Просмотр исходного кода

Use unix socket for local multicast loop

Instead of rely on multicast loop functionality of kernel, we now use
unix socket created by socketpair to deliver multicast messages to
local node. This handles problems with improperly configured local
firewall. So if output/input to/from ethernet interface is blocked, node
is still able to create single node membership.

Dark side of the patch is fact, that membership is always created, so
"Totem is unable to form a cluster..." will never appear (same applies
to continuous_gather key).

Signed-off-by: Jan Friesse <jfriesse@redhat.com>
Reviewed-by: Fabio M. Di Nitto <fdinitto@redhat.com>
Jan Friesse 13 лет назад
Родитель
Сommit
6c3b337b37
1 измененных файлов с 148 добавлено и 27 удалено
  1. 148 27
      exec/totemudp.c

+ 148 - 27
exec/totemudp.c

@@ -94,6 +94,13 @@ struct totemudp_socket {
 	int mcast_recv;
 	int mcast_recv;
 	int mcast_send;
 	int mcast_send;
 	int token;
 	int token;
+	/*
+	 * Socket used for local multicast delivery. We don't rely on multicast
+	 * loop and rather this UNIX DGRAM socket is used. Socket is created by
+	 * socketpair call and they are used in same way as pipe (so [0] is read
+	 * end and [1] is write end)
+	 */
+	int local_mcast_loop[2];
 };
 };
 
 
 struct totemudp_instance {
 struct totemudp_instance {
@@ -381,6 +388,20 @@ static inline void mcast_sendmsg (
 		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
 		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
 			"sendmsg(mcast) failed (non-critical)");
 			"sendmsg(mcast) failed (non-critical)");
 	}
 	}
+
+	/*
+	 * Transmit multicast message to local unix mcast loop
+	 * An error here is recovered by totemsrp
+	 */
+	msg_mcast.msg_name = NULL;
+	msg_mcast.msg_namelen = 0;
+
+	res = sendmsg (instance->totemudp_sockets.local_mcast_loop[1], &msg_mcast,
+		MSG_NOSIGNAL);
+	if (res < 0) {
+		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
+			"sendmsg(local mcast loop) failed (non-critical)");
+	}
 }
 }
 
 
 
 
@@ -398,6 +419,12 @@ int totemudp_finalize (
 	if (instance->totemudp_sockets.mcast_send > 0) {
 	if (instance->totemudp_sockets.mcast_send > 0) {
 		close (instance->totemudp_sockets.mcast_send);
 		close (instance->totemudp_sockets.mcast_send);
 	}
 	}
+	if (instance->totemudp_sockets.local_mcast_loop[0] > 0) {
+		qb_loop_poll_del (instance->totemudp_poll_handle,
+			instance->totemudp_sockets.local_mcast_loop[0]);
+		close (instance->totemudp_sockets.local_mcast_loop[0]);
+		close (instance->totemudp_sockets.local_mcast_loop[1]);
+	}
 	if (instance->totemudp_sockets.token > 0) {
 	if (instance->totemudp_sockets.token > 0) {
 		qb_loop_poll_del (instance->totemudp_poll_handle,
 		qb_loop_poll_del (instance->totemudp_poll_handle,
 			instance->totemudp_sockets.token);
 			instance->totemudp_sockets.token);
@@ -564,6 +591,12 @@ static void timer_function_netif_check_timeout (
 	if (instance->totemudp_sockets.mcast_send > 0) {
 	if (instance->totemudp_sockets.mcast_send > 0) {
 		close (instance->totemudp_sockets.mcast_send);
 		close (instance->totemudp_sockets.mcast_send);
 	}
 	}
+	if (instance->totemudp_sockets.local_mcast_loop[0] > 0) {
+		qb_loop_poll_del (instance->totemudp_poll_handle,
+			instance->totemudp_sockets.local_mcast_loop[0]);
+		close (instance->totemudp_sockets.local_mcast_loop[0]);
+		close (instance->totemudp_sockets.local_mcast_loop[1]);
+	}
 	if (instance->totemudp_sockets.token > 0) {
 	if (instance->totemudp_sockets.token > 0) {
 		qb_loop_poll_del (instance->totemudp_poll_handle,
 		qb_loop_poll_del (instance->totemudp_poll_handle,
 			instance->totemudp_sockets.token);
 			instance->totemudp_sockets.token);
@@ -608,6 +641,12 @@ static void timer_function_netif_check_timeout (
 		instance->totemudp_sockets.mcast_recv,
 		instance->totemudp_sockets.mcast_recv,
 		POLLIN, instance, net_deliver_fn);
 		POLLIN, instance, net_deliver_fn);
 
 
+	qb_loop_poll_add (
+		instance->totemudp_poll_handle,
+		QB_LOOP_MED,
+		instance->totemudp_sockets.local_mcast_loop[0],
+		POLLIN, instance, net_deliver_fn);
+
 	qb_loop_poll_add (
 	qb_loop_poll_add (
 		instance->totemudp_poll_handle,
 		instance->totemudp_poll_handle,
 		QB_LOOP_MED,
 		QB_LOOP_MED,
@@ -685,6 +724,7 @@ static int totemudp_build_sockets_ip (
 	int res;
 	int res;
 	int flag;
 	int flag;
 	uint8_t sflag;
 	uint8_t sflag;
+	int i;
 
 
 	/*
 	/*
 	 * Create multicast recv socket
 	 * Create multicast recv socket
@@ -726,6 +766,27 @@ static int totemudp_build_sockets_ip (
 		return (-1);
 		return (-1);
 	}
 	}
 
 
+	/*
+	 * Create local multicast loop socket
+	 */
+	if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets->local_mcast_loop) == -1) {
+		LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
+			"socket() failed");
+		return (-1);
+	}
+
+	for (i = 0; i < 2; i++) {
+		totemip_nosigpipe (sockets->local_mcast_loop[i]);
+		res = fcntl (sockets->local_mcast_loop[i], F_SETFL, O_NONBLOCK);
+		if (res == -1) {
+			LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
+				"Could not set non-blocking operation on multicast socket");
+			return (-1);
+		}
+	}
+
+
+
 	/*
 	/*
 	 * Setup mcast send socket
 	 * Setup mcast send socket
 	 */
 	 */
@@ -808,12 +869,34 @@ static int totemudp_build_sockets_ip (
 	/*
 	/*
 	 * Set buffer sizes to avoid overruns
 	 * Set buffer sizes to avoid overruns
 	 */
 	 */
-	 res = setsockopt (sockets->mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
-	 res = setsockopt (sockets->mcast_send, SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
+	res = setsockopt (sockets->mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
+	if (res == -1) {
+		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
+			"Unable to set SO_RCVBUF size on UDP mcast socket");
+		return (-1);
+	}
+	res = setsockopt (sockets->mcast_send, SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
+	if (res == -1) {
+		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
+			"Unable to set SO_SNDBUF size on UDP mcast socket");
+		return (-1);
+	}
+	res = setsockopt (sockets->local_mcast_loop[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
+	if (res == -1) {
+		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
+			"Unable to set SO_RCVBUF size on UDP local mcast loop socket");
+		return (-1);
+	}
+	res = setsockopt (sockets->local_mcast_loop[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
+	if (res == -1) {
+		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
+			"Unable to set SO_SNDBUF size on UDP local mcast loop socket");
+		return (-1);
+	}
 
 
 	res = getsockopt (sockets->mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen);
 	res = getsockopt (sockets->mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen);
 	if (res == 0) {
 	if (res == 0) {
-	 	log_printf (instance->totemudp_log_level_debug,
+		log_printf (instance->totemudp_log_level_debug,
 			"Receive multicast socket recv buffer size (%d bytes).", recvbuf_size);
 			"Receive multicast socket recv buffer size (%d bytes).", recvbuf_size);
 	}
 	}
 
 
@@ -823,6 +906,19 @@ static int totemudp_build_sockets_ip (
 			"Transmit multicast socket send buffer size (%d bytes).", sendbuf_size);
 			"Transmit multicast socket send buffer size (%d bytes).", sendbuf_size);
 	}
 	}
 
 
+	res = getsockopt (sockets->local_mcast_loop[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen);
+	if (res == 0) {
+		log_printf (instance->totemudp_log_level_debug,
+			"Local receive multicast loop socket recv buffer size (%d bytes).", recvbuf_size);
+	}
+
+	res = getsockopt (sockets->local_mcast_loop[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, &optlen);
+	if (res == 0) {
+		log_printf (instance->totemudp_log_level_debug,
+			"Local transmit multicast loop socket send buffer size (%d bytes).", sendbuf_size);
+	}
+
+
 	/*
 	/*
 	 * Join group membership on socket
 	 * Join group membership on socket
 	 */
 	 */
@@ -875,13 +971,13 @@ static int totemudp_build_sockets_ip (
 	}
 	}
 
 
 	/*
 	/*
-	 * Turn on multicast loopback
+	 * Turn off multicast loopback
 	 */
 	 */
 
 
-	flag = 1;
+	flag = 0;
 	switch ( bindnet_address->family ) {
 	switch ( bindnet_address->family ) {
 		case AF_INET:
 		case AF_INET:
-		sflag = 1;
+		sflag = 0;
 		res = setsockopt (sockets->mcast_send, IPPROTO_IP, IP_MULTICAST_LOOP,
 		res = setsockopt (sockets->mcast_send, IPPROTO_IP, IP_MULTICAST_LOOP,
 			&sflag, sizeof (sflag));
 			&sflag, sizeof (sflag));
 		break;
 		break;
@@ -891,7 +987,7 @@ static int totemudp_build_sockets_ip (
 	}
 	}
 	if (res == -1) {
 	if (res == -1) {
 		LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
 		LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
-			"Unable to turn on multicast loopback");
+			"Unable to turn off multicast loopback");
 		return (-1);
 		return (-1);
 	}
 	}
 
 
@@ -1126,18 +1222,30 @@ int totemudp_recv_flush (void *udp_context)
 	struct pollfd ufd;
 	struct pollfd ufd;
 	int nfds;
 	int nfds;
 	int res = 0;
 	int res = 0;
+	int i;
+	int sock;
 
 
 	instance->flushing = 1;
 	instance->flushing = 1;
 
 
-	do {
-		ufd.fd = instance->totemudp_sockets.mcast_recv;
-		ufd.events = POLLIN;
-		nfds = poll (&ufd, 1, 0);
-		if (nfds == 1 && ufd.revents & POLLIN) {
-		net_deliver_fn (instance->totemudp_sockets.mcast_recv,
-			ufd.revents, instance);
+	for (i = 0; i < 2; i++) {
+		sock = -1;
+		if (i == 0) {
+		    sock = instance->totemudp_sockets.mcast_recv;
+		}
+		if (i == 1) {
+		    sock = instance->totemudp_sockets.local_mcast_loop[0];
 		}
 		}
-	} while (nfds == 1);
+		assert(sock != -1);
+
+		do {
+			ufd.fd = sock;
+			ufd.events = POLLIN;
+			nfds = poll (&ufd, 1, 0);
+			if (nfds == 1 && ufd.revents & POLLIN) {
+			net_deliver_fn (sock, ufd.revents, instance);
+			}
+		} while (nfds == 1);
+	}
 
 
 	instance->flushing = 0;
 	instance->flushing = 0;
 
 
@@ -1251,6 +1359,8 @@ extern int totemudp_recv_mcast_empty (
 	struct pollfd ufd;
 	struct pollfd ufd;
 	int nfds;
 	int nfds;
 	int msg_processed = 0;
 	int msg_processed = 0;
+	int i;
+	int sock;
 
 
 	/*
 	/*
 	 * Receive datagram
 	 * Receive datagram
@@ -1275,19 +1385,30 @@ extern int totemudp_recv_mcast_empty (
 	msg_recv.msg_accrightslen = 0;
 	msg_recv.msg_accrightslen = 0;
 #endif
 #endif
 
 
-	do {
-		ufd.fd = instance->totemudp_sockets.mcast_recv;
-		ufd.events = POLLIN;
-		nfds = poll (&ufd, 1, 0);
-		if (nfds == 1 && ufd.revents & POLLIN) {
-			res = recvmsg (instance->totemudp_sockets.mcast_recv, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
-			if (res != -1) {
-				msg_processed = 1;
-			} else {
-				msg_processed = -1;
-			}
+	for (i = 0; i < 2; i++) {
+		sock = -1;
+		if (i == 0) {
+		    sock = instance->totemudp_sockets.mcast_recv;
+		}
+		if (i == 1) {
+		    sock = instance->totemudp_sockets.local_mcast_loop[0];
 		}
 		}
-	} while (nfds == 1);
+		assert(sock != -1);
+
+		do {
+			ufd.fd = sock;
+			ufd.events = POLLIN;
+			nfds = poll (&ufd, 1, 0);
+			if (nfds == 1 && ufd.revents & POLLIN) {
+				res = recvmsg (sock, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
+				if (res != -1) {
+					msg_processed = 1;
+				} else {
+					msg_processed = -1;
+				}
+			}
+		} while (nfds == 1);
+	}
 
 
 	return (msg_processed);
 	return (msg_processed);
 }
 }