|
@@ -63,6 +63,7 @@
|
|
|
LOGSYS_DECLARE_SUBSYS ("SYNC", LOG_INFO);
|
|
LOGSYS_DECLARE_SUBSYS ("SYNC", LOG_INFO);
|
|
|
|
|
|
|
|
#define MESSAGE_REQ_SYNC_BARRIER 0
|
|
#define MESSAGE_REQ_SYNC_BARRIER 0
|
|
|
|
|
+#define MESSAGE_REQ_SYNC_REQUEST 1
|
|
|
|
|
|
|
|
struct barrier_data {
|
|
struct barrier_data {
|
|
|
unsigned int nodeid;
|
|
unsigned int nodeid;
|
|
@@ -84,6 +85,7 @@ static void (*sync_synchronization_completed) (void);
|
|
|
static int sync_recovery_index = 0;
|
|
static int sync_recovery_index = 0;
|
|
|
|
|
|
|
|
static void *sync_callback_token_handle = 0;
|
|
static void *sync_callback_token_handle = 0;
|
|
|
|
|
+static void *sync_request_token_handle;
|
|
|
|
|
|
|
|
static struct barrier_data barrier_data_confchg[PROCESSOR_COUNT_MAX];
|
|
static struct barrier_data barrier_data_confchg[PROCESSOR_COUNT_MAX];
|
|
|
|
|
|
|
@@ -126,12 +128,27 @@ static struct totempg_group sync_group = {
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
static totempg_groups_handle sync_group_handle;
|
|
static totempg_groups_handle sync_group_handle;
|
|
|
|
|
+static char *service_name;
|
|
|
|
|
+static unsigned int current_members[PROCESSOR_COUNT_MAX];
|
|
|
|
|
+static unsigned int current_members_cnt;
|
|
|
|
|
|
|
|
struct req_exec_sync_barrier_start {
|
|
struct req_exec_sync_barrier_start {
|
|
|
mar_req_header_t header;
|
|
mar_req_header_t header;
|
|
|
struct memb_ring_id ring_id;
|
|
struct memb_ring_id ring_id;
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
|
|
+struct sync_request {
|
|
|
|
|
+ uint32_t name_len;
|
|
|
|
|
+ char name[0] __attribute__((aligned(8)));
|
|
|
|
|
+};
|
|
|
|
|
+
|
|
|
|
|
+typedef struct sync_msg {
|
|
|
|
|
+ mar_req_header_t header;
|
|
|
|
|
+ struct memb_ring_id ring_id;
|
|
|
|
|
+ struct sync_request sync_request;
|
|
|
|
|
+} sync_msg_t;
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
/*
|
|
/*
|
|
|
* Send a barrier data structure
|
|
* Send a barrier data structure
|
|
|
*/
|
|
*/
|
|
@@ -362,6 +379,7 @@ static void sync_deliver_fn (
|
|
|
{
|
|
{
|
|
|
struct req_exec_sync_barrier_start *req_exec_sync_barrier_start =
|
|
struct req_exec_sync_barrier_start *req_exec_sync_barrier_start =
|
|
|
(struct req_exec_sync_barrier_start *)iovec[0].iov_base;
|
|
(struct req_exec_sync_barrier_start *)iovec[0].iov_base;
|
|
|
|
|
+ sync_msg_t *msg = (sync_msg_t *)iovec[0].iov_base;
|
|
|
|
|
|
|
|
int i;
|
|
int i;
|
|
|
|
|
|
|
@@ -382,6 +400,36 @@ static void sync_deliver_fn (
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ if (msg->header.id == MESSAGE_REQ_SYNC_REQUEST) {
|
|
|
|
|
+ if (endian_conversion_required) {
|
|
|
|
|
+ swab_mar_uint32_t (&msg->sync_request.name_len);
|
|
|
|
|
+ }
|
|
|
|
|
+ /*
|
|
|
|
|
+ * If there is an ongoing sync, abort it. A requested sync is
|
|
|
|
|
+ * only allowed to abort other requested synchronizations,
|
|
|
|
|
+ * not full synchronizations.
|
|
|
|
|
+ */
|
|
|
|
|
+ if (sync_processing && sync_callbacks.sync_abort) {
|
|
|
|
|
+ sync_callbacks.sync_abort();
|
|
|
|
|
+ sync_callbacks.sync_activate = NULL;
|
|
|
|
|
+ sync_processing = 0;
|
|
|
|
|
+ assert (service_name != NULL);
|
|
|
|
|
+ free (service_name);
|
|
|
|
|
+ service_name = NULL;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ service_name = malloc (msg->sync_request.name_len);
|
|
|
|
|
+ strcpy (service_name, msg->sync_request.name);
|
|
|
|
|
+
|
|
|
|
|
+ /*
|
|
|
|
|
+ * Start requested synchronization
|
|
|
|
|
+ */
|
|
|
|
|
+ sync_primary_callback_fn (current_members, current_members_cnt, 1,
|
|
|
|
|
+ sync_ring_id);
|
|
|
|
|
+
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/*
|
|
/*
|
|
|
* Set completion for source_addr's address
|
|
* Set completion for source_addr's address
|
|
|
*/
|
|
*/
|
|
@@ -451,6 +499,7 @@ static void sync_confchg_fn (
|
|
|
unsigned int *joined_list, int joined_list_entries,
|
|
unsigned int *joined_list, int joined_list_entries,
|
|
|
struct memb_ring_id *ring_id)
|
|
struct memb_ring_id *ring_id)
|
|
|
{
|
|
{
|
|
|
|
|
+ int i;
|
|
|
sync_ring_id = ring_id;
|
|
sync_ring_id = ring_id;
|
|
|
|
|
|
|
|
if (configuration_type != TOTEM_CONFIGURATION_REGULAR) {
|
|
if (configuration_type != TOTEM_CONFIGURATION_REGULAR) {
|
|
@@ -460,6 +509,14 @@ static void sync_confchg_fn (
|
|
|
sync_callbacks.sync_abort ();
|
|
sync_callbacks.sync_abort ();
|
|
|
sync_callbacks.sync_activate = NULL;
|
|
sync_callbacks.sync_activate = NULL;
|
|
|
}
|
|
}
|
|
|
|
|
+ /*
|
|
|
|
|
+ * Save current members and ring ID for later use
|
|
|
|
|
+ */
|
|
|
|
|
+ for (i = 0; i < member_list_entries; i++) {
|
|
|
|
|
+ current_members[i] = member_list[i];
|
|
|
|
|
+ }
|
|
|
|
|
+ current_members_cnt = member_list_entries;
|
|
|
|
|
+
|
|
|
/*
|
|
/*
|
|
|
* If no virtual synchrony filter configured, then start
|
|
* If no virtual synchrony filter configured, then start
|
|
|
* synchronization process
|
|
* synchronization process
|
|
@@ -472,6 +529,60 @@ static void sync_confchg_fn (
|
|
|
ring_id);
|
|
ring_id);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+/**
|
|
|
|
|
+ * TOTEM callback function used to multicast a sync_request
|
|
|
|
|
+ * message
|
|
|
|
|
+ * @param type
|
|
|
|
|
+ * @param _name
|
|
|
|
|
+ *
|
|
|
|
|
+ * @return int
|
|
|
|
|
+ */
|
|
|
|
|
+static int sync_request_send (
|
|
|
|
|
+ enum totem_callback_token_type type, void *_name)
|
|
|
|
|
+{
|
|
|
|
|
+ int res;
|
|
|
|
|
+ char *name = _name;
|
|
|
|
|
+ sync_msg_t msg;
|
|
|
|
|
+ struct iovec iovec[2];
|
|
|
|
|
+ int name_len;
|
|
|
|
|
+
|
|
|
|
|
+ ENTER("'%s'", name);
|
|
|
|
|
+
|
|
|
|
|
+ name_len = strlen (name) + 1;
|
|
|
|
|
+ msg.header.size = sizeof (msg) + name_len;
|
|
|
|
|
+ msg.header.id = MESSAGE_REQ_SYNC_REQUEST;
|
|
|
|
|
+
|
|
|
|
|
+ if (sync_ring_id == NULL) {
|
|
|
|
|
+ log_printf (LOG_LEVEL_ERROR,
|
|
|
|
|
+ "%s sync_ring_id is NULL.\n", __func__);
|
|
|
|
|
+ return 1;
|
|
|
|
|
+ }
|
|
|
|
|
+ memcpy (&msg.ring_id, sync_ring_id, sizeof (struct memb_ring_id));
|
|
|
|
|
+ msg.sync_request.name_len = name_len;
|
|
|
|
|
+
|
|
|
|
|
+ iovec[0].iov_base = (char *)&msg;
|
|
|
|
|
+ iovec[0].iov_len = sizeof (msg);
|
|
|
|
|
+ iovec[1].iov_base = _name;
|
|
|
|
|
+ iovec[1].iov_len = name_len;
|
|
|
|
|
+
|
|
|
|
|
+ res = totempg_groups_mcast_joined (
|
|
|
|
|
+ sync_group_handle, iovec, 2, TOTEMPG_AGREED);
|
|
|
|
|
+
|
|
|
|
|
+ if (res == 0) {
|
|
|
|
|
+ /*
|
|
|
|
|
+ * We managed to multicast the message so delete the token callback
|
|
|
|
|
+ * for the sync request.
|
|
|
|
|
+ */
|
|
|
|
|
+ totempg_callback_token_destroy (&sync_request_token_handle);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /*
|
|
|
|
|
+ * if we failed to multicast the message, this function will be called
|
|
|
|
|
+ * again.
|
|
|
|
|
+ */
|
|
|
|
|
+
|
|
|
|
|
+ return (0);
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
int sync_in_process (void)
|
|
int sync_in_process (void)
|
|
|
{
|
|
{
|
|
@@ -486,3 +597,28 @@ int sync_primary_designated (void)
|
|
|
return (vsf_iface->primary());
|
|
return (vsf_iface->primary());
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * Execute synchronization upon request for the named service
|
|
|
|
|
+ * @param name
|
|
|
|
|
+ *
|
|
|
|
|
+ * @return int
|
|
|
|
|
+ */
|
|
|
|
|
+int sync_request (char *name)
|
|
|
|
|
+{
|
|
|
|
|
+ assert (name != NULL);
|
|
|
|
|
+
|
|
|
|
|
+ ENTER("'%s'", name);
|
|
|
|
|
+
|
|
|
|
|
+ if (sync_processing) {
|
|
|
|
|
+ return -1;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ totempg_callback_token_create (&sync_request_token_handle,
|
|
|
|
|
+ TOTEM_CALLBACK_TOKEN_SENT, 0, /* don't delete after callback */
|
|
|
|
|
+ sync_request_send, name);
|
|
|
|
|
+
|
|
|
|
|
+ LEAVE("");
|
|
|
|
|
+
|
|
|
|
|
+ return 0;
|
|
|
|
|
+}
|