Просмотр исходного кода

Use unix socket for local multicast loop

Instead of rely on multicast loop functionality of kernel, we now use
unix socket created by socketpair to deliver multicast messages to
local node. This handles problems with improperly configured local
firewall. So if output/input to/from ethernet interface is blocked, node
is still able to create single node membership.

Dark side of the patch is fact, that membership is always created, so
"Totem is unable to form a cluster..." will never appear (same applies
to continuous_gather key).

Signed-off-by: Jan Friesse <jfriesse@redhat.com>
Reviewed-by: Fabio M. Di Nitto <fdinitto@redhat.com>
Jan Friesse 13 лет назад
Родитель
Сommit
cea2b1a302
1 измененных файлов с 144 добавлено и 26 удалено
  1. 144 26
      exec/totemudp.c

+ 144 - 26
exec/totemudp.c

@@ -108,6 +108,13 @@ struct totemudp_socket {
 	int mcast_recv;
 	int mcast_send;
 	int token;
+	/*
+	 * Socket used for local multicast delivery. We don't rely on multicast
+	 * loop and rather this UNIX DGRAM socket is used. Socket is created by
+	 * socketpair call and they are used in same way as pipe (so [0] is read
+	 * end and [1] is write end)
+	 */
+	int local_mcast_loop[2];
 };
 
 struct totemudp_instance {
@@ -1061,6 +1068,20 @@ static inline void mcast_sendmsg (
 		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
 			"sendmsg(mcast) failed (non-critical)");
 	}
+
+	/*
+	 * Transmit multicast message to local unix mcast loop
+	 * An error here is recovered by totemsrp
+	 */
+	msg_mcast.msg_name = NULL;
+	msg_mcast.msg_namelen = 0;
+
+	res = sendmsg (instance->totemudp_sockets.local_mcast_loop[1], &msg_mcast,
+		MSG_NOSIGNAL);
+	if (res < 0) {
+		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
+			"sendmsg(local mcast loop) failed (non-critical)");
+	}
 }
 
 static void totemudp_mcast_thread_state_constructor (
@@ -1157,6 +1178,12 @@ int totemudp_finalize (
 	if (instance->totemudp_sockets.mcast_send > 0) {
 		close (instance->totemudp_sockets.mcast_send);
 	}
+	if (instance->totemudp_sockets.local_mcast_loop[0] > 0) {
+		poll_dispatch_delete (instance->totemudp_poll_handle,
+			instance->totemudp_sockets.local_mcast_loop[0]);
+		close (instance->totemudp_sockets.local_mcast_loop[0]);
+		close (instance->totemudp_sockets.local_mcast_loop[1]);
+	}
 	if (instance->totemudp_sockets.token > 0) {
 		close (instance->totemudp_sockets.token);
 		poll_dispatch_delete (instance->totemudp_poll_handle,
@@ -1338,6 +1365,12 @@ static void timer_function_netif_check_timeout (
 	if (instance->totemudp_sockets.mcast_send > 0) {
 		close (instance->totemudp_sockets.mcast_send);
 	}
+	if (instance->totemudp_sockets.local_mcast_loop[0] > 0) {
+		poll_dispatch_delete (instance->totemudp_poll_handle,
+			instance->totemudp_sockets.local_mcast_loop[0]);
+		close (instance->totemudp_sockets.local_mcast_loop[0]);
+		close (instance->totemudp_sockets.local_mcast_loop[1]);
+	}
 	if (instance->totemudp_sockets.token > 0) {
 		close (instance->totemudp_sockets.token);
 		poll_dispatch_delete (instance->totemudp_poll_handle,
@@ -1380,6 +1413,11 @@ static void timer_function_netif_check_timeout (
 		instance->totemudp_sockets.mcast_recv,
 		POLLIN, instance, net_deliver_fn);
 
+	poll_dispatch_add (
+		instance->totemudp_poll_handle,
+		instance->totemudp_sockets.local_mcast_loop[0],
+		POLLIN, instance, net_deliver_fn);
+
 	poll_dispatch_add (
 		instance->totemudp_poll_handle,
 		instance->totemudp_sockets.token,
@@ -1454,6 +1492,7 @@ static int totemudp_build_sockets_ip (
 	int addrlen;
 	int res;
 	int flag;
+	int i;
 
 	/*
 	 * Create multicast recv socket
@@ -1495,6 +1534,25 @@ static int totemudp_build_sockets_ip (
 		return (-1);
 	}
 
+	/*
+	 * Create local multicast loop socket
+	 */
+	if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets->local_mcast_loop) == -1) {
+		LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
+			"socket() failed");
+		return (-1);
+	}
+
+	for (i = 0; i < 2; i++) {
+		totemip_nosigpipe (sockets->local_mcast_loop[i]);
+		res = fcntl (sockets->local_mcast_loop[i], F_SETFL, O_NONBLOCK);
+		if (res == -1) {
+			LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
+				"Could not set non-blocking operation on multicast socket");
+			return (-1);
+		}
+	}
+
 	/*
 	 * Setup mcast send socket
 	 */
@@ -1577,12 +1635,34 @@ static int totemudp_build_sockets_ip (
 	/*
 	 * Set buffer sizes to avoid overruns
 	 */
-	 res = setsockopt (sockets->mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
-	 res = setsockopt (sockets->mcast_send, SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
+	res = setsockopt (sockets->mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
+	if (res == -1) {
+		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
+			"Unable to set SO_RCVBUF size on UDP mcast socket");
+		return (-1);
+	}
+	res = setsockopt (sockets->mcast_send, SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
+	if (res == -1) {
+		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
+			"Unable to set SO_SNDBUF size on UDP mcast socket");
+		return (-1);
+	}
+	res = setsockopt (sockets->local_mcast_loop[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
+	if (res == -1) {
+		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
+			"Unable to set SO_RCVBUF size on UDP local mcast loop socket");
+		return (-1);
+	}
+	res = setsockopt (sockets->local_mcast_loop[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
+	if (res == -1) {
+		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
+			"Unable to set SO_SNDBUF size on UDP local mcast loop socket");
+		return (-1);
+	}
 
 	res = getsockopt (sockets->mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen);
 	if (res == 0) {
-	 	log_printf (instance->totemudp_log_level_debug,
+		log_printf (instance->totemudp_log_level_debug,
 			"Receive multicast socket recv buffer size (%d bytes).\n", recvbuf_size);
 	}
 
@@ -1592,6 +1672,19 @@ static int totemudp_build_sockets_ip (
 			"Transmit multicast socket send buffer size (%d bytes).\n", sendbuf_size);
 	}
 
+	res = getsockopt (sockets->local_mcast_loop[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen);
+	if (res == 0) {
+		log_printf (instance->totemudp_log_level_debug,
+			"Local receive multicast loop socket recv buffer size (%d bytes).", recvbuf_size);
+	}
+
+	res = getsockopt (sockets->local_mcast_loop[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, &optlen);
+	if (res == 0) {
+		log_printf (instance->totemudp_log_level_debug,
+			"Local transmit multicast loop socket send buffer size (%d bytes).", sendbuf_size);
+	}
+
+
 	/*
 	 * Join group membership on socket
 	 */
@@ -1644,10 +1737,10 @@ static int totemudp_build_sockets_ip (
 	}
 
 	/*
-	 * Turn on multicast loopback
+	 * Turn off multicast loopback
 	 */
 
-	flag = 1;
+	flag = 0;
 	switch ( bindnet_address->family ) {
 		case AF_INET:
 		res = setsockopt (sockets->mcast_send, IPPROTO_IP, IP_MULTICAST_LOOP,
@@ -1659,7 +1752,7 @@ static int totemudp_build_sockets_ip (
 	}
 	if (res == -1) {
 		LOGSYS_PERROR (errno, instance->totemudp_log_level_warning,
-			"Unable to turn on multicast loopback");
+			"Unable to turn off multicast loopback");
 		return (-1);
 	}
 
@@ -1889,18 +1982,30 @@ int totemudp_recv_flush (void *udp_context)
 	struct pollfd ufd;
 	int nfds;
 	int res = 0;
+	int i;
+	int sock;
 
 	instance->flushing = 1;
 
-	do {
-		ufd.fd = instance->totemudp_sockets.mcast_recv;
-		ufd.events = POLLIN;
-		nfds = poll (&ufd, 1, 0);
-		if (nfds == 1 && ufd.revents & POLLIN) {
-		net_deliver_fn (0, instance->totemudp_sockets.mcast_recv,
-			ufd.revents, instance);
+	for (i = 0; i < 2; i++) {
+		sock = -1;
+		if (i == 0) {
+		    sock = instance->totemudp_sockets.mcast_recv;
 		}
-	} while (nfds == 1);
+		if (i == 1) {
+		    sock = instance->totemudp_sockets.local_mcast_loop[0];
+		}
+		assert(sock != -1);
+
+		do {
+			ufd.fd = sock;
+			ufd.events = POLLIN;
+			nfds = poll (&ufd, 1, 0);
+			if (nfds == 1 && ufd.revents & POLLIN) {
+				net_deliver_fn (0, sock, ufd.revents, instance);
+			}
+		} while (nfds == 1);
+	}
 
 	instance->flushing = 0;
 
@@ -2032,6 +2137,8 @@ extern int totemudp_recv_mcast_empty (
 	struct pollfd ufd;
 	int nfds;
 	int msg_processed = 0;
+	int i;
+	int sock;
 
 	/*
 	 * Receive datagram
@@ -2049,19 +2156,30 @@ extern int totemudp_recv_mcast_empty (
 	msg_recv.msg_accrightslen = 0;
 #endif
 
-	do {
-		ufd.fd = instance->totemudp_sockets.mcast_recv;
-		ufd.events = POLLIN;
-		nfds = poll (&ufd, 1, 0);
-		if (nfds == 1 && ufd.revents & POLLIN) {
-			res = recvmsg (instance->totemudp_sockets.mcast_recv, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
-			if (res != -1) {
-				msg_processed = 1;
-			} else {
-				msg_processed = -1;
-			}
+	for (i = 0; i < 2; i++) {
+		sock = -1;
+		if (i == 0) {
+		    sock = instance->totemudp_sockets.mcast_recv;
+		}
+		if (i == 1) {
+		    sock = instance->totemudp_sockets.local_mcast_loop[0];
 		}
-	} while (nfds == 1);
+		assert(sock != -1);
+
+		do {
+			ufd.fd = sock;
+			ufd.events = POLLIN;
+			nfds = poll (&ufd, 1, 0);
+			if (nfds == 1 && ufd.revents & POLLIN) {
+				res = recvmsg (sock, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
+				if (res != -1) {
+					msg_processed = 1;
+				} else {
+					msg_processed = -1;
+				}
+			}
+		} while (nfds == 1);
+	}
 
 	return (msg_processed);
 }