Просмотр исходного кода

totem srp merge from whitetank

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1427 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake 18 лет назад
Родитель
Сommit
4336be7c4c
1 измененных файлов с 159 добавлено и 126 удалено
  1. 159 126
      exec/totemsrp.c

+ 159 - 126
exec/totemsrp.c

@@ -48,9 +48,10 @@
  *   usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
  */
 
-#ifndef OPENAIS_BSD
+#ifdef OPENAIS_BSD
 #include <alloca.h>
 #endif
+
 #include <assert.h>
 #include <sys/mman.h>
 #include <sys/types.h>
@@ -82,6 +83,7 @@
 #include "../include/sq.h"
 #include "../include/list.h"
 #include "../include/hdb.h"
+#include "swab.h"
 
 #include "crypto.h"
 
@@ -91,6 +93,7 @@
 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX		500 /* allow 500 messages to be queued */
 #define MAXIOVS					5	
 #define RETRANSMIT_ENTRIES_MAX			30
+#define TOKEN_SIZE_MAX				64000 /* bytes */
 
 /*
  * Rollover handling:
@@ -323,6 +326,8 @@ struct totemsrp_instance {
 
 	struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX];
 
+	struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX];
+
 	int my_proc_list_entries;
 
 	int my_failed_list_entries;
@@ -335,7 +340,7 @@ struct totemsrp_instance {
 
 	int my_deliver_memb_entries;
 
-	int my_nodeid_lookup_entries;
+	int my_left_memb_entries;
 
 	struct memb_ring_id my_ring_id;
 
@@ -387,14 +392,12 @@ struct totemsrp_instance {
 
 	struct list_head token_callback_sent_listhead;
 
-	char *orf_token_retransmit; // sizeof (struct orf_token) + sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX];
+	char *orf_token_retransmit[TOKEN_SIZE_MAX];
 
 	int orf_token_retransmit_size;
 
 	unsigned int my_token_seq;
 
-	unsigned int my_commit_token_seq;
-
 	/*
 	 * Timers
 	 */
@@ -551,8 +554,10 @@ static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token
 	int fcc_mcasts_allowed);
 static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
 
-static void memb_ring_id_store (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
+static void memb_ring_id_set_and_store (struct totemsrp_instance *instance,
+	struct memb_ring_id *ring_id);
 static void memb_state_commit_token_update (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
+static void memb_state_commit_token_target_set (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
 static int memb_state_commit_token_send (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
 static void memb_state_commit_token_create (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
 static int token_hold_cancel_send (struct totemsrp_instance *instance);
@@ -618,10 +623,6 @@ void totemsrp_instance_initialize (struct totemsrp_instance *instance)
 
 	instance->my_token_seq = SEQNO_START_TOKEN - 1;
 
-	instance->my_commit_token_seq = SEQNO_START_TOKEN - 1;
-
-	instance->orf_token_retransmit = malloc (15000);
-
 	instance->memb_state = MEMB_STATE_OPERATIONAL;
 
 	instance->set_aru = -1;
@@ -945,7 +946,6 @@ error_exit:
 	return (res);
 }
 
-
 /*
  * Set operations for use by the membership algorithm
  */
@@ -1186,7 +1186,7 @@ static void memb_set_merge (
 			}	
 		}
 		if (found == 0) {
-			srp_addr_copy (&fullset[j], &subset[i]);
+			srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
 			*fullset_entries = *fullset_entries + 1;
 		}
 		found = 0;
@@ -1388,7 +1388,7 @@ static void memb_state_consensus_timeout_expired (
 
 		memb_set_merge (no_consensus_list, no_consensus_list_entries,
 			instance->my_failed_list, &instance->my_failed_list_entries);
-		memb_state_gather_enter (instance, 1);
+		memb_state_gather_enter (instance, 0);
 	}
 }
 
@@ -1569,13 +1569,13 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance)
 {
 	struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
 	int joined_list_entries = 0;
-	struct srp_addr left_list[PROCESSOR_COUNT_MAX];
-	int left_list_entries = 0;
 	unsigned int aru_save;
-	unsigned int left_list_totemip[PROCESSOR_COUNT_MAX];
 	unsigned int joined_list_totemip[PROCESSOR_COUNT_MAX];
 	unsigned int trans_memb_list_totemip[PROCESSOR_COUNT_MAX];
 	unsigned int new_memb_list_totemip[PROCESSOR_COUNT_MAX];
+	unsigned int left_list[PROCESSOR_COUNT_MAX];
+
+	memb_consensus_reset (instance);
 
 	old_ring_state_reset (instance);
 	ring_reset (instance);
@@ -1593,7 +1593,8 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance)
 	/*
 	 * Calculate joined and left list
 	 */
-	memb_set_subtract (left_list, &left_list_entries,
+	memb_set_subtract (instance->my_left_memb_list,
+		&instance->my_left_memb_entries,
 		instance->my_memb_list, instance->my_memb_entries,
 		instance->my_trans_memb_list, instance->my_trans_memb_entries);
 
@@ -1613,12 +1614,13 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance)
 	/*
 	 * Deliver transitional configuration to application
 	 */
-	srp_addr_to_nodeid (left_list_totemip, left_list, left_list_entries);
+	srp_addr_to_nodeid (left_list, instance->my_left_memb_list,
+		instance->my_left_memb_entries);
 	srp_addr_to_nodeid (trans_memb_list_totemip,
 		instance->my_trans_memb_list, instance->my_trans_memb_entries);
 	instance->totemsrp_confchg_fn (TOTEM_CONFIGURATION_TRANSITIONAL,
 		trans_memb_list_totemip, instance->my_trans_memb_entries,
-		left_list_totemip, left_list_entries,
+		left_list, instance->my_left_memb_entries,
 		0, 0, &instance->my_ring_id);
 		
 // TODO we need to filter to ensure we only deliver those
@@ -1669,8 +1671,6 @@ static void memb_state_gather_enter (
 	struct totemsrp_instance *instance,
 	int gather_from)
 {
-	instance->my_commit_token_seq = SEQNO_START_TOKEN - 1;
-
 	memb_set_merge (
 		&instance->my_id, 1,
 		instance->my_proc_list, &instance->my_proc_list_entries);
@@ -1730,12 +1730,15 @@ static void memb_state_commit_enter (
 
 	old_ring_state_save (instance); 
 
-// ABC
 	memb_state_commit_token_update (instance, commit_token);
 
+	memb_state_commit_token_target_set (instance, commit_token);
+
+	memb_ring_id_set_and_store (instance, &commit_token->ring_id);
+
 	memb_state_commit_token_send (instance, commit_token);
 
-	memb_ring_id_store (instance, commit_token);
+	instance->token_ring_id_seq = instance->my_ring_id.seq;
 
 	poll_timer_delete (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
 
@@ -1753,8 +1756,6 @@ static void memb_state_commit_enter (
 
 	instance->memb_state = MEMB_STATE_COMMIT;
 
-	instance->my_commit_token_seq = SEQNO_START_TOKEN - 1;
-
 	/*
 	 * reset all flow control variables since we are starting a new ring
 	 */
@@ -1829,7 +1830,7 @@ static void memb_state_recovery_enter (
 			memb_list[i].high_delivered,
 			memb_list[i].received_flg);
 
-// TODO		assert (!totemip_zero_check(&memb_list[i].ring_id.rep));
+		assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
 	}
 	/*
 	 * Determine if any received flag is false
@@ -2045,6 +2046,8 @@ error_iovec:
 	for (j = 0; j < i; j++) {
 		free (message_item.iovec[j].iov_base);
 	}
+	
+	free(message_item.mcast);
 
 error_mcast:
 	hdb_handle_put (&totemsrp_instance_database, handle);
@@ -2295,13 +2298,13 @@ static int orf_token_mcast (
 		 * Delete item from pending queue
 		 */
 		queue_item_remove (mcast_queue);
+
+		/*
+		 * If messages mcasted, deliver any new messages to totempg
+		 */
+		instance->my_high_seq_received = token->seq;
 	}
 
-	/*
-	 * If messages mcasted, deliver any new messages to totempg
-	 */
-	instance->my_high_seq_received = token->seq;
-		
 	update_aru (instance);
 
 	/*
@@ -2423,8 +2426,7 @@ static int orf_token_rtr (
 				/*
 				 * Missing message not found in current retransmit list so add it
 				 */
-				memb_ring_id_copy (
-					&rtr_list[orf_token->rtr_list_entries].ring_id,
+				memb_ring_id_copy (&rtr_list[orf_token->rtr_list_entries].ring_id,
 					&instance->my_ring_id);
 				rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i;
 				orf_token->rtr_list_entries++;
@@ -2459,7 +2461,6 @@ static void timer_function_token_retransmit_timeout (void *data)
 	case MEMB_STATE_GATHER:
 		break;
 	case MEMB_STATE_COMMIT:
-		break;
 	case MEMB_STATE_OPERATIONAL:
 	case MEMB_STATE_RECOVERY:
 		token_retransmit (instance);
@@ -2525,7 +2526,7 @@ static int token_send (
 		return (0);
 	}
 
-	iovec.iov_base = (char *)orf_token;
+	iovec.iov_base = orf_token;
 	iovec.iov_len = iov_len;
 
 	totemrrp_token_send (instance->totemrrp_handle,
@@ -2556,10 +2557,10 @@ static int token_hold_cancel_send (struct totemsrp_instance *instance)
 	token_hold_cancel.header.nodeid = instance->my_id.addr[0].nodeid;
 	assert (token_hold_cancel.header.nodeid);
 
-	iovec[0].iov_base = (char *)&token_hold_cancel;
+	iovec[0].iov_base = &token_hold_cancel;
 	iovec[0].iov_len = sizeof (struct token_hold_cancel) -
 		sizeof (struct memb_ring_id);
-	iovec[1].iov_base = (char *)&instance->my_ring_id;
+	iovec[1].iov_base = &instance->my_ring_id;
 	iovec[1].iov_len = sizeof (struct memb_ring_id);
 
 	totemrrp_mcast_flush_send (instance->totemrrp_handle, iovec, 2);
@@ -2611,65 +2612,78 @@ static void memb_state_commit_token_update (
 	struct totemsrp_instance *instance,
 	struct memb_commit_token *commit_token)
 {
-	int memb_index_this;
 	struct srp_addr *addr;
 	struct memb_commit_token_memb_entry *memb_list;
 
 	addr = (struct srp_addr *)commit_token->end_of_commit_token;
 	memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
 
-	memb_index_this = (commit_token->memb_index + 1) % commit_token->addr_entries;
-	memb_ring_id_copy (&memb_list[memb_index_this].ring_id,
+	memb_ring_id_copy (&memb_list[commit_token->memb_index].ring_id,
 		&instance->my_old_ring_id);
 	assert (!totemip_zero_check(&instance->my_old_ring_id.rep));
 
-	memb_list[memb_index_this].aru = instance->old_ring_state_aru;
+	memb_list[commit_token->memb_index].aru = instance->old_ring_state_aru;
 	/*
 	 *  TODO high delivered is really instance->my_aru, but with safe this
 	 * could change?
 	 */
-	memb_list[memb_index_this].high_delivered = instance->my_high_delivered;
-	memb_list[memb_index_this].received_flg = instance->my_received_flg;
+	memb_list[commit_token->memb_index].high_delivered = instance->my_high_delivered;
+	memb_list[commit_token->memb_index].received_flg = instance->my_received_flg;
 
 	commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
+	commit_token->memb_index += 1;
+	assert (commit_token->memb_index <= commit_token->addr_entries);
 	assert (commit_token->header.nodeid);
 }
 
-static int memb_state_commit_token_send (struct totemsrp_instance *instance,
+static void memb_state_commit_token_target_set (
+	struct totemsrp_instance *instance,
+	struct memb_commit_token *commit_token)
+{
+	struct srp_addr *addr;
+	unsigned int i;
+
+	addr = (struct srp_addr *)commit_token->end_of_commit_token;
+
+	for (i = 0; i < instance->totem_config->interface_count; i++) {
+		totemrrp_token_target_set (
+			instance->totemrrp_handle,
+			&addr[commit_token->memb_index %
+				commit_token->addr_entries].addr[i],
+			i);
+	}
+}
+
+static int memb_state_commit_token_send (
+	struct totemsrp_instance *instance,
 	struct memb_commit_token *commit_token)
 {
 	struct iovec iovec;
-	int memb_index_this;
-	int memb_index_next;
 	struct srp_addr *addr;
 	struct memb_commit_token_memb_entry *memb_list;
-	unsigned int i;
 
 	addr = (struct srp_addr *)commit_token->end_of_commit_token;
 	memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
 
 	commit_token->token_seq++;
-	memb_index_this = (commit_token->memb_index + 1) % commit_token->addr_entries;
-	memb_index_next = (memb_index_this + 1) % commit_token->addr_entries;
-	commit_token->memb_index = memb_index_this;
-
-
-	iovec.iov_base = (char *)commit_token;
+	iovec.iov_base = commit_token;
 	iovec.iov_len = sizeof (struct memb_commit_token) +
 		((sizeof (struct srp_addr) +
 			sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
-
-	for (i = 0; i < instance->totem_config->interface_count; i++) {
-		totemrrp_token_target_set (
-			instance->totemrrp_handle,
-			&addr[memb_index_next].addr[i],
-			i);
-	}
+	/*
+	 * Make a copy for retransmission if necessary
+	 */
+	memcpy (instance->orf_token_retransmit, commit_token, iovec.iov_len);
+	instance->orf_token_retransmit_size = iovec.iov_len;
 
 	totemrrp_token_send (instance->totemrrp_handle,
 		&iovec,
 		1);
 
+	/*
+	 * Request retransmission of the commit token in case it is lost
+	 */
+	reset_token_retransmit_timeout (instance);
 	return (0);
 }
 
@@ -2740,7 +2754,7 @@ static void memb_state_commit_token_create (
 	qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
 		srp_addr_compare);
 
-	commit_token->memb_index = token_memb_entries - 1;
+	commit_token->memb_index = 0;
 	commit_token->addr_entries = token_memb_entries;
 
 	addr = (struct srp_addr *)commit_token->end_of_commit_token;
@@ -2756,6 +2770,7 @@ static void memb_join_message_send (struct totemsrp_instance *instance)
 {
 	struct memb_join memb_join;
 	struct iovec iovec[3];
+	unsigned int iovs;
 
 	memb_join.header.type = MESSAGE_TYPE_MEMB_JOIN;
 	memb_join.header.endian_detector = ENDIAN_LOCAL;
@@ -2763,21 +2778,25 @@ static void memb_join_message_send (struct totemsrp_instance *instance)
 	memb_join.header.nodeid = instance->my_id.addr[0].nodeid;
 	assert (memb_join.header.nodeid);
 
-
 	assert (srp_addr_equal (&instance->my_proc_list[0], &instance->my_proc_list[1]) == 0);
 	memb_join.ring_seq = instance->my_ring_id.seq;
 	memb_join.proc_list_entries = instance->my_proc_list_entries;
 	memb_join.failed_list_entries = instance->my_failed_list_entries;
 	srp_addr_copy (&memb_join.system_from, &instance->my_id);
 		
-	iovec[0].iov_base = (char *)&memb_join;
+	iovec[0].iov_base = &memb_join;
 	iovec[0].iov_len = sizeof (struct memb_join);
-	iovec[1].iov_base = (char *)&instance->my_proc_list;
+	iovec[1].iov_base = &instance->my_proc_list;
 	iovec[1].iov_len = instance->my_proc_list_entries *
 		sizeof (struct srp_addr);
-	iovec[2].iov_base = (char *)&instance->my_failed_list;
-	iovec[2].iov_len = instance->my_failed_list_entries *
-		sizeof (struct srp_addr);
+	if (instance->my_failed_list_entries == 0) {
+		iovs = 2;
+	} else {
+		iovs = 3;
+		iovec[2].iov_base = &instance->my_failed_list;
+		iovec[2].iov_len = instance->my_failed_list_entries *
+			sizeof (struct srp_addr);
+	}
 
 	if (instance->totem_config->send_join_timeout) {
 		usleep (random() % (instance->totem_config->send_join_timeout * 1000));
@@ -2786,7 +2805,7 @@ static void memb_join_message_send (struct totemsrp_instance *instance)
 	totemrrp_mcast_flush_send (
 		instance->totemrrp_handle,
 		iovec,
-		3);
+		iovs);
 }
 
 static void memb_merge_detect_transmit (struct totemsrp_instance *instance) 
@@ -2801,10 +2820,10 @@ static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
 	srp_addr_copy (&memb_merge_detect.system_from, &instance->my_id);
 	assert (memb_merge_detect.header.nodeid);
 
-	iovec[0].iov_base = (char *)&memb_merge_detect;
+	iovec[0].iov_base = &memb_merge_detect;
 	iovec[0].iov_len = sizeof (struct memb_merge_detect) -
 		sizeof (struct memb_ring_id);
-	iovec[1].iov_base = (char *)&instance->my_ring_id;
+	iovec[1].iov_base = &instance->my_ring_id;
 	iovec[1].iov_len = sizeof (struct memb_ring_id);
 
 	totemrrp_mcast_flush_send (instance->totemrrp_handle, iovec, 2);
@@ -2836,6 +2855,7 @@ static void memb_ring_id_create_or_load (
 		}
 		res = write (fd, &memb_ring_id->seq, sizeof (unsigned long long));
 		assert (res == sizeof (unsigned long long));
+		fsync (fd);
 		close (fd);
 	} else {
 		log_printf (instance->totemsrp_log_level_warning,
@@ -2847,16 +2867,18 @@ static void memb_ring_id_create_or_load (
 	instance->token_ring_id_seq = memb_ring_id->seq;
 }
 
-static void memb_ring_id_store (
+static void memb_ring_id_set_and_store (
 	struct totemsrp_instance *instance,
-	struct memb_commit_token *commit_token)
+	struct memb_ring_id *ring_id)
 {
 	char filename[256];
 	int fd;
 	int res;
 
-	sprintf (filename, "/tmp/ringid_%s",
-		totemip_print (&instance->my_id.addr[0]));
+	memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
+
+	sprintf (filename, "%s/ringid_%s",
+		rundir, totemip_print (&instance->my_id.addr[0]));
 
 	fd = open (filename, O_WRONLY, 0777);
 	if (fd == -1) {
@@ -2865,18 +2887,17 @@ static void memb_ring_id_store (
 	if (fd == -1) {
 		log_printf (instance->totemsrp_log_level_warning,
 			"Couldn't store new ring id %llx to stable storage (%s)\n",
-				commit_token->ring_id.seq, strerror (errno));
+				instance->my_ring_id.seq, strerror (errno));
 		assert (0);
 		return;
 	}
 	log_printf (instance->totemsrp_log_level_notice,
-		"Storing new sequence id for ring %llx\n", commit_token->ring_id.seq);
+		"Storing new sequence id for ring %llx\n", instance->my_ring_id.seq);
 	//assert (fd > 0);
-	res = write (fd, &commit_token->ring_id.seq, sizeof (unsigned long long));
+	res = write (fd, &instance->my_ring_id.seq, sizeof (unsigned long long));
 	assert (res == sizeof (unsigned long long));
+	fsync (fd);
 	close (fd);
-	memcpy (&instance->my_ring_id, &commit_token->ring_id, sizeof (struct memb_ring_id));
-	instance->token_ring_id_seq = instance->my_ring_id.seq;
 }
 
 int totemsrp_callback_token_create (
@@ -3071,6 +3092,7 @@ static void fcc_token_update (
  * Message Handlers
  */
 
+struct timeval tv_old;
 /*
  * message handler called when TOKEN message type received
  */
@@ -3098,9 +3120,10 @@ static int message_handler_orf_token (
 	timersub (&tv_current, &tv_old, &tv_diff);
 	memcpy (&tv_old, &tv_current, sizeof (struct timeval));
 
-	if ((((float)tv_diff.tv_usec) / 100.0) > 5.0) {
-		printf ("OTHERS %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0);
-	}
+	log_printf (instance->totemsrp_log_level_notice,
+	"Time since last token %0.4f ms\n",
+		(((float)tv_diff.tv_sec) * 1000) + ((float)tv_diff.tv_usec)
+			/ 1000.0);
 #endif
 
 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
@@ -3343,9 +3366,9 @@ static int message_handler_orf_token (
 			gettimeofday (&tv_current, NULL);
 			timersub (&tv_current, &tv_old, &tv_diff);
 			memcpy (&tv_old, &tv_current, sizeof (struct timeval));
-			if ((((float)tv_diff.tv_usec) / 100.0) > 5.0) {
-				printf ("I held %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0);
-			}
+			log_printf (instance->totemsrp_log_level_notice,
+				"I held %0.4f ms\n",
+				((float)tv_diff.tv_usec) / 1000.0);
 #endif
 			if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
 				messages_deliver_to_app (instance, 0,
@@ -3388,9 +3411,10 @@ static void messages_deliver_to_app (
 	struct sort_queue_item *sort_queue_item_p;
 	unsigned int i;
 	int res;
-	struct mcast *mcast;
+	struct mcast *mcast_in;
+	struct mcast mcast_header;
 	unsigned int range = 0;
-	int endian_conversion_required = 0 ;
+	int endian_conversion_required;
 	unsigned int my_high_delivered_stored = 0;
 
 
@@ -3438,18 +3462,27 @@ static void messages_deliver_to_app (
 
 		sort_queue_item_p = ptr;
 
-		mcast = (struct mcast *)sort_queue_item_p->iovec[0].iov_base;
-		assert (mcast != (struct mcast *)0xdeadbeef);
+		mcast_in = sort_queue_item_p->iovec[0].iov_base;
+		assert (mcast_in != (struct mcast *)0xdeadbeef);
+
+		endian_conversion_required = 0;
+		if (mcast_in->header.endian_detector != ENDIAN_LOCAL) {
+			endian_conversion_required = 1;
+			mcast_endian_convert (mcast_in, &mcast_header);
+		} else {
+			memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
+		}
 
 		/*
 		 * Skip messages not originated in instance->my_deliver_memb
 		 */
 		if (skip &&
-			memb_set_subset (&mcast->system_from,
+			memb_set_subset (&mcast_header.system_from,
 				1,
 				instance->my_deliver_memb_list,
 				instance->my_deliver_memb_entries) == 0) {
-		instance->my_high_delivered = my_high_delivered_stored + i;
+
+			instance->my_high_delivered = my_high_delivered_stored + i;
 
 			continue;
 		}
@@ -3459,12 +3492,7 @@ static void messages_deliver_to_app (
 		 */
 		log_printf (instance->totemsrp_log_level_debug,
 			"Delivering MCAST message with seq %x to pending delivery queue\n",
-			mcast->seq);
-
-		if (mcast->header.endian_detector != ENDIAN_LOCAL) {
-			endian_conversion_required = 1;
-			mcast_endian_convert (mcast, mcast);
-		}
+			mcast_header.seq);
 
 		/*
 		 * Message is locally originated multicast
@@ -3472,7 +3500,7 @@ static void messages_deliver_to_app (
 	 	if (sort_queue_item_p->iov_len > 1 &&
 			sort_queue_item_p->iovec[0].iov_len == sizeof (struct mcast)) {
 			instance->totemsrp_deliver_fn (
-				mcast->header.nodeid,
+				mcast_header.header.nodeid,
 				&sort_queue_item_p->iovec[1],
 				sort_queue_item_p->iov_len - 1,
 				endian_conversion_required);
@@ -3481,7 +3509,7 @@ static void messages_deliver_to_app (
 			sort_queue_item_p->iovec[0].iov_base += sizeof (struct mcast);
 
 			instance->totemsrp_deliver_fn (
-				mcast->header.nodeid,
+				mcast_header.header.nodeid,
 				sort_queue_item_p->iovec,
 				sort_queue_item_p->iov_len,
 				endian_conversion_required);
@@ -3684,7 +3712,7 @@ static int memb_join_process (
 	struct totemsrp_instance *instance,
 	struct memb_join *memb_join)
 {
-	unsigned char *commit_token_storage[32000];
+	unsigned char *commit_token_storage[TOKEN_SIZE_MAX];
 	struct memb_commit_token *my_commit_token =
 		(struct memb_commit_token *)commit_token_storage;
 	struct srp_addr *proc_list;
@@ -3795,7 +3823,8 @@ static void memb_commit_token_endian_convert (struct memb_commit_token *in, stru
 	out->header.endian_detector = ENDIAN_LOCAL;
 	out->header.nodeid = swab32 (in->header.nodeid);
 	out->token_seq = swab32 (in->token_seq);
-	memb_ring_id_copy_endian_convert (&out->ring_id, &in->ring_id);
+	totemip_copy_endian_convert(&out->ring_id.rep, &in->ring_id.rep);
+	out->ring_id.seq = swab64 (in->ring_id.seq);
 	out->retrans_flg = swab32 (in->retrans_flg);
 	out->memb_index = swab32 (in->memb_index);
 	out->addr_entries = swab32 (in->addr_entries);
@@ -3810,8 +3839,11 @@ static void memb_commit_token_endian_convert (struct memb_commit_token *in, stru
 		 */
 		if (in_memb_list[i].ring_id.rep.family != 0) {
 			memb_ring_id_copy_endian_convert (
-			    &out_memb_list[i].ring_id,
-			    &in_memb_list[i].ring_id);
+				&out_memb_list[i].ring_id,
+				&in_memb_list[i].ring_id);
+
+			out_memb_list[i].ring_id.seq =
+				swab64 (in_memb_list[i].ring_id.seq);
 			out_memb_list[i].aru = swab32 (in_memb_list[i].aru);
 			out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
 			out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
@@ -3836,8 +3868,8 @@ static void orf_token_endian_convert (struct orf_token *in, struct orf_token *ou
 	out->retrans_flg = swab32 (in->retrans_flg);
 	out->rtr_list_entries = swab32 (in->rtr_list_entries);
 	for (i = 0; i < out->rtr_list_entries; i++) {
-		memb_ring_id_copy_endian_convert(&out->rtr_list[i].ring_id,
-			&in->rtr_list[i].ring_id);
+		memb_ring_id_copy_endian_convert (&out->rtr_list[i].ring_id,
+			 &in->rtr_list[i].ring_id);
 		out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
 	}
 }
@@ -3849,7 +3881,7 @@ static void mcast_endian_convert (struct mcast *in, struct mcast *out)
 	out->header.nodeid = swab32 (in->header.nodeid);
 	out->seq = swab32 (in->seq);
 	out->this_seqno = swab32 (in->this_seqno);
-	memb_ring_id_copy_endian_convert(&out->ring_id, &in->ring_id);
+	memb_ring_id_copy_endian_convert (&out->ring_id, &in->ring_id);
 	out->node_id = swab32 (in->node_id);
 	out->guarantee = swab32 (in->guarantee);
 	srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
@@ -3945,6 +3977,9 @@ static int message_handler_memb_commit_token (
 	struct srp_addr *addr;
 	struct memb_commit_token_memb_entry *memb_list;
 
+	log_printf (instance->totemsrp_log_level_debug,
+		"got commit token\n");
+
 	if (endian_conversion_needed) {
 		memb_commit_token = memb_commit_token_convert;
 		memb_commit_token_endian_convert (msg, memb_commit_token);
@@ -3954,16 +3989,6 @@ static int message_handler_memb_commit_token (
 	addr = (struct srp_addr *)memb_commit_token->end_of_commit_token;
 	memb_list = (struct memb_commit_token_memb_entry *)(addr + memb_commit_token->addr_entries);
 
-	if (sq_lte_compare (memb_commit_token->token_seq,
-		instance->my_commit_token_seq)) {
-		/*
-		 * discard token
-		 */
-		return (0);
-	}
-	instance->my_commit_token_seq = memb_commit_token->token_seq;
-
-
 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
 	if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
 		return (0);
@@ -3991,21 +4016,29 @@ static int message_handler_memb_commit_token (
 			break;
 
 		case MEMB_STATE_COMMIT:
-//			if (memcmp (&memb_commit_token->ring_id, &instance->my_ring_id,
-//				sizeof (struct memb_ring_id)) == 0) {
-			 if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq) {
+			/*
+			 * If retransmitted commit tokens are sent on this ring
+			 * filter them out and only enter recovery once the
+			 * commit token has traversed the array.  This is
+			 * determined by :
+		 	 * memb_commit_token->memb_index == memb_commit_token->addr_entries) {
+			 */
+			 if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
+				memb_commit_token->memb_index == memb_commit_token->addr_entries) {
 				memb_state_recovery_enter (instance, memb_commit_token);
 			}
 			break;
 
 		case MEMB_STATE_RECOVERY:
-			log_printf (instance->totemsrp_log_level_notice,
-				"Sending initial ORF token\n");
-
-			// TODO convert instead of initiate
-			orf_token_send_initial (instance);
-			reset_token_timeout (instance); // REVIEWED
-			reset_token_retransmit_timeout (instance); // REVIEWED
+			if (totemip_equal (&instance->my_id.addr[0], &instance->my_ring_id.rep)) {
+				log_printf (instance->totemsrp_log_level_notice,
+					"Sending initial ORF token\n");
+
+				// TODO convert instead of initiate
+				orf_token_send_initial (instance);
+				reset_token_timeout (instance); // REVIEWED
+				reset_token_retransmit_timeout (instance); // REVIEWED
+			}
 			break;
 	}
 	return (0);