Просмотр исходного кода

This fixes a potential problem where, because of a config change, a joining
node may not have a previous fragment of a message. It now discards
continuations of that message until it is complete and a new message
arrives.

(Logical change 1.132)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@476 fd59a12c-fef9-0310-b244-a6a79926bd2f

Mark Haverkamp 21 лет назад
Родитель
Сommit
c720930bfb
1 измененных файлов с 80 добавлено и 39 удалено
  1. 80 39
      exec/totempg.c

+ 80 - 39
exec/totempg.c

@@ -99,9 +99,21 @@ struct totempg_mcast_header {
 	short type;
 };
 
+/*
+ * totempg_mcast structure
+ *
+ * header:				Identify the mcast.
+ * fragmented:			Set if this message continues into next message
+ * continuation:		Set if this message is a continuation from last message
+ * msg_count			Indicates how many packed messages are contained
+ * 						in the mcast.
+ * Also, the size of each packed message and the messages themselves are
+ * appended to the end of this structure when sent.
+ */
 struct totempg_mcast {
 	struct totempg_mcast_header header;
-	short fragmented; /* This message continues into next message */
+	unsigned char fragmented; 
+	unsigned char continuation; 
 	short msg_count;
 	/*
 	 * short msg_len[msg_count];
@@ -114,7 +126,8 @@ struct totempg_mcast {
 /*
  * Maximum packet size for totem pg messages
  */
-#define TOTEMPG_PACKET_SIZE (TOTEMSRP_PACKET_SIZE_MAX - sizeof (struct totempg_mcast))
+#define TOTEMPG_PACKET_SIZE (TOTEMSRP_PACKET_SIZE_MAX - \
+				sizeof (struct totempg_mcast))
 
 /*
  * Local variables used for packing small messages
@@ -147,9 +160,19 @@ struct assembly {
 struct assembly *assembly_list[16]; // MAX PROCESSORS TODO
 int assembly_list_entries = 0;
 
+/*
+ * Staging buffer for packed messages.  Messages are staged in this buffer
+ * before sending.  Multiple messages may fit which cuts down on the 
+ * number of mcasts sent.  If a message doesn't completely fit, then 
+ * the mcast header has a fragment bit set that says that there are more 
+ * data to follow.  fragment_size is an index into the buffer.  It indicates
+ * the size of message data and where to place new message data.  
+ * fragment_contuation indicates whether the first packed message in 
+ * the buffer is a continuation of a previously packed fragment.
+ */
 static unsigned char fragmentation_data[TOTEMPG_PACKET_SIZE];
-
 int fragment_size = 0;
+int fragment_continuation = 0;
 
 static struct iovec iov_delv;
 
@@ -233,16 +256,20 @@ static void totempg_deliver_fn (
 	int h_index;
 	int a_i = 0;
 	int msg_count;
+	int continuation;
 
 	assembly = find_assembly (source_addr);
 	assert (assembly);
 
 	/*
-	 * Assemble the header into one block of data
-	 * Assemble the packet contents into one block of data to simplify delivery
+	 * Assemble the header into one block of data and
+	 * assemble the packet contents into one block of data to simplify delivery
 	 */
 	if (iov_len == 1) {
-		/* message originated from external processor - 1 iovec for full msg */
+		/* 
+		 * This message originated from external processor 
+		 * because there is only one iovec for the full msg.
+		 */
 		char *data;
 		int datasize;
 
@@ -263,7 +290,10 @@ static void totempg_deliver_fn (
 		memcpy (&assembly->data[assembly->index], &data[datasize],
 			iovec[0].iov_len - datasize);
 	} else {
-		/* message originated from local processor  - <1 iovec for full msg */
+		/* 
+		 * The message originated from local processor  
+		 * becasue there is greater than one iovec for then full msg.
+		 */
 		h_index = 0;
 		for (i = 0; i < 2; i++) {
 			memcpy (&header[h_index], iovec[i].iov_base, iovec[i].iov_len);
@@ -284,7 +314,6 @@ static void totempg_deliver_fn (
 	}
 
 	if (endian_conversion_required) {
-		mcast->fragmented = swab16 (mcast->fragmented);
 		mcast->msg_count = swab16 (mcast->msg_count);
 		for (i = 0; i < mcast->msg_count; i++) {
 			msg_lens[i] = swab16 (msg_lens[i]);
@@ -299,48 +328,47 @@ printf ("Message fragmented %d count %d\n", mcast->fragmented, mcast->msg_count)
 */
 
 	/*
-	 * Deliver all full messages in packed message
+	 * If the last message in the buffer is a fragment, then we
+	 * can't deliver it.  We'll first deliver the full messages
+	 * then adjust the assembly buffer so we can add the rest of the 
+	 * fragment when it arrives.
 	 */
+	msg_count = mcast->fragmented ? mcast->msg_count - 1 : mcast->msg_count;
+	continuation = mcast->continuation;
+	iov_delv.iov_base = &assembly->data[0];
+	iov_delv.iov_len = assembly->index + msg_lens[0];
 
-	/*
-	 * this message's last packed message is not a fragment
-	 */
-	if (mcast->fragmented == 0) {
-		iov_delv.iov_base = &assembly->data[0];
-		iov_delv.iov_len = assembly->index + msg_lens[0];
-		for  (i = 0; i < mcast->msg_count; i++) {
-			assembly->index += msg_lens[i];
-//printf ("app deliver\n");
-			app_deliver_fn (source_addr, &iov_delv, 1,
+	for  (i = 0; i < msg_count; i++) {
+		/*
+		 * If the first packed message is a continuation
+		 * of a previous message, but the assembly buffer
+		 * is empty, then we need to discard it since we can't
+		 * assemble a complete message.
+		 */
+		if (continuation && (assembly->index == 0)) {
+			continuation = 0;
+		} else {
+			app_deliver_fn(source_addr, &iov_delv, 1,
 				endian_conversion_required);
-			iov_delv.iov_base = &assembly->data[assembly->index];
-			iov_delv.iov_len = msg_lens[i + 1];
 		}
-		assembly->index = 0;
-	} else
-
-	/*
-	 * This message's last packed message is a fragment
-	 */
-	if (mcast->fragmented == 1) {
-		iov_delv.iov_base = &assembly->data[0];
-		iov_delv.iov_len = assembly->index + msg_lens[0];
-		for  (i = 0; i < mcast->msg_count - 1; i++) {
-			assembly->index += msg_lens[i];
-//printf ("app deliver\n");
-			app_deliver_fn (source_addr, &iov_delv, 1,
-				endian_conversion_required);
-			iov_delv.iov_base = &assembly->data[assembly->index];
+		assembly->index += msg_lens[i];
+		iov_delv.iov_base = &assembly->data[assembly->index];
+		if (i < (msg_count - 1)) {
 			iov_delv.iov_len = msg_lens[i + 1];
 		}
+	}
+
+	if (mcast->fragmented) {
 		if (mcast->msg_count > 1) {
 			memmove (&assembly->data[0],
 				&assembly->data[assembly->index],
-				msg_lens[mcast->msg_count - 1]);
+				msg_lens[msg_count]);
 
 			assembly->index = 0;
 		}
-		assembly->index += msg_lens[mcast->msg_count - 1];
+		assembly->index += msg_lens[msg_count];
+	} else {
+		assembly->index = 0;
 	}
 }
 
@@ -381,6 +409,14 @@ int callback_token_received_fn (enum totemsrp_callback_token_type type,
 		return (0);
 	}
 	mcast.fragmented = 0;
+
+	/*
+	 * Was the first message in this buffer a continuation of a
+	 * fragmented message?
+	 */
+	mcast.continuation = fragment_continuation;
+	fragment_continuation = 0;
+
 	mcast.msg_count = mcast_packed_msg_count;
 
 	iovecs[0].iov_base = &mcast;
@@ -468,6 +504,7 @@ int totempg_mcast (
 
 	for (i = 0; i < iov_len; ) {
 		mcast.fragmented = 0;
+		mcast.continuation = fragment_continuation;
 		copy_len = iovec[i].iov_len - copy_base;
 
 		/*
@@ -498,11 +535,15 @@ int totempg_mcast (
 
 			/*
 			 * if we're not on the last iovec or the iovec is too large to
-			 * fit, then indicate a fragment.
+			 * fit, then indicate a fragment. This also means that the next
+			 * message will have the continuation of this one.
 			 */
 			if ((i < (iov_len - 1)) || 
 					((copy_base + copy_len) < iovec[i].iov_len)) {
 				mcast.fragmented = 1;
+				fragment_continuation = 1;
+			} else {
+				fragment_continuation = 0;
 			}
 
 			/*