Просмотр исходного кода

Flush all multicast messages by delivering them before
processing the token. This ensures that the mcast fd
doesn't buffer too many old messages and avoids an assert.

(Logical change 1.61)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@208 fd59a12c-fef9-0310-b244-a6a79926bd2f

Steven Dake 21 лет назад
Родитель
Сommit
287a5c5359
1 измененных файлов с 41 добавлено и 20 удалено
  1. 41 20
      exec/gmi.c

+ 41 - 20
exec/gmi.c

@@ -90,7 +90,7 @@
 #define LOCALHOST_IP				inet_addr("127.0.0.1")
 #define QUEUE_PEND_SIZE_MAX			51
 #define QUEUE_ASSEMBLY_SIZE_MAX		((MESSAGE_SIZE_MAX / 1472) + 1)
-#define QUEUE_RTR_ITEMS_SIZE_MAX	8192
+#define QUEUE_RTR_ITEMS_SIZE_MAX	256
 #define QUEUE_PEND_TRANS_SIZE_MAX	((MESSAGE_SIZE_MAX / 1472) + 1)
 #define MAXIOVS						8
 #define RTR_TOKEN_SIZE_MAX			32
@@ -442,6 +442,7 @@ static int messages_free (int group_arut);
 static int orf_token_send (struct orf_token *orf_token, int reset_timer);
 static void encrypt_and_sign (struct iovec *iovec, int iov_len);
 static int authenticate_and_decrypt (struct iovec *iov);
+static int recv_handler (poll_handle handle, int fd, int revents, void *data, unsigned int *prio);
 
 struct message_handlers gmi_message_handlers = {
 	5,
@@ -2520,9 +2521,29 @@ static int message_handler_orf_token (
 	int iov_len,
 	int bytes_received)
 {
-	struct orf_token *orf_token;
+	struct orf_token orf_token;
 	int transmits_allowed;
 	int starting_group_arut;
+	int prio = UINT_MAX;
+	struct pollfd ufd;
+	int nfds;
+
+	assert (bytes_received == sizeof (struct orf_token));
+	memcpy (&orf_token, iovec->iov_base, sizeof (struct orf_token));
+
+	/*
+	* flush multicast messages
+	*/
+	do {
+		ufd.fd = gmi_sockets[0].mcast;
+		ufd.events = POLLIN;
+		nfds = poll (&ufd, 1, 0);
+		if (nfds == 1 && ufd.revents & POLLIN) {
+			gmi_iov_recv.iov_len = 1500;
+			recv_handler (0, gmi_sockets[0].mcast, ufd.revents, 0,
+				&prio);
+		}
+	} while (nfds == 1);
 
 #ifdef TESTTOKENRETRANSMIT
 	if ((random() % 500) == 0) {
@@ -2530,25 +2551,24 @@ static int message_handler_orf_token (
 		return (0);
 	}
 #endif
-	orf_token = iovec[0].iov_base;
 
 	/*
 	 * Already received this token, but it was retransmitted
 	 * to this processor because the retransmit timer on a previous
 	 * processor timed out, so ignore the token
 	 */
-	if (orf_token->token_seqid > 0 && gmi_token_seqid >= orf_token->token_seqid) {
-printf ("already received token %d %d\n", orf_token->token_seqid, gmi_token_seqid);
+	if (orf_token.token_seqid > 0 && gmi_token_seqid >= orf_token.token_seqid) {
+printf ("already received token %d %d\n", orf_token.token_seqid, gmi_token_seqid);
 //exit(1);
 		return (0);
 	}
-	gmi_token_seqid = orf_token->token_seqid;
+	gmi_token_seqid = orf_token.token_seqid;
 
 	poll_timer_delete (*gmi_poll_handle, timer_orf_token_retransmit_timeout);
 	timer_orf_token_retransmit_timeout = 0;
 
 #ifdef PRINT_STATS
-	if (orf_token->header.seqid > 10000) {
+	if (orf_token.header.seqid > 10000) {
 		print_stats ();
 	}
 #endif
@@ -2569,17 +2589,17 @@ printf ("already received token %d %d\n", orf_token->token_seqid, gmi_token_seqi
 	}
 
 //printf ("Got orf token from %s\n", inet_ntoa (system_from->sin_addr));
-	starting_group_arut = orf_token->group_arut;
+	starting_group_arut = orf_token.group_arut;
 	stats_orf_token++;
 	
-	transmits_allowed = orf_fcc_allowed (orf_token);
+	transmits_allowed = orf_fcc_allowed (&orf_token);
 
 //printf ("retransmit allowed %d\n", transmits_allowed);
 	/*
 	 * Retransmit failed messages and request retransmissions
 	 */
 
-	orf_token_rtr (orf_token, &transmits_allowed);
+	orf_token_rtr (&orf_token, &transmits_allowed);
 //printf ("multicasts allowed %d\n", transmits_allowed);
 
 	/*
@@ -2592,7 +2612,7 @@ printf ("already received token %d %d\n", orf_token->token_seqid, gmi_token_seqi
 	 * be overrun or cause the form token to be large.
 	 */
 
-	if ((gmi_brake + MISSING_MCAST_WINDOW) < orf_token->header.seqid) {
+	if ((gmi_brake + MISSING_MCAST_WINDOW) < orf_token.header.seqid) {
 		transmits_allowed = 0;
 	}
 
@@ -2600,33 +2620,34 @@ printf ("already received token %d %d\n", orf_token->token_seqid, gmi_token_seqi
 	 * Set the group arut and free any messages that can be freed
 	 */
 	if (memb_state != MEMB_STATE_EVS) {
-		calculate_group_arut (orf_token);
+		calculate_group_arut (&orf_token);
 	}
 
 	/*
 	 * Multicast queued messages
 	 */
-	orf_token_mcast (orf_token, transmits_allowed, system_from);
+	orf_token_mcast (&orf_token, transmits_allowed, system_from);
 
 	/*
 	 * Calculate flow control count
 	 */
-	orf_token_fcc (orf_token);
+	orf_token_fcc (&orf_token);
 
 	/*
 	 * Deliver membership and messages required by EVS
 	 */
-	orf_token_evs (orf_token, starting_group_arut);
+	orf_token_evs (&orf_token, starting_group_arut);
 
 	if (memb_state == MEMB_STATE_EVS) {
-		calculate_group_arut (orf_token);
+		calculate_group_arut (&orf_token);
 	}
 
 	/*
 	 * Increment the token seqid and store for later retransmit
 	 */
-	orf_token->token_seqid += 1;
-	memcpy (&orf_token_retransmit, orf_token, sizeof (struct orf_token));
+	orf_token.token_seqid += 1;
+	memcpy (&orf_token_retransmit, &orf_token,
+		sizeof (struct orf_token));
 
 	poll_timer_delete (*gmi_poll_handle, timer_orf_token_retransmit_timeout);
 
@@ -2637,7 +2658,7 @@ printf ("already received token %d %d\n", orf_token->token_seqid, gmi_token_seqi
 	/*
 	 * Transmit orf_token to next member
 	 */
-	orf_token_send (orf_token, 1);
+	orf_token_send (&orf_token, 1);
 
 	return (0);
 }
@@ -3357,7 +3378,7 @@ printf ("setting barrier seq to %d\n", gmi_barrier_seq);
 	return (res);
 }
 
-int recv_handler (poll_handle handle, int fd, int revents, void *data, unsigned int *prio)
+static int recv_handler (poll_handle handle, int fd, int revents, void *data, unsigned int *prio)
 {
 	struct msghdr msg_recv;
 	struct message_header *message_header;