Просмотр исходного кода

cpg: fix sync'ing the downlist.

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@2801 fd59a12c-fef9-0310-b244-a6a79926bd2f
Angus Salkeld 16 лет назад
Родитель
Сommit
8f430ecc8a
1 измененных файлов с 196 добавлено и 74 удалено
  1. 196 74
      services/cpg.c

+ 196 - 74
services/cpg.c

@@ -75,7 +75,8 @@ enum cpg_message_req_types {
 	MESSAGE_REQ_EXEC_CPG_PROCLEAVE = 1,
 	MESSAGE_REQ_EXEC_CPG_JOINLIST = 2,
 	MESSAGE_REQ_EXEC_CPG_MCAST = 3,
-	MESSAGE_REQ_EXEC_CPG_DOWNLIST = 4
+	MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD = 4,
+	MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5
 };
 
 /*
@@ -127,6 +128,13 @@ enum cpg_sync_state {
 	CPGSYNC_JOINLIST
 };
 
+enum cpg_downlist_state_e {
+       CPG_DOWNLIST_NONE,
+       CPG_DOWNLIST_WAITING_FOR_MESSAGES,
+       CPG_DOWNLIST_APPLYING,
+};
+static enum cpg_downlist_state_e downlist_state;
+static struct list_head downlist_messages_head;
 
 struct cpg_pd {
 	void *conn;
@@ -202,6 +210,10 @@ static void message_handler_req_exec_cpg_mcast (
 	const void *message,
 	unsigned int nodeid);
 
+static void message_handler_req_exec_cpg_downlist_old (
+	const void *message,
+	unsigned int nodeid);
+
 static void message_handler_req_exec_cpg_downlist (
 	const void *message,
 	unsigned int nodeid);
@@ -212,6 +224,8 @@ static void exec_cpg_joinlist_endian_convert (void *msg);
 
 static void exec_cpg_mcast_endian_convert (void *msg);
 
+static void exec_cpg_downlist_endian_convert_old (void *msg);
+
 static void exec_cpg_downlist_endian_convert (void *msg);
 
 static void message_handler_req_lib_cpg_join (void *conn, const void *message);
@@ -246,6 +260,10 @@ static int cpg_exec_send_downlist(void);
 
 static int cpg_exec_send_joinlist(void);
 
+static void downlist_messages_delete (void);
+
+static void downlist_master_choose_and_send (void);
+
 static void cpg_sync_init_v2 (
 	const unsigned int *trans_list,
 	size_t trans_list_entries,
@@ -326,6 +344,10 @@ static struct corosync_exec_handler cpg_exec_engine[] =
 		.exec_endian_convert_fn	= exec_cpg_mcast_endian_convert
 	},
 	{ /* 4 */
+		.exec_handler_fn	= message_handler_req_exec_cpg_downlist_old,
+		.exec_endian_convert_fn	= exec_cpg_downlist_endian_convert_old
+	},
+	{ /* 5 */
 		.exec_handler_fn	= message_handler_req_exec_cpg_downlist,
 		.exec_endian_convert_fn	= exec_cpg_downlist_endian_convert
 	},
@@ -415,34 +437,30 @@ struct req_exec_cpg_mcast {
 	mar_uint8_t message[] __attribute__((aligned(8)));
 };
 
-struct req_exec_cpg_downlist {
+struct req_exec_cpg_downlist_old {
 	coroipc_request_header_t header __attribute__((aligned(8)));
 	mar_uint32_t left_nodes __attribute__((aligned(8)));
 	mar_uint32_t nodeids[PROCESSOR_COUNT_MAX]  __attribute__((aligned(8)));
 };
 
-static struct req_exec_cpg_downlist g_req_exec_cpg_downlist;
+struct req_exec_cpg_downlist {
+	coroipc_request_header_t header __attribute__((aligned(8)));
+	/* merge decisions */
+	mar_uint32_t old_members __attribute__((aligned(8)));
+	/* downlist below */
+	mar_uint32_t left_nodes __attribute__((aligned(8)));
+	mar_uint32_t nodeids[PROCESSOR_COUNT_MAX]  __attribute__((aligned(8)));
+};
 
-static int memb_list_remove_value (unsigned int *list,
-	size_t list_entries, int value)
-{
-	int j;
-	int found = 0;
+struct downlist_msg {
+	mar_uint32_t sender_nodeid;
+	mar_uint32_t old_members __attribute__((aligned(8)));
+	mar_uint32_t left_nodes __attribute__((aligned(8)));
+	mar_uint32_t nodeids[PROCESSOR_COUNT_MAX]  __attribute__((aligned(8)));
+	struct list_head list;
+};
 
-	for (j = 0; j < list_entries; j++) {
-		if (list[j] == value) {
-			/* mark next values to be copied down */
-			found = 1;
-		}
-		else if (found) {
-			list[j-1] = list[j];
-		}
-	}
-	if (found)
-		return (list_entries - 1);
-	else
-		return list_entries;
-}
+static struct req_exec_cpg_downlist g_req_exec_cpg_downlist;
 
 static void cpg_sync_init_v2 (
 	const unsigned int *trans_list,
@@ -451,7 +469,6 @@ static void cpg_sync_init_v2 (
 	size_t member_list_entries,
 	const struct memb_ring_id *ring_id)
 {
-	unsigned int lowest_nodeid = 0xffffffff;
 	int entries;
 	int i, j;
 	int found;
@@ -465,6 +482,12 @@ static void cpg_sync_init_v2 (
 	last_sync_ring_id.nodeid = ring_id->rep.nodeid;
 	last_sync_ring_id.seq = ring_id->seq;
 
+	downlist_state = CPG_DOWNLIST_WAITING_FOR_MESSAGES;
+
+	entries = 0;
+	/*
+	 * Determine list of nodeids for downlist message
+	 */
 	for (i = 0; i < my_old_member_list_entries; i++) {
 		found = 0;
 		for (j = 0; j < trans_list_entries; j++) {
@@ -474,35 +497,8 @@ static void cpg_sync_init_v2 (
 			}
 		}
 		if (found == 0) {
-			my_member_list_entries = memb_list_remove_value (
-				my_member_list, my_member_list_entries,
-				my_old_member_list[i]);
-		}
-	}
-
-	for (i = 0; i < my_member_list_entries; i++) {
-		if (my_member_list[i] < lowest_nodeid) {
-			lowest_nodeid = my_member_list[i];
-		}
-	}
-
-	entries = 0;
-	if (lowest_nodeid == api->totem_nodeid_get()) {
-		/*
-		 * Determine list of nodeids for downlist message
-		 */
-		for (i = 0; i < my_old_member_list_entries; i++) {
-			found = 0;
-			for (j = 0; j < trans_list_entries; j++) {
-				if (my_old_member_list[i] == trans_list[j]) {
-					found = 1;
-					break;
-				}
-			}
-			if (found == 0) {
-				g_req_exec_cpg_downlist.nodeids[entries++] =
-					my_old_member_list[i];
-			}
+			g_req_exec_cpg_downlist.nodeids[entries++] =
+				my_old_member_list[i];
 		}
 	}
 	g_req_exec_cpg_downlist.left_nodes = entries;
@@ -531,11 +527,20 @@ static void cpg_sync_activate (void)
 		my_member_list_entries * sizeof (unsigned int));
 	my_old_member_list_entries = my_member_list_entries;
 
+	if (downlist_state == CPG_DOWNLIST_WAITING_FOR_MESSAGES) {
+		downlist_master_choose_and_send ();
+	}
+
+	downlist_messages_delete ();
+	downlist_state = CPG_DOWNLIST_NONE;
+
 	notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
 }
 
 static void cpg_sync_abort (void)
 {
+	downlist_state = CPG_DOWNLIST_NONE;
+	downlist_messages_delete ();
 }
 
 static int notify_lib_totem_membership (
@@ -706,11 +711,101 @@ static int notify_lib_joinlist(
 	return CPG_OK;
 }
 
+static struct downlist_msg* downlist_master_choose (void)
+{
+	struct downlist_msg *cmp;
+	struct downlist_msg *best = NULL;
+	struct list_head *iter;
+
+	for (iter = downlist_messages_head.next;
+		iter != &downlist_messages_head;
+		iter = iter->next) {
+
+		cmp = list_entry(iter, struct downlist_msg, list);
+		if (best == NULL) {
+			best = cmp;
+			continue;
+		}
+
+		if (cmp->old_members < best->old_members) {
+			continue;
+		}
+		else if (cmp->old_members > best->old_members) {
+			best = cmp;
+		}
+		else if (cmp->sender_nodeid < best->sender_nodeid) {
+			best = cmp;
+		}
+
+	}
+	return best;
+}
+
+static void downlist_master_choose_and_send (void)
+{
+	struct downlist_msg *stored_msg;
+	struct list_head *iter;
+	mar_cpg_address_t *left_list = NULL;
+	int i;
+
+	downlist_state = CPG_DOWNLIST_APPLYING;
+
+	stored_msg = downlist_master_choose ();
+	if (!stored_msg) {
+		log_printf (LOGSYS_LEVEL_INFO, "NO chosen downlist");
+		return;
+	}
+
+	log_printf (LOGSYS_LEVEL_INFO, "chosen downlist from node %s",
+		api->totem_ifaces_print(stored_msg->sender_nodeid));
+
+	/* send events */
+	for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
+		struct process_info *pi = list_entry(iter, struct process_info, list);
+		iter = iter->next;
+
+		for (i = 0; i < stored_msg->left_nodes; i++) {
+			if (pi->nodeid == stored_msg->nodeids[i]) {
+				left_list[0].nodeid = pi->nodeid;
+				left_list[0].pid = pi->pid;
+				left_list[0].reason = CONFCHG_CPG_REASON_NODEDOWN;
+
+				notify_lib_joinlist(&pi->group, NULL,
+					0, NULL,
+					1, left_list,
+					MESSAGE_RES_CPG_CONFCHG_CALLBACK);
+				list_del (&pi->list);
+				free (pi);
+				break;
+			}
+		}
+	}
+}
+
+static void downlist_messages_delete (void)
+{
+	struct downlist_msg *stored_msg;
+	struct list_head *iter, *iter_next;
+
+	for (iter = downlist_messages_head.next;
+		iter != &downlist_messages_head;
+		iter = iter_next) {
+
+		iter_next = iter->next;
+
+		stored_msg = list_entry(iter, struct downlist_msg, list);
+		list_del (&stored_msg->list);
+		free (stored_msg);
+	}
+}
+
+
 static int cpg_exec_init_fn (struct corosync_api_v1 *corosync_api)
 {
 #ifdef COROSYNC_SOLARIS
 	logsys_subsys_init();
 #endif
+	list_init (&downlist_messages_head);
 	api = corosync_api;
 	return (0);
 }
@@ -817,12 +912,17 @@ static void exec_cpg_joinlist_endian_convert (void *msg_v)
 	}
 }
 
+static void exec_cpg_downlist_endian_convert_old (void *msg)
+{
+}
+
 static void exec_cpg_downlist_endian_convert (void *msg)
 {
 	struct req_exec_cpg_downlist *req_exec_cpg_downlist = msg;
 	unsigned int i;
 
 	req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes);
+	req_exec_cpg_downlist->old_members = swab32(req_exec_cpg_downlist->old_members);
 
 	for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
 		req_exec_cpg_downlist->nodeids[i] = swab32(req_exec_cpg_downlist->nodeids[i]);
@@ -908,43 +1008,63 @@ static void do_proc_join(
 			    MESSAGE_RES_CPG_CONFCHG_CALLBACK);
 }
 
-static void message_handler_req_exec_cpg_downlist (
+static void message_handler_req_exec_cpg_downlist_old (
+	const void *message,
+	unsigned int nodeid)
+{
+	log_printf (LOGSYS_LEVEL_WARNING, "downlist OLD from node %d",
+		nodeid);
+}
+
+static void message_handler_req_exec_cpg_downlist(
 	const void *message,
 	unsigned int nodeid)
 {
 	const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message;
 	int i;
-	mar_cpg_address_t left_list[1];
 	struct list_head *iter;
+	struct downlist_msg *stored_msg;
+	int found;
 
-	/*
-		FOR OPTIMALIZATION - Make list of lists
-	*/
-
-	log_printf (LOGSYS_LEVEL_DEBUG, "downlist left_list: %d\n", req_exec_cpg_downlist->left_nodes);
+	if (downlist_state != CPG_DOWNLIST_WAITING_FOR_MESSAGES) {
+		log_printf (LOGSYS_LEVEL_WARNING, "downlist left_list: %d received in state %d",
+			req_exec_cpg_downlist->left_nodes, downlist_state);
+		return;
+	}
+	else {
+		log_printf (LOGSYS_LEVEL_INFO, "downlist received left_list: %d",
+			req_exec_cpg_downlist->left_nodes);
+	}
 
-	for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
-		struct process_info *pi = list_entry(iter, struct process_info, list);
-		iter = iter->next;
+	stored_msg = malloc (sizeof (struct downlist_msg));
+	stored_msg->sender_nodeid = nodeid;
+	stored_msg->old_members = req_exec_cpg_downlist->old_members;
+	stored_msg->left_nodes = req_exec_cpg_downlist->left_nodes;
+	memcpy (stored_msg->nodeids, req_exec_cpg_downlist->nodeids,
+		req_exec_cpg_downlist->left_nodes * sizeof (mar_uint32_t));
+	list_init (&stored_msg->list);
+	list_add (&stored_msg->list, &downlist_messages_head);
 
-		for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
-			if (pi->nodeid == req_exec_cpg_downlist->nodeids[i]) {
-				left_list[0].nodeid = pi->nodeid;
-				left_list[0].pid = pi->pid;
-				left_list[0].reason = CONFCHG_CPG_REASON_NODEDOWN;
+	for (i = 0; i < my_member_list_entries; i++) {
+		found = 0;
+		for (iter = downlist_messages_head.next;
+			iter != &downlist_messages_head;
+			iter = iter->next) {
 
-				notify_lib_joinlist(&pi->group, NULL,
-                                	            0, NULL,
-                                        	    1, left_list,
-	                                            MESSAGE_RES_CPG_CONFCHG_CALLBACK);
-				list_del (&pi->list);
-				free (pi);
-				break;
+			stored_msg = list_entry(iter, struct downlist_msg, list);
+			if (my_member_list[i] == stored_msg->sender_nodeid) {
+				found = 1;
 			}
 		}
+		if (!found) {
+			return;
+		}
 	}
+
+	downlist_master_choose_and_send ();
 }
 
+
 static void message_handler_req_exec_cpg_procjoin (
 	const void *message,
 	unsigned int nodeid)
@@ -1081,6 +1201,8 @@ static int cpg_exec_send_downlist(void)
 	g_req_exec_cpg_downlist.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_DOWNLIST);
 	g_req_exec_cpg_downlist.header.size = sizeof(struct req_exec_cpg_downlist);
 
+	g_req_exec_cpg_downlist.old_members = my_old_member_list_entries;
+
 	iov.iov_base = (void *)&g_req_exec_cpg_downlist;
 	iov.iov_len = g_req_exec_cpg_downlist.header.size;