Ver Fonte

Add recovery plug support to group messaging.

(Logical change 1.74)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@263 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake há 21 anos atrás
pai
commit
7239bf25ad
1 ficheiros alterados com 408 adições e 62 exclusões
  1. 408 62
      exec/gmi.c

+ 408 - 62
exec/gmi.c

@@ -82,6 +82,8 @@
 #include "gmi.h"
 #include "../include/queue.h"
 #include "../include/sq.h"
+#include "../include/list.h"
+#include "hdb.h"
 
 #include "crypto.h"
 #define AUTHENTICATION 1 /* use authentication */
@@ -93,7 +95,7 @@
 #define QUEUE_RTR_ITEMS_SIZE_MAX	256
 #define QUEUE_PEND_TRANS_SIZE_MAX	((MESSAGE_SIZE_MAX / 1472) + 1)
 #define MAXIOVS						4
-#define RTR_TOKEN_SIZE_MAX			32
+#define RETRANSMIT_ENTRIES_MAX		50
 #define MISSING_MCAST_WINDOW		64
 #define TIMEOUT_STATE_GATHER		100
 #define TIMEOUT_TOKEN				1500
@@ -113,6 +115,15 @@ prng_state gmi_prng_state;
 unsigned char gmi_private_key[1024];
 unsigned int gmi_private_key_len;
 
+enum plug_state {
+	GMI_PLUG_PROCESSOR_PLUGGED, /* this processor is waiting for recovery */
+	GMI_PLUG_PROCESSOR_REQUEST_UNPLUG, /* this processor should be unplugged at next token posession */
+	GMI_PLUG_PROCESSOR_UNPLUGGED, /* this processor is done with recovery waiting for group */
+	GMI_PLUG_GROUP_UNPLUGGED /* all processors in group have recovered */
+};
+
+enum plug_state plug_state;
+
 int stats_sent = 0;
 int stats_recv = 0;
 int stats_delv = 0;
@@ -209,6 +220,22 @@ struct gmi_socket gmi_sockets[2];
  */
 int gmi_arut = 0;
 
+struct plug_instance {
+	int plugged;
+	struct list_head list;
+};
+
+/*
+ * All instances in one database
+ */
+static struct saHandleDatabase plug_instance_database = {
+	.handleCount				= 0,
+	.handles					= 0,
+	.handleInstanceDestructor	= 0
+};
+
+DECLARE_LIST_INIT (plug_listhead);
+
 /*
  * Delivered up to and including
  */
@@ -226,7 +253,9 @@ int gmi_fragment = 0;
 
 int gmi_pend_queue_priority = 0;
 
-struct orf_token orf_token_retransmit;
+char orf_token_retransmit[PACKET_SIZE_MAX];
+
+int orf_token_retransmit_size;
 
 int gmi_token_seqid = 0;
 
@@ -308,8 +337,10 @@ struct orf_token {
 	int group_arut;
 	struct in_addr addr_arut;
 	short int fcc;
-	struct rtr_item rtr_list[RTR_TOKEN_SIZE_MAX];
 	int rtr_list_entries;
+	int bitmap_bytes;
+	/* struct rtr_item rtr_list[0] goes here */
+	/* bitmap_bytes goes here */
 };
 
 struct conf_desc {
@@ -433,6 +464,7 @@ void (*gmi_deliver_fn) (
 	int iov_len) = 0;
 
 void (*gmi_confchg_fn) (
+	enum gmi_configuration_type configuration_type,
 	struct sockaddr_in *member_list, int member_list_entries,
 	struct sockaddr_in *left_list, int left_list_entries,
 	struct sockaddr_in *joined_list, int joined_list_entries) = 0;
@@ -459,7 +491,8 @@ static int orf_token_mcast (struct orf_token *orf_token,
 static void queues_queue_frag_memb_new ();
 static void calculate_group_arut (struct orf_token *orf_token);
 static int messages_free (int group_arut);
-static int orf_token_send (struct orf_token *orf_token, int reset_timer);
+
+static int orf_token_send (struct orf_token *orf_token, struct rtr_item *rtr_list, unsigned char *plug_bitmap, int reset_timer);
 static void encrypt_and_sign (struct iovec *iovec, int iov_len);
 static int authenticate_and_decrypt (struct iovec *iov);
 static int recv_handler (poll_handle handle, int fd, int revents, void *data, unsigned int *prio);
@@ -593,6 +626,7 @@ int gmi_join (
 		struct iovec *iovec,
 		int iov_len),
 	void (*confchg_fn) (
+		enum gmi_configuration_type configuration_type,
 		struct sockaddr_in *member_list, int member_list_entries,
 		struct sockaddr_in *left_list, int left_list_entries,
 		struct sockaddr_in *joined_list, int joined_list_entries),
@@ -1034,6 +1068,126 @@ int gmi_send_ok (
 	return (1);
 }
 
+int gmi_recovery_plug_create (
+	gmi_recovery_plug_handle *handle_plug)
+{
+    struct plug_instance *plug_instance;
+    SaErrorT error;
+
+	error = saHandleCreate (&plug_instance_database,
+		sizeof (struct plug_instance), handle_plug);
+	if (error != SA_OK) {
+		goto error_exit;
+	}
+	error = saHandleInstanceGet (&plug_instance_database,
+		*handle_plug,
+		(void *)&plug_instance);
+	if (error != SA_OK) {
+		goto error_destroy;
+	}
+
+	list_add (&plug_instance->list, &plug_listhead);
+
+	plug_instance->plugged = 0;
+
+	saHandleInstancePut (&plug_instance_database, *handle_plug);
+
+	return (0);
+
+error_destroy:
+	saHandleDestroy (&plug_instance_database, *handle_plug);
+
+error_exit:
+	return (-1);
+}
+
+int gmi_recovery_plug_destroy (
+	gmi_recovery_plug_handle handle_plug)
+{
+    struct plug_instance *plug_instance;
+    SaErrorT error;
+
+	error = saHandleInstanceGet (&plug_instance_database,
+		handle_plug, (void *)&plug_instance);
+	if (error != SA_OK) {
+		goto error_exit;
+	}
+
+	list_del (&plug_instance->list);
+
+	saHandleDestroy (&plug_instance_database, handle_plug);
+
+	saHandleInstancePut (&plug_instance_database, handle_plug);
+
+	return (0);
+error_exit:
+	return (-1);
+}
+
+int gmi_recovery_plug_unplug (
+	gmi_recovery_plug_handle handle_plug)
+{
+	SaErrorT error;
+	struct plug_instance *plug_instance;
+	struct list_head *list;
+	int plugged = 0;
+
+	error = saHandleInstanceGet (&plug_instance_database,
+		handle_plug, (void *)&plug_instance);
+	if (error != SA_OK) {
+		goto error_exit;
+	}
+
+	plug_instance->plugged = 0;
+
+	saHandleInstancePut (&plug_instance_database, handle_plug);
+
+	/*
+	 * Determine if all plugs have been unplugged
+	 */
+	for (list = plug_listhead.next; list != &plug_listhead;
+		list = list->next) {
+
+		plug_instance = list_entry (list, struct plug_instance, list);
+
+		if (plug_instance->plugged) {
+			plugged = 1;
+		}
+	}
+
+	if (plugged == 0) {
+		gmi_log_printf (gmi_log_level_notice, "All services unplugged, unplugging processor\n");
+		if (memb_list_entries == 1) {
+				plug_state = GMI_PLUG_GROUP_UNPLUGGED;
+				gmi_log_printf (gmi_log_level_notice, "All processors unplugged, allowing messages to be transmitted.\n");
+		} else {
+			plug_state = GMI_PLUG_PROCESSOR_REQUEST_UNPLUG;
+		}
+	}
+error_exit:
+	return (0);
+}
+
+void recovery_plug_set (void)
+{
+	struct list_head *list;
+	struct plug_instance *plug_instance;
+
+	/*
+	 * Determine if all plugs have been unplugged
+	 */
+	for (list = plug_listhead.next; list != &plug_listhead;
+		list = list->next) {
+
+		plug_instance = list_entry (list, struct plug_instance, list);
+
+		plug_instance->plugged = 1;
+	}
+
+	plug_state = GMI_PLUG_PROCESSOR_PLUGGED;
+}
+
+
 static int netif_determine (struct sockaddr_in *bindnet,
 	struct sockaddr_in *bound_to)
 {
@@ -1354,7 +1508,7 @@ static int orf_token_mcast (
 		 * from if this is not a message fragment
 		 */
 		if (gmi_fragment == 0) {
-			gmi_pend_queue_priority = 0;
+			gmi_pend_queue_priority = GMI_PRIO_RECOVERY;
 			do {
 				queue_pend_trans = &queues_pend_trans[gmi_pend_queue_priority];
 
@@ -1369,6 +1523,11 @@ static int orf_token_mcast (
 		if (gmi_pend_queue_priority == PRIORITY_MAX) {
 			break; /* all queues are empty, break from for */
 		}
+		if (plug_state != GMI_PLUG_GROUP_UNPLUGGED &&
+			gmi_pend_queue_priority != GMI_PRIO_RECOVERY) {
+
+			break; /* group isn't unplugged and this isn't a recovery message */
+		}
 //		printf ("selecting pending queue %d\n", gmi_pend_queue_priority);
 
 		gmi_pend_trans_item = (struct gmi_pend_trans_item *)queue_item_get (queue_pend_trans);
@@ -1486,30 +1645,45 @@ static int orf_token_mcast (
  */
 static void orf_token_rtr (
 	struct orf_token *orf_token,
+	struct rtr_item *rtr_list_new,
 	int *fcc_allowed)
 {
 	int res;
 	int i, j;
 	int found;
+	int index_old = 0;
+	int index_new = 0;
+	struct rtr_item *rtr_list_old;
+	int total_entries;
+
+	rtr_list_old = (struct rtr_item *)(((char *)orf_token) + sizeof (struct orf_token));
 
-#ifdef COMPLE_OUT
+/*
+if (orf_token->rtr_list_entries) {
 printf ("Retransmit List %d\n", orf_token->rtr_list_entries);
 for (i = 0; i < orf_token->rtr_list_entries; i++) {
-	printf ("%d ", orf_token->rtr_list[i].seqid);
+	printf ("%d ", rtr_list_old[i].seqid);
 }
 printf ("\n");
-#endif
+}
+*/
+
+	total_entries = orf_token->rtr_list_entries;
 
 	/*
 	 * Retransmit messages on orf_token's RTR list from RTR queue
 	 */
-	for (fcc_remcast_current = 0, i = 0;
-		fcc_remcast_current <= *fcc_allowed && i < orf_token->rtr_list_entries;) {
+	for (fcc_remcast_current = 0, index_old = 0;
+		fcc_remcast_current <= *fcc_allowed &&
+		index_old < total_entries;) {
+
+		assert (index_new < RETRANSMIT_ENTRIES_MAX);
+
 #ifdef COMPILE_OUT
 printf ("%d.%d.%d vs %d.%d.%d\n",
-	orf_token->rtr_list[i].conf_id.rep.s_addr,
-	orf_token->rtr_list[i].conf_id.tv.tv_sec,
-	orf_token->rtr_list[i].conf_id.tv.tv_usec,
+	rtr_list_old[index_old].conf_id.rep.s_addr,
+	rtr_list_old[index_old].conf_id.tv.tv_sec,
+	rtr_list_old[index_old].conf_id.tv.tv_usec,
 	memb_form_token_conf_id.rep.s_addr,
 	memb_form_token_conf_id.tv.tv_sec,
 	memb_form_token_conf_id.tv.tv_usec);
@@ -1518,32 +1692,60 @@ printf ("%d.%d.%d vs %d.%d.%d\n",
 		 * If this retransmit request isn't from this configuration,
 		 * try next rtr entry
 		 */
- 		if (memcmp (&orf_token->rtr_list[i].conf_id, &memb_form_token_conf_id,
+ 		if (memcmp (&rtr_list_old[index_old].conf_id, &memb_form_token_conf_id,
 			sizeof (struct memb_conf_id)) != 0) {
 
-			i++;
+			/*
+			 * Copy retransmit request to new retransmit list because its a
+			 * retransmit request for another configuration
+			 */
+			memcpy (&rtr_list_new[index_new],
+				&rtr_list_old[index_old],
+				sizeof (struct rtr_item));
+
+			index_old += 1;
+			index_new += 1;
 			continue;
 		}
-		assert (orf_token->rtr_list[i].seqid > 0);
-		res = orf_token_remcast (orf_token->rtr_list[i].seqid);
+
+		assert (rtr_list_old[index_old].seqid > 0);
+		res = orf_token_remcast (rtr_list_old[index_old].seqid);
 		if (res == 0) {
+			/*
+			 * Multicasted message, so no need to copy to new retransmit list
+			 */
 			orf_token->rtr_list_entries -= 1;
 			assert (orf_token->rtr_list_entries >= 0);
-			memmove (&orf_token->rtr_list[i],
-				&orf_token->rtr_list[i + 1],
-				sizeof (struct rtr_item) * (orf_token->rtr_list_entries));
+			index_old += 1;
 			fcc_remcast_current++;
 			stats_remcasts++;
 		} else {
-			i++;
-//printf ("couldn't remcast %d\n", i);
+			/*
+			 * Couldn't remulticast, so copy request to new retransmit list
+			 */
+			memcpy (&rtr_list_new[index_new],
+				&rtr_list_old[index_old],
+				sizeof (struct rtr_item));
+
+			index_old += 1;
+			index_new += 1;
 		}
 	}
 	*fcc_allowed = *fcc_allowed - fcc_remcast_current - 1;
 
+	/*
+	 * Copy unsent requests because flow control limit was reached
+	 */
+if (orf_token->rtr_list_entries - index_new) {
+	memcpy (&rtr_list_new[index_new],
+		&rtr_list_old[index_old],
+		sizeof (struct rtr_item) * (orf_token->rtr_list_entries - index_new));
+	index_new += orf_token->rtr_list_entries - index_new;
+}
+
 #ifdef COMPILE_OUT
 for (i = 0; i < orf_token->rtr_list_entries; i++) {
-	assert (orf_token->rtr_list[i].seqid != -1);
+	assert (rtr_list_old[index_old].seqid != -1);
 }
 #endif
 
@@ -1552,25 +1754,92 @@ for (i = 0; i < orf_token->rtr_list_entries; i++) {
 	 * but only retry if there is room in the retransmit list
 	 */
 	for (i = gmi_arut + 1;
-			orf_token->rtr_list_entries < RTR_TOKEN_SIZE_MAX &&
-		//	i <= orf_token->header.seqid; /* TODO this worked previously but not correct for EVS */
+			orf_token->rtr_list_entries < RETRANSMIT_ENTRIES_MAX &&
 			i <= gmi_highest_seq;
 			i++) {
 
+		/*
+		 * Find if a message is missing from this processor
+		 */
 		res = sq_item_inuse (&queue_rtr_items, i);
 		if (res == 0) {
+			/*
+			 * Determine if missing message is already in retransmit list
+			 */
 			found = 0;
 			for (j = 0; j < orf_token->rtr_list_entries; j++) {
-				if (i == orf_token->rtr_list[j].seqid) {
+				if (i == rtr_list_new[j].seqid) {
 					found = 1;
 				}
 			}
 			if (found == 0) {
-				memcpy (&orf_token->rtr_list[orf_token->rtr_list_entries].conf_id,
+				/*
+				 * Missing message not found in current retransmit list so add it
+				 */
+				memcpy (&rtr_list_new[orf_token->rtr_list_entries].conf_id,
 					&memb_form_token_conf_id, sizeof (struct memb_conf_id));
-				orf_token->rtr_list[orf_token->rtr_list_entries].seqid = i;
+				rtr_list_new[orf_token->rtr_list_entries].seqid = i;
 				orf_token->rtr_list_entries++;
-//printf ("adding to retransmit list %d\n", i);
+			}
+		}
+	}
+}
+
+static void orf_token_plug_calculate (
+	struct orf_token *orf_token,
+	unsigned char *plug_bitmap_new)
+{
+	unsigned char *plug_bitmap_old;
+	int bitmap_reps;
+	int i;
+	int set_unplugged = 1;
+
+	/*
+	 * If the group is already unplugged, do nothing
+	 */
+	if (plug_state == GMI_PLUG_GROUP_UNPLUGGED) {
+		return;
+	}
+	plug_bitmap_old = (((char *)orf_token) +
+		sizeof (struct orf_token) +
+		(sizeof (struct rtr_item) * orf_token->rtr_list_entries));
+
+	memcpy (plug_bitmap_new, plug_bitmap_old, orf_token->bitmap_bytes);
+	/*
+	 * Unplug this processor if an unplug request is pending and
+	 * no further messages are pending in the pending queue
+	 */
+	if (queue_is_empty (&queues_pend_trans[GMI_PRIO_RECOVERY]) &&
+		plug_state == GMI_PLUG_PROCESSOR_REQUEST_UNPLUG) {
+
+		plug_state = GMI_PLUG_PROCESSOR_UNPLUGGED;
+		
+		for (i = 0; i < memb_list_entries; i++) {
+			if (memb_list[i].sin_addr.s_addr == memb_local_sockaddr_in.sin_addr.s_addr) {
+				plug_bitmap_new[i / 8] |= 1 << (i % 8);
+				break;
+			}
+		}
+	}
+
+	/*
+	 * Determine if group is unplugged
+	 */
+	if (plug_state == GMI_PLUG_PROCESSOR_UNPLUGGED) {
+		bitmap_reps = (1 << (memb_list_entries % 8)) - 1;
+		if ((bitmap_reps & plug_bitmap_new[memb_list_entries / 8]) == bitmap_reps) {
+			/*
+			 * If bits 0..(1<<memb_list_entries/8) not set, don't unplug
+			 */
+			for (i = 0; i < (memb_list_entries / 8); i++) {
+				if (plug_bitmap_new[i] != 0xff) {
+					set_unplugged = 0;
+					break;
+				}
+			}
+			if (set_unplugged) {
+				plug_state = GMI_PLUG_GROUP_UNPLUGGED;
+				gmi_log_printf (gmi_log_level_notice, "All processors unplugged, allowing messages to be transmitted.\n");
 			}
 		}
 	}
@@ -1785,6 +2054,12 @@ printf ("CONFCHG ENTRIES %d\n", memb_list_entries_confchg);
 			}
 		}
 
+		/*
+		 * Disallow all but RECOVERY priority messages
+	 	 */
+printf ("calling recovery\n");
+		recovery_plug_set ();
+
 		/*
 		 * MAIN STEP:
 		 * Deliver transitional configuration
@@ -1792,7 +2067,7 @@ printf ("CONFCHG ENTRIES %d\n", memb_list_entries_confchg);
 		if (gmi_confchg_fn &&
 			(trans_memb_list_entries != memb_list_entries ||
 			(memcmp (trans_memb_list, memb_list, sizeof (struct sockaddr_in) * memb_list_entries) != 0))) {
-			gmi_confchg_fn (trans_memb_list, trans_memb_list_entries,
+			gmi_confchg_fn (GMI_CONFIGURATION_TRANSITIONAL, trans_memb_list, trans_memb_list_entries,
 				left_list, left_list_entries,
 				0, 0);
 		}
@@ -1848,7 +2123,7 @@ printf ("CONFCHG ENTRIES %d\n", memb_list_entries_confchg);
 		 * Deliver regular configuration
 		 */
 		if (gmi_confchg_fn) {
-			gmi_confchg_fn (memb_list, memb_list_entries,
+			gmi_confchg_fn (GMI_CONFIGURATION_REGULAR, memb_list, memb_list_entries,
 				left_list, 0,
 				joined_list, joined_list_entries);
 		}
@@ -1889,9 +2164,25 @@ static int orf_fcc_allowed (struct orf_token *token)
 
 void timer_function_token_retransmit_timeout (void *data)
 {
+	struct iovec iovec;
+	struct msghdr msg_orf_token;
+	int res;
+
 	gmi_log_printf (gmi_log_level_warning, "Token being retransmitted.\n");
 
-	orf_token_send (&orf_token_retransmit, 0);
+	iovec.iov_base = orf_token_retransmit;
+	iovec.iov_len = orf_token_retransmit_size;
+
+	msg_orf_token.msg_name = (caddr_t)&memb_next;
+	msg_orf_token.msg_namelen = sizeof (struct sockaddr_in);
+	msg_orf_token.msg_iov = &iovec;
+	msg_orf_token.msg_iovlen = 1;
+	msg_orf_token.msg_control = 0;
+	msg_orf_token.msg_controllen = 0;
+	msg_orf_token.msg_flags = 0;
+	
+	res = sendmsg (gmi_sockets[0].token, &msg_orf_token, MSG_NOSIGNAL);
+	assert (res != -1);
 }
 
 void timer_function_form_token_timeout (void *data)
@@ -1944,10 +2235,13 @@ void orf_timer_function_token_timeout (void *data)
  */
 static int orf_token_send (
 	struct orf_token *orf_token,
+	struct rtr_item *rtr_list,
+	unsigned char *plug_bitmap,
 	int reset_timer)
 {
 	struct msghdr msg_orf_token;
-	struct iovec iovec_orf_token;
+	struct iovec iovec_orf_token[3];
+	int iov_len;
 	int res;
 
 	if (reset_timer) {
@@ -1957,11 +2251,28 @@ static int orf_token_send (
 			orf_timer_function_token_timeout, &timer_orf_token_timeout);
 	}
 
-	iovec_orf_token.iov_base = (char *)orf_token;
-	iovec_orf_token.iov_len = sizeof (struct orf_token);
+	iovec_orf_token[0].iov_base = (char *)orf_token;
+	iovec_orf_token[0].iov_len = sizeof (struct orf_token);
+	iovec_orf_token[1].iov_base = (char *)rtr_list;
+	iovec_orf_token[1].iov_len = sizeof (struct rtr_item) * orf_token->rtr_list_entries;
+	iov_len = 2;
+	if (plug_bitmap) {
+		iovec_orf_token[2].iov_base = (char *)plug_bitmap;
+		iovec_orf_token[2].iov_len = sizeof (unsigned char) * (MAX_MEMBERS / 8);
+		iov_len = 3;
+	}
+
+	encrypt_and_sign (iovec_orf_token, iov_len);
 
-	encrypt_and_sign (&iovec_orf_token, 1);
+	/*
+	 * Keep an encrypted copy in case the token retransmit timer expires
+	 */
+	memcpy (orf_token_retransmit, iov_encrypted.iov_base, iov_encrypted.iov_len);
+	orf_token_retransmit_size = iov_encrypted.iov_len;
 
+	/*
+	 * Send the message
+	 */
 	msg_orf_token.msg_name = (caddr_t)&memb_next;
 	msg_orf_token.msg_namelen = sizeof (struct sockaddr_in);
 	msg_orf_token.msg_iov = &iov_encrypted;
@@ -1993,18 +2304,20 @@ int orf_token_send_initial (void)
 {
 	struct orf_token orf_token;
 	int res;
+	unsigned char bitmap[MAX_MEMBERS / 8];
 
 	orf_token.header.seqid = 0;
 	orf_token.header.type = MESSAGE_TYPE_ORF_TOKEN;
 	orf_token.token_seqid = 0;
 	orf_token.group_arut = gmi_highest_seq;
 	orf_token.addr_arut.s_addr = gmi_bound_to.sin_addr.s_addr;
+	orf_token.bitmap_bytes = MAX_MEMBERS / 8;
 	orf_token.fcc = 0;
+	memset (bitmap, 0, sizeof (bitmap));
 
 	orf_token.rtr_list_entries = 0;
-	memset (orf_token.rtr_list, 0, sizeof (struct rtr_item) * RTR_TOKEN_SIZE_MAX);
 
-	res = orf_token_send (&orf_token, 1);
+	res = orf_token_send (&orf_token, 0, bitmap, 1);
 
 	return (res);
 }
@@ -2113,6 +2426,11 @@ static void memb_timer_function_state_commit_timeout (void *data)
 		 */
 		if (memb_list_entries == 1) {
 			gmi_log_printf (gmi_log_level_notice, "I am the only member.\n");
+			/*
+			 * Disallow all but RECOVERY priority messages
+	 		 */
+			recovery_plug_set ();
+
 			if (gmi_confchg_fn) {
 				/*
 				 * Determine nodes that left the configuration
@@ -2127,10 +2445,16 @@ static void memb_timer_function_state_commit_timeout (void *data)
 					}
 				}
 
-				gmi_confchg_fn (&memb_local_sockaddr_in, 1,
+				gmi_confchg_fn (GMI_CONFIGURATION_TRANSITIONAL,
+					&memb_local_sockaddr_in, 1,
 					left_list, left_list_entries,
 					0, 0);
 
+				gmi_confchg_fn (GMI_CONFIGURATION_REGULAR,
+					&memb_local_sockaddr_in, 1,
+					0, 0,
+					0, 0);
+
 				memb_list_entries_confchg = 1;
 				memb_list[0].sin_addr.s_addr = memb_local_sockaddr_in.sin_addr.s_addr; 
 			}
@@ -2590,15 +2914,29 @@ static int message_handler_orf_token (
 	int iov_len,
 	int bytes_received)
 {
-	struct orf_token orf_token;
+	struct orf_token *orf_token = (struct orf_token *)orf_token_retransmit;
 	int transmits_allowed;
 	int starting_group_arut;
 	int prio = UINT_MAX;
 	struct pollfd ufd;
 	int nfds;
+	struct rtr_item rtr_list[RETRANSMIT_ENTRIES_MAX];
+	struct orf_token *orf_token_ref = (struct orf_token *)iovec->iov_base;
+	unsigned char plug_bitmap[MAX_MEMBERS / 8];
 
-	assert (bytes_received == sizeof (struct orf_token));
-	memcpy (&orf_token, iovec->iov_base, sizeof (struct orf_token));
+#ifdef RANDOMDROP
+if (random () % 100 < 20) {
+	return (0);
+}
+#endif
+	assert (bytes_received >= sizeof (struct orf_token));
+	assert (bytes_received == sizeof (struct orf_token) +
+		(sizeof (struct rtr_item) * orf_token_ref->rtr_list_entries) +
+		(sizeof (unsigned char) * orf_token_ref->bitmap_bytes));
+	memcpy (orf_token, iovec->iov_base,
+		sizeof (struct orf_token) +
+		(sizeof (struct rtr_item) * orf_token_ref->rtr_list_entries) +
+		(sizeof (unsigned char) * orf_token_ref->bitmap_bytes));
 
 	/*
 	* flush multicast messages
@@ -2626,18 +2964,18 @@ static int message_handler_orf_token (
 	 * to this processor because the retransmit timer on a previous
 	 * processor timed out, so ignore the token
 	 */
-	if (orf_token.token_seqid > 0 && gmi_token_seqid >= orf_token.token_seqid) {
-printf ("already received token %d %d\n", orf_token.token_seqid, gmi_token_seqid);
+	if (orf_token->token_seqid > 0 && gmi_token_seqid >= orf_token->token_seqid) {
+printf ("already received token %d %d\n", orf_token->token_seqid, gmi_token_seqid);
 //exit(1);
 		return (0);
 	}
-	gmi_token_seqid = orf_token.token_seqid;
+	gmi_token_seqid = orf_token->token_seqid;
 
 	poll_timer_delete (*gmi_poll_handle, timer_orf_token_retransmit_timeout);
 	timer_orf_token_retransmit_timeout = 0;
 
 #ifdef PRINT_STATS
-	if (orf_token.header.seqid > 10000) {
+	if (orf_token->header.seqid > 10000) {
 		print_stats ();
 	}
 #endif
@@ -2657,19 +2995,24 @@ printf ("already received token %d %d\n", orf_token.token_seqid, gmi_token_seqid
 		return (0);
 	}
 
+	/*
+	 * Determine if the processor should be unplugged,
+	 * and if the entire group has been unplugged
+	 */
+	orf_token_plug_calculate (orf_token, plug_bitmap);
+
 //printf ("Got orf token from %s\n", inet_ntoa (system_from->sin_addr));
-	starting_group_arut = orf_token.group_arut;
+	starting_group_arut = orf_token->group_arut;
 	stats_orf_token++;
 	
-	transmits_allowed = orf_fcc_allowed (&orf_token);
+	transmits_allowed = orf_fcc_allowed (orf_token);
+
 
 //printf ("retransmit allowed %d\n", transmits_allowed);
 	/*
 	 * Retransmit failed messages and request retransmissions
 	 */
-
-	orf_token_rtr (&orf_token, &transmits_allowed);
-//printf ("multicasts allowed %d\n", transmits_allowed);
+	orf_token_rtr (orf_token, rtr_list, &transmits_allowed);
 
 	/*
 	 * TODO Ok this is ugly and I dont like it.
@@ -2681,7 +3024,7 @@ printf ("already received token %d %d\n", orf_token.token_seqid, gmi_token_seqid
 	 * be overrun or cause the form token to be large.
 	 */
 
-	if ((gmi_brake + MISSING_MCAST_WINDOW) < orf_token.header.seqid) {
+	if ((gmi_brake + MISSING_MCAST_WINDOW) < orf_token->header.seqid) {
 		transmits_allowed = 0;
 	}
 
@@ -2689,34 +3032,32 @@ printf ("already received token %d %d\n", orf_token.token_seqid, gmi_token_seqid
 	 * Set the group arut and free any messages that can be freed
 	 */
 	if (memb_state != MEMB_STATE_EVS) {
-		calculate_group_arut (&orf_token);
+		calculate_group_arut (orf_token);
 	}
 
 	/*
 	 * Multicast queued messages
 	 */
-	orf_token_mcast (&orf_token, transmits_allowed, system_from);
+	orf_token_mcast (orf_token, transmits_allowed, system_from);
 
 	/*
 	 * Calculate flow control count
 	 */
-	orf_token_fcc (&orf_token);
+	orf_token_fcc (orf_token);
 
 	/*
 	 * Deliver membership and messages required by EVS
 	 */
-	orf_token_evs (&orf_token, starting_group_arut);
+	orf_token_evs (orf_token, starting_group_arut);
 
 	if (memb_state == MEMB_STATE_EVS) {
-		calculate_group_arut (&orf_token);
+		calculate_group_arut (orf_token);
 	}
 
 	/*
 	 * Increment the token seqid and store for later retransmit
 	 */
-	orf_token.token_seqid += 1;
-	memcpy (&orf_token_retransmit, &orf_token,
-		sizeof (struct orf_token));
+	orf_token->token_seqid += 1;
 
 	poll_timer_delete (*gmi_poll_handle, timer_orf_token_retransmit_timeout);
 
@@ -2727,7 +3068,7 @@ printf ("already received token %d %d\n", orf_token.token_seqid, gmi_token_seqid
 	/*
 	 * Transmit orf_token to next member
 	 */
-	orf_token_send (&orf_token, 1);
+	orf_token_send (orf_token, rtr_list, plug_bitmap, 1);
 
 	return (0);
 }
@@ -3089,7 +3430,7 @@ static void pending_queues_deliver (void)
 		assembly_queue_item.iov_len = gmi_rtr_item_p->iov_len;
 		memcpy (&assembly_queue_item.iovec, gmi_rtr_item_p->iovec,
 			sizeof (struct iovec) * gmi_rtr_item_p->iov_len);
-		assert (gmi_rtr_item_p->iov_len < MAXIOVS);
+		assert (gmi_rtr_item_p->iov_len <= MAXIOVS);
 
 		assert (mcast->source.s_addr != 0);
 		queue_frag = pend_delv_find (mcast->source);
@@ -3136,6 +3477,11 @@ static int message_handler_mcast (
 
 	mcast = iovec[0].iov_base;
 
+#ifdef RANDOMDROP
+if (random()%100 < 70) {
+	return (0);
+}
+#endif
 	/*
 	 * Ignore multicasts for other configurations
 	 * TODO shouldn't we enter gather here?