Просмотр исходного кода

Event service now reconciles open channels and retained events between active
merging partitions.

(Logical change 1.154)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@541 fd59a12c-fef9-0310-b244-a6a79926bd2f

Mark Haverkamp 21 лет назад
Родитель
Сommit
81b83714ec
1 измененных файлов с 194 добавлено и 132 удалено
  1. 194 132
      exec/evt.c

+ 194 - 132
exec/evt.c

@@ -32,6 +32,7 @@
  */
  */
 
 
 #define DUMP_CHAN_INFO
 #define DUMP_CHAN_INFO
+#define RECOVERY_EVENT_DEBUG LOG_LEVEL_DEBUG
 #define RECOVERY_DEBUG LOG_LEVEL_DEBUG
 #define RECOVERY_DEBUG LOG_LEVEL_DEBUG
 #define CHAN_DEL_DEBUG LOG_LEVEL_DEBUG
 #define CHAN_DEL_DEBUG LOG_LEVEL_DEBUG
 #define CHAN_OPEN_DEBUG LOG_LEVEL_DEBUG
 #define CHAN_OPEN_DEBUG LOG_LEVEL_DEBUG
@@ -199,7 +200,6 @@ struct service_handler evt_service_handler = {
 	.sync_abort					= evt_sync_abort
 	.sync_abort					= evt_sync_abort
 };
 };
 
 
-// TODO static totempg_recovery_plug_handle evt_recovery_plug_handle;
 
 
 /* 
 /* 
  * list of all retained events 
  * list of all retained events 
@@ -242,6 +242,8 @@ static DECLARE_LIST_INIT(ci_head);
  *
  *
  *	evt_send_retained_events:		Node is sending retained event data.
  *	evt_send_retained_events:		Node is sending retained event data.
  *
  *
+ *	evt_send_retained_events_done:	Node is sending done message.
+ *
  *	evt_wait_send_retained_events:	Node is waiting for other nodes to 
  *	evt_wait_send_retained_events:	Node is waiting for other nodes to 
  *									finish sending retained event data.
  *									finish sending retained event data.
  */
  */
@@ -251,6 +253,7 @@ enum recovery_phases {
 	evt_send_open_count,
 	evt_send_open_count,
 	evt_wait_open_count_done,
 	evt_wait_open_count_done,
 	evt_send_retained_events,
 	evt_send_retained_events,
+	evt_send_retained_events_done,
 	evt_wait_send_retained_events
 	evt_wait_send_retained_events
 };
 };
 
 
@@ -260,22 +263,27 @@ enum recovery_phases {
  * base_id_top:			upper bits of next event ID to assign
  * base_id_top:			upper bits of next event ID to assign
  * base_id:				Lower bits of Next event ID to assign
  * base_id:				Lower bits of Next event ID to assign
  * my_node_id:			My cluster node id
  * my_node_id:			My cluster node id
- * total_member_count:	how many members in this cluster
- * joined_member_count:	How many joined this configuration.
- * left_member_count:	How many left this configuration.
  * checked_in:			keep track during config change.
  * checked_in:			keep track during config change.
- * recovery_node:		True if we're the recovery node.
+ * recovery_node:		True if we're the recovery node. i.e. the
+ * 						node that sends the retained events.
  * next_retained:		pointer to next retained message to send 
  * next_retained:		pointer to next retained message to send 
  * 						during recovery.
  * 						during recovery.
  * next_chan:			pointer to next channel to send during recovery.
  * next_chan:			pointer to next channel to send during recovery.
  * recovery_phase:		Indicates what recovery is taking place.
  * recovery_phase:		Indicates what recovery is taking place.
+ * left_member_count:	How many left this configuration.
  * left_member_list:	Members that left this config
  * left_member_list:	Members that left this config
+ * joined_member_count:	How many joined this configuration.
  * joined_member_list:	Members that joined this config
  * joined_member_list:	Members that joined this config
+ * total_member_count:	how many members in this cluster
  * current_member_list:	Total membership this config
  * current_member_list:	Total membership this config
- * add_list:			pointer to joined list used for sending event id
+ * trans_member_count:	Node count in transitional membership
+ * trans_member_list:	Total membership from the transitional membership
+ * add_count:			count of joined members used for sending event id
  * 						recovery data.
  * 						recovery data.
- * add_list_count:		count of joined members used for sending event id
+ * add_list:			pointer to joined list used for sending event id
  * 						recovery data.
  * 						recovery data.
+ * processed_open_counts: 	Flag used to coordinate clearing open
+ * 						channel counts for config change recovery.
  *
  *
  */
  */
 
 
@@ -283,19 +291,22 @@ enum recovery_phases {
 static SaEvtEventIdT 	base_id = 0;
 static SaEvtEventIdT 	base_id = 0;
 static SaEvtEventIdT 	base_id_top = 0;
 static SaEvtEventIdT 	base_id_top = 0;
 static SaClmNodeIdT  	my_node_id = 0;
 static SaClmNodeIdT  	my_node_id = 0;
-static int			 	total_member_count = 0;
-static int				joined_member_count = 0;
-static int				left_member_count = 0;
 static int 			 	checked_in = 0;
 static int 			 	checked_in = 0;
 static int			 	recovery_node = 0;
 static int			 	recovery_node = 0;
 static struct list_head *next_retained = 0;
 static struct list_head *next_retained = 0;
 static struct list_head *next_chan = 0;
 static struct list_head *next_chan = 0;
 static enum recovery_phases recovery_phase = evt_recovery_complete;
 static enum recovery_phases recovery_phase = evt_recovery_complete;
+static int				left_member_count = 0;
 static struct in_addr 	*left_member_list = 0;
 static struct in_addr 	*left_member_list = 0;
+static int				joined_member_count = 0;
 static struct in_addr 	*joined_member_list = 0;
 static struct in_addr 	*joined_member_list = 0;
+static int			 	total_member_count = 0;
 static struct in_addr 	*current_member_list = 0;
 static struct in_addr 	*current_member_list = 0;
-static struct in_addr 	*add_list = 0;
+static int				trans_member_count = 0;
+static struct in_addr	*trans_member_list = 0;
 static int				add_count = 0;
 static int				add_count = 0;
+static struct in_addr 	*add_list = 0;
+static int				processed_open_counts = 0;
 
 
 /*
 /*
  * Structure to track pending channel open requests.
  * Structure to track pending channel open requests.
@@ -751,7 +762,7 @@ static void dump_chan_opens(struct event_svr_channel_instance *eci)
 		log_printf(LOG_LEVEL_NOTICE, "Node 0x%x, count %d\n",
 		log_printf(LOG_LEVEL_NOTICE, "Node 0x%x, count %d\n",
 			eci->esc_node_opens[i].oc_node_id, 
 			eci->esc_node_opens[i].oc_node_id, 
 			eci->esc_node_opens[i].oc_open_count);
 			eci->esc_node_opens[i].oc_open_count);
-		}
+	}
 }
 }
 
 
 #ifdef DUMP_CHAN_INFO
 #ifdef DUMP_CHAN_INFO
@@ -771,6 +782,26 @@ static void dump_all_chans()
 }
 }
 #endif
 #endif
 
 
+/*
+ * Scan the list of channels and zero out the open counts
+ */
+static void zero_chan_open_counts()
+{
+	struct list_head *l;
+	struct event_svr_channel_instance *eci;
+	int i;
+
+	for (l = esc_head.next; l != &esc_head; l = l->next) {
+		eci = list_entry(l, struct event_svr_channel_instance, esc_entry);
+		for (i = 0; i < eci->esc_oc_size; i++) {
+			if (eci->esc_node_opens[i].oc_node_id == 0) {
+				break;
+			}
+			eci->esc_node_opens[i].oc_open_count = 0;
+		}
+		eci->esc_total_opens = 0;
+	}
+}
 /*
 /*
  * Replace the current open count for a node with the specified value.
  * Replace the current open count for a node with the specified value.
  */
  */
@@ -786,23 +817,12 @@ static int set_open_count(struct event_svr_channel_instance *eci,
 
 
 	oc = find_open_count(eci, node_id);
 	oc = find_open_count(eci, node_id);
 	if (oc) {
 	if (oc) {
-		if (oc->oc_open_count) {
-			/*
-			 * If the open count wasn't zero, then we already
-			 * knew about this node.  It should never be different than
-			 * what we already had for an open count.
-			 */
-			if (oc->oc_open_count != open_count) {
-				log_printf(LOG_LEVEL_ERROR, 
-						"Channel open count error\n");
-				dump_chan_opens(eci);
-			}
-			return 0;
-		} 
-		log_printf(LOG_LEVEL_DEBUG, 
+		log_printf(RECOVERY_DEBUG, 
 			"Set count: Chan %s for node 0x%x, was %d, now %d\n",
 			"Set count: Chan %s for node 0x%x, was %d, now %d\n",
-			eci->esc_channel_name.value,
-			node_id, eci->esc_node_opens[i].oc_open_count, open_count);
+			eci->esc_channel_name.value, node_id, 
+			oc->oc_open_count, open_count);
+
+		eci->esc_total_opens -= oc->oc_open_count;
 		eci->esc_total_opens += open_count;
 		eci->esc_total_opens += open_count;
 		oc->oc_open_count = open_count;
 		oc->oc_open_count = open_count;
 		return 0;
 		return 0;
@@ -1150,7 +1170,7 @@ evt_add_node(struct in_addr addr, SaClmClusterNodeT *cn)
 	}
 	}
 
 
 	*nlp = malloc(sizeof(struct member_node_data));
 	*nlp = malloc(sizeof(struct member_node_data));
-	if (!nlp) {
+	if (!(*nlp)) {
 			return SA_AIS_ERR_NO_MEMORY;
 			return SA_AIS_ERR_NO_MEMORY;
 	}
 	}
 	nl = *nlp;
 	nl = *nlp;
@@ -1168,50 +1188,25 @@ an_out:
 	return err;
 	return err;
 }
 }
 
 
-#ifdef REMOVE_NODE
-static SaErrorT
-evt_remove_node(struct in_addr addr) 
-{
-	struct member_node_data **nlp;
-	struct member_node_data *nl;
-	SaErrorT err = SA_AIS_ERR_NOT_EXIST;
-
-	nlp = lookup_node(addr);
-
-	if (!nlp) {
-		log_printf(LOG_LEVEL_DEBUG, "remove_node: Got NULL nlp?\n");
-		goto an_out;
-	}
-
-	if (!(*nlp)) {
-		goto an_out;
-	}
-
-	nl = *nlp;
-
-	list_del(&nl->mn_entry);
-	*nlp = nl->mn_next;
-	free(*nlp);
-	err = SA_AIS_OK;
-
-an_out:
-	return err;
-}
-#endif /* REMOVE_NODE */
-
 /*
 /*
  * Find the oldest node in the membership.  This is the one we choose to 
  * Find the oldest node in the membership.  This is the one we choose to 
  * perform some cluster-wide functions like distributing retained events.
  * perform some cluster-wide functions like distributing retained events.
+ * We only check nodes that were in our transitional configuration.  In this 
+ * way there is a recovery node chosen for each original partition in case 
+ * of a merge. 
  */
  */
 static struct member_node_data* oldest_node()
 static struct member_node_data* oldest_node()
 {
 {
-	struct list_head *l;
 	struct member_node_data *mn = 0;
 	struct member_node_data *mn = 0;
 	struct member_node_data *oldest = 0;
 	struct member_node_data *oldest = 0;
+	int i;
 
 
-	for (l = mnd.next; l != &mnd; l = l->next) {
-		mn = list_entry(l, struct member_node_data, mn_entry);
-		if (mn->mn_started == 0) {
+	for (i = 0; i < trans_member_count; i++) {
+		mn = evt_find_node(trans_member_list[i]);
+		if (!mn || (mn->mn_started == 0)) {
+			log_printf(LOG_LEVEL_ERROR, 
+				"Transitional config Node %s not active\n",
+				inet_ntoa(trans_member_list[i]));
 			continue;
 			continue;
 		}
 		}
 		if ((oldest == NULL) || 
 		if ((oldest == NULL) || 
@@ -2721,12 +2716,6 @@ static int evt_conf_change(
 					joined_list_entries,
 					joined_list_entries,
 					left_list_entries);
 					left_list_entries);
 
 
-	/*
-	 * TODO: Save transitional membership for selecting representative from
-	 * each partition to send retained events.
-	 */
-
-
 	/*
 	/*
 	 * Save the various membership lists for later processing by
 	 * Save the various membership lists for later processing by
 	 * the synchronization functions.  The left list is only
 	 * the synchronization functions.  The left list is only
@@ -2739,6 +2728,7 @@ static int evt_conf_change(
 	if (configuration_type == TOTEM_CONFIGURATION_TRANSITIONAL) {
 	if (configuration_type == TOTEM_CONFIGURATION_TRANSITIONAL) {
 
 
 		left_member_count = left_list_entries;
 		left_member_count = left_list_entries;
+		trans_member_count = member_list_entries;
 
 
 		if (left_member_list) {
 		if (left_member_list) {
 			free(left_member_list);
 			free(left_member_list);
@@ -2749,12 +2739,35 @@ static int evt_conf_change(
 				malloc(sizeof(struct in_addr) * left_list_entries);
 				malloc(sizeof(struct in_addr) * left_list_entries);
 			if (!left_member_list) {
 			if (!left_member_list) {
 				/* 
 				/* 
-			 	 * TODO: ERROR
+			 	 * ERROR: No recovery.
 		 		 */
 		 		 */
+				log_printf(LOG_LEVEL_ERROR, 
+						"Config change left list allocation error\n");
+				assert(0);
 			}
 			}
 			memcpy(left_member_list, left_list, 
 			memcpy(left_member_list, left_list, 
 					sizeof(struct in_addr) * left_list_entries);
 					sizeof(struct in_addr) * left_list_entries);
 		}
 		}
+
+		if (trans_member_list) {
+			free(trans_member_list);
+			trans_member_list = 0;
+		}
+		if (member_list_entries) {
+			trans_member_list = 
+				malloc(sizeof(struct in_addr) * member_list_entries);
+
+			if (!trans_member_list) {
+				/* 
+			 	 * ERROR: No recovery.
+		 		 */
+				log_printf(LOG_LEVEL_ERROR, 
+				  "Config change transitional member list allocation error\n");
+				assert(0);
+			}
+			memcpy(trans_member_list, member_list, 
+					sizeof(struct in_addr) * member_list_entries);
+		}
 	}
 	}
 
 
 	if (configuration_type == TOTEM_CONFIGURATION_REGULAR) {
 	if (configuration_type == TOTEM_CONFIGURATION_REGULAR) {
@@ -2771,8 +2784,11 @@ static int evt_conf_change(
 				malloc(sizeof(struct in_addr) * joined_list_entries);
 				malloc(sizeof(struct in_addr) * joined_list_entries);
 			if (!joined_member_list) {
 			if (!joined_member_list) {
 				/* 
 				/* 
-			 	 * TODO: ERROR
+			 	 * ERROR: No recovery.
 		 		 */
 		 		 */
+				log_printf(LOG_LEVEL_ERROR, 
+						"Config change joined list allocation error\n");
+				assert(0);
 			}
 			}
 			memcpy(joined_member_list, joined_list, 
 			memcpy(joined_member_list, joined_list, 
 					sizeof(struct in_addr) * joined_list_entries);
 					sizeof(struct in_addr) * joined_list_entries);
@@ -2789,8 +2805,11 @@ static int evt_conf_change(
 
 
 			if (!current_member_list) {
 			if (!current_member_list) {
 				/* 
 				/* 
-			 	 * TODO: ERROR
+			 	 * ERROR: No recovery.
 		 		 */
 		 		 */
+				log_printf(LOG_LEVEL_ERROR, 
+						"Config change member list allocation error\n");
+				assert(0);
 			}
 			}
 			memcpy(current_member_list, member_list, 
 			memcpy(current_member_list, member_list, 
 					sizeof(struct in_addr) * member_list_entries);
 					sizeof(struct in_addr) * member_list_entries);
@@ -2840,16 +2859,6 @@ static int evt_exec_init(void)
 {
 {
 	log_printf(LOG_LEVEL_DEBUG, "Evt exec init request\n");
 	log_printf(LOG_LEVEL_DEBUG, "Evt exec init request\n");
 
 
-#ifdef TODO
-	int res;
-	res = totempg_recovery_plug_create (&evt_recovery_plug_handle);
-	if (res != 0) {
-		log_printf(LOG_LEVEL_ERROR,
-			"Could not create recovery plug for event service.\n");
-		return (-1);
-	}
-#endif
-
 	/*
 	/*
 	 * Create an event to be sent when we have to drop messages
 	 * Create an event to be sent when we have to drop messages
 	 * for an application.
 	 * for an application.
@@ -2859,7 +2868,6 @@ static int evt_exec_init(void)
 	if (dropped_event == 0) {
 	if (dropped_event == 0) {
 		log_printf(LOG_LEVEL_ERROR, 
 		log_printf(LOG_LEVEL_ERROR, 
 				"Memory Allocation Failure, event service not started\n");
 				"Memory Allocation Failure, event service not started\n");
-// TODO		res = totempg_recovery_plug_destroy (evt_recovery_plug_handle);
 		errno = ENOMEM;
 		errno = ENOMEM;
 		return -1;
 		return -1;
 	}
 	}
@@ -3037,12 +3045,12 @@ static int evt_remote_recovery_evt(void *msg, struct in_addr source_addr,
 
 
 	now = clust_time_now();
 	now = clust_time_now();
 
 
-	log_printf(LOG_LEVEL_DEBUG, 
+	log_printf(RECOVERY_EVENT_DEBUG, 
 			"Remote recovery event data received from %s\n",
 			"Remote recovery event data received from %s\n",
 					inet_ntoa(source_addr));
 					inet_ntoa(source_addr));
 
 
 	if (recovery_phase == evt_recovery_complete) {
 	if (recovery_phase == evt_recovery_complete) {
-		log_printf(LOG_LEVEL_NOTICE, 
+		log_printf(RECOVERY_EVENT_DEBUG, 
 				"Received recovery data, not in recovery mode\n");
 				"Received recovery data, not in recovery mode\n");
 		return 0;
 		return 0;
 	}
 	}
@@ -3051,13 +3059,13 @@ static int evt_remote_recovery_evt(void *msg, struct in_addr source_addr,
 		convert_event(evtpkt);
 		convert_event(evtpkt);
 	}
 	}
 
 
-	log_printf(LOG_LEVEL_DEBUG, 
+	log_printf(RECOVERY_EVENT_DEBUG, 
 			"Processing recovery of retained events\n");
 			"Processing recovery of retained events\n");
 	if (recovery_node) {
 	if (recovery_node) {
-		log_printf(LOG_LEVEL_DEBUG, "This node is the recovery node\n");
+		log_printf(RECOVERY_EVENT_DEBUG, "This node is the recovery node\n");
 	}
 	}
 
 
-	log_printf(LOG_LEVEL_DEBUG, "(1)EVT ID: %llx, Time: %llx\n",
+	log_printf(RECOVERY_EVENT_DEBUG, "(1)EVT ID: %llx, Time: %llx\n",
 			evtpkt->led_event_id, evtpkt->led_retention_time);
 			evtpkt->led_event_id, evtpkt->led_retention_time);
 	/*
 	/*
 	 * Calculate remaining retention time
 	 * Calculate remaining retention time
@@ -3067,7 +3075,7 @@ static int evt_remote_recovery_evt(void *msg, struct in_addr source_addr,
 				evtpkt->led_receive_time, 
 				evtpkt->led_receive_time, 
 				now);
 				now);
 
 
-	log_printf(LOG_LEVEL_DEBUG, 
+	log_printf(RECOVERY_EVENT_DEBUG, 
 			"(2)EVT ID: %llx, ret: %llx, rec: %llx, now: %llx\n",
 			"(2)EVT ID: %llx, ret: %llx, rec: %llx, now: %llx\n",
 			evtpkt->led_event_id, 
 			evtpkt->led_event_id, 
 			evtpkt->led_retention_time, evtpkt->led_receive_time, now);
 			evtpkt->led_retention_time, evtpkt->led_receive_time, now);
@@ -3106,7 +3114,7 @@ static int evt_remote_recovery_evt(void *msg, struct in_addr source_addr,
 		 * know about.
 		 * know about.
 		 */
 		 */
 		if (!eci) {
 		if (!eci) {
-			log_printf(LOG_LEVEL_DEBUG, "Channel %s doesn't exist\n",
+			log_printf(RECOVERY_EVENT_DEBUG, "Channel %s doesn't exist\n",
 				evtpkt->led_chan_name.value);
 				evtpkt->led_chan_name.value);
 			return 0;
 			return 0;
 		}
 		}
@@ -3343,6 +3351,7 @@ static int evt_remote_chan_op(void *msg, struct in_addr source_addr,
 			log_printf(LOG_LEVEL_WARNING, 
 			log_printf(LOG_LEVEL_WARNING, 
 				"Evt remote channel op: Node data for addr %s is NULL\n",
 				"Evt remote channel op: Node data for addr %s is NULL\n",
 					inet_ntoa(source_addr));
 					inet_ntoa(source_addr));
+			assert(0);
 		} else {
 		} else {
 			evt_add_node(source_addr, cn);
 			evt_add_node(source_addr, cn);
 			mn = evt_find_node(source_addr);
 			mn = evt_find_node(source_addr);
@@ -3502,9 +3511,13 @@ static int evt_remote_chan_op(void *msg, struct in_addr source_addr,
 	 */
 	 */
 	case EVT_SET_ID_OP: {
 	case EVT_SET_ID_OP: {
 		struct in_addr my_addr;
 		struct in_addr my_addr;
+		int log_level = LOG_LEVEL_DEBUG;
 		my_addr.s_addr = my_node->nodeId;
 		my_addr.s_addr = my_node->nodeId;
-		log_printf(RECOVERY_DEBUG, 
-			"Received Set event ID OP from %x to %llx for %x my addr %x base %llx\n",
+		if (cpkt->u.chc_set_id.chc_addr.s_addr == my_addr.s_addr) {
+			log_level = RECOVERY_DEBUG;
+		}
+		log_printf(log_level, 
+			"Received Set event ID OP from %s to %llx for %x my addr %x base %llx\n",
 					inet_ntoa(source_addr), 
 					inet_ntoa(source_addr), 
 					cpkt->u.chc_set_id.chc_last_id,
 					cpkt->u.chc_set_id.chc_last_id,
 					cpkt->u.chc_set_id.chc_addr.s_addr,
 					cpkt->u.chc_set_id.chc_addr.s_addr,
@@ -3532,6 +3545,15 @@ static int evt_remote_chan_op(void *msg, struct in_addr source_addr,
 				"Evt open count msg from %s, but not in membership change\n",
 				"Evt open count msg from %s, but not in membership change\n",
 				inet_ntoa(source_addr));
 				inet_ntoa(source_addr));
 		}
 		}
+
+		/*
+		 * Zero out all open counts because we're setting then based
+		 * on each nodes local counts.
+		 */
+		if (!processed_open_counts) {
+			zero_chan_open_counts();
+			processed_open_counts = 1;
+		}
 		log_printf(RECOVERY_DEBUG, 
 		log_printf(RECOVERY_DEBUG, 
 				"Open channel count %s is %d for node 0x%x\n",
 				"Open channel count %s is %d for node 0x%x\n",
 				cpkt->u.chc_set_opens.chc_chan_name.value, 
 				cpkt->u.chc_set_opens.chc_chan_name.value, 
@@ -3582,28 +3604,40 @@ static int evt_remote_chan_op(void *msg, struct in_addr source_addr,
 			 * retained events.
 			 * retained events.
 			 */
 			 */
 			mn = oldest_node();
 			mn = oldest_node();
-			if (mn->mn_node_info.nodeId == my_node_id) {
-				log_printf(RECOVERY_DEBUG, "I am oldest\n");
+			if (mn && mn->mn_node_info.nodeId == my_node_id) {
+				log_printf(RECOVERY_DEBUG, 
+					"I am oldest in my transitional config\n");
+				recovery_node = 1;
 				recovery_phase = evt_send_retained_events;
 				recovery_phase = evt_send_retained_events;
 			} else {
 			} else {
-				recovery_phase = evt_wait_send_retained_events;
+				recovery_phase = evt_send_retained_events_done;
+				recovery_node = 0;
 			}
 			}
+			checked_in = 0;
 		}
 		}
 		break;
 		break;
 	}
 	}
 
 
 	/*
 	/*
-	 * OK, We're done with recovery, continue operations.
+	 * Count up the nodes again, when all the nodes have responded, we've
+	 * distributed the retained events and we're done with recovery and can
+	 * continue operations.
 	 */
 	 */
 	case EVT_CONF_DONE: {
 	case EVT_CONF_DONE: {
 		log_printf(RECOVERY_DEBUG, 
 		log_printf(RECOVERY_DEBUG, 
-				"Receive EVT_CONF_DONE from %s\n", 
-				inet_ntoa(source_addr));
-		recovery_phase = evt_recovery_complete;
-
+				"Receive EVT_CONF_DONE from %s, members %d checked in %d\n", 
+				inet_ntoa(source_addr),
+					total_member_count, checked_in+1);
+		if (++checked_in == total_member_count) {
+			/*
+			 * All recovery complete, carry on.
+			 */
+			recovery_phase = evt_recovery_complete;
 #ifdef DUMP_CHAN_INFO
 #ifdef DUMP_CHAN_INFO
-		dump_all_chans();
+			dump_all_chans();
 #endif
 #endif
+		}
+
 		break;
 		break;
 	}
 	}
 
 
@@ -3662,17 +3696,32 @@ static void evt_sync_init(void)
 	}
 	}
 
 
 	/*
 	/*
-	 * set up for recovery processing
+	 * set up for recovery processing, first phase:
 	 */
 	 */
 	recovery_phase = evt_send_event_id;
 	recovery_phase = evt_send_event_id;
-	add_list = joined_member_list;
-	add_count = joined_member_count;
+	
+	/*
+	 * List used to distribute last know event IDs.
+	 */
+	add_list = current_member_list;
+	add_count = total_member_count;
+	processed_open_counts = 0;
 
 
+	/*
+	 * List used for distributing open channel counts
+	 */
 	next_chan = esc_head.next;
 	next_chan = esc_head.next;
-	checked_in = 0;
 
 
+	/*
+	 * List used for distributing retained events
+	 */
 	next_retained = retained_list.next;
 	next_retained = retained_list.next;
 
 
+	/*
+	 * Member check in counts for open channel counts and 
+	 * retained events.
+	 */
+	checked_in = 0;
 }
 }
 
 
 /*
 /*
@@ -3724,30 +3773,28 @@ static int evt_sync_process(void)
 			 */
 			 */
 			md = evt_find_node(*add_list);
 			md = evt_find_node(*add_list);
 			if (md != NULL) {
 			if (md != NULL) {
-				if (!md->mn_started) {
+				log_printf(RECOVERY_DEBUG, 
+					"Send set evt ID %llx to %s\n",
+					md->mn_last_evt_id, inet_ntoa(*add_list));
+				md->mn_started = 1;
+				memset(&cpkt, 0, sizeof(cpkt));
+				cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
+				cpkt.chc_head.size = sizeof(cpkt);
+				cpkt.chc_op = EVT_SET_ID_OP;
+				cpkt.u.chc_set_id.chc_addr = *add_list;
+				cpkt.u.chc_set_id.chc_last_id = 
+									md->mn_last_evt_id & BASE_ID_MASK;
+				chn_iovec.iov_base = &cpkt;
+				chn_iovec.iov_len = cpkt.chc_head.size;
+				res = totempg_mcast (&chn_iovec, 1,TOTEMPG_AGREED);
+				if (res != 0) {
 					log_printf(RECOVERY_DEBUG, 
 					log_printf(RECOVERY_DEBUG, 
-						"end set evt ID %llx to %s\n",
-						md->mn_last_evt_id, inet_ntoa(*add_list));
-					md->mn_started = 1;
-					memset(&cpkt, 0, sizeof(cpkt));
-					cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
-					cpkt.chc_head.size = sizeof(cpkt);
-					cpkt.chc_op = EVT_SET_ID_OP;
-					cpkt.u.chc_set_id.chc_addr = *add_list;
-					cpkt.u.chc_set_id.chc_last_id = 
-										md->mn_last_evt_id & BASE_ID_MASK;
-					chn_iovec.iov_base = &cpkt;
-					chn_iovec.iov_len = cpkt.chc_head.size;
-					res = totempg_mcast (&chn_iovec, 1,TOTEMPG_AGREED);
-					if (res != 0) {
-						log_printf(RECOVERY_DEBUG, 
-							"Unable to send event id to %s\n", 
-							inet_ntoa(*add_list));
-						/*
-						 * We'll try again later.
-						 */
-						return 1;
-					}
+						"Unable to send event id to %s\n", 
+						inet_ntoa(*add_list));
+					/*
+					 * We'll try again later.
+					 */
+					return 1;
 				}
 				}
 
 
 			} else {
 			} else {
@@ -3757,10 +3804,15 @@ static int evt_sync_process(void)
 				cn = clm_get_by_nodeid(*add_list);
 				cn = clm_get_by_nodeid(*add_list);
 				if (!cn) {
 				if (!cn) {
 					/*
 					/*
-					 * TODO: Error, shouldn't happen
+					 * Error: shouldn't happen
 					 */
 					 */
+					log_printf(LOG_LEVEL_ERROR,
+							"recovery error node: %s not found\n",
+							inet_ntoa(*add_list));
+					assert(0);
+				} else {
+					evt_add_node(*add_list, cn);
 				}
 				}
-				evt_add_node(*add_list, cn);
 			}
 			}
 
 
 			add_list++;
 			add_list++;
@@ -3844,7 +3896,6 @@ static int evt_sync_process(void)
 	 */
 	 */
 	case evt_send_retained_events:
 	case evt_send_retained_events:
 	{
 	{
-		struct req_evt_chan_command cpkt;
 		struct iovec chn_iovec;
 		struct iovec chn_iovec;
 		struct event_data *evt;
 		struct event_data *evt;
 		int res;
 		int res;
@@ -3871,6 +3922,17 @@ static int evt_sync_process(void)
 				return -1;
 				return -1;
 			}
 			}
 		}
 		}
+
+		recovery_phase = evt_send_retained_events_done;
+		return 1;
+	}
+
+	case evt_send_retained_events_done:
+	{
+		struct req_evt_chan_command cpkt;
+		struct iovec chn_iovec;
+		int res;
+
 		log_printf(RECOVERY_DEBUG, "DONE Sending retained events\n");
 		log_printf(RECOVERY_DEBUG, "DONE Sending retained events\n");
 		memset(&cpkt, 0, sizeof(cpkt));
 		memset(&cpkt, 0, sizeof(cpkt));
 		cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
 		cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;