Explorar o código

Fix ordering of join messages

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1324 fd59a12c-fef9-0310-b244-a6a79926bd2f
Patrick Caulfield %!s(int64=19) %!d(string=hai) anos
pai
achega
2a12de36f2
Modificáronse 1 ficheiros con 91 adicións e 19 borrados
  1. 91 19
      exec/cpg.c

+ 91 - 19
exec/cpg.c

@@ -77,7 +77,8 @@ enum cpg_message_req_types {
 	MESSAGE_REQ_EXEC_CPG_PROCJOIN = 0,
 	MESSAGE_REQ_EXEC_CPG_PROCLEAVE = 1,
 	MESSAGE_REQ_EXEC_CPG_JOINLIST = 2,
-	MESSAGE_REQ_EXEC_CPG_MCAST = 3
+	MESSAGE_REQ_EXEC_CPG_MCAST = 3,
+	MESSAGE_REQ_EXEC_CPG_DOWNLIST = 4
 };
 
 struct removed_group
@@ -146,12 +147,18 @@ static void message_handler_req_exec_cpg_mcast (
 	void *message,
 	unsigned int nodeid);
 
+static void message_handler_req_exec_cpg_downlist (
+	void *message,
+	unsigned int nodeid);
+
 static void exec_cpg_procjoin_endian_convert (void *msg);
 
 static void exec_cpg_joinlist_endian_convert (void *msg);
 
 static void exec_cpg_mcast_endian_convert (void *msg);
 
+static void exec_cpg_downlist_endian_convert (void *msg);
+
 static void message_handler_req_lib_cpg_join (void *conn, void *message);
 
 static void message_handler_req_lib_cpg_leave (void *conn, void *message);
@@ -233,6 +240,10 @@ static struct openais_exec_handler cpg_exec_service[] =
 		.exec_handler_fn	= message_handler_req_exec_cpg_mcast,
 		.exec_endian_convert_fn	= exec_cpg_mcast_endian_convert
 	},
+	{ /* 4 */
+		.exec_handler_fn	= message_handler_req_exec_cpg_downlist,
+		.exec_endian_convert_fn	= exec_cpg_downlist_endian_convert
+	},
 };
 
 struct openais_service_handler cpg_service_handler = {
@@ -311,6 +322,14 @@ struct req_exec_cpg_mcast {
 	mar_uint8_t message[] __attribute__((aligned(8)));
 };
 
+struct req_exec_cpg_downlist {
+	mar_req_header_t header __attribute__((aligned(8)));
+	mar_uint32_t left_nodes __attribute__((aligned(8)));
+	mar_uint32_t nodeids[PROCESSOR_COUNT_MAX]  __attribute__((aligned(8)));
+};
+
+static struct req_exec_cpg_downlist req_exec_cpg_downlist;
+
 static void cpg_sync_init (void)
 {
 }
@@ -585,31 +604,44 @@ static void cpg_confchg_fn (
 	struct memb_ring_id *ring_id)
 {
 	int i;
-	struct list_head removed_list;
-
-	log_printf(LOG_LEVEL_DEBUG, "confchg. joined_list: %d, left_list: %d\n", joined_list_entries, left_list_entries);
+	uint32_t lowest_nodeid = 0xffffff;
+	struct iovec req_exec_cpg_iovec;
 
-	list_init(&removed_list);
+	/* We don't send the library joinlist in here because it can end up
+	   out of order with the rest of the messages (which are totem ordered).
+	   So we get the lowest nodeid to send out a list of left nodes instead.
+	   On receipt of that message, all nodes will then notify their local clients
+	   of the new joinlist */
 
-	/* Remove nodes from joined groups and add removed groups to the list */
-	for (i = 0; i < left_list_entries; i++) {
-		remove_node_from_groups(left_list[i], &removed_list);
-	}
+	if (left_list_entries) {
+		for (i = 0; i < member_list_entries; i++) {
+			if (member_list[i] < lowest_nodeid)
+				lowest_nodeid = member_list[i];
+		}
 
-	if (!list_empty(&removed_list)) {
-		struct list_head *iter, *tmp;
+		log_printf(LOG_LEVEL_DEBUG, "confchg, low nodeid=%d, us = %d\n", lowest_nodeid, this_ip->nodeid);
+		if (lowest_nodeid == this_ip->nodeid) {
 
-		for (iter = removed_list.next, tmp=iter->next; iter != &removed_list; iter = tmp, tmp = iter->next) {
-			struct removed_group *rg = list_entry(iter, struct removed_group, list);
+			req_exec_cpg_downlist.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_DOWNLIST);
+			req_exec_cpg_downlist.header.size = sizeof(struct req_exec_cpg_downlist);
 
-			notify_lib_joinlist(rg->gi, NULL,
-					    0, NULL,
-					    rg->left_list_entries, rg->left_list,
-					    MESSAGE_RES_CPG_CONFCHG_CALLBACK);
-			rg->gi->rg = NULL;
-			free(rg);
+			req_exec_cpg_downlist.left_nodes = left_list_entries;
+			for (i = 0; i < left_list_entries; i++) {
+				req_exec_cpg_downlist.nodeids[i] = left_list[i];
+			}
+			log_printf(LOG_LEVEL_DEBUG, "confchg, build downlist: %d nodes\n", left_list_entries);
 		}
 	}
+
+	/* Don't send this message until we get the final configuration message */
+	if (configuration_type == TOTEM_CONFIGURATION_REGULAR && req_exec_cpg_downlist.left_nodes) {
+		req_exec_cpg_iovec.iov_base = &req_exec_cpg_downlist;
+		req_exec_cpg_iovec.iov_len = req_exec_cpg_downlist.header.size;
+
+		totempg_groups_mcast_joined (openais_group_handle, &req_exec_cpg_iovec, 1, TOTEMPG_AGREED);
+		req_exec_cpg_downlist.left_nodes = 0;
+		log_printf(LOG_LEVEL_DEBUG, "confchg, sent downlist\n");
+	}
 }
 
 static void cpg_flow_control_state_set_fn (
@@ -645,6 +677,13 @@ static void exec_cpg_joinlist_endian_convert (void *msg)
 	}
 }
 
+static void exec_cpg_downlist_endian_convert (void *msg)
+{
+	struct req_exec_cpg_downlist *req_exec_cpg_downlist = (struct req_exec_cpg_downlist *)msg;
+
+	req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes);
+}
+
 static void exec_cpg_mcast_endian_convert (void *msg)
 {
 	struct req_exec_cpg_mcast *req_exec_cpg_mcast = (struct req_exec_cpg_mcast *)msg;
@@ -710,6 +749,39 @@ local_join:
 			    MESSAGE_RES_CPG_CONFCHG_CALLBACK);
 }
 
+static void message_handler_req_exec_cpg_downlist (
+	void *message,
+	unsigned int nodeid)
+{
+	struct req_exec_cpg_downlist *req_exec_cpg_downlist = (struct req_exec_cpg_downlist *)message;
+	int i;
+	struct list_head removed_list;
+
+	log_printf(LOG_LEVEL_DEBUG, "downlist left_list: %d\n", req_exec_cpg_downlist->left_nodes);
+
+	list_init(&removed_list);
+
+	/* Remove nodes from joined groups and add removed groups to the list */
+	for (i = 0; i <  req_exec_cpg_downlist->left_nodes; i++) {
+		remove_node_from_groups( req_exec_cpg_downlist->nodeids[i], &removed_list);
+	}
+
+	if (!list_empty(&removed_list)) {
+		struct list_head *iter, *tmp;
+
+		for (iter = removed_list.next, tmp=iter->next; iter != &removed_list; iter = tmp, tmp = iter->next) {
+			struct removed_group *rg = list_entry(iter, struct removed_group, list);
+
+			notify_lib_joinlist(rg->gi, NULL,
+					    0, NULL,
+					    rg->left_list_entries, rg->left_list,
+					    MESSAGE_RES_CPG_CONFCHG_CALLBACK);
+			rg->gi->rg = NULL;
+			free(rg);
+		}
+	}
+}
+
 static void message_handler_req_exec_cpg_procjoin (
 	void *message,
 	unsigned int nodeid)