|
|
@@ -277,13 +277,12 @@ struct memb_commit_token {
|
|
|
|
|
|
struct message_item {
|
|
|
struct mcast *mcast;
|
|
|
- struct iovec iovec[MAXIOVS];
|
|
|
- unsigned int iov_len;
|
|
|
+ unsigned int msg_len;
|
|
|
};
|
|
|
|
|
|
struct sort_queue_item {
|
|
|
- struct iovec iovec[MAXIOVS];
|
|
|
- unsigned int iov_len;
|
|
|
+ struct mcast *mcast;
|
|
|
+ unsigned int msg_len;
|
|
|
};
|
|
|
|
|
|
struct orf_token_mcast_thread_state {
|
|
|
@@ -443,23 +442,14 @@ struct totemsrp_instance {
|
|
|
|
|
|
//TODO struct srp_addr next_memb;
|
|
|
|
|
|
- char iov_buffer[FRAME_SIZE_MAX];
|
|
|
-
|
|
|
- struct iovec totemsrp_iov_recv;
|
|
|
-
|
|
|
hdb_handle_t totemsrp_poll_handle;
|
|
|
|
|
|
- /*
|
|
|
- * Function called when new message received
|
|
|
- */
|
|
|
- int (*totemsrp_recv) (char *group, struct iovec *iovec, unsigned int iov_len);
|
|
|
-
|
|
|
struct totem_ip_address mcast_address;
|
|
|
|
|
|
void (*totemsrp_deliver_fn) (
|
|
|
unsigned int nodeid,
|
|
|
- const struct iovec *iovec,
|
|
|
- unsigned int iov_len,
|
|
|
+ const void *msg,
|
|
|
+ unsigned int msg_len,
|
|
|
int endian_conversion_required);
|
|
|
|
|
|
void (*totemsrp_confchg_fn) (
|
|
|
@@ -603,7 +593,7 @@ static void timer_function_merge_detect_timeout (void *data);
|
|
|
void main_deliver_fn (
|
|
|
void *context,
|
|
|
const void *msg,
|
|
|
- size_t msg_len);
|
|
|
+ unsigned int msg_len);
|
|
|
|
|
|
void main_iface_change_fn (
|
|
|
void *context,
|
|
|
@@ -690,8 +680,8 @@ int totemsrp_initialize (
|
|
|
|
|
|
void (*deliver_fn) (
|
|
|
unsigned int nodeid,
|
|
|
- const struct iovec *iovec,
|
|
|
- unsigned int iov_len,
|
|
|
+ const void *msg,
|
|
|
+ unsigned int msg_len,
|
|
|
int endian_conversion_required),
|
|
|
|
|
|
void (*confchg_fn) (
|
|
|
@@ -750,8 +740,6 @@ int totemsrp_initialize (
|
|
|
*/
|
|
|
totemip_copy (&instance->mcast_address, &totem_config->interfaces[0].mcast_addr);
|
|
|
|
|
|
- memset (instance->iov_buffer, 0, FRAME_SIZE_MAX);
|
|
|
-
|
|
|
/*
|
|
|
* Display totem configuration
|
|
|
*/
|
|
|
@@ -1577,35 +1565,22 @@ static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance
|
|
|
/*
|
|
|
* Convert recovery message into regular message
|
|
|
*/
|
|
|
- if (recovery_message_item->iov_len > 1) {
|
|
|
- mcast = recovery_message_item->iovec[1].iov_base;
|
|
|
- memcpy (®ular_message_item.iovec[0],
|
|
|
- &recovery_message_item->iovec[1],
|
|
|
- sizeof (struct iovec) * recovery_message_item->iov_len);
|
|
|
+ mcast = recovery_message_item->mcast;
|
|
|
+ if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) {
|
|
|
+ /*
|
|
|
+ * Message is a recovery message encapsulated
|
|
|
+ * in a new ring message
|
|
|
+ */
|
|
|
+ regular_message_item.mcast =
|
|
|
+ (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
|
|
|
+ regular_message_item.msg_len =
|
|
|
+ recovery_message_item->msg_len - sizeof (struct mcast);
|
|
|
+ mcast = regular_message_item.mcast;
|
|
|
} else {
|
|
|
- mcast = recovery_message_item->iovec[0].iov_base;
|
|
|
- if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) {
|
|
|
- /*
|
|
|
- * Message is a recovery message encapsulated
|
|
|
- * in a new ring message
|
|
|
- */
|
|
|
- regular_message_item.iovec[0].iov_base =
|
|
|
- (char *)recovery_message_item->iovec[0].iov_base + sizeof (struct mcast);
|
|
|
- regular_message_item.iovec[0].iov_len =
|
|
|
- recovery_message_item->iovec[0].iov_len - sizeof (struct mcast);
|
|
|
- regular_message_item.iov_len = 1;
|
|
|
- mcast = regular_message_item.iovec[0].iov_base;
|
|
|
- } else {
|
|
|
- continue; /* TODO this case shouldn't happen */
|
|
|
- /*
|
|
|
- * Message is originated on new ring and not
|
|
|
- * encapsulated
|
|
|
- */
|
|
|
- regular_message_item.iovec[0].iov_base =
|
|
|
- recovery_message_item->iovec[0].iov_base;
|
|
|
- regular_message_item.iovec[0].iov_len =
|
|
|
- recovery_message_item->iovec[0].iov_len;
|
|
|
- }
|
|
|
+ /*
|
|
|
+ * TODO this case shouldn't happen
|
|
|
+ */
|
|
|
+ continue;
|
|
|
}
|
|
|
|
|
|
log_printf (instance->totemsrp_log_level_debug,
|
|
|
@@ -1620,7 +1595,7 @@ static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance
|
|
|
if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
|
|
|
sizeof (struct memb_ring_id)) == 0) {
|
|
|
|
|
|
- regular_message_item.iov_len = recovery_message_item->iov_len;
|
|
|
+ regular_message_item.msg_len = recovery_message_item->msg_len;
|
|
|
res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
|
|
|
if (res == 0) {
|
|
|
sq_item_add (&instance->regular_sort_queue,
|
|
|
@@ -1980,28 +1955,27 @@ static void memb_state_recovery_enter (
|
|
|
low_ring_aru + i, &ptr);
|
|
|
if (res != 0) {
|
|
|
strcat (not_originated, seqno_string_hex);
|
|
|
- continue;
|
|
|
- }
|
|
|
- strcat (is_originated, seqno_string_hex);
|
|
|
- sort_queue_item = ptr;
|
|
|
- assert (sort_queue_item->iov_len > 0);
|
|
|
- assert (sort_queue_item->iov_len <= MAXIOVS);
|
|
|
- messages_originated++;
|
|
|
- memset (&message_item, 0, sizeof (struct message_item));
|
|
|
-// TODO LEAK
|
|
|
- message_item.mcast = malloc (sizeof (struct mcast));
|
|
|
- assert (message_item.mcast);
|
|
|
- message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
|
|
|
- srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
|
|
|
- message_item.mcast->header.encapsulated = MESSAGE_ENCAPSULATED;
|
|
|
- message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
|
|
|
- assert (message_item.mcast->header.nodeid);
|
|
|
- message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
|
|
|
- memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
|
|
|
- sizeof (struct memb_ring_id));
|
|
|
- message_item.iov_len = sort_queue_item->iov_len;
|
|
|
- memcpy (&message_item.iovec, &sort_queue_item->iovec,
|
|
|
- sizeof (struct iovec) * sort_queue_item->iov_len);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ strcat (is_originated, seqno_string_hex);
|
|
|
+ sort_queue_item = ptr;
|
|
|
+ messages_originated++;
|
|
|
+ memset (&message_item, 0, sizeof (struct message_item));
|
|
|
+ // TODO LEAK
|
|
|
+ message_item.mcast = malloc (10000);
|
|
|
+ assert (message_item.mcast);
|
|
|
+ message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
|
|
|
+ srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
|
|
|
+ message_item.mcast->header.encapsulated = MESSAGE_ENCAPSULATED;
|
|
|
+ message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
|
|
|
+ assert (message_item.mcast->header.nodeid);
|
|
|
+ message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
|
|
|
+ memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
|
|
|
+ sizeof (struct memb_ring_id));
|
|
|
+ message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
|
|
|
+ memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
|
|
|
+ sort_queue_item->mcast,
|
|
|
+ sort_queue_item->msg_len);
|
|
|
queue_item_add (&instance->retrans_message_queue, &message_item);
|
|
|
}
|
|
|
log_printf (instance->totemsrp_log_level_notice,
|
|
|
@@ -2057,9 +2031,10 @@ int totemsrp_mcast (
|
|
|
int guarantee)
|
|
|
{
|
|
|
int i;
|
|
|
- int j;
|
|
|
struct message_item message_item;
|
|
|
struct totemsrp_instance *instance;
|
|
|
+ char *addr;
|
|
|
+ unsigned int addr_idx;
|
|
|
unsigned int res;
|
|
|
|
|
|
res = hdb_handle_get (&totemsrp_instance_database, handle,
|
|
|
@@ -2072,17 +2047,13 @@ int totemsrp_mcast (
|
|
|
log_printf (instance->totemsrp_log_level_warning, "queue full\n");
|
|
|
return (-1);
|
|
|
}
|
|
|
- for (j = 0, i = 0; i < iov_len; i++) {
|
|
|
- j+= iovec[i].iov_len;
|
|
|
- }
|
|
|
|
|
|
memset (&message_item, 0, sizeof (struct message_item));
|
|
|
|
|
|
/*
|
|
|
* Allocate pending item
|
|
|
*/
|
|
|
-// TODO LEAK
|
|
|
- message_item.mcast = malloc (sizeof (struct mcast));
|
|
|
+ message_item.mcast = malloc (10000);
|
|
|
if (message_item.mcast == 0) {
|
|
|
goto error_mcast;
|
|
|
}
|
|
|
@@ -2099,21 +2070,14 @@ int totemsrp_mcast (
|
|
|
message_item.mcast->guarantee = guarantee;
|
|
|
srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
|
|
|
|
|
|
+ addr = (char *)message_item.mcast;
|
|
|
+ addr_idx = sizeof (struct mcast);
|
|
|
for (i = 0; i < iov_len; i++) {
|
|
|
-// TODO LEAK
|
|
|
- message_item.iovec[i].iov_base = malloc (iovec[i].iov_len);
|
|
|
-
|
|
|
- if (message_item.iovec[i].iov_base == 0) {
|
|
|
- goto error_iovec;
|
|
|
- }
|
|
|
-
|
|
|
- memcpy (message_item.iovec[i].iov_base, iovec[i].iov_base,
|
|
|
- iovec[i].iov_len);
|
|
|
-
|
|
|
- message_item.iovec[i].iov_len = iovec[i].iov_len;
|
|
|
+ memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
|
|
|
+ addr_idx += iovec[i].iov_len;
|
|
|
}
|
|
|
|
|
|
- message_item.iov_len = iov_len;
|
|
|
+ message_item.msg_len = addr_idx;
|
|
|
|
|
|
log_printf (instance->totemsrp_log_level_debug, "mcasted message added to pending queue\n");
|
|
|
queue_item_add (&instance->new_message_queue, &message_item);
|
|
|
@@ -2121,13 +2085,6 @@ int totemsrp_mcast (
|
|
|
hdb_handle_put (&totemsrp_instance_database, handle);
|
|
|
return (0);
|
|
|
|
|
|
-error_iovec:
|
|
|
- for (j = 0; j < i; j++) {
|
|
|
- free (message_item.iovec[j].iov_base);
|
|
|
- }
|
|
|
-
|
|
|
- free(message_item.mcast);
|
|
|
-
|
|
|
error_mcast:
|
|
|
hdb_handle_put (&totemsrp_instance_database, handle);
|
|
|
|
|
|
@@ -2198,9 +2155,10 @@ static int orf_token_remcast (
|
|
|
|
|
|
sort_queue_item = ptr;
|
|
|
|
|
|
- totemrrp_mcast_noflush_send (instance->totemrrp_handle,
|
|
|
- sort_queue_item->iovec,
|
|
|
- sort_queue_item->iov_len);
|
|
|
+ totemrrp_mcast_noflush_send (
|
|
|
+ instance->totemrrp_handle,
|
|
|
+ sort_queue_item->mcast,
|
|
|
+ sort_queue_item->msg_len);
|
|
|
|
|
|
return (0);
|
|
|
}
|
|
|
@@ -2214,7 +2172,7 @@ static void messages_free (
|
|
|
unsigned int token_aru)
|
|
|
{
|
|
|
struct sort_queue_item *regular_message;
|
|
|
- unsigned int i, j;
|
|
|
+ unsigned int i;
|
|
|
int res;
|
|
|
int log_release = 0;
|
|
|
unsigned int release_to;
|
|
|
@@ -2248,9 +2206,7 @@ static void messages_free (
|
|
|
instance->last_released + i, &ptr);
|
|
|
if (res == 0) {
|
|
|
regular_message = ptr;
|
|
|
- for (j = 0; j < regular_message->iov_len; j++) {
|
|
|
- free (regular_message->iovec[j].iov_base);
|
|
|
- }
|
|
|
+ free (regular_message->mcast);
|
|
|
}
|
|
|
sq_items_release (&instance->regular_sort_queue,
|
|
|
instance->last_released + i);
|
|
|
@@ -2349,29 +2305,23 @@ static int orf_token_mcast (
|
|
|
* Build IO vector
|
|
|
*/
|
|
|
memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
|
|
|
- sort_queue_item.iovec[0].iov_base = message_item->mcast;
|
|
|
- sort_queue_item.iovec[0].iov_len = sizeof (struct mcast);
|
|
|
-
|
|
|
- mcast = sort_queue_item.iovec[0].iov_base;
|
|
|
+ sort_queue_item.mcast = message_item->mcast;
|
|
|
+ sort_queue_item.msg_len = message_item->msg_len;
|
|
|
|
|
|
- memcpy (&sort_queue_item.iovec[1], message_item->iovec,
|
|
|
- message_item->iov_len * sizeof (struct iovec));
|
|
|
+ mcast = sort_queue_item.mcast;
|
|
|
|
|
|
memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
|
|
|
|
|
|
- sort_queue_item.iov_len = message_item->iov_len + 1;
|
|
|
-
|
|
|
- assert (sort_queue_item.iov_len < 16);
|
|
|
-
|
|
|
/*
|
|
|
* Add message to retransmit queue
|
|
|
*/
|
|
|
sort_queue_item_ptr = sq_item_add (sort_queue,
|
|
|
&sort_queue_item, message_item->mcast->seq);
|
|
|
|
|
|
- totemrrp_mcast_noflush_send (instance->totemrrp_handle,
|
|
|
- sort_queue_item_ptr->iovec,
|
|
|
- sort_queue_item_ptr->iov_len);
|
|
|
+ totemrrp_mcast_noflush_send (
|
|
|
+ instance->totemrrp_handle,
|
|
|
+ message_item->mcast,
|
|
|
+ message_item->msg_len);
|
|
|
|
|
|
/*
|
|
|
* Delete item from pending queue
|
|
|
@@ -2517,14 +2467,9 @@ static int orf_token_rtr (
|
|
|
|
|
|
static void token_retransmit (struct totemsrp_instance *instance)
|
|
|
{
|
|
|
- struct iovec iovec;
|
|
|
-
|
|
|
- iovec.iov_base = instance->orf_token_retransmit;
|
|
|
- iovec.iov_len = instance->orf_token_retransmit_size;
|
|
|
-
|
|
|
totemrrp_token_send (instance->totemrrp_handle,
|
|
|
- &iovec,
|
|
|
- 1);
|
|
|
+ instance->orf_token_retransmit,
|
|
|
+ instance->orf_token_retransmit_size);
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
@@ -2591,13 +2536,14 @@ static int token_send (
|
|
|
struct orf_token *orf_token,
|
|
|
int forward_token)
|
|
|
{
|
|
|
- struct iovec iovec;
|
|
|
int res = 0;
|
|
|
- unsigned int iov_len = sizeof (struct orf_token) +
|
|
|
+ unsigned int orf_token_size;
|
|
|
+
|
|
|
+ orf_token_size = sizeof (struct orf_token) +
|
|
|
(orf_token->rtr_list_entries * sizeof (struct rtr_item));
|
|
|
|
|
|
- memcpy (instance->orf_token_retransmit, orf_token, iov_len);
|
|
|
- instance->orf_token_retransmit_size = iov_len;
|
|
|
+ memcpy (instance->orf_token_retransmit, orf_token, orf_token_size);
|
|
|
+ instance->orf_token_retransmit_size = orf_token_size;
|
|
|
orf_token->header.nodeid = instance->my_id.addr[0].nodeid;
|
|
|
assert (orf_token->header.nodeid);
|
|
|
|
|
|
@@ -2605,12 +2551,9 @@ static int token_send (
|
|
|
return (0);
|
|
|
}
|
|
|
|
|
|
- iovec.iov_base = orf_token;
|
|
|
- iovec.iov_len = iov_len;
|
|
|
-
|
|
|
totemrrp_token_send (instance->totemrrp_handle,
|
|
|
- &iovec,
|
|
|
- 1);
|
|
|
+ orf_token,
|
|
|
+ orf_token_size);
|
|
|
|
|
|
return (res);
|
|
|
}
|
|
|
@@ -2618,7 +2561,6 @@ static int token_send (
|
|
|
static int token_hold_cancel_send (struct totemsrp_instance *instance)
|
|
|
{
|
|
|
struct token_hold_cancel token_hold_cancel;
|
|
|
- struct iovec iovec[2];
|
|
|
|
|
|
/*
|
|
|
* Only cancel if the token is currently held
|
|
|
@@ -2634,19 +2576,15 @@ static int token_hold_cancel_send (struct totemsrp_instance *instance)
|
|
|
token_hold_cancel.header.type = MESSAGE_TYPE_TOKEN_HOLD_CANCEL;
|
|
|
token_hold_cancel.header.endian_detector = ENDIAN_LOCAL;
|
|
|
token_hold_cancel.header.nodeid = instance->my_id.addr[0].nodeid;
|
|
|
+ memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id,
|
|
|
+ sizeof (struct memb_ring_id));
|
|
|
assert (token_hold_cancel.header.nodeid);
|
|
|
|
|
|
- iovec[0].iov_base = &token_hold_cancel;
|
|
|
- iovec[0].iov_len = sizeof (struct token_hold_cancel) -
|
|
|
- sizeof (struct memb_ring_id);
|
|
|
- iovec[1].iov_base = &instance->my_ring_id;
|
|
|
- iovec[1].iov_len = sizeof (struct memb_ring_id);
|
|
|
-
|
|
|
- totemrrp_mcast_flush_send (instance->totemrrp_handle, iovec, 2);
|
|
|
+ totemrrp_mcast_flush_send (instance->totemrrp_handle, &token_hold_cancel,
|
|
|
+ sizeof (struct token_hold_cancel));
|
|
|
|
|
|
return (0);
|
|
|
}
|
|
|
-//AAA
|
|
|
|
|
|
static int orf_token_send_initial (struct totemsrp_instance *instance)
|
|
|
{
|
|
|
@@ -2777,27 +2715,26 @@ static int memb_state_commit_token_send (
|
|
|
struct totemsrp_instance *instance,
|
|
|
struct memb_commit_token *commit_token)
|
|
|
{
|
|
|
- struct iovec iovec;
|
|
|
struct srp_addr *addr;
|
|
|
struct memb_commit_token_memb_entry *memb_list;
|
|
|
+ unsigned int commit_token_size;
|
|
|
|
|
|
addr = (struct srp_addr *)commit_token->end_of_commit_token;
|
|
|
memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
|
|
|
|
|
|
commit_token->token_seq++;
|
|
|
- iovec.iov_base = commit_token;
|
|
|
- iovec.iov_len = sizeof (struct memb_commit_token) +
|
|
|
+ commit_token_size = sizeof (struct memb_commit_token) +
|
|
|
((sizeof (struct srp_addr) +
|
|
|
sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
|
|
|
/*
|
|
|
* Make a copy for retransmission if necessary
|
|
|
*/
|
|
|
- memcpy (instance->orf_token_retransmit, commit_token, iovec.iov_len);
|
|
|
- instance->orf_token_retransmit_size = iovec.iov_len;
|
|
|
+ memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
|
|
|
+ instance->orf_token_retransmit_size = commit_token_size;
|
|
|
|
|
|
totemrrp_token_send (instance->totemrrp_handle,
|
|
|
- &iovec,
|
|
|
- 1);
|
|
|
+ commit_token,
|
|
|
+ commit_token_size);
|
|
|
|
|
|
/*
|
|
|
* Request retransmission of the commit token in case it is lost
|
|
|
@@ -2887,35 +2824,44 @@ static void memb_state_commit_token_create (
|
|
|
|
|
|
static void memb_join_message_send (struct totemsrp_instance *instance)
|
|
|
{
|
|
|
- struct memb_join memb_join;
|
|
|
- struct iovec iovec[3];
|
|
|
- unsigned int iovs;
|
|
|
+ char memb_join_data[10000];
|
|
|
+ struct memb_join *memb_join = (struct memb_join *)memb_join_data;
|
|
|
+ char *addr;
|
|
|
+ unsigned int addr_idx;
|
|
|
|
|
|
- memb_join.header.type = MESSAGE_TYPE_MEMB_JOIN;
|
|
|
- memb_join.header.endian_detector = ENDIAN_LOCAL;
|
|
|
- memb_join.header.encapsulated = 0;
|
|
|
- memb_join.header.nodeid = instance->my_id.addr[0].nodeid;
|
|
|
- assert (memb_join.header.nodeid);
|
|
|
+ memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
|
|
|
+ memb_join->header.endian_detector = ENDIAN_LOCAL;
|
|
|
+ memb_join->header.encapsulated = 0;
|
|
|
+ memb_join->header.nodeid = instance->my_id.addr[0].nodeid;
|
|
|
+ assert (memb_join->header.nodeid);
|
|
|
|
|
|
assert (srp_addr_equal (&instance->my_proc_list[0], &instance->my_proc_list[1]) == 0);
|
|
|
- memb_join.ring_seq = instance->my_ring_id.seq;
|
|
|
- memb_join.proc_list_entries = instance->my_proc_list_entries;
|
|
|
- memb_join.failed_list_entries = instance->my_failed_list_entries;
|
|
|
- srp_addr_copy (&memb_join.system_from, &instance->my_id);
|
|
|
-
|
|
|
- iovec[0].iov_base = &memb_join;
|
|
|
- iovec[0].iov_len = sizeof (struct memb_join);
|
|
|
- iovec[1].iov_base = &instance->my_proc_list;
|
|
|
- iovec[1].iov_len = instance->my_proc_list_entries *
|
|
|
+ memb_join->ring_seq = instance->my_ring_id.seq;
|
|
|
+ memb_join->proc_list_entries = instance->my_proc_list_entries;
|
|
|
+ memb_join->failed_list_entries = instance->my_failed_list_entries;
|
|
|
+ srp_addr_copy (&memb_join->system_from, &instance->my_id);
|
|
|
+
|
|
|
+ /*
|
|
|
+ * This mess adds the joined and failed processor lists into the join
|
|
|
+ * message
|
|
|
+ */
|
|
|
+ addr = (char *)memb_join;
|
|
|
+ addr_idx = sizeof (struct memb_join);
|
|
|
+ memcpy (&addr[addr_idx],
|
|
|
+ instance->my_proc_list,
|
|
|
+ instance->my_proc_list_entries *
|
|
|
+ sizeof (struct srp_addr));
|
|
|
+ addr_idx +=
|
|
|
+ instance->my_proc_list_entries *
|
|
|
sizeof (struct srp_addr);
|
|
|
- if (instance->my_failed_list_entries == 0) {
|
|
|
- iovs = 2;
|
|
|
- } else {
|
|
|
- iovs = 3;
|
|
|
- iovec[2].iov_base = instance->my_failed_list;
|
|
|
- iovec[2].iov_len = instance->my_failed_list_entries *
|
|
|
- sizeof (struct srp_addr);
|
|
|
- }
|
|
|
+ memcpy (&addr[addr_idx],
|
|
|
+ instance->my_failed_list,
|
|
|
+ instance->my_failed_list_entries *
|
|
|
+ sizeof (struct srp_addr));
|
|
|
+ addr_idx +=
|
|
|
+ instance->my_failed_list_entries *
|
|
|
+ sizeof (struct srp_addr);
|
|
|
+
|
|
|
|
|
|
if (instance->totem_config->send_join_timeout) {
|
|
|
usleep (random() % (instance->totem_config->send_join_timeout * 1000));
|
|
|
@@ -2923,29 +2869,26 @@ static void memb_join_message_send (struct totemsrp_instance *instance)
|
|
|
|
|
|
totemrrp_mcast_flush_send (
|
|
|
instance->totemrrp_handle,
|
|
|
- iovec,
|
|
|
- iovs);
|
|
|
+ memb_join,
|
|
|
+ addr_idx);
|
|
|
}
|
|
|
|
|
|
static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
|
|
|
{
|
|
|
struct memb_merge_detect memb_merge_detect;
|
|
|
- struct iovec iovec[2];
|
|
|
|
|
|
memb_merge_detect.header.type = MESSAGE_TYPE_MEMB_MERGE_DETECT;
|
|
|
memb_merge_detect.header.endian_detector = ENDIAN_LOCAL;
|
|
|
memb_merge_detect.header.encapsulated = 0;
|
|
|
memb_merge_detect.header.nodeid = instance->my_id.addr[0].nodeid;
|
|
|
srp_addr_copy (&memb_merge_detect.system_from, &instance->my_id);
|
|
|
+ memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
|
|
|
+ sizeof (struct memb_ring_id));
|
|
|
assert (memb_merge_detect.header.nodeid);
|
|
|
|
|
|
- iovec[0].iov_base = &memb_merge_detect;
|
|
|
- iovec[0].iov_len = sizeof (struct memb_merge_detect) -
|
|
|
- sizeof (struct memb_ring_id);
|
|
|
- iovec[1].iov_base = &instance->my_ring_id;
|
|
|
- iovec[1].iov_len = sizeof (struct memb_ring_id);
|
|
|
-
|
|
|
- totemrrp_mcast_flush_send (instance->totemrrp_handle, iovec, 2);
|
|
|
+ totemrrp_mcast_flush_send (instance->totemrrp_handle,
|
|
|
+ &memb_merge_detect,
|
|
|
+ sizeof (struct memb_merge_detect));
|
|
|
}
|
|
|
|
|
|
static void memb_ring_id_create_or_load (
|
|
|
@@ -3569,7 +3512,7 @@ static void messages_deliver_to_app (
|
|
|
|
|
|
sort_queue_item_p = ptr;
|
|
|
|
|
|
- mcast_in = sort_queue_item_p->iovec[0].iov_base;
|
|
|
+ mcast_in = sort_queue_item_p->mcast;
|
|
|
assert (mcast_in != (struct mcast *)0xdeadbeef);
|
|
|
|
|
|
endian_conversion_required = 0;
|
|
|
@@ -3604,27 +3547,11 @@ static void messages_deliver_to_app (
|
|
|
/*
|
|
|
* Message is locally originated multicast
|
|
|
*/
|
|
|
- if (sort_queue_item_p->iov_len > 1 &&
|
|
|
- sort_queue_item_p->iovec[0].iov_len == sizeof (struct mcast)) {
|
|
|
- instance->totemsrp_deliver_fn (
|
|
|
- mcast_header.header.nodeid,
|
|
|
- &sort_queue_item_p->iovec[1],
|
|
|
- sort_queue_item_p->iov_len - 1,
|
|
|
- endian_conversion_required);
|
|
|
- } else {
|
|
|
- sort_queue_item_p->iovec[0].iov_len -= sizeof (struct mcast);
|
|
|
- sort_queue_item_p->iovec[0].iov_base = (char *)sort_queue_item_p->iovec[0].iov_base + sizeof (struct mcast);
|
|
|
-
|
|
|
- instance->totemsrp_deliver_fn (
|
|
|
- mcast_header.header.nodeid,
|
|
|
- sort_queue_item_p->iovec,
|
|
|
- sort_queue_item_p->iov_len,
|
|
|
- endian_conversion_required);
|
|
|
-
|
|
|
- sort_queue_item_p->iovec[0].iov_len += sizeof (struct mcast);
|
|
|
- sort_queue_item_p->iovec[0].iov_base = (char *)sort_queue_item_p->iovec[0].iov_base - sizeof (struct mcast);
|
|
|
- }
|
|
|
-//TODO instance->stats_delv += 1;
|
|
|
+ instance->totemsrp_deliver_fn (
|
|
|
+ mcast_header.header.nodeid,
|
|
|
+ ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
|
|
|
+ sort_queue_item_p->msg_len - sizeof (struct mcast),
|
|
|
+ endian_conversion_required);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -3726,15 +3653,12 @@ static int message_handler_mcast (
|
|
|
* Allocate new multicast memory block
|
|
|
*/
|
|
|
// TODO LEAK
|
|
|
- sort_queue_item.iovec[0].iov_base = malloc (msg_len);
|
|
|
- if (sort_queue_item.iovec[0].iov_base == 0) {
|
|
|
+ sort_queue_item.mcast = malloc (msg_len);
|
|
|
+ if (sort_queue_item.mcast == NULL) {
|
|
|
return (-1); /* error here is corrected by the algorithm */
|
|
|
}
|
|
|
- memcpy (sort_queue_item.iovec[0].iov_base, msg, msg_len);
|
|
|
- sort_queue_item.iovec[0].iov_len = msg_len;
|
|
|
- assert (sort_queue_item.iovec[0].iov_len > 0);
|
|
|
- assert (sort_queue_item.iovec[0].iov_len < FRAME_SIZE_MAX);
|
|
|
- sort_queue_item.iov_len = 1;
|
|
|
+ memcpy (sort_queue_item.mcast, msg, msg_len);
|
|
|
+ sort_queue_item.msg_len = msg_len;
|
|
|
|
|
|
if (sq_lt_compare (instance->my_high_seq_received,
|
|
|
mcast_header.seq)) {
|
|
|
@@ -4175,7 +4099,7 @@ static int message_handler_token_hold_cancel (
|
|
|
void main_deliver_fn (
|
|
|
void *context,
|
|
|
const void *msg,
|
|
|
- size_t msg_len)
|
|
|
+ unsigned int msg_len)
|
|
|
{
|
|
|
struct totemsrp_instance *instance = context;
|
|
|
const struct message_header *message_header = msg;
|