Просмотр исходного кода

Major improvements to fragmentation. It works alot better now.

(Logical change 1.39)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@114 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake 21 лет назад
Родитель
Сommit
7298c7d9c0
1 измененных файлов с 236 добавлено и 181 удалено
  1. 236 181
      exec/gmi.c

+ 236 - 181
exec/gmi.c

@@ -37,7 +37,22 @@
  *	http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5). 
  *
  * Some changes have been made to the design to support things like fragmentation,
- * multiple I/O queues, and other things.
+ * multiple I/O queues.
+ *
+ * Fragmentation Assembly Algorithm:
+ * Messages are read from the rtr list and stored in assembly queues
+ * identified by the ip address of the source of the mcast message.  Every
+ * time a fragmented message has been fully assembled, it is added to the
+ * pending delivery queue.
+
+ * Every time an item is added to the pending delivery queue:
+ * The pending delivery queue with the smallest starting sequence number
+ * is found.  If a message is waiting on that pending delivery queue, it will
+ * be delivered.  This process will be repeated until the pending delivery queue
+ * with the smallest sequence number has no pending messages.
+ * This ensures VS semantics because an assembled message is ordered vs other
+ * assembled messages based upon the first sequence number of the collection of
+ * packets.
  */
 
 #include <assert.h>
@@ -69,15 +84,16 @@
 #include "../include/sq.h"
 
 #define LOCALHOST_IP				inet_addr("127.0.0.1")
-#define QUEUE_PEND_DELV_SIZE_MAX	((MESSAGE_SIZE_MAX / 1472) + 1) * 2
-#define QUEUE_RTR_ITEMS_SIZE_MAX	512
+#define QUEUE_PEND_SIZE_MAX			50
+#define QUEUE_ASSEMBLY_SIZE_MAX		((MESSAGE_SIZE_MAX / 1472) + 1)
+#define QUEUE_RTR_ITEMS_SIZE_MAX	8192
 #define QUEUE_PEND_TRANS_SIZE_MAX	((MESSAGE_SIZE_MAX / 1472) + 1) * 500
 #define MAXIOVS						8
 #define RTR_TOKEN_SIZE_MAX			32
 #define MISSING_MCAST_WINDOW		64
 #define TIMEOUT_STATE_GATHER		100
-#define TIMEOUT_TOKEN				100
-#define TIMEOUT_TOKEN_RETRANSMIT	50
+#define TIMEOUT_TOKEN				200
+#define TIMEOUT_TOKEN_RETRANSMIT	100	
 #define TIMEOUT_STATE_COMMIT		100
 #define MAX_MEMBERS					16
 #define HOLE_LIST_MAX				MISSING_MCAST_WINDOW
@@ -115,14 +131,31 @@ struct queue queues_pend_trans[PRIORITY_MAX];
 /*
  * In-order pending delivery queue
  */
-struct pend_delv {
-	struct in_addr ip;
+struct assembly_queue_item {
+	struct iovec iovec[MAXIOVS];
+	int iov_len;
+};
+
+struct assembly_queue {
 	int seqid;
 	int first_delivery;
 	struct queue queue;
 };
 
-struct pend_delv queues_pend_delv[MAX_MEMBERS];
+struct pend_queue_item {
+	int seqid;
+	struct iovec iovec[256];
+	int iov_len;
+};
+
+struct queue_frag {
+	int seqid;
+	struct in_addr source_addr;
+	struct assembly_queue assembly;
+	struct queue pend_queue;
+};
+ 
+struct queue_frag queues_frag[MAX_MEMBERS];
 
 /*
  * Sorted delivery/retransmit queue
@@ -280,11 +313,6 @@ struct gmi_pend_trans_item {
 	int iov_len;
 };
 
-struct gmi_pend_delv_item {
-	struct iovec iovec[MAXIOVS];
-	int iov_len;
-};
-
 struct gmi_rtr_item {
 	struct iovec iovec[MAXIOVS+2]; /* +2 is for mcast msg + group name  TODO is this right */
 	int iov_len;
@@ -382,7 +410,7 @@ static int memb_state_gather_enter (void);
 static void pending_queues_deliver (void);
 static int orf_token_mcast (struct orf_token *orf_token,
 	int fcc_mcasts_allowed, struct sockaddr_in *system_from);
-static void queues_pend_delv_memb_new (void);
+static void queues_queue_frag_memb_new ();
 static void calculate_group_arut (struct orf_token *orf_token);
 static int messages_free (int group_arut);
 static int orf_token_send (struct orf_token *orf_token, int reset_timer);
@@ -463,7 +491,7 @@ printf ("mcast is %d token is %d\n", gmi_fd_mcast, gmi_fd_token);
 
 	memset (&memb_next, 0, sizeof (struct sockaddr_in));
 
-	queues_pend_delv_memb_new ();
+	queues_queue_frag_memb_new ();
 
 	return (0);
 }
@@ -1256,18 +1284,23 @@ static void orf_token_fcc (
 	orf_token->fcc = orf_token->fcc - fcc_mcast_last - fcc_remcast_last
 		+ fcc_mcast_current + fcc_remcast_current;
 
+//printf ("orf token fcc is %d %d %d %d %d\n", orf_token->fcc, fcc_mcast_last,
+//	fcc_remcast_last, fcc_mcast_current, fcc_remcast_current);
+
 	fcc_mcast_last = fcc_mcast_current;
 	fcc_remcast_last = fcc_remcast_current;
+	fcc_mcast_current = 0;
+	fcc_remcast_current = 0;
 }
 
-static void queues_pend_delv_memb_new (void)
+static void queues_queue_frag_memb_new (void)
 {
-	struct pend_delv pend_delv_new[MAX_MEMBERS];
+	struct queue_frag queues_frag_new[MAX_MEMBERS];
 	int item_index = 0;
 	int i, j;
 	int found;
 
-	memset (pend_delv_new, 0, sizeof (struct pend_delv) * MAX_MEMBERS);
+	memset (queues_frag_new, 0, sizeof (struct queue_frag) * MAX_MEMBERS);
 
 	/*
 	 * Build new pending list
@@ -1278,9 +1311,9 @@ static void queues_pend_delv_memb_new (void)
 			/*
 			 * If membership item in queues pending delivery list, copy it
 			 */
-			if (memb_list[i].sin_addr.s_addr == queues_pend_delv[j].ip.s_addr) {
-				memcpy (&pend_delv_new[item_index], &queues_pend_delv[j],
-					sizeof (struct pend_delv));
+			if (memb_list[i].sin_addr.s_addr == queues_frag[j].source_addr.s_addr) {
+				memcpy (&queues_frag_new[item_index], &queues_frag[j],
+					sizeof (struct queue_frag));
 				item_index += 1;
 				found = 1;
 				break; /* for j = */
@@ -1290,10 +1323,15 @@ static void queues_pend_delv_memb_new (void)
 		 * If membership item not found in pending delivery list, make new entry
 		 */
 		if (found == 0) {
-			queue_init (&pend_delv_new[item_index].queue, QUEUE_PEND_DELV_SIZE_MAX,
-				sizeof (struct gmi_pend_delv_item));
-			pend_delv_new[item_index].seqid = 0;
-			pend_delv_new[item_index].ip.s_addr = memb_list[i].sin_addr.s_addr;
+			queue_init (&queues_frag_new[item_index].assembly.queue,
+				QUEUE_ASSEMBLY_SIZE_MAX,
+				sizeof (struct assembly_queue_item));
+			queue_init (&queues_frag_new[item_index].pend_queue,
+				QUEUE_PEND_SIZE_MAX, sizeof (struct pend_queue_item));
+			queues_frag_new[item_index].assembly.seqid = 0;
+			queues_frag_new[item_index].source_addr.s_addr =
+				memb_list[i].sin_addr.s_addr;
+printf ("New queue for ip %s\n", inet_ntoa (queues_frag_new[item_index].source_addr));
 			item_index += 1;
 		}
 	}
@@ -1301,9 +1339,14 @@ static void queues_pend_delv_memb_new (void)
 	/*
 	 * Copy new list into system list
 	 */
-	memcpy (queues_pend_delv, pend_delv_new,
-		sizeof (struct pend_delv) * MAX_MEMBERS);
+	memcpy (queues_frag, queues_frag_new,
+		sizeof (struct queue_frag) * MAX_MEMBERS);
 
+	for (i = 0; i < memb_list_entries_confchg; i++) {
+		queues_frag[i].seqid = 0;
+		queues_frag[i].assembly.seqid = 0;
+	}
+#ifdef TODO
 	for (i = 0; i < memb_list_entries_confchg; i++) {
 		/*
 		 * If queue not empty, mark it for first delivery
@@ -1315,6 +1358,7 @@ static void queues_pend_delv_memb_new (void)
 			queues_pend_delv[i].seqid = 0;
 		}
 	}
+#endif
 }
 
 static int orf_token_evs (
@@ -1370,7 +1414,7 @@ printf ("EVS STATE group arut %d gmi arut %d highest %d barrier %d starting grou
 		gmi_adut_old = gmi_adut;
 		gmi_adut = 0;
 
-		gmi_token_seqid = 0;
+//		gmi_token_seqid = 0;
 
 		gmi_highest_seq_old = gmi_highest_seq;
 		gmi_highest_seq = 0;
@@ -1472,7 +1516,7 @@ printf ("EVS STATE group arut %d gmi arut %d highest %d barrier %d starting grou
 		 */
 		memb_list_entries = memb_form_token.member_list_entries;
 		memb_list_entries_confchg = memb_list_entries;
-		queues_pend_delv_memb_new ();
+		queues_queue_frag_memb_new ();
 
 		/*
 		 * Install new conf id
@@ -1495,8 +1539,8 @@ printf ("EVS STATE group arut %d gmi arut %d highest %d barrier %d starting grou
 	return (0);
 }
 
-int gwin = 90;
-int pwin = 45;
+int gwin = 80;
+int pwin = 20;
 
 
 static int orf_fcc_allowed (struct orf_token *token)
@@ -2398,17 +2442,18 @@ static int memb_state_gather_enter (void) {
 	return (res);
 }
 
-struct pend_delv *pend_delv_next_delivery_find (void)
+struct queue_frag *queue_frag_delivery_find (void)
 {
-	struct pend_delv *pend_delv = 0;
+	struct queue_frag *queue_frag = 0;
 	int i;
 
+#ifdef ABBA
 	/*
 	 * Find first_delivery queue that is not empty
 	 * this sets the first pend_delv
 	 */
 	for (i = 0; i < memb_list_entries_confchg; i++) {
-		if (queues_pend_delv[i].first_delivery && 
+		if (queues_frag[i].first_delivery && 
 			queue_is_empty (&queues_pend_delv[i].queue) == 0) {
 
 			pend_delv = &queues_pend_delv[i];
@@ -2423,12 +2468,12 @@ struct pend_delv *pend_delv_next_delivery_find (void)
 	 */
 	for (++i; i < memb_list_entries_confchg; i++) {
 		assert (pend_delv);
-		if (queues_pend_delv[i].first_delivery &&
-			(queue_is_empty (&queues_pend_delv[i].queue) == 0) &&
+		if (queues_frag[i].first_delivery &&
+			(queue_is_empty (&queues_frag[i].queue) == 0) &&
 			(queues_pend_delv[i].seqid < pend_delv->seqid)) {
 
 			pend_delv = &queues_pend_delv[i];
-//			printf ("Selecting first queue %s\n", inet_ntoa (pend_delv->ip));
+//			printf ("Selecting first from %d in second phase %s\n", i,  inet_ntoa (pend_delv->ip));
 		}
 	}
 		
@@ -2438,13 +2483,21 @@ struct pend_delv *pend_delv_next_delivery_find (void)
 	if (pend_delv) {
 		return (pend_delv);
 	}
+#endif
+
 	/*
 	 * No first delivery queues, repeat same
 	 * process looking for any queue
 	 */
 	for (i = 0; i < memb_list_entries_confchg; i++) {
-		if (queue_is_empty (&queues_pend_delv[i].queue) == 0) {
-			pend_delv = &queues_pend_delv[i];
+#ifdef DEBUG
+printf ("Queue empty[%d] %d queues seqid %d\n", i,
+	queue_is_empty (&queues_frag[i].pend_queue),
+	queues_frag[i].seqid);
+#endif
+		if (queue_is_empty (&queues_frag[i].pend_queue) == 0 ||
+			queue_is_empty (&queues_frag[i].assembly.queue) == 0) {
+			queue_frag = &queues_frag[i];
 			break;
 		}
 	}
@@ -2453,184 +2506,184 @@ struct pend_delv *pend_delv_next_delivery_find (void)
 	 * Find lowest sequence number queue
 	 */
 	for (++i; i < memb_list_entries_confchg; i++) {
-		assert (pend_delv);
-		if ((queue_is_empty (&queues_pend_delv[i].queue) == 0) &&
-			(queues_pend_delv[i].seqid < pend_delv->seqid)) {
-			pend_delv = &queues_pend_delv[i];
+		assert (queue_frag);
+#ifdef DEBUG
+printf ("Queue empty[%d] %d queues seqid %d lowest so far %d\n", i,
+	queue_is_empty (&queues_frag[i].pend_queue),
+	queues_frag[i].seqid, queues_frag->seqid);
+#endif
+		if (queue_is_empty (&queues_frag[i].pend_queue) == 0 &&
+			(queues_frag[i].seqid < queue_frag->seqid)) {
+			queue_frag = &queues_frag[i];
+		}
+		if (queue_is_empty (&queues_frag[i].assembly.queue) == 0 &&
+			(queues_frag[i].assembly.seqid < queue_frag->seqid)) {
+//printf ("assembly seqid is %d\n",
+//			queues_frag[i].assembly.seqid);
+				queue_frag = &queues_frag[i];
 		}
 	}
 
-	return (pend_delv);
+	return (queue_frag);
+}
+
+/*
+ * This delivers all available messages that can be delivered in VS semantics
+ * from the fragmentation pend queue to the registered deliver function
+ */
+static void app_deliver (void) {
+	struct queue_frag *queue_frag;
+	struct pend_queue_item *pend_queue_item;
+
+	do {
+		queue_frag = queue_frag_delivery_find ();
+		if (queue_frag == 0) {
+			break;
+		}
+assert (queue_frag);
+
+		/*
+		 * There is an assembly taking place that was selected but its not completed
+		 */
+		if (queue_is_empty (&queue_frag->pend_queue) == 1) {
+			break;
+		}
+
+//printf ("Delivering from pending queue %s seq id %d\n", inet_ntoa (queue_frag->source_addr), queue_frag->seqid);
+
+		pend_queue_item = queue_item_get (&queue_frag->pend_queue);
+		assert (pend_queue_item);
+		queue_item_remove (&queue_frag->pend_queue);	
+
+//&mcast->groupname, /* TODO figure out how to pass this from the frag queue */
+		gmi_deliver_fn (
+			0,
+			queue_frag->source_addr,
+			pend_queue_item->iovec,
+			pend_queue_item->iov_len);
+
+		/*
+		 * Release messages that can be freed
+		 */
+		gmi_adut = queue_frag->seqid;
+
+		/*
+		 * Reset lowest seqid for this pending queue from next assembled message
+		 */
+		if (queue_is_empty (&queue_frag->pend_queue) == 0) {
+			pend_queue_item = queue_item_get (&queue_frag->pend_queue);
+			queue_frag->seqid = pend_queue_item->seqid;
+		}
+	} while (queue_frag);
+
 }
 
-static int user_deliver ()
+/*
+ * This delivers an assembled message into the fragmentation pend queue
+ * This must only be called once the full message has been assembled
+ */
+static void assembly_deliver (struct queue_frag *queue_frag)
 {
-	struct gmi_pend_delv_item *pend_delv_item;
-	int i = 0;
+	struct assembly_queue_item *assembly_queue_item;
+	struct pend_queue_item pend_queue_item;
 	int res = 0;
 	struct iovec iovec_delv[256];
 	int iov_len_delv = 0;
 	struct mcast *mcast = 0;
-	int messages_delivered = 0;
-	struct pend_delv *pend_delv;
-	int retval = 0;
 
-	/*
-	 * Find pend_delv with lowest sequence number.  This pend_delv is
-	 * the queue that should be delivered from next
-	 */
-	pend_delv = pend_delv_next_delivery_find ();
-	assert (pend_delv); // TODO this assertion fails sometimes
-//printf ("Delivering from queue %s\n", inet_ntoa (pend_delv->ip));
+	memset (iovec_delv, 0, sizeof (iovec_delv));
+
+	queue_item_iterator_init (&queue_frag->assembly.queue);
+	assert (queue_is_empty (&queue_frag->assembly.queue) == 0);
+
+	assembly_queue_item = queue_item_iterator_get (&queue_frag->assembly.queue);
 
 	/*
-	 * If a message was not assembled on the queue with the lowest
-	 * sequence number, return since there is no reason to attempt assembly.
+	 * Assemble all of the message iovectors into one iovector for delivery
 	 */
-	memset (iovec_delv, 0, sizeof (iovec_delv));
-	queue_item_iterator_init (&pend_delv->queue);
-	assert (queue_is_empty (&pend_delv->queue) == 0);
-//printf ("Starting a packet assembly\n");
 	do {
-		pend_delv_item = queue_item_iterator_get (&pend_delv->queue);
-		mcast = pend_delv_item->iovec[0].iov_base;
-
-		assert (pend_delv_item);
-		assert (pend_delv_item->iovec[0].iov_len < MESSAGE_SIZE_MAX);
-		assert (pend_delv_item->iovec[0].iov_len != 0);
-		assert (pend_delv_item->iovec[0].iov_base != 0);
-		assert (mcast != (struct mcast *)0xdeadbeef);
-		assert (pend_delv->ip.s_addr == mcast->source.s_addr);
-
-		messages_delivered += 1;
+		assembly_queue_item = queue_item_iterator_get (&queue_frag->assembly.queue);
 
 		/*
 		 * Assemble io vector
 		 */
-		if (pend_delv_item->iovec[0].iov_len == sizeof (struct mcast)) {
+		if (assembly_queue_item->iov_len != 1 &&
+			assembly_queue_item->iovec[0].iov_len == sizeof (struct mcast)) {
 			/*
 			 * Copy iovec from second iovec if this is self-delivered
 			 */
 			memcpy (&iovec_delv[iov_len_delv],
-				&pend_delv_item->iovec[1],
-				sizeof (struct iovec) * pend_delv_item->iov_len - 1);
-			iov_len_delv += pend_delv_item->iov_len - 1;
+				&assembly_queue_item->iovec[1],
+				sizeof (struct iovec) * assembly_queue_item->iov_len - 1);
+			iov_len_delv += assembly_queue_item->iov_len - 1;
 		} else {
 			/*
 			 * Copy iovec from first iovec if this is an external message
 			 */
 			iovec_delv[iov_len_delv].iov_base =
-				pend_delv_item->iovec[0].iov_base + sizeof (struct mcast);
+				assembly_queue_item->iovec[0].iov_base + sizeof (struct mcast);
 			iovec_delv[iov_len_delv].iov_len =
-				pend_delv_item->iovec[0].iov_len - sizeof (struct mcast);
+				assembly_queue_item->iovec[0].iov_len - sizeof (struct mcast);
 			assert (iovec_delv[iov_len_delv].iov_len < MESSAGE_SIZE_MAX);
 			iov_len_delv += 1;
-			if (pend_delv_item->iov_len > 1) {
+			if (assembly_queue_item->iov_len > 1) {
 				memcpy (&iovec_delv[iov_len_delv],
-					&pend_delv_item->iovec[1],
-					sizeof (struct iovec) * pend_delv_item->iov_len - 1);
-				iov_len_delv += pend_delv_item->iov_len - 1;
+					&assembly_queue_item->iovec[1],
+					sizeof (struct iovec) * assembly_queue_item->iov_len - 1);
+				iov_len_delv += assembly_queue_item->iov_len - 1;
 			}
 		}
-
 		assert (iov_len_delv < 256);
 		assert (iov_len_delv > 0);
 
-//printf ("Assembling from packet %d of %d of total %d\n",
-//	mcast->packet_number, mcast->packet_count, mcast->packet_seq);
-		/*
-		 * Deliver message if this is the last packet
-		 */
-		if (mcast->packet_number == mcast->packet_count) {
-			gmi_log_printf (gmi_log_level_debug, "Last packet, delivering iovec %d entries seq %d\n",
-				iov_len_delv, i);
-
-			if (gmi_deliver_fn) {
-				gmi_deliver_fn (
-					&mcast->groupname,
-					pend_delv->ip,
-					iovec_delv,
-					iov_len_delv);
-			}
+		res = queue_item_iterator_next (&queue_frag->assembly.queue);
+	} while (res == 0);
 
-			/*
-			 * On the first message delivery:
-			 * Free items in the pending queue up to the barrier message
-			 * set gmi_adut to rut so that message_free may free any messages.
-			 */
-			if (pend_delv->first_delivery) {
-//				printf ("releasing all messages up to %d\n", gmi_adut);
-// TODO actually release the messages from the previous configuration
-// TODO without a fix here, those messages are leaked
-			}
+	/*
+	 * assert that this really is the end of the packet
+	 */
+	mcast = assembly_queue_item->iovec[0].iov_base;
+	assert (mcast->packet_number == mcast->packet_count);
 
-			/*
-			 * Because of the ordering guarantees, we are guaranteed that 
-			 * pend_delv->seqid on every invocation of user_deliver shall
-			 * increase (or reset to zero).  This allows us to set the
-			 * low water mark (gmi_adut) for freeing of messages to atleast
-			 * the beginning of this message.
-			 */ 
-			gmi_adut = pend_delv->seqid;
+	memcpy (pend_queue_item.iovec, iovec_delv,
+		sizeof (pend_queue_item.iovec));
+	pend_queue_item.iov_len = iov_len_delv;
+	pend_queue_item.seqid = queue_frag->assembly.seqid;
 
-			/*
-			 * Determine if there are more messages on this queue
-			 */
-			res = queue_item_iterator_next (&pend_delv->queue);
-			if (res == 0) {
-				/*
-				 * More items to deliver set queues seqid head so
-				 * correct pending queue can be selected next time
-				 */
-				pend_delv_item = queue_item_iterator_get (&pend_delv->queue);
-				mcast = pend_delv_item->iovec[0].iov_base;
-				pend_delv->seqid = mcast->header.seqid;
-				for (i = 0; i < messages_delivered; i++) {
-					queue_item_remove (&pend_delv->queue);
-				}
-			} else {
-				/*
-				 * No more items to deliver
-				 */
-				pend_delv->seqid = 0;
-				queue_reinit (&pend_delv->queue);
-			}
-			
-			retval = 1;
-			break; /* From do loop */
-		}
+	/*
+	 * Add IO vector to pend queue
+	 */
+//printf ("assembling message for %s\n", inet_ntoa (queue_frag->source_addr));
+	queue_item_add (&queue_frag->pend_queue, &pend_queue_item);
 
-		res = queue_item_iterator_next (&pend_delv->queue);
-	} while (res == 0);
-	return (retval);
+	queue_reinit (&queue_frag->assembly.queue);
+
+	app_deliver ();
 }
 
-struct pend_delv *pend_delv_find (struct in_addr source)
+struct queue_frag *pend_delv_find (struct in_addr source)
 {
-	struct pend_delv *pend_delv = 0;
+	struct queue_frag *queue_frag = 0;
 	int i;
 
 	for (i = 0; i < memb_list_entries_confchg; i++) {
-		if (source.s_addr == queues_pend_delv[i].ip.s_addr) {
-			pend_delv = &queues_pend_delv[i];
+		if (source.s_addr == queues_frag[i].source_addr.s_addr) {
+			queue_frag = &queues_frag[i];
 			break;
 		}
 	}
 
-	return (pend_delv);
+	return (queue_frag);
 }
 
-static int delivery_outstanding = 0;
-
 static void pending_queues_deliver (void)
 {
 	struct gmi_rtr_item *gmi_rtr_item_p;
 	int i;
 	int res;
 	struct mcast *mcast;
-	struct gmi_pend_delv_item pend_delv_item;
-	struct pend_delv *pend_delv;
-	int delivered;
+	struct assembly_queue_item assembly_queue_item;
+	struct queue_frag *queue_frag;
 
 //printf ("Delivering messages to pending queues\n");
 	/*
@@ -2658,48 +2711,42 @@ static void pending_queues_deliver (void)
 			"Delivering MCAST message with seqid %d to pending delivery queue\n",
 			mcast->header.seqid);
 
-//printf ("Delivering MCAST from packet %d of %d of total %d seqid %d\n", mcast->packet_number, mcast->packet_count, mcast->packet_seq, mcast->header.seqid);
 		gmi_arut = i;
 
 		/*
 		 * Create pending delivery item
 		 */
-		pend_delv_item.iov_len = gmi_rtr_item_p->iov_len;
-		memcpy (&pend_delv_item.iovec, gmi_rtr_item_p->iovec,
+		assembly_queue_item.iov_len = gmi_rtr_item_p->iov_len;
+		memcpy (&assembly_queue_item.iovec, gmi_rtr_item_p->iovec,
 			sizeof (struct iovec) * gmi_rtr_item_p->iov_len);
 		assert (gmi_rtr_item_p->iov_len < MAXIOVS);
 
 		assert (mcast->source.s_addr != 0);
-		pend_delv = pend_delv_find (mcast->source);
-
-		if (pend_delv == 0) {
-			printf ("mcast source is %s\n", inet_ntoa (mcast->source));
-		}
-		assert (pend_delv != 0);
-		assert (pend_delv->ip.s_addr != 0);
+		queue_frag = pend_delv_find (mcast->source);
 
+		/*
+		 * Setup sequence id numbers for use in assembly and delivery
+		 */
 		if (mcast->packet_number == 0) {
-			pend_delv->seqid = mcast->header.seqid;
+			queue_frag->assembly.seqid = mcast->header.seqid;
+//			printf ("Setting %s assembly seqid to %d\n",
+//				inet_ntoa (queue_frag->source_addr), queue_frag->assembly.seqid);
+
+			if (queue_is_empty (&queue_frag->pend_queue) == 1) {
+				queue_frag->seqid = mcast->header.seqid;
+			}
 		}
 
 		/*
-		 * Add pending delivery item to pending delivery queue
+		 * Add pending delivery item to assembly queue
 		 */
-		queue_item_add (&pend_delv->queue, &pend_delv_item);
+		queue_item_add (&queue_frag->assembly.queue, &assembly_queue_item);
 
 		/*
-		 * If message is complete, attempt delivery of all messages
-		 * that are currently outstanding
+		 * If message is complete, deliver to user the pending delivery message
 		 */
 		if (mcast->packet_number == mcast->packet_count) {
-//printf ("Starting delivery\n");
-			delivery_outstanding += 1;
-			do {
-				delivered = user_deliver ();
-				if (delivered) {
-					delivery_outstanding -= 1;
-				}
-			} while (delivery_outstanding && delivered);
+			assembly_deliver (queue_frag);
 		}
 	}
 //printf ("Done delivering messages to pending queues\n");
@@ -3007,6 +3054,14 @@ printf ("Got membership form token\n");
 		memb_state = MEMB_STATE_EVS;
 		memb_form_token_update_highest_seq (&memb_form_token);
 
+		/*
+		 * Reset flow control local variables since we are starting a new token
+		 */
+		fcc_mcast_current = 0;
+		fcc_remcast_current = 0;
+		fcc_mcast_last = 0;
+		fcc_remcast_last = 0;
+
 		/*
 		 * FORM token has rotated once, now install local variables
 		 *