Просмотр исходного кода

enhancement 1019
Modify YKD to select primary component and execute the synchronization
operation only in the main partition. In the non-primary partition, no
new requests are allowed - they are all returned with the error code
SA_AIS_ERR_TRY_AGAIN.


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@878 fd59a12c-fef9-0310-b244-a6a79926bd2f

Steven Dake 20 лет назад
Родитель
Сommit
d920864d61
7 измененных файлов с 117 добавлено и 93 удалено
  1. 7 22
      exec/main.c
  2. 30 31
      exec/main.h
  3. 54 14
      exec/sync.c
  4. 0 10
      exec/sync.h
  5. 11 9
      exec/totempg.c
  6. 13 6
      exec/ykd.c
  7. 2 1
      exec/ykd.h

+ 7 - 22
exec/main.c

@@ -722,10 +722,11 @@ retry_recv:
 				&send_ok_joined_iovec, 1);
 
 			send_ok =
+				(ykd_primary() == 1) && (
 				(ais_service_handlers[service]->libais_handlers[header->id].flow_control == FLOW_CONTROL_NOT_REQUIRED) ||
 				((ais_service_handlers[service]->libais_handlers[header->id].flow_control == FLOW_CONTROL_REQUIRED) &&
 				(send_ok_joined) &&
-				(sync_in_process() == 0));
+				(sync_in_process() == 0)));
 
 			if (send_ok) {
 		//		*prio = 0;
@@ -799,7 +800,7 @@ static int pool_sizes[] = { 0, 0, 0, 0, 0, 4096, 0, 1, 0, /* 256 */
 					1, 1, 1, 1, 1, 1, 1, 1, 1 };
 
 static int (*aisexec_handler_fns[AIS_SERVICE_HANDLER_AISEXEC_FUNCTIONS_MAX]) (void *msg, struct totem_ip_address *source_addr, int endian_conversion_required);
-static int aisexec_handler_fns_count = 1;
+static int aisexec_handler_fns_count = 0;
 
 /*
  * Builds the handler table as an optimization
@@ -811,8 +812,6 @@ static void aisexec_handler_fns_build (void)
 	/*
 	 * Install sync handler function
 	 */
-	aisexec_handler_fns[0] = sync_deliver_fn;
-
 	for (i = 0; i < AIS_SERVICE_HANDLERS_COUNT; i++) {
 		for (j = 0; j < ais_service_handlers[i]->aisexec_handler_fns_count; j++) {
 			aisexec_handler_fns[aisexec_handler_fns_count++] = 
@@ -885,6 +884,8 @@ static void deliver_fn (
 		endian_conversion_required);
 }
 
+static struct memb_ring_id aisexec_ring_id;
+
 static void confchg_fn (
 	enum totem_configuration_type configuration_type,
 	struct totem_ip_address *member_list, int member_list_entries,
@@ -894,18 +895,12 @@ static void confchg_fn (
 {
 	int i;
 
+	memcpy (&aisexec_ring_id, ring_id, sizeof (struct memb_ring_id));
+
 	if (!totemip_localhost_check(this_ip)) {
 		totemip_copy(&this_non_loopback_ip, this_ip);
 	}
 
-	/*
-	 * Execute configuration change for synchronization service
-	 */
-	sync_confchg_fn (configuration_type,
-		member_list, member_list_entries,
-		left_list, left_list_entries,
-		joined_list, joined_list_entries, ring_id);
-
 	/*
 	 * Call configuration change for all services
 	 */
@@ -1074,14 +1069,6 @@ void message_source_set (struct message_source *source, struct conn_info *conn_i
 
 struct totem_logging_configuration totem_logging_configuration;
 
-void main_primary_callback_fn (
-	struct totem_ip_address *view_list,
-	int view_list_entries,
-	int primary_component)
-{
-	log_printf (LOG_LEVEL_NOTICE, "Primary component is %d\n", primary_component);
-}
-
 int main (int argc, char **argv)
 {
 	int libais_server_fd;
@@ -1175,8 +1162,6 @@ int main (int argc, char **argv)
 		&openais_group,
 		1);
 
-	ykd_init (main_primary_callback_fn);
-
 	this_ip = &openais_config.totem_config.interfaces[0].boundto;
 
 	/*

+ 30 - 31
exec/main.h

@@ -113,37 +113,36 @@ struct conn_info {
 
 
 enum nodeexec_message_types {
-	MESSAGE_REQ_EXEC_SYNC_BARRIER = 0,
-	MESSAGE_REQ_EXEC_EVS_MCAST = 1,
-	MESSAGE_REQ_EXEC_CLM_NODEJOIN = 2,
-	MESSAGE_REQ_EXEC_AMF_COMPONENTREGISTER = 3,
-	MESSAGE_REQ_EXEC_AMF_COMPONENTUNREGISTER = 4,
-	MESSAGE_REQ_EXEC_AMF_ERRORREPORT = 5,
-	MESSAGE_REQ_EXEC_AMF_ERRORCANCELALL = 6,
-	MESSAGE_REQ_EXEC_AMF_READINESSSTATESET = 7,
-	MESSAGE_REQ_EXEC_AMF_HASTATESET = 8,
-	MESSAGE_REQ_EXEC_CKPT_CHECKPOINTOPEN = 9,
-	MESSAGE_REQ_EXEC_CKPT_CHECKPOINTCLOSE = 10,
-	MESSAGE_REQ_EXEC_CKPT_CHECKPOINTUNLINK = 11,
-	MESSAGE_REQ_EXEC_CKPT_CHECKPOINTRETENTIONDURATIONSET = 12,
-	MESSAGE_REQ_EXEC_CKPT_CHECKPOINTRETENTIONDURATIONEXPIRE = 13,
-	MESSAGE_REQ_EXEC_CKPT_SECTIONCREATE = 14,
-	MESSAGE_REQ_EXEC_CKPT_SECTIONDELETE = 15,
-	MESSAGE_REQ_EXEC_CKPT_SECTIONEXPIRATIONTIMESET = 16,
-	MESSAGE_REQ_EXEC_CKPT_SECTIONWRITE = 17,
-	MESSAGE_REQ_EXEC_CKPT_SECTIONOVERWRITE = 18,
-	MESSAGE_REQ_EXEC_CKPT_SECTIONREAD = 19,
-	MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE = 20,
-	MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESECTION = 21,
-	MESSAGE_REQ_EXEC_EVT_EVENTDATA = 22,
-	MESSAGE_REQ_EXEC_EVT_CHANCMD = 23,
-	MESSAGE_REQ_EXEC_EVT_RECOVERY_EVENTDATA = 24,
-	MESSAGE_REQ_EXEC_LCK_RESOURCEOPEN = 25,
-	MESSAGE_REQ_EXEC_LCK_RESOURCECLOSE = 26,
-	MESSAGE_REQ_EXEC_LCK_RESOURCELOCK = 27,
-	MESSAGE_REQ_EXEC_LCK_RESOURCEUNLOCK = 28,
-	MESSAGE_REQ_EXEC_LCK_RESOURCELOCKORPHAN = 29,
-	MESSAGE_REQ_EXEC_LCK_LOCKPURGE = 30
+	MESSAGE_REQ_EXEC_EVS_MCAST = 0,
+	MESSAGE_REQ_EXEC_CLM_NODEJOIN = 1,
+	MESSAGE_REQ_EXEC_AMF_COMPONENTREGISTER = 2,
+	MESSAGE_REQ_EXEC_AMF_COMPONENTUNREGISTER = 3,
+	MESSAGE_REQ_EXEC_AMF_ERRORREPORT = 4,
+	MESSAGE_REQ_EXEC_AMF_ERRORCANCELALL = 5,
+	MESSAGE_REQ_EXEC_AMF_READINESSSTATESET = 6,
+	MESSAGE_REQ_EXEC_AMF_HASTATESET = 7,
+	MESSAGE_REQ_EXEC_CKPT_CHECKPOINTOPEN = 8,
+	MESSAGE_REQ_EXEC_CKPT_CHECKPOINTCLOSE = 9,
+	MESSAGE_REQ_EXEC_CKPT_CHECKPOINTUNLINK = 10,
+	MESSAGE_REQ_EXEC_CKPT_CHECKPOINTRETENTIONDURATIONSET = 11,
+	MESSAGE_REQ_EXEC_CKPT_CHECKPOINTRETENTIONDURATIONEXPIRE = 12,
+	MESSAGE_REQ_EXEC_CKPT_SECTIONCREATE = 13,
+	MESSAGE_REQ_EXEC_CKPT_SECTIONDELETE = 14,
+	MESSAGE_REQ_EXEC_CKPT_SECTIONEXPIRATIONTIMESET = 15,
+	MESSAGE_REQ_EXEC_CKPT_SECTIONWRITE = 16,
+	MESSAGE_REQ_EXEC_CKPT_SECTIONOVERWRITE = 17,
+	MESSAGE_REQ_EXEC_CKPT_SECTIONREAD = 18,
+	MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE = 19,
+	MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESECTION = 20,
+	MESSAGE_REQ_EXEC_EVT_EVENTDATA = 21,
+	MESSAGE_REQ_EXEC_EVT_CHANCMD = 22,
+	MESSAGE_REQ_EXEC_EVT_RECOVERY_EVENTDATA = 23,
+	MESSAGE_REQ_EXEC_LCK_RESOURCEOPEN = 24,
+	MESSAGE_REQ_EXEC_LCK_RESOURCECLOSE = 25,
+	MESSAGE_REQ_EXEC_LCK_RESOURCELOCK = 26,
+	MESSAGE_REQ_EXEC_LCK_RESOURCEUNLOCK = 27,
+	MESSAGE_REQ_EXEC_LCK_RESOURCELOCKORPHAN = 28,
+	MESSAGE_REQ_EXEC_LCK_LOCKPURGE = 29
 };
 
 extern struct totem_ip_address *this_ip;

+ 54 - 14
exec/sync.c

@@ -53,7 +53,7 @@
 #include "main.h"
 #include "sync.h"
 #include "totempg.h"
-#include "totempg.h"
+#include "ykd.h"
 #include "print.h"
 
 #define LOG_SERVICE LOG_SERVICE_SYNC
@@ -91,6 +91,25 @@ static void sync_service_init (struct memb_ring_id *ring_id);
 
 static int sync_service_process (enum totem_callback_token_type type, void *data);
 
+static int sync_deliver_fn (
+	struct totem_ip_address *source_addr,
+	struct iovec *iovec,
+	int iov_len,
+	int endian_conversion_required);
+
+void sync_primary_callback_fn (
+	struct totem_ip_address *view_list,
+	int view_list_entries,
+	int primary_designated,
+	struct memb_ring_id *ring_id);
+
+static struct totempg_group sync_group = {
+    .group      = "sync",
+    .group_len  = 4
+};
+
+static totempg_groups_handle sync_group_handle;
+
 struct req_exec_sync_barrier_start {
 	struct req_header header;
 	struct memb_ring_id ring_id;
@@ -114,7 +133,7 @@ static int sync_barrier_send (struct memb_ring_id *ring_id)
 	iovec.iov_base = (char *)&req_exec_sync_barrier_start;
 	iovec.iov_len = sizeof (req_exec_sync_barrier_start);
 
-	res = totempg_groups_mcast_joined (openais_group_handle, &iovec, 1, TOTEMPG_AGREED);
+	res = totempg_groups_mcast_joined (sync_group_handle, &iovec, 1, TOTEMPG_AGREED);
 
 	return (res);
 }
@@ -194,24 +213,41 @@ void sync_register (
 	int callback_count,
 	void (*synchronization_completed) (void))
 {
+	totempg_groups_initialize (
+		&sync_group_handle,
+		sync_deliver_fn,
+		NULL);
+
+	totempg_groups_join (
+		sync_group_handle,
+		&sync_group,
+		1);
+
+	ykd_init (sync_primary_callback_fn);
+
 	sync_callbacks = callbacks;
 	sync_callback_count = callback_count;
 	sync_synchronization_completed = synchronization_completed;
 }
 
-void sync_confchg_fn (
-	enum totem_configuration_type configuration_type,
-	struct totem_ip_address *member_list, int member_list_entries,
-	struct totem_ip_address *left_list, int left_list_entries,
-	struct totem_ip_address *joined_list, int joined_list_entries,
+void sync_primary_callback_fn (
+	struct totem_ip_address *view_list,
+	int view_list_entries,
+	int primary_designated,
 	struct memb_ring_id *ring_id)
 {
 	int i;
 
-	if (configuration_type != TOTEM_CONFIGURATION_REGULAR) {
+	if (primary_designated) {
+		log_printf (LOG_LEVEL_NOTICE, "This node is within the primary component and will provide service.\n");
+	} else {
+		log_printf (LOG_LEVEL_NOTICE, "This node is within the non-primary component and will NOT provide any services.\n");
 		return;
 	}
 
+	/*
+	 * Execute configuration change for synchronization service
+	 */
 	sync_processing = 1;
 
 	totempg_callback_token_destroy (&sync_callback_token_handle);
@@ -220,23 +256,27 @@ void sync_confchg_fn (
 
 	sync_recovery_index = 0;
 	memset (&barrier_data_confchg, 0, sizeof (barrier_data_confchg));
-	for (i = 0; i < member_list_entries; i++) {
-		totemip_copy(&barrier_data_confchg[i].addr, &member_list[i]);
+	for (i = 0; i < view_list_entries; i++) {
+		totemip_copy(&barrier_data_confchg[i].addr, &view_list[i]);
 		barrier_data_confchg[i].completed = 0;
 	}
 	memcpy (barrier_data_process, barrier_data_confchg,
 		sizeof (barrier_data_confchg));
-	barrier_data_confchg_entries = member_list_entries;
+	barrier_data_confchg_entries = view_list_entries;
 	sync_start_init (ring_id);
 }
 
 static struct memb_ring_id deliver_ring_id;
 
-int sync_deliver_fn (void *msg, struct totem_ip_address *source_addr,
-	int endian_conversion_needed)
+int sync_deliver_fn (
+	struct totem_ip_address *source_addr,
+	struct iovec *iovec,
+	int iov_len,
+	int endian_conversion_required)
+
 {
 	struct req_exec_sync_barrier_start *req_exec_sync_barrier_start =
-		(struct req_exec_sync_barrier_start *)msg;
+		(struct req_exec_sync_barrier_start *)iovec[0].iov_base;
 
 	int i;
 

+ 0 - 10
exec/sync.h

@@ -51,16 +51,6 @@ void sync_register (
 	int callback_count,
 	void (*synchronization_completed) (void));
 
-void sync_confchg_fn (
-	enum totem_configuration_type configuration_type,
-	struct totem_ip_address *member_list, int member_list_entries,
-	struct totem_ip_address *left_list, int left_list_entries,
-	struct totem_ip_address *joined_list, int joined_list_entries,
-	struct memb_ring_id *ring_id);
-
-int sync_deliver_fn (void *msg, struct totem_ip_address *source_addr,
-	int endian_conversion_needed);
-
 int sync_in_process (void);
 
 #endif /* SYNC_H_DEFINED */

+ 11 - 9
exec/totempg.c

@@ -224,15 +224,17 @@ static inline void app_confchg_fn (
 			i, (void *)&instance);
 
 		if (error == SA_OK) {
-			instance->confchg_fn (
-				configuration_type,
-				member_list,
-				member_list_entries,
-				left_list,
-				left_list_entries,
-				joined_list,
-				joined_list_entries,
-				ring_id);
+			if (instance->confchg_fn) {
+				instance->confchg_fn (
+					configuration_type,
+					member_list,
+					member_list_entries,
+					left_list,
+					left_list_entries,
+					joined_list,
+					joined_list_entries,
+					ring_id);
+			}
 
 			saHandleInstancePut (&totempg_groups_instance_database, i);
 		}

+ 13 - 6
exec/ykd.c

@@ -130,10 +130,13 @@ static int ambiguous_sessions_max_entries;
 
 static int primary_designated = 0;
 
+static struct memb_ring_id ykd_ring_id;
+
 static void (*ykd_primary_callback_fn) (
 	struct totem_ip_address *view_list,
 	int view_list_entries,
-	int primary_designated) = NULL;
+	int primary_designated,
+	struct memb_ring_id *ring_id) = NULL;
 
 void ykd_state_init (void)
 {
@@ -275,8 +278,6 @@ static void ykd_deliver_fn (
 	char *msg_state = iovec->iov_base + sizeof (struct ykd_header);
 	
 #ifdef COMPILE_OUT
-	memcpy (&ykd_ring_id, &req_exec_sync_barrier_start->ring_id,
-	sizeof (struct memb_ring_id));
 
 /*
 	* Is this barrier from this configuration, if not, ignore it
@@ -351,7 +352,8 @@ static void ykd_deliver_fn (
 				ykd_primary_callback_fn (
 					view_list,
 					view_list_entries,
-					primary_designated);
+					primary_designated,
+					&ykd_ring_id);
 
 				memcpy (ykd_state.last_primary.member_list, view_list, sizeof (view_list));
 				ykd_state.last_primary.member_list_entries = view_list_entries;
@@ -375,6 +377,9 @@ static void ykd_confchg_fn (
 	if (configuration_type != TOTEM_CONFIGURATION_REGULAR) {
 		return;
 	}
+
+	memcpy (&ykd_ring_id, ring_id, sizeof (struct memb_ring_id));
+
 	if (first_run) {
 		totemip_copy (&ykd_state.last_primary.member_list[0], this_ip);
 		ykd_state.last_primary.member_list_entries = 1;
@@ -392,7 +397,8 @@ static void ykd_confchg_fn (
 	ykd_primary_callback_fn (
 		view_list,
 		view_list_entries,
-		primary_designated);
+		primary_designated,
+		&ykd_ring_id);
 
 	memset (&state_received_confchg, 0, sizeof (state_received_confchg));
 	for (i = 0; i < member_list_entries; i++) {
@@ -417,7 +423,8 @@ int ykd_init (
 	void (*primary_callback_fn) (
 		struct totem_ip_address *view_list,
 		int view_list_entries,
-		int primary_designated))
+		int primary_designated,
+		struct memb_ring_id *ring_id))
 {
 	ykd_primary_callback_fn = primary_callback_fn;
 

+ 2 - 1
exec/ykd.h

@@ -41,7 +41,8 @@ int ykd_init (
     void (*primary_callback_fn) (
 	struct totem_ip_address *view_list,
 	int view_list_entries,
-	int primary_designated));
+	int primary_designated,
+	struct memb_ring_id *ring_id));
 
 /*
  * Returns 1 if we are primary component, 0 if not