Просмотр исходного кода

- sync_abort is called if there is a new config change during synchronization

- a new function sync_request() that can be called by a user to execute
synchronization on request of a specified service. 



git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1280 fd59a12c-fef9-0310-b244-a6a79926bd2f
Hans Feldt 19 лет назад
Родитель
Сommit
5796de0f76
2 измененных файлов с 265 добавлено и 85 удалено
  1. 257 85
      exec/sync.c
  2. 8 0
      exec/sync.h

+ 257 - 85
exec/sync.c

@@ -1,9 +1,11 @@
 /*
  * Copyright (c) 2005-2006 MontaVista Software, Inc.
+ * Copyright (c) 2006 Ericsson AB.
+ * Author: Steven Dake (sdake@mvista.com)
+ * Author: Hans Feldt
  *
  * All rights reserved.
  *
- * Author: Steven Dake (sdake@mvista.com)
  *
  * This software licensed under BSD license, the text of which follows:
  * 
@@ -57,8 +59,10 @@
 #include "vsf.h"
 #include "../lcr/lcr_ifact.h"
 #include "print.h"
+#include "util.h"
 
 #define MESSAGE_REQ_SYNC_BARRIER 0
+#define MESSAGE_REQ_SYNC_REQUEST 1
 
 struct barrier_data {
 	unsigned int nodeid;
@@ -69,7 +73,8 @@ static struct memb_ring_id *sync_ring_id;
 
 static int vsf_none = 0;
 
-static int (*sync_callbacks_retrieve) (int sync_id, struct sync_callbacks *callack);
+static int (*sync_callbacks_retrieve) (int sync_id,
+	struct sync_callbacks *callack);
 
 static struct sync_callbacks sync_callbacks;
 
@@ -80,10 +85,7 @@ static void (*sync_synchronization_completed) (void);
 static int sync_recovery_index = 0;
 
 static void *sync_callback_token_handle = 0;
-
-static struct barrier_data barrier_data_confchg[PROCESSOR_COUNT_MAX];
-
-static int barrier_data_confchg_entries;
+static void *sync_request_token_handle;
 
 static struct barrier_data barrier_data_process[PROCESSOR_COUNT_MAX];
 
@@ -91,11 +93,13 @@ static struct openais_vsf_iface_ver0 *vsf_iface;
 
 static int sync_barrier_send (struct memb_ring_id *ring_id);
 
-static int sync_start_process (enum totem_callback_token_type type, void *data);
+static int sync_start_process (
+	enum totem_callback_token_type type, void *data);
 
 static void sync_service_init (struct memb_ring_id *ring_id);
 
-static int sync_service_process (enum totem_callback_token_type type, void *data);
+static int sync_service_process (
+	enum totem_callback_token_type type, void *data);
 
 static void sync_deliver_fn (
 	unsigned int nodeid,
@@ -117,52 +121,71 @@ static void sync_primary_callback_fn (
 	struct memb_ring_id *ring_id);
 
 static struct totempg_group sync_group = {
-    .group      = "sync",
-    .group_len  = 4
+	.group      = "sync",
+	.group_len  = 4
 };
 
 static totempg_groups_handle sync_group_handle;
+static char *service_name;
+static struct memb_ring_id deliver_ring_id;
+static unsigned int current_members[PROCESSOR_COUNT_MAX];
+static unsigned int current_members_cnt;
+
+struct sync_barrier_start {
+};
 
-struct req_exec_sync_barrier_start {
+struct sync_request {
+	uint32_t name_len;
+	char name[0] __attribute__((aligned(8)));
+};
+
+typedef struct sync_msg {
 	mar_req_header_t header;
 	struct memb_ring_id ring_id;
-};
+	union {
+		struct sync_barrier_start sync_barrier_start;
+		struct sync_request sync_request;
+	};
+} sync_msg_t;
 
 /*
  * Send a barrier data structure
  */
 static int sync_barrier_send (struct memb_ring_id *ring_id)
 {
-	struct req_exec_sync_barrier_start req_exec_sync_barrier_start;
+	sync_msg_t msg;
 	struct iovec iovec;
 	int res;
 
-	req_exec_sync_barrier_start.header.size = sizeof (struct req_exec_sync_barrier_start);
-	req_exec_sync_barrier_start.header.id = MESSAGE_REQ_SYNC_BARRIER;
+	msg.header.size = sizeof (sync_msg_t);
+	msg.header.id = MESSAGE_REQ_SYNC_BARRIER;
 
-	memcpy (&req_exec_sync_barrier_start.ring_id, ring_id,
-		sizeof (struct memb_ring_id));
+	memcpy (&msg.ring_id, ring_id, sizeof (struct memb_ring_id));
 
-	iovec.iov_base = (char *)&req_exec_sync_barrier_start;
-	iovec.iov_len = sizeof (req_exec_sync_barrier_start);
+	iovec.iov_base = (char *)&msg;
+	iovec.iov_len = sizeof (msg);
 
-	res = totempg_groups_mcast_joined (sync_group_handle, &iovec, 1, TOTEMPG_AGREED);
+	res = totempg_groups_mcast_joined (
+		sync_group_handle, &iovec, 1, TOTEMPG_AGREED);
 
 	return (res);
 }
 
-void sync_start_init (struct memb_ring_id *ring_id)
+static void sync_start_init (struct memb_ring_id *ring_id)
 {
+	ENTER("");
 	totempg_callback_token_create (
 		&sync_callback_token_handle,
 		TOTEM_CALLBACK_TOKEN_SENT,
 		0, /* don't delete after callback */
 		sync_start_process,
 		(void *)ring_id);
+	LEAVE("");
 }
 
 static void sync_service_init (struct memb_ring_id *ring_id)
 {
+	ENTER("");
 	sync_callbacks.sync_init ();
 	totempg_callback_token_destroy (&sync_callback_token_handle);
 
@@ -175,13 +198,16 @@ static void sync_service_init (struct memb_ring_id *ring_id)
 		0, /* don't delete after callback */
 		sync_service_process,
 		(void *)ring_id);
+	LEAVE("");
 }
 
-static int sync_start_process (enum totem_callback_token_type type, void *data)
+static int sync_start_process (
+	enum totem_callback_token_type type, void *data)
 {
 	int res;
 	struct memb_ring_id *ring_id = (struct memb_ring_id *)data;
 
+	ENTER("");
 	res = sync_barrier_send (ring_id);
 	if (res == 0) {
 		/*
@@ -189,13 +215,15 @@ static int sync_start_process (enum totem_callback_token_type type, void *data)
 		 */
 		totempg_callback_token_destroy (&sync_callback_token_handle);
 	}
+	LEAVE("");
 	return (0);
 }
 
-void sync_callbacks_load (void)
+static void sync_callbacks_load (void)
 {
 	int res;
 
+	ENTER("");
 // TODO rewrite this to get rid of the for (;;)
 	for (;;) {
 		res = sync_callbacks_retrieve (sync_recovery_index, &sync_callbacks);
@@ -206,25 +234,33 @@ void sync_callbacks_load (void)
 			sync_processing = 0;
 			break;
 		}
+		if ((service_name != NULL) &&
+			strcmp ((char*)sync_callbacks.name, service_name) != 0) {
+			sync_recovery_index += 1;
+			continue;
+		}
 		sync_recovery_index += 1;
 		if (sync_callbacks.sync_init) {
 			break;
 		}
 	}
+	LEAVE("");
 }
 
-static int sync_service_process (enum totem_callback_token_type type, void *data)
+static int sync_service_process (
+	enum totem_callback_token_type type, void *data)
 {
 	int res;
 	struct memb_ring_id *ring_id = (struct memb_ring_id *)data;
 
-	
+	ENTER("");
+
 	/*
 	 * If process operation not from this ring id, then ignore it and stop
 	 * processing
 	 */
 	if (memcmp (ring_id, sync_ring_id, sizeof (struct memb_ring_id)) != 0) {
-		return (0);
+		goto end;
 	}
 	
 	/*
@@ -233,12 +269,14 @@ static int sync_service_process (enum totem_callback_token_type type, void *data
 	 */
 	res = sync_callbacks.sync_process ();
 	if (res != 0) {
-		return (0);
+		goto end;
 	}
 	totempg_callback_token_destroy (&sync_callback_token_handle);
 
 	sync_start_init (ring_id);
 
+end:
+	LEAVE("");
 	return (0);
 }
 
@@ -314,10 +352,16 @@ static void sync_primary_callback_fn (
 {
 	int i;
 
+	ENTER("");
+
 	if (primary_designated) {
-		log_printf (LOG_LEVEL_NOTICE, "This node is within the primary component and will provide service.\n");
+		log_printf (LOG_LEVEL_NOTICE,
+			"This node is within the primary component and will provide"
+			" service.\n");
 	} else {
-		log_printf (LOG_LEVEL_NOTICE, "This node is within the non-primary component and will NOT provide any services.\n");
+		log_printf (LOG_LEVEL_NOTICE,
+			"This node is within the non-primary component and will NOT"
+			" provide any services.\n");
 		return;
 	}
 
@@ -329,23 +373,14 @@ static void sync_primary_callback_fn (
 	totempg_callback_token_destroy (&sync_callback_token_handle);
 
 	sync_recovery_index = 0;
-	memset (&barrier_data_confchg, 0, sizeof (barrier_data_confchg));
+
 	for (i = 0; i < view_list_entries; i++) {
-		barrier_data_confchg[i].nodeid = view_list[i];
-		barrier_data_confchg[i].completed = 0;
+		barrier_data_process[i].nodeid = view_list[i];
+		barrier_data_process[i].completed = 0;
 	}
-	memcpy (barrier_data_process, barrier_data_confchg,
-		sizeof (barrier_data_confchg));
-	barrier_data_confchg_entries = view_list_entries;
-	sync_start_init (sync_ring_id);
-}
-
-static struct memb_ring_id deliver_ring_id;
 
-void sync_endian_convert (struct req_exec_sync_barrier_start *req_exec_sync_barrier_start)
-{
-	/* XXX no swab on mar_req_header_t? */
-	swab_memb_ring_id_t (&req_exec_sync_barrier_start->ring_id);
+	sync_start_init (sync_ring_id);
+	LEAVE("");
 }
 
 static void sync_deliver_fn (
@@ -354,36 +389,67 @@ static void sync_deliver_fn (
 	int iov_len,
 	int endian_conversion_required)
 {
-	struct req_exec_sync_barrier_start *req_exec_sync_barrier_start =
-		(struct req_exec_sync_barrier_start *)iovec[0].iov_base;
-
 	int i;
+	int barrier_completed;
+	sync_msg_t *msg = iovec[0].iov_base;
+
+	ENTER("type %d, len %d", msg->header.id, iovec[0].iov_len);
 
 	if (endian_conversion_required) {
-		sync_endian_convert (req_exec_sync_barrier_start);
+		swab_mar_req_header_t (&msg->header);
+		swab_memb_ring_id_t (&msg->ring_id);
 	}
 
-	int barrier_completed = 1;
-
-	memcpy (&deliver_ring_id, &req_exec_sync_barrier_start->ring_id,
-		sizeof (struct memb_ring_id));
-
 	/*
-	 * Is this barrier from this configuration, if not, ignore it
+	 * If this message is not from this configuration, ignore it
 	 */
-	if (memcmp (&req_exec_sync_barrier_start->ring_id, sync_ring_id,
+	if (memcmp (&msg->ring_id, sync_ring_id,
 		sizeof (struct memb_ring_id)) != 0) {
-		return;
+		goto end;
 	}
 
+	if (msg->header.id == MESSAGE_REQ_SYNC_REQUEST) {
+		if (endian_conversion_required) {
+			swab_mar_uint32_t (&msg->sync_request.name_len);
+		}	
+		/*
+		 * If there is an ongoing sync, abort it. A requested sync is
+		 * only allowed to abort other requested synchronizations,
+		 * not full synchronizations.
+		 */
+		if (sync_processing && sync_callbacks.sync_abort) {
+			sync_callbacks.sync_abort();
+			sync_callbacks.sync_activate = NULL;
+			sync_processing = 0;
+			assert (service_name != NULL);
+			free (service_name);
+			service_name = NULL;
+		}
+
+		service_name = malloc (msg->sync_request.name_len);
+		strcpy (service_name, msg->sync_request.name);
+
+		/* 
+		 * Start requested synchronization 
+		 */
+		sync_primary_callback_fn (current_members, current_members_cnt,	1,
+			sync_ring_id);
+
+		goto end;
+	}
+
+	barrier_completed = 1;
+
+	memcpy (&deliver_ring_id, &msg->ring_id, sizeof (struct memb_ring_id));
+
 	/*
 	 * Set completion for source_addr's address
 	 */
-	for (i = 0; i < barrier_data_confchg_entries; i++) {
+	for (i = 0; i < current_members_cnt; i++) {
 		if (nodeid == barrier_data_process[i].nodeid) {
 			barrier_data_process[i].completed = 1;
 			log_printf (LOG_LEVEL_DEBUG,
-				"Barrier Start Recieved From %d\n",
+				"Barrier Start Received From %d\n",
 				barrier_data_process[i].nodeid);
 			break;
 		}
@@ -392,37 +458,37 @@ static void sync_deliver_fn (
 	/*
 	 * Test if barrier is complete
 	 */
-	for (i = 0; i < barrier_data_confchg_entries; i++) {
+	for (i = 0; i < current_members_cnt; i++) {
 		log_printf (LOG_LEVEL_DEBUG,
 			"Barrier completion status for nodeid %d = %d. \n", 
 			barrier_data_process[i].nodeid,
 			barrier_data_process[i].completed);
+
 		if (barrier_data_process[i].completed == 0) {
 			barrier_completed = 0;
 		}
 	}
-	if (barrier_completed) {
-		log_printf (LOG_LEVEL_DEBUG,
-			"Synchronization barrier completed\n");
-	}
-	/*
-	 * This sync is complete so activate and start next service sync
-	 */
-	if (barrier_completed && sync_callbacks.sync_activate) {
-		sync_callbacks.sync_activate ();
-	
-		log_printf (LOG_LEVEL_DEBUG,
-			"Committing synchronization for (%s)\n",
-			sync_callbacks.name);
-	
-	}
 
-	/*
-	 * Start synchronization if the barrier has completed
-	 */
 	if (barrier_completed) {
-		memcpy (barrier_data_process, barrier_data_confchg,
-			sizeof (barrier_data_confchg));
+		log_printf (LOG_LEVEL_DEBUG, "Synchronization barrier completed\n");
+		/*
+		 * This sync is complete so activate and start next service sync
+		*/
+		if (sync_callbacks.sync_activate) {
+			log_printf (LOG_LEVEL_DEBUG,
+				"Committing synchronization for (%s)\n",
+				sync_callbacks.name);
+
+			sync_callbacks.sync_activate ();
+		}
+
+		/*
+		 * Start synchronization if the barrier has completed
+		*/
+		for (i = 0; i < current_members_cnt; i++) {
+			barrier_data_process[i].nodeid = current_members[i];
+			barrier_data_process[i].completed = 0;
+		}
 
 		sync_callbacks_load();
 
@@ -434,9 +500,15 @@ static void sync_deliver_fn (
 				"Synchronization actions starting for (%s)\n",
 				sync_callbacks.name);
 			sync_service_init (&deliver_ring_id);
+		} else {
+			if (service_name != NULL) {
+				free (service_name);
+				service_name = NULL;
+			}
 		}
 	}
-	return;
+end:
+	LEAVE("");
 }
 
 static void sync_confchg_fn (
@@ -446,19 +518,94 @@ static void sync_confchg_fn (
 	unsigned int *joined_list, int joined_list_entries,
 	struct memb_ring_id *ring_id)
 {
+	int i;
+
+	ENTER("");
+
+	/*
+	 * Save current members and ring ID for later use
+	 */
+	for (i = 0; i < member_list_entries; i++) {
+		current_members[i] = member_list[i];
+	}
+	current_members_cnt = member_list_entries;
 	sync_ring_id = ring_id;
 
 	/*
-	 * If no virtual synchrony filter configured, then start
-	 * synchronization process
+	 * If no virtual synchrony filter configured.
 	 */
 	if (vsf_none == 1) {
+		/*
+		 * If there is an ongoing synchronization, abort it.
+		 */
+		if (sync_processing && sync_callbacks.sync_abort) {
+			sync_callbacks.sync_abort();
+			sync_callbacks.sync_activate = NULL;
+			sync_processing = 0;
+			if (service_name != NULL) {
+				free (service_name);
+				service_name = NULL;
+			}
+		}
+
+		/*
+		 * Start new synchronization process 
+		 */
 		sync_primary_callback_fn (
-			member_list,
-			member_list_entries,
-			1,
-			ring_id);
+			member_list, member_list_entries,	1, ring_id);
+	}
+	LEAVE("");
+}
+
+/**
+ * TOTEM callback function used to multicast a sync_request
+ * message
+ * @param type
+ * @param _name
+ * 
+ * @return int
+ */
+static int sync_request_send (
+	enum totem_callback_token_type type, void *_name)
+{
+	int res;
+	char *name = _name;
+	sync_msg_t msg;
+	struct iovec iovec[2];
+	int name_len;
+
+	ENTER("'%s'", name);
+
+	name_len = strlen (name) + 1;
+	msg.header.size = sizeof (msg) + name_len;
+	msg.header.id = MESSAGE_REQ_SYNC_REQUEST;
+
+	memcpy (&msg.ring_id, sync_ring_id,	sizeof (struct memb_ring_id));
+	msg.sync_request.name_len = name_len;
+
+	iovec[0].iov_base = (char *)&msg;
+	iovec[0].iov_len = sizeof (msg);
+	iovec[1].iov_base = _name;
+	iovec[1].iov_len = name_len;
+
+	res = totempg_groups_mcast_joined (
+		sync_group_handle, iovec, 2, TOTEMPG_AGREED);
+
+	if (res == 0) {
+		/*
+		 * We managed to multicast the message so delete the token callback
+		 * for the sync request.
+		 */
+		totempg_callback_token_destroy (&sync_request_token_handle);
 	}
+
+	/*
+	 * if we failed to multicast the message, this function will be called
+	 * again.
+	 */
+
+	LEAVE("");
+	return (0);
 }
 
 int sync_in_process (void)
@@ -474,3 +621,28 @@ int sync_primary_designated (void)
 		return (vsf_iface->primary());
 	}
 }
+
+/**
+ * Execute synchronization upon request for the named service
+ * @param name
+ * 
+ * @return int
+ */
+int sync_request (char *name)
+{
+	assert (name != NULL);
+
+	ENTER("'%s'", name);
+
+	if (sync_processing) {
+		return -1;
+	}
+
+	totempg_callback_token_create (&sync_request_token_handle,
+		TOTEM_CALLBACK_TOKEN_SENT, 0, /* don't delete after callback */
+		sync_request_send, name);
+
+	LEAVE("");
+
+	return 0;
+}

+ 8 - 0
exec/sync.h

@@ -56,4 +56,12 @@ int sync_in_process (void);
 
 int sync_primary_designated (void);
 
+/**
+ * Execute synchronization upon request for the named service
+ * @param name service handler name to synchronize
+ * 
+ * @return int 0 OK, error code otherwise
+ */
+extern int sync_request (char *name);
+
 #endif /* SYNC_H_DEFINED */