|
|
@@ -1,5 +1,5 @@
|
|
|
/*
|
|
|
- * Copyright (c) 2006-2012 Red Hat, Inc.
|
|
|
+ * Copyright (c) 2006-2015 Red Hat, Inc.
|
|
|
*
|
|
|
* All rights reserved.
|
|
|
*
|
|
|
@@ -83,7 +83,8 @@ enum cpg_message_req_types {
|
|
|
MESSAGE_REQ_EXEC_CPG_JOINLIST = 2,
|
|
|
MESSAGE_REQ_EXEC_CPG_MCAST = 3,
|
|
|
MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD = 4,
|
|
|
- MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5
|
|
|
+ MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5,
|
|
|
+ MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST = 6,
|
|
|
};
|
|
|
|
|
|
struct zcb_mapped {
|
|
|
@@ -156,6 +157,8 @@ struct cpg_pd {
|
|
|
enum cpd_state cpd_state;
|
|
|
unsigned int flags;
|
|
|
int initial_totem_conf_sent;
|
|
|
+ uint64_t transition_counter; /* These two are used when sending fragmented messages */
|
|
|
+ uint64_t initial_transition_counter;
|
|
|
struct list_head list;
|
|
|
struct list_head iteration_instance_list_head;
|
|
|
struct list_head zcb_mapped_list_head;
|
|
|
@@ -224,6 +227,10 @@ static void message_handler_req_exec_cpg_mcast (
|
|
|
const void *message,
|
|
|
unsigned int nodeid);
|
|
|
|
|
|
+static void message_handler_req_exec_cpg_partial_mcast (
|
|
|
+ const void *message,
|
|
|
+ unsigned int nodeid);
|
|
|
+
|
|
|
static void message_handler_req_exec_cpg_downlist_old (
|
|
|
const void *message,
|
|
|
unsigned int nodeid);
|
|
|
@@ -238,6 +245,8 @@ static void exec_cpg_joinlist_endian_convert (void *msg);
|
|
|
|
|
|
static void exec_cpg_mcast_endian_convert (void *msg);
|
|
|
|
|
|
+static void exec_cpg_partial_mcast_endian_convert (void *msg);
|
|
|
+
|
|
|
static void exec_cpg_downlist_endian_convert_old (void *msg);
|
|
|
|
|
|
static void exec_cpg_downlist_endian_convert (void *msg);
|
|
|
@@ -250,6 +259,8 @@ static void message_handler_req_lib_cpg_finalize (void *conn, const void *messag
|
|
|
|
|
|
static void message_handler_req_lib_cpg_mcast (void *conn, const void *message);
|
|
|
|
|
|
+static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message);
|
|
|
+
|
|
|
static void message_handler_req_lib_cpg_membership (void *conn,
|
|
|
const void *message);
|
|
|
|
|
|
@@ -383,7 +394,10 @@ static struct corosync_lib_handler cpg_lib_engine[] =
|
|
|
.lib_handler_fn = message_handler_req_lib_cpg_zc_execute,
|
|
|
.flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
|
|
|
},
|
|
|
-
|
|
|
+ { /* 12 */
|
|
|
+ .lib_handler_fn = message_handler_req_lib_cpg_partial_mcast,
|
|
|
+ .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
|
|
|
+ },
|
|
|
|
|
|
};
|
|
|
|
|
|
@@ -413,6 +427,10 @@ static struct corosync_exec_handler cpg_exec_engine[] =
|
|
|
.exec_handler_fn = message_handler_req_exec_cpg_downlist,
|
|
|
.exec_endian_convert_fn = exec_cpg_downlist_endian_convert
|
|
|
},
|
|
|
+ { /* 6 - MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST */
|
|
|
+ .exec_handler_fn = message_handler_req_exec_cpg_partial_mcast,
|
|
|
+ .exec_endian_convert_fn = exec_cpg_partial_mcast_endian_convert
|
|
|
+ },
|
|
|
};
|
|
|
|
|
|
struct corosync_service_engine cpg_service_engine = {
|
|
|
@@ -457,6 +475,17 @@ struct req_exec_cpg_mcast {
|
|
|
mar_uint8_t message[] __attribute__((aligned(8)));
|
|
|
};
|
|
|
|
|
|
+struct req_exec_cpg_partial_mcast {
|
|
|
+ struct qb_ipc_request_header header __attribute__((aligned(8)));
|
|
|
+ mar_cpg_name_t group_name __attribute__((aligned(8)));
|
|
|
+ mar_uint32_t msglen __attribute__((aligned(8)));
|
|
|
+ mar_uint32_t fraglen __attribute__((aligned(8)));
|
|
|
+ mar_uint32_t pid __attribute__((aligned(8)));
|
|
|
+ mar_uint32_t type __attribute__((aligned(8)));
|
|
|
+ mar_message_source_t source __attribute__((aligned(8)));
|
|
|
+ mar_uint8_t message[] __attribute__((aligned(8)));
|
|
|
+};
|
|
|
+
|
|
|
struct req_exec_cpg_downlist_old {
|
|
|
struct qb_ipc_request_header header __attribute__((aligned(8)));
|
|
|
mar_uint32_t left_nodes __attribute__((aligned(8)));
|
|
|
@@ -740,6 +769,7 @@ static int notify_lib_joinlist(
|
|
|
cpd->cpd_state == CPD_STATE_LEAVE_STARTED) {
|
|
|
|
|
|
api->ipc_dispatch_send (cpd->conn, buf, size);
|
|
|
+ cpd->transition_counter++;
|
|
|
}
|
|
|
if (left_list_entries) {
|
|
|
if (left_list[0].pid == cpd->pid &&
|
|
|
@@ -1186,6 +1216,19 @@ static void exec_cpg_mcast_endian_convert (void *msg)
|
|
|
swab_mar_message_source_t (&req_exec_cpg_mcast->source);
|
|
|
}
|
|
|
|
|
|
+static void exec_cpg_partial_mcast_endian_convert (void *msg)
|
|
|
+{
|
|
|
+ struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = msg;
|
|
|
+
|
|
|
+ swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
|
|
|
+ swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
|
|
|
+ req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
|
|
|
+ req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
|
|
|
+ req_exec_cpg_mcast->fraglen = swab32(req_exec_cpg_mcast->fraglen);
|
|
|
+ req_exec_cpg_mcast->type = swab32(req_exec_cpg_mcast->type);
|
|
|
+ swab_mar_message_source_t (&req_exec_cpg_mcast->source);
|
|
|
+}
|
|
|
+
|
|
|
static struct process_info *process_info_find(const mar_cpg_name_t *group_name, uint32_t pid, unsigned int nodeid) {
|
|
|
struct list_head *iter;
|
|
|
|
|
|
@@ -1453,6 +1496,68 @@ static void message_handler_req_exec_cpg_mcast (
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+static void message_handler_req_exec_cpg_partial_mcast (
|
|
|
+ const void *message,
|
|
|
+ unsigned int nodeid)
|
|
|
+{
|
|
|
+ const struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = message;
|
|
|
+ struct res_lib_cpg_partial_deliver_callback res_lib_cpg_mcast;
|
|
|
+ int msglen = req_exec_cpg_mcast->fraglen;
|
|
|
+ struct list_head *iter, *pi_iter;
|
|
|
+ struct cpg_pd *cpd;
|
|
|
+ struct iovec iovec[2];
|
|
|
+ int known_node = 0;
|
|
|
+
|
|
|
+ log_printf(LOGSYS_LEVEL_DEBUG, "Got fragmented message from node %d, size = %d bytes\n", nodeid, msglen);
|
|
|
+
|
|
|
+ res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK;
|
|
|
+ res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
|
|
|
+ res_lib_cpg_mcast.fraglen = msglen;
|
|
|
+ res_lib_cpg_mcast.msglen = req_exec_cpg_mcast->msglen;
|
|
|
+ res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
|
|
|
+ res_lib_cpg_mcast.type = req_exec_cpg_mcast->type;
|
|
|
+ res_lib_cpg_mcast.nodeid = nodeid;
|
|
|
+
|
|
|
+ memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
|
|
|
+ sizeof(mar_cpg_name_t));
|
|
|
+ iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
|
|
|
+ iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
|
|
|
+
|
|
|
+ iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
|
|
|
+ iovec[1].iov_len = msglen;
|
|
|
+
|
|
|
+ for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) {
|
|
|
+ cpd = list_entry(iter, struct cpg_pd, list);
|
|
|
+ iter = iter->next;
|
|
|
+
|
|
|
+ if ((cpd->cpd_state == CPD_STATE_LEAVE_STARTED || cpd->cpd_state == CPD_STATE_JOIN_COMPLETED)
|
|
|
+ && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
|
|
|
+
|
|
|
+ if (!known_node) {
|
|
|
+ /* Try to find, if we know the node */
|
|
|
+ for (pi_iter = process_info_list_head.next;
|
|
|
+ pi_iter != &process_info_list_head; pi_iter = pi_iter->next) {
|
|
|
+
|
|
|
+ struct process_info *pi = list_entry (pi_iter, struct process_info, list);
|
|
|
+
|
|
|
+ if (pi->nodeid == nodeid &&
|
|
|
+ mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
|
|
|
+ known_node = 1;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!known_node) {
|
|
|
+ log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
|
|
|
+ return ;
|
|
|
+ }
|
|
|
+
|
|
|
+ api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
|
|
|
static int cpg_exec_send_downlist(void)
|
|
|
{
|
|
|
@@ -1864,6 +1969,77 @@ static void message_handler_req_lib_cpg_zc_free (
|
|
|
res_header.size);
|
|
|
}
|
|
|
|
|
|
+/* Fragmented mcast message from the library */
|
|
|
+static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message)
|
|
|
+{
|
|
|
+ const struct req_lib_cpg_partial_mcast *req_lib_cpg_mcast = message;
|
|
|
+ struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
|
|
|
+ mar_cpg_name_t group_name = cpd->group_name;
|
|
|
+
|
|
|
+ struct iovec req_exec_cpg_iovec[2];
|
|
|
+ struct req_exec_cpg_partial_mcast req_exec_cpg_mcast;
|
|
|
+ struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
|
|
|
+ int msglen = req_lib_cpg_mcast->fraglen;
|
|
|
+ int result;
|
|
|
+ cs_error_t error = CS_ERR_NOT_EXIST;
|
|
|
+
|
|
|
+ log_printf(LOGSYS_LEVEL_TRACE, "got fragmented mcast request on %p", conn);
|
|
|
+ log_printf(LOGSYS_LEVEL_DEBUG, "Sending fragmented message size = %d bytes\n", msglen);
|
|
|
+
|
|
|
+ switch (cpd->cpd_state) {
|
|
|
+ case CPD_STATE_UNJOINED:
|
|
|
+ error = CS_ERR_NOT_EXIST;
|
|
|
+ break;
|
|
|
+ case CPD_STATE_LEAVE_STARTED:
|
|
|
+ error = CS_ERR_NOT_EXIST;
|
|
|
+ break;
|
|
|
+ case CPD_STATE_JOIN_STARTED:
|
|
|
+ error = CS_OK;
|
|
|
+ break;
|
|
|
+ case CPD_STATE_JOIN_COMPLETED:
|
|
|
+ error = CS_OK;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ res_lib_cpg_partial_send.header.size = sizeof(res_lib_cpg_partial_send);
|
|
|
+ res_lib_cpg_partial_send.header.id = MESSAGE_RES_CPG_PARTIAL_SEND;
|
|
|
+
|
|
|
+ if (req_lib_cpg_mcast->type == LIBCPG_PARTIAL_FIRST) {
|
|
|
+ cpd->initial_transition_counter = cpd->transition_counter;
|
|
|
+ }
|
|
|
+ if (cpd->transition_counter != cpd->initial_transition_counter) {
|
|
|
+ error = CS_ERR_INTERRUPT;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (error == CS_OK) {
|
|
|
+ req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
|
|
|
+ req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
|
|
|
+ MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST);
|
|
|
+ req_exec_cpg_mcast.pid = cpd->pid;
|
|
|
+ req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
|
|
|
+ req_exec_cpg_mcast.type = req_lib_cpg_mcast->type;
|
|
|
+ req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen;
|
|
|
+ api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
|
|
|
+ memcpy(&req_exec_cpg_mcast.group_name, &group_name,
|
|
|
+ sizeof(mar_cpg_name_t));
|
|
|
+
|
|
|
+ req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
|
|
|
+ req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
|
|
|
+ req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
|
|
|
+ req_exec_cpg_iovec[1].iov_len = msglen;
|
|
|
+
|
|
|
+ result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
|
|
|
+ assert(result == 0);
|
|
|
+ } else {
|
|
|
+ log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
|
|
|
+ conn, group_name.value, cpd->cpd_state, error);
|
|
|
+ }
|
|
|
+
|
|
|
+ res_lib_cpg_partial_send.header.error = error;
|
|
|
+ api->ipc_response_send (conn, &res_lib_cpg_partial_send,
|
|
|
+ sizeof (res_lib_cpg_partial_send));
|
|
|
+}
|
|
|
+
|
|
|
/* Mcast message from the library */
|
|
|
static void message_handler_req_lib_cpg_mcast (void *conn, const void *message)
|
|
|
{
|