Просмотр исходного кода

sync: Call sync_init of all services at once

This patch solves situation which can happen very rearly:
- Node B is running
- Node A is started and tries to create singleton membership. It also
  initialize service S which tries to send message during initialization
- Just before node A finished move to operational state, it gets
  Node B multicast message so moves to gather state
- Node A and B creates membership and moves to operational state and
  sync is started
- Node A and B receives message sent by node A during initialization of
  service S
- Node A exits before sync of service is finished

In this situation, node B may never execute sync_init for
service S. So node B service S is not aware of existence of node A but
it received message from it.

Similar situation can theoretically also happen during merge.

Solution is to change flow of sync, so now it looks like:

- Build service_list
- Call sync_init for all local services
- Send service_list
- Receive service_list from all members and send barier
- For all services:
  - Receive barier
  - Call sync_activate if this is not first service
  - Call sync_process for next service or finish sync if previous
    this service is the last one
  - Send barier

Signed-off-by: Jan Friesse <jfriesse@redhat.com>
Reviewed-by: Christine Caulfield <ccaulfie@redhat.com>
Jan Friesse 8 лет назад
Родитель
Сommit
154895dfbe
1 измененных файлов с 42 добавлено и 43 удалено
  1. 42 43
      exec/sync.c

+ 42 - 43
exec/sync.c

@@ -64,7 +64,6 @@ LOGSYS_DECLARE_SUBSYS ("SYNC");
 #define MESSAGE_REQ_SYNC_SERVICE_BUILD 1
 #define MESSAGE_REQ_SYNC_SERVICE_BUILD 1
 
 
 enum sync_process_state {
 enum sync_process_state {
-	INIT,
 	PROCESS,
 	PROCESS,
 	ACTIVATE
 	ACTIVATE
 };
 };
@@ -143,6 +142,8 @@ static int schedwrk_processor (const void *context);
 
 
 static void sync_process_enter (void);
 static void sync_process_enter (void);
 
 
+static void sync_process_call_init (void);
+
 static struct totempg_group sync_group = {
 static struct totempg_group sync_group = {
     .group      = "sync",
     .group      = "sync",
     .group_len  = 4
     .group_len  = 4
@@ -227,15 +228,6 @@ static void sync_barrier_handler (unsigned int nodeid, const void *msg)
 	}
 	}
 }
 }
 
 
-static void dummy_sync_init (
-	const unsigned int *trans_list,
-	size_t trans_list_entries,
-	const unsigned int *member_list,
-	size_t member_list_entries,
-	const struct memb_ring_id *ring_id)
-{
-}
-
 static void dummy_sync_abort (void)
 static void dummy_sync_abort (void)
 {
 {
 }
 }
@@ -281,15 +273,14 @@ static void sync_service_build_handler (unsigned int nodeid, const void *msg)
 			}
 			}
 		}
 		}
 		if (found == 0) {
 		if (found == 0) {
-			my_service_list[my_service_list_entries].state =
-				INIT;
+			my_service_list[my_service_list_entries].state = PROCESS;
 			my_service_list[my_service_list_entries].service_id =
 			my_service_list[my_service_list_entries].service_id =
 				req_exec_service_build_message->service_list[i];
 				req_exec_service_build_message->service_list[i];
 			sprintf (my_service_list[my_service_list_entries].name,
 			sprintf (my_service_list[my_service_list_entries].name,
 				"Unknown External Service (id = %d)\n",
 				"Unknown External Service (id = %d)\n",
 				req_exec_service_build_message->service_list[i]);
 				req_exec_service_build_message->service_list[i]);
 			my_service_list[my_service_list_entries].sync_init =
 			my_service_list[my_service_list_entries].sync_init =
-				dummy_sync_init;
+				NULL;
 			my_service_list[my_service_list_entries].sync_abort =
 			my_service_list[my_service_list_entries].sync_abort =
 				dummy_sync_abort;
 				dummy_sync_abort;
 			my_service_list[my_service_list_entries].sync_process =
 			my_service_list[my_service_list_entries].sync_process =
@@ -316,6 +307,7 @@ static void sync_service_build_handler (unsigned int nodeid, const void *msg)
 		}
 		}
 	}
 	}
 	if (barrier_reached) {
 	if (barrier_reached) {
+		log_printf (LOGSYS_LEVEL_DEBUG, "enter sync process");
 		sync_process_enter ();
 		sync_process_enter ();
 	}
 	}
 }
 }
@@ -379,6 +371,38 @@ static void sync_barrier_enter (void)
 	barrier_message_transmit ();
 	barrier_message_transmit ();
 }
 }
 
 
+static void sync_process_call_init (void)
+{
+	unsigned int old_trans_list[PROCESSOR_COUNT_MAX];
+	size_t old_trans_list_entries = 0;
+	int o, m;
+	int i;
+
+	memcpy (old_trans_list, my_trans_list, my_trans_list_entries *
+		sizeof (unsigned int));
+	old_trans_list_entries = my_trans_list_entries;
+
+	my_trans_list_entries = 0;
+	for (o = 0; o < old_trans_list_entries; o++) {
+		for (m = 0; m < my_member_list_entries; m++) {
+			if (old_trans_list[o] == my_member_list[m]) {
+				my_trans_list[my_trans_list_entries] = my_member_list[m];
+				my_trans_list_entries++;
+				break;
+			}
+		}
+	}
+
+	for (i = 0; i < my_service_list_entries; i++) {
+		if (my_sync_callbacks_retrieve(my_service_list[i].service_id, NULL) != -1) {
+			my_service_list[i].sync_init (my_trans_list,
+				my_trans_list_entries, my_member_list,
+				my_member_list_entries,
+				&my_ring_id);
+		}
+	}
+}
+
 static void sync_process_enter (void)
 static void sync_process_enter (void)
 {
 {
 	int i;
 	int i;
@@ -396,6 +420,7 @@ static void sync_process_enter (void)
 	for (i = 0; i < my_processor_list_entries; i++) {
 	for (i = 0; i < my_processor_list_entries; i++) {
 		my_processor_list[i].received = 0;
 		my_processor_list[i].received = 0;
 	}
 	}
+
 	schedwrk_create (&my_schedwrk_handle,
 	schedwrk_create (&my_schedwrk_handle,
 		schedwrk_processor,
 		schedwrk_processor,
 		NULL);
 		NULL);
@@ -435,7 +460,7 @@ static void sync_servicelist_build_enter (
 		if (sync_callbacks.sync_init == NULL) {
 		if (sync_callbacks.sync_init == NULL) {
 			continue;
 			continue;
 		}
 		}
-		my_service_list[my_service_list_entries].state = INIT;
+		my_service_list[my_service_list_entries].state = PROCESS;
 		my_service_list[my_service_list_entries].service_id = i;
 		my_service_list[my_service_list_entries].service_id = i;
 		strcpy (my_service_list[my_service_list_entries].name,
 		strcpy (my_service_list[my_service_list_entries].name,
 			sync_callbacks.name);
 			sync_callbacks.name);
@@ -453,42 +478,16 @@ static void sync_servicelist_build_enter (
 	service_build.service_list_entries = my_service_list_entries;
 	service_build.service_list_entries = my_service_list_entries;
 
 
 	service_build_message_transmit (&service_build);
 	service_build_message_transmit (&service_build);
+
+	log_printf (LOGSYS_LEVEL_DEBUG, "call init for locally known services");
+	sync_process_call_init ();
 }
 }
 
 
 static int schedwrk_processor (const void *context)
 static int schedwrk_processor (const void *context)
 {
 {
 	int res = 0;
 	int res = 0;
 
 
-	if (my_service_list[my_processing_idx].state == INIT) {
-		unsigned int old_trans_list[PROCESSOR_COUNT_MAX];
-		size_t old_trans_list_entries = 0;
-		int o, m;
-		my_service_list[my_processing_idx].state = PROCESS;
-
-		memcpy (old_trans_list, my_trans_list, my_trans_list_entries *
-			sizeof (unsigned int));
-		old_trans_list_entries = my_trans_list_entries;
-
-		my_trans_list_entries = 0;
-		for (o = 0; o < old_trans_list_entries; o++) {
-			for (m = 0; m < my_member_list_entries; m++) {
-				if (old_trans_list[o] == my_member_list[m]) {
-					my_trans_list[my_trans_list_entries] = my_member_list[m];
-					my_trans_list_entries++;
-					break;
-				}
-			}
-		}
-
-		if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) {
-			my_service_list[my_processing_idx].sync_init (my_trans_list,
-				my_trans_list_entries, my_member_list,
-				my_member_list_entries,
-				&my_ring_id);
-		}
-	}
 	if (my_service_list[my_processing_idx].state == PROCESS) {
 	if (my_service_list[my_processing_idx].state == PROCESS) {
-		my_service_list[my_processing_idx].state = PROCESS;
 		if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) {
 		if (my_sync_callbacks_retrieve(my_service_list[my_processing_idx].service_id, NULL) != -1) {
 			res = my_service_list[my_processing_idx].sync_process ();
 			res = my_service_list[my_processing_idx].sync_process ();
 		} else {
 		} else {