Просмотр исходного кода

Revert "totemsrp: Remove recv_flush code"

This reverts commit 1a7b7a39f445be63c697170c1680eeca9834de39.

Reversion is needed to remove overflow of receive buffers and dropping
messages.

Signed-off-by: Jan Friesse <jfriesse@redhat.com>
(cherry picked from commit ddb5214c2c57194fe8e12d775398bfc5726743c4)
Jan Friesse 14 лет назад
Родитель
Сommit
ee0a0dd75e
8 измененных файлов с 122 добавлено и 1 удалено
  1. 10 0
      exec/totemiba.c
  2. 15 0
      exec/totemnet.c
  3. 2 0
      exec/totemnet.h
  4. 54 0
      exec/totemrrp.c
  5. 3 0
      exec/totemrrp.h
  6. 2 0
      exec/totemsrp.c
  7. 29 1
      exec/totemudp.c
  8. 7 0
      exec/totemudpu.c

+ 10 - 0
exec/totemiba.c

@@ -1331,6 +1331,16 @@ int totemiba_processor_count_set (
 	return (res);
 }
 
+int totemiba_recv_flush (void *iba_context)
+{
+	struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;
+	int res = 0;
+
+	instance = NULL;
+
+	return (res);
+}
+
 int totemiba_send_flush (void *iba_context)
 {
 	struct totemiba_instance *instance = (struct totemiba_instance *)iba_context;

+ 15 - 0
exec/totemnet.c

@@ -87,6 +87,8 @@ struct transport {
 		const void *msg,
 		unsigned int msg_len);
 
+	int (*recv_flush) (void *transport_context);
+
 	int (*send_flush) (void *transport_context);
 
 	int (*iface_check) (void *transport_context);
@@ -129,6 +131,7 @@ struct transport transport_entries[] = {
 		.token_send = totemudp_token_send,
 		.mcast_flush_send = totemudp_mcast_flush_send,
 		.mcast_noflush_send = totemudp_mcast_noflush_send,
+		.recv_flush = totemudp_recv_flush,
 		.send_flush = totemudp_send_flush,
 		.iface_check = totemudp_iface_check,
 		.finalize = totemudp_finalize,
@@ -146,6 +149,7 @@ struct transport transport_entries[] = {
 		.token_send = totemudpu_token_send,
 		.mcast_flush_send = totemudpu_mcast_flush_send,
 		.mcast_noflush_send = totemudpu_mcast_noflush_send,
+		.recv_flush = totemudpu_recv_flush,
 		.send_flush = totemudpu_send_flush,
 		.iface_check = totemudpu_iface_check,
 		.finalize = totemudpu_finalize,
@@ -166,6 +170,7 @@ struct transport transport_entries[] = {
 		.token_send = totemiba_token_send,
 		.mcast_flush_send = totemiba_mcast_flush_send,
 		.mcast_noflush_send = totemiba_mcast_noflush_send,
+		.recv_flush = totemiba_recv_flush,
 		.send_flush = totemiba_send_flush,
 		.iface_check = totemiba_iface_check,
 		.finalize = totemiba_finalize,
@@ -302,6 +307,16 @@ int totemnet_processor_count_set (
 	return (res);
 }
 
+int totemnet_recv_flush (void *net_context)
+{
+	struct totemnet_instance *instance = (struct totemnet_instance *)net_context;
+	int res = 0;
+
+	res = instance->transport->recv_flush (instance->transport_context);
+
+	return (res);
+}
+
 int totemnet_send_flush (void *net_context)
 {
 	struct totemnet_instance *instance = (struct totemnet_instance *)net_context;

+ 2 - 0
exec/totemnet.h

@@ -88,6 +88,8 @@ extern int totemnet_mcast_noflush_send (
 	const void *msg,
 	unsigned int msg_len);
 
+extern int totemnet_recv_flush (void *net_context);
+
 extern int totemnet_send_flush (void *net_context);
 
 extern int totemnet_iface_check (void *net_context);

+ 54 - 0
exec/totemrrp.c

@@ -143,6 +143,9 @@ struct rrp_algo {
 		const void *msg,
 		unsigned int msg_len);
 
+	void (*recv_flush) (
+		struct totemrrp_instance *instance);
+
 	void (*send_flush) (
 		struct totemrrp_instance *instance);
 
@@ -273,6 +276,9 @@ static void none_token_send (
 	const void *msg,
 	unsigned int msg_len);
 
+static void none_recv_flush (
+	struct totemrrp_instance *instance);
+
 static void none_send_flush (
 	struct totemrrp_instance *instance);
 
@@ -340,6 +346,9 @@ static void passive_token_send (
 	const void *msg,
 	unsigned int msg_len);
 
+static void passive_recv_flush (
+	struct totemrrp_instance *instance);
+
 static void passive_send_flush (
 	struct totemrrp_instance *instance);
 
@@ -407,6 +416,9 @@ static void active_token_send (
 	const void *msg,
 	unsigned int msg_len);
 
+static void active_recv_flush (
+	struct totemrrp_instance *instance);
+
 static void active_send_flush (
 	struct totemrrp_instance *instance);
 
@@ -458,6 +470,7 @@ struct rrp_algo none_algo = {
 	.mcast_flush_send	= none_mcast_flush_send,
 	.token_recv		= none_token_recv,
 	.token_send		= none_token_send,
+	.recv_flush		= none_recv_flush,
 	.send_flush		= none_send_flush,
 	.iface_check		= none_iface_check,
 	.processor_count_set	= none_processor_count_set,
@@ -476,6 +489,7 @@ struct rrp_algo passive_algo = {
 	.mcast_flush_send	= passive_mcast_flush_send,
 	.token_recv		= passive_token_recv,
 	.token_send		= passive_token_send,
+	.recv_flush		= passive_recv_flush,
 	.send_flush		= passive_send_flush,
 	.iface_check		= passive_iface_check,
 	.processor_count_set	= passive_processor_count_set,
@@ -494,6 +508,7 @@ struct rrp_algo active_algo = {
 	.mcast_flush_send	= active_mcast_flush_send,
 	.token_recv		= active_token_recv,
 	.token_send		= active_token_send,
+	.recv_flush		= active_recv_flush,
 	.send_flush		= active_send_flush,
 	.iface_check		= active_iface_check,
 	.processor_count_set	= active_processor_count_set,
@@ -579,6 +594,11 @@ static void none_token_send (
 		msg, msg_len);
 }
 
+static void none_recv_flush (struct totemrrp_instance *instance)
+{
+	totemnet_recv_flush (instance->net_handles[0]);
+}
+
 static void none_send_flush (struct totemrrp_instance *instance)
 {
 	totemnet_send_flush (instance->net_handles[0]);
@@ -910,6 +930,19 @@ static void passive_token_send (
 
 }
 
+static void passive_recv_flush (struct totemrrp_instance *instance)
+{
+	struct passive_instance *rrp_algo_instance = (struct passive_instance *)instance->rrp_algo_instance;
+	unsigned int i;
+
+	for (i = 0; i < instance->interface_count; i++) {
+		if (rrp_algo_instance->faulty[i] == 0) {
+
+			totemnet_recv_flush (instance->net_handles[i]);
+		}
+	}
+}
+
 static void passive_send_flush (struct totemrrp_instance *instance)
 {
 	struct passive_instance *rrp_algo_instance = (struct passive_instance *)instance->rrp_algo_instance;
@@ -1292,6 +1325,19 @@ static void active_token_send (
 	}
 }
 
+static void active_recv_flush (struct totemrrp_instance *instance)
+{
+	struct active_instance *rrp_algo_instance = (struct active_instance *)instance->rrp_algo_instance;
+	unsigned int i;
+
+	for (i = 0; i < instance->interface_count; i++) {
+		if (rrp_algo_instance->faulty[i] == 0) {
+
+			totemnet_recv_flush (instance->net_handles[i]);
+		}
+	}
+}
+
 static void active_send_flush (struct totemrrp_instance *instance)
 {
 	struct active_instance *rrp_algo_instance = (struct active_instance *)instance->rrp_algo_instance;
@@ -1641,6 +1687,14 @@ int totemrrp_token_target_set (
 
 	return (0);
 }
+int totemrrp_recv_flush (void *rrp_context)
+{
+	struct totemrrp_instance *instance = (struct totemrrp_instance *)rrp_context;
+
+	instance->rrp_algo->recv_flush (instance);
+
+	return (0);
+}
 
 int totemrrp_send_flush (void *rrp_context)
 {

+ 3 - 0
exec/totemrrp.h

@@ -98,6 +98,9 @@ extern int totemrrp_mcast_flush_send (
 	const void *msg,
 	unsigned int msg_len);
 
+extern int totemrrp_recv_flush (
+	void *rrp_context);
+
 extern int totemrrp_send_flush (
 	void *rrp_context);
 

+ 2 - 0
exec/totemsrp.c

@@ -3418,6 +3418,8 @@ static int message_handler_orf_token (
 	}
 #endif
 
+	totemrrp_recv_flush (instance->totemrrp_context);
+
 	/*
 	 * Determine if we should hold (in reality drop) the token
 	 */

+ 29 - 1
exec/totemudp.c

@@ -1173,7 +1173,11 @@ static int net_deliver_fn (
 	unsigned char *msg_offset;
 	unsigned int size_delv;
 
-	iovec = &instance->totemudp_iov_recv;
+	if (instance->flushing == 1) {
+		iovec = &instance->totemudp_iov_recv_flush;
+	} else {
+		iovec = &instance->totemudp_iov_recv;
+	}
 
 	/*
 	 * Receive datagram
@@ -1849,6 +1853,30 @@ int totemudp_processor_count_set (
 	return (res);
 }
 
+int totemudp_recv_flush (void *udp_context)
+{
+	struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
+	struct pollfd ufd;
+	int nfds;
+	int res = 0;
+
+	instance->flushing = 1;
+
+	do {
+		ufd.fd = instance->totemudp_sockets.mcast_recv;
+		ufd.events = POLLIN;
+		nfds = poll (&ufd, 1, 0);
+		if (nfds == 1 && ufd.revents & POLLIN) {
+		net_deliver_fn (0, instance->totemudp_sockets.mcast_recv,
+			ufd.revents, instance);
+		}
+	} while (nfds == 1);
+
+	instance->flushing = 0;
+
+	return (res);
+}
+
 int totemudp_send_flush (void *udp_context)
 {
 	struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;

+ 7 - 0
exec/totemudpu.c

@@ -1499,6 +1499,13 @@ int totemudpu_processor_count_set (
 	return (res);
 }
 
+int totemudpu_recv_flush (void *udpu_context)
+{
+	int res = 0;
+
+	return (res);
+}
+
 int totemudpu_send_flush (void *udpu_context)
 {
 	int res = 0;