Просмотр исходного кода

New event service recovery code implementation for totem and the new
sync services.

(Logical change 1.144)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@514 fd59a12c-fef9-0310-b244-a6a79926bd2f

Mark Haverkamp 21 лет назад
Родитель
Сommit
58cf2b9a20
1 измененных файлов с 479 добавлено и 314 удалено
  1. 479 314
      exec/evt.c

+ 479 - 314
exec/evt.c

@@ -94,6 +94,15 @@ static int evt_initialize(struct conn_info *conn_info, void *msg);
 static int evt_finalize(struct conn_info *conn_info);
 static int evt_exec_init(void);
 
+/*
+ * Recovery sync functions
+ */
+static void evt_sync_init(void);
+static int evt_sync_process(void);
+static void evt_sync_activate(void);
+static void evt_sync_abort(void);
+
+
 static struct libais_handler evt_libais_handlers[] = {
 	{
 	.libais_handler_fn = 	message_handler_req_lib_activatepoll,
@@ -173,7 +182,11 @@ struct service_handler evt_service_handler = {
 	.libais_init_fn				= evt_initialize,
 	.libais_exit_fn				= evt_finalize,
 	.exec_init_fn				= evt_exec_init,
-	.exec_dump_fn				= 0
+	.exec_dump_fn				= 0,
+	.sync_init					= evt_sync_init,
+	.sync_process				= evt_sync_process,
+	.sync_activate				= evt_sync_activate,
+	.sync_abort					= evt_sync_abort
 };
 
 // TODO static totempg_recovery_plug_handle evt_recovery_plug_handle;
@@ -203,35 +216,76 @@ static DECLARE_LIST_INIT(esc_unlinked_head);
 static DECLARE_LIST_INIT(ci_head);
 
 
+/*
+ * Track the state of event service recovery.
+ *
+ *	evt_recovery_complete:			Normal operational mode
+ *
+ *	evt_send_event_id:				Node is sending known last 
+ *									event IDs.
+ *
+ *	evt_send_open_count:			Node is sending its open
+ *									Channel information.
+ *
+ *	evt_wait_open_count_done:		Node is done sending open channel data and
+ *									is waiting for the other nodes to finish.
+ *
+ *	evt_send_retained_events:		Node is sending retained event data.
+ *
+ *	evt_wait_send_retained_events:	Node is waiting for other nodes to 
+ *									finish sending retained event data.
+ */
+enum recovery_phases {
+	evt_recovery_complete,
+	evt_send_event_id,
+	evt_send_open_count,
+	evt_wait_open_count_done,
+	evt_send_retained_events,
+	evt_wait_send_retained_events
+};
+
 /*
  * Global varaibles used by the event service
  *
- * base_id_top:		upper bits of next event ID to assign
- * base_id:			Lower bits of Next event ID to assign
- * my_node_id:		My cluster node id
- * in_cfg_change:	Config change occurred.  Figure out who sends retained evts.
- * 					cleared when retained events have been delivered.
- * total_members:	how many members in this cluster
- * checked_in:		keep track during config change.
- * any_joined:		did any nodes join on this change?
- * recovery_node:	True if we're the recovery node.
- * tok_call_handle:	totempg token callback handle for recovery.
- * next_retained:	pointer to next retained message to send during recovery.
- * next_chan:		pointer to next channel to send during recovery.
+ * base_id_top:			upper bits of next event ID to assign
+ * base_id:				Lower bits of Next event ID to assign
+ * my_node_id:			My cluster node id
+ * total_member_count:	how many members in this cluster
+ * joined_member_count:	How many joined this configuration.
+ * left_member_count:	How many left this configuration.
+ * checked_in:			keep track during config change.
+ * recovery_node:		True if we're the recovery node.
+ * next_retained:		pointer to next retained message to send 
+ * 						during recovery.
+ * next_chan:			pointer to next channel to send during recovery.
+ * recovery_phase:		Indicates what recovery is taking place.
+ * left_member_list:	Members that left this config
+ * joined_member_list:	Members that joined this config
+ * current_member_list:	Total membership this config
+ * add_list:			pointer to joined list used for sending event id
+ * 						recovery data.
+ * add_list_count:		count of joined members used for sending event id
+ * 						recovery data.
  *
  */
+
 #define BASE_ID_MASK 0xffffffffLL
-static SaEvtEventIdT base_id = 0;
-static SaEvtEventIdT base_id_top = 0;
-static SaClmNodeIdT  my_node_id = 0;
-static int			 in_cfg_change = 0;
-static int			 total_members = 0;
-static int 			 checked_in = 0;
-static int			 any_joined = 0;
-static int			 recovery_node = 0;
-static void 		 *tok_call_handle = 0;
+static SaEvtEventIdT 	base_id = 0;
+static SaEvtEventIdT 	base_id_top = 0;
+static SaClmNodeIdT  	my_node_id = 0;
+static int			 	total_member_count = 0;
+static int				joined_member_count = 0;
+static int				left_member_count = 0;
+static int 			 	checked_in = 0;
+static int			 	recovery_node = 0;
 static struct list_head *next_retained = 0;
 static struct list_head *next_chan = 0;
+static enum recovery_phases recovery_phase = evt_recovery_complete;
+static struct in_addr 	*left_member_list = 0;
+static struct in_addr 	*joined_member_list = 0;
+static struct in_addr 	*current_member_list = 0;
+static struct in_addr 	*add_list = 0;
+static int				add_count = 0;
 
 /*
  * Structure to track pending channel open requests.
@@ -608,13 +662,15 @@ static struct event_svr_channel_instance *create_channel(SaNameT *cn)
 	memset(eci, 0, sizeof(*eci));
 	list_init(&eci->esc_entry);
 	list_init(&eci->esc_open_chans);
-	eci->esc_oc_size = total_members;
-	eci->esc_node_opens = malloc(sizeof(struct open_count) * total_members);
+	eci->esc_oc_size = total_member_count;
+	eci->esc_node_opens = 
+			malloc(sizeof(struct open_count) * total_member_count);
 	if (!eci->esc_node_opens) {
 		free(eci);
 		return 0;
 	}
-	memset(eci->esc_node_opens, 0, sizeof(struct open_count) * total_members);
+	memset(eci->esc_node_opens, 0, 
+			sizeof(struct open_count) * total_member_count);
 	eci->esc_channel_name = *cn;
 	eci->esc_channel_name.value[eci->esc_channel_name.length] = '\0';
 	list_add(&eci->esc_entry, &esc_head);
@@ -629,17 +685,18 @@ static struct event_svr_channel_instance *create_channel(SaNameT *cn)
  */
 static int check_open_size(struct event_svr_channel_instance *eci)
 {
-	if (total_members > eci->esc_oc_size) {
+	if (total_member_count > eci->esc_oc_size) {
 		eci->esc_node_opens = realloc(eci->esc_node_opens, 
-							sizeof(struct open_count) * total_members);
+							sizeof(struct open_count) * total_member_count);
 		if (!eci->esc_node_opens) {
 			log_printf(LOG_LEVEL_WARNING, 
 					"Memory error realloc of node list\n");
 			return -1;
 		}
 		memset(&eci->esc_node_opens[eci->esc_oc_size], 0, 
-			sizeof(struct open_count) * (total_members - eci->esc_oc_size));
-		eci->esc_oc_size = total_members;
+			sizeof(struct open_count) * 
+					(total_member_count - eci->esc_oc_size));
+		eci->esc_oc_size = total_member_count;
 	}
 	return 0;
 }
@@ -835,7 +892,8 @@ static void delete_channel(struct event_svr_channel_instance *eci)
 		/*
 		 * adjust if we're sending open counts on a config change.
 		 */
-		if (in_cfg_change && (&eci->esc_entry == next_chan)) {
+		if ((recovery_phase != evt_recovery_complete) && 
+								(&eci->esc_entry == next_chan)) {
 			next_chan = eci->esc_entry.next;
 		}
 
@@ -1161,165 +1219,6 @@ static struct member_node_data* oldest_node()
 }
 
 
-/*
- * Token callback routine.  Send as many mcasts as possible to distribute
- * retained events on a config change.
- */
-static int send_next_retained(void *data)
-{
-	struct req_evt_chan_command cpkt;
-	struct iovec chn_iovec;
-	struct event_data *evt;
-	int res;
-
-	if (in_cfg_change && recovery_node) {
-		/*
-		 * Process messages.  When we're done, send the done message
-		 * to the nodes.
-		 */
-		for (;next_retained != &retained_list; 
-								next_retained = next_retained->next) {
-			log_printf(LOG_LEVEL_DEBUG, "Sending next retained event\n");
-			evt = list_entry(next_retained, struct event_data, ed_retained);
-			evt->ed_event.led_head.id = MESSAGE_REQ_EXEC_EVT_RECOVERY_EVENTDATA;
-			chn_iovec.iov_base = &evt->ed_event;
-			chn_iovec.iov_len = evt->ed_event.led_head.size;
-			res = totempg_mcast(&chn_iovec, 1, TOTEMPG_AGREED);
-
-			if (res != 0) {
-			/*
-			 * Try again later.
-			 */
-				return -1;
-			}
-		}
-		log_printf(RECOVERY_DEBUG, "DONE Sending retained events\n");
-		memset(&cpkt, 0, sizeof(cpkt));
-		cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
-		cpkt.chc_head.size = sizeof(cpkt);
-		cpkt.chc_op = EVT_CONF_DONE;
-		chn_iovec.iov_base = &cpkt;
-		chn_iovec.iov_len = cpkt.chc_head.size;
-		res = totempg_mcast (&chn_iovec, 1, TOTEMPG_AGREED);
-	}
-	tok_call_handle = 0;
-	return 0;
-}
-
-/*
- * Send our retained events. If we've been chosen as the recovery node, kick
- * kick off the process of sending retained events.
- */
-static void send_retained()
-{
-	struct req_evt_chan_command cpkt;
-	struct iovec chn_iovec;
-	int res = 0;
-
-	if (list_empty(&retained_list) || !any_joined) {
-		memset(&cpkt, 0, sizeof(cpkt));
-		cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
-		cpkt.chc_head.size = sizeof(cpkt);
-		cpkt.chc_op = EVT_CONF_DONE;
-		chn_iovec.iov_base = &cpkt;
-		chn_iovec.iov_len = cpkt.chc_head.size;
-		log_printf(RECOVERY_DEBUG, "No messages to send\n");
-		res = totempg_mcast (&chn_iovec, 1, TOTEMPG_AGREED);
-	} else {
-		log_printf(RECOVERY_DEBUG, 
-					"Start sending retained messages\n");
-		recovery_node = 1;
-		next_retained = retained_list.next;
-// TODO		res = totempg_token_callback_create(&tok_call_handle, send_next_retained,
-//				NULL);
-	}
-	if (res != 0) {
-		log_printf(LOG_LEVEL_ERROR, "ERROR sending evt recovery data\n");
-	}
-}
-
-/*
- * 	Token callback routine.  Send as many mcasts as possible to distribute
- *  open counts on a config change.
- */
-static int send_next_open_count(void *data)
-{
-	struct req_evt_chan_command cpkt;
-	struct iovec chn_iovec;
-	struct event_svr_channel_instance *eci;
-	int res;
-
-	if (in_cfg_change) {
-		/*
-		 * Process messages.  When we're done, send the done message
-		 * to the nodes.
-		 */
-		memset(&cpkt, 0, sizeof(cpkt));
-		for (;next_chan != &esc_head; 
-								next_chan = next_chan->next) {
-			log_printf(RECOVERY_DEBUG, "Sending next open count\n");
-			eci = list_entry(next_chan, struct event_svr_channel_instance, 
-					esc_entry);
-			cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
-			cpkt.chc_head.size = sizeof(cpkt);
-			cpkt.chc_op = EVT_OPEN_COUNT;
-			cpkt.u.chc_set_opens.chc_chan_name = eci->esc_channel_name;
-			cpkt.u.chc_set_opens.chc_open_count = eci->esc_local_opens;
-			chn_iovec.iov_base = &cpkt;
-			chn_iovec.iov_len = cpkt.chc_head.size;
-			res = totempg_mcast(&chn_iovec, 1,TOTEMPG_AGREED);
-
-			if (res != 0) {
-			/*
-			 * Try again later.
-			 */
-				return -1;
-			}
-		}
-		log_printf(RECOVERY_DEBUG, "DONE Sending open counts\n");
-		memset(&cpkt, 0, sizeof(cpkt));
-		cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
-		cpkt.chc_head.size = sizeof(cpkt);
-		cpkt.chc_op = EVT_OPEN_COUNT_DONE;
-		chn_iovec.iov_base = &cpkt;
-		chn_iovec.iov_len = cpkt.chc_head.size;
-		res = totempg_mcast (&chn_iovec, 1,TOTEMPG_AGREED);
-	}
-	tok_call_handle = 0;
-	return 0;
-}
-
-/*
- * kick off the process of sending open channel counts during recovery.
- * Every node does this.
- */
-static void send_open_count()
-{
-	struct req_evt_chan_command cpkt;
-	struct iovec chn_iovec;
-	int res;
-
-	if (list_empty(&esc_head)) {
-		memset(&cpkt, 0, sizeof(cpkt));
-		cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
-		cpkt.chc_head.size = sizeof(cpkt);
-		cpkt.chc_op = EVT_OPEN_COUNT_DONE;
-		chn_iovec.iov_base = &cpkt;
-		chn_iovec.iov_len = cpkt.chc_head.size;
-		log_printf(RECOVERY_DEBUG, "No channels to send\n");
-		res = totempg_mcast (&chn_iovec, 1,TOTEMPG_AGREED);
-	} else {
-		log_printf(RECOVERY_DEBUG, 
-					"Start sending open channel count\n");
-		next_chan = esc_head.next;
-// TODO		res = totempg_token_callback_create(&tok_call_handle, send_next_open_count,
-//				NULL);
-	}
-	if (res != 0) {
-		log_printf(LOG_LEVEL_ERROR, "ERROR sending evt recovery data\n");
-	}
-}
-
 /*
  * keep track of the last event ID from a node.
  * If we get an event ID less than our last, we've already
@@ -1432,7 +1331,7 @@ event_retention_timeout(void *data)
 	 * adjust next_retained if we're in recovery and 
 	 * were in charge of sending retained events.
 	 */
-	if (in_cfg_change && recovery_node) {
+	if (recovery_phase != evt_recovery_complete && recovery_node) {
 		if (next_retained == &edp->ed_retained) {
 			next_retained = edp->ed_retained.next;
 		}
@@ -1958,7 +1857,10 @@ make_local_event(struct lib_event_data *p,
 	ed_size = sizeof(*ed) + p->led_user_data_offset + p->led_user_data_size;
 	ed = malloc(ed_size);
 	if (!ed) {
-			return 0;
+		log_printf(LOG_LEVEL_WARNING, 
+			"Failed to allocate %u bytes for event, offset %u, data size %u\n",
+				ed_size, p->led_user_data_offset, p->led_user_data_size);
+		return 0;
 	}
 	memset(ed, 0, ed_size);
 	list_init(&ed->ed_retained);
@@ -2802,137 +2704,87 @@ static int evt_conf_change(
 			int joined_list_entries,
 		struct memb_ring_id *ring_id)
 {
-	struct in_addr my_node = {SA_CLM_LOCAL_NODE_ID};
-	SaClmClusterNodeT *cn;
-	static int first = 1;
-	struct sockaddr_in *add_list;
-	struct member_node_data *md;
-	int add_count;
-	struct req_evt_chan_command cpkt;
-	struct iovec chn_iovec;
-	int res;
-
-
-	/*  
-	 *  TODO required for open count accounting 
-	 *  until the recovery code is re-enabled.
-	 */
-	total_members = member_list_entries;
-
-	/*
-	 * Set the base event id
-	 */
-	cn = clm_get_by_nodeid(my_node);
-	if (!base_id_top) {
-		log_printf(RECOVERY_DEBUG, "My node ID 0x%x\n", cn->nodeId);
-		my_node_id = cn->nodeId;
-		set_event_id(my_node_id);
-	}
-
-	return (0); // TODO 
-	log_printf(LOG_LEVEL_DEBUG, "Evt conf change %d\n", 
+	log_printf(RECOVERY_DEBUG, "Evt conf change %d\n", 
 			configuration_type);
-	log_printf(LOG_LEVEL_DEBUG, "m %d, j %d, l %d\n", 
+	log_printf(RECOVERY_DEBUG, "m %d, j %d, l %d\n", 
 					member_list_entries,
 					joined_list_entries,
 					left_list_entries);
+
 	/*
-	 * Stop any recovery callbacks in progress.
+	 * TODO: Save transitional membership for selecting representative from
+	 * each partition to send retained events.
 	 */
-	if (tok_call_handle) {
-// TODO		totempg_token_callback_destroy(tok_call_handle);
-		tok_call_handle = 0;
-	}
+
 
 	/*
-	 * Don't seem to be able to tell who joined if we're just coming up. Not all
-	 * nodes show up in the join list.  If this is the first time through,
-	 * choose the members list to use to add nodes, after that use the join
-	 * list.  Always use the left list for removing nodes.
+	 * Save the various membership lists for later processing by
+	 * the synchronization functions.  The left list is only
+	 * valid in the transitional configuration, the joined list is
+	 * only valid in the regular configuration.  Other than for the 
+	 * purposes of delivering retained events from merging partitions, 
+	 * we only care about the final membership from the regular
+	 * configuration.
 	 */
-	if (first) {
-//j			add_list = member_list;
-//			add_count = member_list_entries;
-			first = 0;
-	} else {
-//			add_list = joined_list;
-//			add_count = joined_list_entries;
-	}
+	if (configuration_type == TOTEMPG_CONFIGURATION_TRANSITIONAL) {
 
-	while (add_count--) {
-		/*
-		 * If we've seen this node before, send out the last event ID 
-		 * that we've seen from him.  He will set his base event ID to
-		 * the highest one seen.
-		 */
-		md = evt_find_node(add_list->sin_addr);
-		if (md != NULL) {
-			if (!md->mn_started) {
-				log_printf(RECOVERY_DEBUG, 
-					"end set evt ID %llx to %s\n",
-					md->mn_last_evt_id, inet_ntoa(add_list->sin_addr));
-				md->mn_started = 1;
-				memset(&cpkt, 0, sizeof(cpkt));
-				cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
-				cpkt.chc_head.size = sizeof(cpkt);
-				cpkt.chc_op = EVT_SET_ID_OP;
-				cpkt.u.chc_set_id.chc_addr = add_list->sin_addr;
-				cpkt.u.chc_set_id.chc_last_id = 
-										md->mn_last_evt_id & BASE_ID_MASK;
-				chn_iovec.iov_base = &cpkt;
-				chn_iovec.iov_len = cpkt.chc_head.size;
-				res = totempg_mcast (&chn_iovec, 1,TOTEMPG_AGREED);
-				if (res != 0) {
-					log_printf(LOG_LEVEL_WARNING, 
-						"Unable to send event id to %s\n", 
-						inet_ntoa(add_list->sin_addr));
-				}
+		left_member_count = left_list_entries;
+
+		if (left_member_list) {
+			free(left_member_list);
+			left_member_list = 0;
+		}
+		if (left_list_entries) {
+			left_member_list = 
+				malloc(sizeof(struct in_addr) * left_list_entries);
+			if (!left_member_list) {
+				/* 
+			 	 * TODO: ERROR
+		 		 */
 			}
+			memcpy(left_member_list, left_list, 
+					sizeof(struct in_addr) * left_list_entries);
 		}
-		add_list++;
 	}
 
-	while (left_list_entries--) {
-// TODO		md = evt_find_node(left_list);
-		if (md == 0) {
-			log_printf(LOG_LEVEL_WARNING, 
-					"Can't find cluster node at %s\n",
-							inet_ntoa(left_list[0]));
-		/*
-		 * Mark this one as down.
-		 */
-		} else {
-			log_printf(RECOVERY_DEBUG, "cluster node at %s down\n",
-							inet_ntoa(left_list[0]));
-			md->mn_started = 0;
-			remove_chan_open_info(md->mn_node_info.nodeId);
-		}
-		left_list++;
-	}
+	if (configuration_type == TOTEMPG_CONFIGURATION_REGULAR) {
 
+		joined_member_count = joined_list_entries;
+		total_member_count = member_list_entries;
 
-	/*
-	 * Notify that a config change happened.  The exec handler will
-	 * then determine what to do.
-	 */
-	if (configuration_type == TOTEMPG_CONFIGURATION_REGULAR) {
-		if (in_cfg_change) {
-			log_printf(LOG_LEVEL_NOTICE, 
-				"Already in config change, Starting over, m %d, c %d\n",
-					total_members, checked_in);
+		if (joined_member_list) {
+			free(joined_member_list);
+			joined_member_list = 0;
+		}
+		if (joined_list_entries) {
+			joined_member_list = 
+				malloc(sizeof(struct in_addr) * joined_list_entries);
+			if (!joined_member_list) {
+				/* 
+			 	 * TODO: ERROR
+		 		 */
+			}
+			memcpy(joined_member_list, joined_list, 
+					sizeof(struct in_addr) * joined_list_entries);
 		}
 
-		in_cfg_change = 1;
-		total_members = member_list_entries;
-		checked_in = 0;
-		any_joined = joined_list_entries;
 
-		/*
-	   	 * Start by updating all the nodes on our
-	 	 * open channel count. Once that is done, proceed to determining who
-	 	 * sends ratained events.  Then we can start normal operation again.
-	 	 */
-		send_open_count();
+		if (current_member_list) {
+			free(current_member_list);
+			current_member_list = 0;
+		}
+		if (member_list_entries) {
+			current_member_list = 
+				malloc(sizeof(struct in_addr) * member_list_entries);
+
+			if (!current_member_list) {
+				/* 
+			 	 * TODO: ERROR
+		 		 */
+			}
+			memcpy(current_member_list, member_list, 
+					sizeof(struct in_addr) * member_list_entries);
+		}
 	}
 
 	return 0;
@@ -3050,7 +2902,7 @@ static int evt_remote_evt(void *msg, struct in_addr source_addr,
 			/*
 			 * Not sure how this can happen...
 			 */
-			log_printf(LOG_LEVEL_NOTICE, "No cluster node data for %s\n",
+			log_printf(LOG_LEVEL_DEBUG, "No cluster node data for %s\n",
 							inet_ntoa(source_addr));
 			errno = ENXIO;
 			return -1;
@@ -3179,7 +3031,7 @@ static int evt_remote_recovery_evt(void *msg, struct in_addr source_addr,
 			"Remote recovery event data received from %s\n",
 					inet_ntoa(source_addr));
 
-	if (!in_cfg_change) {
+	if (recovery_phase == evt_recovery_complete) {
 		log_printf(LOG_LEVEL_NOTICE, 
 				"Received recovery data, not in recovery mode\n");
 		return 0;
@@ -3665,7 +3517,7 @@ static int evt_remote_chan_op(void *msg, struct in_addr source_addr,
 	 * open so that it can be removed when no one else has it open anymore.
 	 */
 	case EVT_OPEN_COUNT:
-		if (!in_cfg_change) {
+		if (recovery_phase == evt_recovery_complete) {
 			log_printf(LOG_LEVEL_ERROR, 
 				"Evt open count msg from %s, but not in membership change\n",
 				inet_ntoa(source_addr));
@@ -3700,31 +3552,32 @@ static int evt_remote_chan_op(void *msg, struct in_addr source_addr,
 	 * the current membership, determine who delivers any retained events.
 	 */
 	case EVT_OPEN_COUNT_DONE: {
-		if (!in_cfg_change) {
+		if (recovery_phase == evt_recovery_complete) {
 			log_printf(LOG_LEVEL_ERROR, 
 				"Evt config msg from %s, but not in membership change\n",
 				inet_ntoa(source_addr));
 		}
 		log_printf(RECOVERY_DEBUG, 
 			"Receive EVT_CONF_CHANGE_DONE from %s members %d checked in %d\n",
-				inet_ntoa(source_addr), total_members, checked_in+1);
+				inet_ntoa(source_addr), total_member_count, checked_in+1);
 		if (!mn) {
 			log_printf(RECOVERY_DEBUG, 
 				"NO NODE DATA AVAILABLE FOR %s\n",
 					inet_ntoa(source_addr));
 		}
 
-		if (++checked_in == total_members) {
+		if (++checked_in == total_member_count) {
 			/*
 			 * We're all here, now figure out who should send the
-			 * retained events, if any.
+			 * retained events.
 			 */
 			mn = oldest_node();
 			if (mn->mn_node_info.nodeId == my_node_id) {
 				log_printf(RECOVERY_DEBUG, "I am oldest\n");
-				send_retained();
+				recovery_phase = evt_send_retained_events;
+			} else {
+				recovery_phase = evt_wait_send_retained_events;
 			}
-			
 		}
 		break;
 	}
@@ -3736,8 +3589,8 @@ static int evt_remote_chan_op(void *msg, struct in_addr source_addr,
 		log_printf(RECOVERY_DEBUG, 
 				"Receive EVT_CONF_DONE from %s\n", 
 				inet_ntoa(source_addr));
-		in_cfg_change = 0;
-// TODO		totempg_recovery_plug_unplug (evt_recovery_plug_handle);
+		recovery_phase = evt_recovery_complete;
+
 #ifdef DUMP_CHAN_INFO
 		dump_all_chans();
 #endif
@@ -3752,6 +3605,318 @@ static int evt_remote_chan_op(void *msg, struct in_addr source_addr,
 
 	return 0;
 }
+
+/*
+ * Set up initial conditions for processing event service
+ * recovery.
+ */
+static void evt_sync_init(void) 
+{
+	SaClmClusterNodeT *cn;
+	struct member_node_data *md;
+	struct in_addr my_node = {SA_CLM_LOCAL_NODE_ID};
+	int left_list_entries = left_member_count;
+	struct in_addr *left_list = left_member_list;
+
+	log_printf(RECOVERY_DEBUG, "Evt synchronize initialization\n");
+
+	/*
+	 * Set the base event id
+	 */
+	if (!my_node_id) {
+		cn = clm_get_by_nodeid(my_node);
+		log_printf(RECOVERY_DEBUG, "My node ID 0x%x\n", cn->nodeId);
+		my_node_id = cn->nodeId;
+		set_event_id(my_node_id);
+	}
+
+	/*
+	 * account for nodes that left the membership
+	 */
+	while (left_list_entries--) {
+		md = evt_find_node(*left_list);
+		if (md == 0) {
+			log_printf(LOG_LEVEL_WARNING, 
+					"Can't find cluster node at %s\n",
+							inet_ntoa(left_list[0]));
+		/*
+		 * Mark this one as down.
+		 */
+		} else {
+			log_printf(RECOVERY_DEBUG, "cluster node at %s down\n",
+							inet_ntoa(left_list[0]));
+			md->mn_started = 0;
+			remove_chan_open_info(md->mn_node_info.nodeId);
+		}
+		left_list++;
+	}
+
+	/*
+	 * set up for recovery processing
+	 */
+	recovery_phase = evt_send_event_id;
+	add_list = joined_member_list;
+	add_count = joined_member_count;
+
+	next_chan = esc_head.next;
+	checked_in = 0;
+
+	next_retained = retained_list.next;
+
+}
+
+/*
+ * Handle event service recovery.  It passes through a number of states to 
+ * finish the recovery.
+ * 
+ * First, the node broadcasts the highest event ID that it has seen for any
+ * joinig node.  This helps to make sure that rejoining nodes don't re-use
+ * event IDs that have already been seen.
+ * 
+ * Next, The node broadcasts its open channel information to the other nodes.
+ * This makes sure that any joining nodes have complete data on any channels
+ * already open.
+ *
+ * Once done sending open channel information the node waits in a state for 
+ * the rest of the nodes to finish sending their data.  When the last node
+ * has checked in, then the remote channel operation handler selects the next
+ * state which is evt_send_retained_events if this is the oldest node in the
+ * cluster, or otherwise to evt_wait_send_retained_events to wait for the 
+ * retained events to be sent.  When the retained events have been sent, the
+ * state is changed to evt_recovery_complete and this function exits with
+ * zero to inidicate that recovery is done.
+ */
+static int evt_sync_process(void)
+{
+
+	log_printf(RECOVERY_DEBUG, "Process Evt synchronization \n");
+
+	switch (recovery_phase) {
+	
+	/*
+	 * Send last know event ID to joining nodes to prevent duplicate 
+	 * event IDs.
+	 */
+	case evt_send_event_id:
+	{
+		struct member_node_data *md;
+		SaClmClusterNodeT *cn;
+		struct req_evt_chan_command cpkt;
+		struct iovec chn_iovec;
+		int res;
+
+		log_printf(RECOVERY_DEBUG, "Send max event ID updates\n");
+		while (add_count) {
+			/*
+			 * If we've seen this node before, send out the last event ID 
+			 * that we've seen from him.  He will set his base event ID to
+			 * the highest one seen.
+			 */
+			md = evt_find_node(*add_list);
+			if (md != NULL) {
+				if (!md->mn_started) {
+					log_printf(RECOVERY_DEBUG, 
+						"end set evt ID %llx to %s\n",
+						md->mn_last_evt_id, inet_ntoa(*add_list));
+					md->mn_started = 1;
+					memset(&cpkt, 0, sizeof(cpkt));
+					cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
+					cpkt.chc_head.size = sizeof(cpkt);
+					cpkt.chc_op = EVT_SET_ID_OP;
+					cpkt.u.chc_set_id.chc_addr = *add_list;
+					cpkt.u.chc_set_id.chc_last_id = 
+										md->mn_last_evt_id & BASE_ID_MASK;
+					chn_iovec.iov_base = &cpkt;
+					chn_iovec.iov_len = cpkt.chc_head.size;
+					res = totempg_mcast (&chn_iovec, 1,TOTEMPG_AGREED);
+					if (res != 0) {
+						log_printf(RECOVERY_DEBUG, 
+							"Unable to send event id to %s\n", 
+							inet_ntoa(*add_list));
+						/*
+						 * We'll try again later.
+						 */
+						return 1;
+					}
+				}
+
+			} else {
+				/*
+				 * Not seen before, add it to our list of nodes.
+				 */
+				cn = clm_get_by_nodeid(*add_list);
+				if (!cn) {
+					/*
+					 * TODO: Error, shouldn't happen
+					 */
+				}
+				evt_add_node(*add_list, cn);
+			}
+
+			add_list++;
+			add_count--;
+		}
+		recovery_phase = evt_send_open_count;
+		return 1;
+	}
+
+	/*
+	 * Send channel open counts so all members have the same channel open
+	 * counts.
+	 */
+	case evt_send_open_count:
+	{
+		log_printf(RECOVERY_DEBUG, "Send open count updates\n");
+		struct req_evt_chan_command cpkt;
+		struct iovec chn_iovec;
+		struct event_svr_channel_instance *eci;
+		int res;
+
+		/*
+		 * Process messages.  When we're done, send the done message
+		 * to the nodes.
+		 */
+		memset(&cpkt, 0, sizeof(cpkt));
+		for (;next_chan != &esc_head; 
+								next_chan = next_chan->next) {
+			log_printf(RECOVERY_DEBUG, "Sending next open count\n");
+			eci = list_entry(next_chan, struct event_svr_channel_instance, 
+					esc_entry);
+			cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
+			cpkt.chc_head.size = sizeof(cpkt);
+			cpkt.chc_op = EVT_OPEN_COUNT;
+			cpkt.u.chc_set_opens.chc_chan_name = eci->esc_channel_name;
+			cpkt.u.chc_set_opens.chc_open_count = eci->esc_local_opens;
+			chn_iovec.iov_base = &cpkt;
+			chn_iovec.iov_len = cpkt.chc_head.size;
+			res = totempg_mcast(&chn_iovec, 1,TOTEMPG_AGREED);
+
+			if (res != 0) {
+			/*
+			 * Try again later.
+			 */
+				return 1;
+			}
+		}
+		memset(&cpkt, 0, sizeof(cpkt));
+		cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
+		cpkt.chc_head.size = sizeof(cpkt);
+		cpkt.chc_op = EVT_OPEN_COUNT_DONE;
+		chn_iovec.iov_base = &cpkt;
+		chn_iovec.iov_len = cpkt.chc_head.size;
+		res = totempg_mcast (&chn_iovec, 1,TOTEMPG_AGREED);
+		if (res != 0) {
+		/*
+		 * Try again later.
+		 */
+			return 1;
+		}
+		log_printf(RECOVERY_DEBUG, "DONE Sending open counts\n");
+
+		recovery_phase = evt_wait_open_count_done;
+		return 1;
+	}
+
+	/*
+	 * Wait for all nodes to finish sending open updates before proceding.
+	 * the EVT_OPEN_COUNT_DONE handler will set the state to 
+	 * evt_send_retained_events to get us out of this.
+	 */
+	case evt_wait_open_count_done:
+	{
+		log_printf(RECOVERY_DEBUG, "Wait for open count done\n");
+		return 1;
+	}
+
+	/*
+	 * If I'm the oldest node, send out retained events so that new nodes
+	 * have all the information.
+	 */
+	case evt_send_retained_events:
+	{
+		struct req_evt_chan_command cpkt;
+		struct iovec chn_iovec;
+		struct event_data *evt;
+		int res;
+
+		log_printf(RECOVERY_DEBUG, "Send retained event updates\n");
+
+		/*
+		 * Process messages.  When we're done, send the done message
+		 * to the nodes.
+		 */
+		for (;next_retained != &retained_list; 
+								next_retained = next_retained->next) {
+			log_printf(LOG_LEVEL_DEBUG, "Sending next retained event\n");
+			evt = list_entry(next_retained, struct event_data, ed_retained);
+			evt->ed_event.led_head.id = MESSAGE_REQ_EXEC_EVT_RECOVERY_EVENTDATA;
+			chn_iovec.iov_base = &evt->ed_event;
+			chn_iovec.iov_len = evt->ed_event.led_head.size;
+			res = totempg_mcast(&chn_iovec, 1, TOTEMPG_AGREED);
+
+			if (res != 0) {
+			/*
+			 * Try again later.
+			 */
+				return -1;
+			}
+		}
+		log_printf(RECOVERY_DEBUG, "DONE Sending retained events\n");
+		memset(&cpkt, 0, sizeof(cpkt));
+		cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
+		cpkt.chc_head.size = sizeof(cpkt);
+		cpkt.chc_op = EVT_CONF_DONE;
+		chn_iovec.iov_base = &cpkt;
+		chn_iovec.iov_len = cpkt.chc_head.size;
+		res = totempg_mcast (&chn_iovec, 1, TOTEMPG_AGREED);
+
+		recovery_phase = evt_wait_send_retained_events;
+		return 1;
+	}
+
+	/*
+	 * Wait for send of retained events to finish 
+	 * the EVT_CONF_DONE handler will set the state to 
+	 * evt_recovery_complete to get us out of this.
+	 */
+	case evt_wait_send_retained_events:
+	{
+		log_printf(RECOVERY_DEBUG, "Wait for retained events\n");
+		return 1;
+	}
+
+	case evt_recovery_complete:
+	{
+		log_printf(RECOVERY_DEBUG, "Recovery complete\n");
+		return 0;
+	}
+
+	default:
+		log_printf(LOG_LEVEL_WARNING, "Bad recovery phase state: %u\n",
+				recovery_phase);
+		recovery_phase = evt_recovery_complete;
+		return 0;
+	}
+
+	return 0;
+}
+
+/*
+ * Not used at this time
+ */
+static void evt_sync_activate(void)
+{
+	log_printf(RECOVERY_DEBUG, "Evt synchronize activation\n");
+}
+
+/*
+ * Not used at this time
+ */
+static void evt_sync_abort(void)
+{
+	log_printf(RECOVERY_DEBUG, "Abort Evt synchronization\n");
+}
+
 /*
  *	vi: set autoindent tabstop=4 shiftwidth=4 :
  */