|
|
@@ -7,7 +7,7 @@
|
|
|
* Author: Steven Dake (sdake@redhat.com)
|
|
|
*
|
|
|
* This software licensed under BSD license, the text of which follows:
|
|
|
- *
|
|
|
+ *
|
|
|
* Redistribution and use in source and binary forms, with or without
|
|
|
* modification, are permitted provided that the following conditions are met:
|
|
|
*
|
|
|
@@ -35,7 +35,7 @@
|
|
|
|
|
|
/*
|
|
|
* The first version of this code was based upon Yair Amir's PhD thesis:
|
|
|
- * http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5).
|
|
|
+ * http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5).
|
|
|
*
|
|
|
* The current version of totemsrp implements the Totem protocol specified in:
|
|
|
* http://citeseer.ist.psu.edu/amir95totem.html
|
|
|
@@ -87,7 +87,7 @@
|
|
|
#define QUEUE_RTR_ITEMS_SIZE_MAX 256 /* allow 256 retransmit items */
|
|
|
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
|
|
|
#define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
|
|
|
-#define MAXIOVS 5
|
|
|
+#define MAXIOVS 5
|
|
|
#define RETRANSMIT_ENTRIES_MAX 30
|
|
|
#define TOKEN_SIZE_MAX 64000 /* bytes */
|
|
|
|
|
|
@@ -110,7 +110,7 @@
|
|
|
|
|
|
/*
|
|
|
* These can be used ot test different rollover points
|
|
|
- * #define SEQNO_START_MSG 0xfffffe00
|
|
|
+ * #define SEQNO_START_MSG 0xfffffe00
|
|
|
* #define SEQNO_START_TOKEN 0xfffffe00
|
|
|
*/
|
|
|
|
|
|
@@ -144,7 +144,7 @@ enum encapsulation_type {
|
|
|
MESSAGE_NOT_ENCAPSULATED = 2
|
|
|
};
|
|
|
|
|
|
-/*
|
|
|
+/*
|
|
|
* New membership algorithm local variables
|
|
|
*/
|
|
|
struct srp_addr {
|
|
|
@@ -213,7 +213,7 @@ struct orf_token {
|
|
|
unsigned int token_seq;
|
|
|
unsigned int aru;
|
|
|
unsigned int aru_addr;
|
|
|
- struct memb_ring_id ring_id;
|
|
|
+ struct memb_ring_id ring_id;
|
|
|
unsigned int backlog;
|
|
|
unsigned int fcc;
|
|
|
int retrans_flg;
|
|
|
@@ -236,7 +236,7 @@ struct memb_join {
|
|
|
*/
|
|
|
} __attribute__((packed));
|
|
|
|
|
|
-
|
|
|
+
|
|
|
struct memb_merge_detect {
|
|
|
struct message_header header;
|
|
|
struct srp_addr system_from;
|
|
|
@@ -368,7 +368,7 @@ struct totemsrp_instance {
|
|
|
int my_retrans_flg_count;
|
|
|
|
|
|
unsigned int my_high_ring_delivered;
|
|
|
-
|
|
|
+
|
|
|
int heartbeat_timeout;
|
|
|
|
|
|
/*
|
|
|
@@ -433,7 +433,7 @@ struct totemsrp_instance {
|
|
|
|
|
|
int totemsrp_subsys_id;
|
|
|
|
|
|
- void (*totemsrp_log_printf) (int subsys,
|
|
|
+ void (*totemsrp_log_printf) (int subsys,
|
|
|
const char *function, const char *file,
|
|
|
int line, unsigned int level,
|
|
|
const char *format, ...)__attribute__((format(printf, 6, 7)));;
|
|
|
@@ -718,7 +718,7 @@ int totemsrp_initialize (
|
|
|
if (rundir == NULL) {
|
|
|
rundir = LOCALSTATEDIR "/lib/corosync";
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
res = mkdir (rundir, 0700);
|
|
|
if (res == -1 && errno != EEXIST) {
|
|
|
goto error_put;
|
|
|
@@ -820,13 +820,13 @@ int totemsrp_initialize (
|
|
|
}
|
|
|
|
|
|
if (instance->use_heartbeat) {
|
|
|
- instance->heartbeat_timeout
|
|
|
- = (totem_config->heartbeat_failures_allowed) * totem_config->token_retransmit_timeout
|
|
|
+ instance->heartbeat_timeout
|
|
|
+ = (totem_config->heartbeat_failures_allowed) * totem_config->token_retransmit_timeout
|
|
|
+ totem_config->max_network_delay;
|
|
|
|
|
|
if (instance->heartbeat_timeout >= totem_config->token_timeout) {
|
|
|
log_printf (instance->totemsrp_log_level_notice,
|
|
|
- "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)\n",
|
|
|
+ "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)\n",
|
|
|
instance->heartbeat_timeout,
|
|
|
totem_config->token_timeout);
|
|
|
log_printf (instance->totemsrp_log_level_notice,
|
|
|
@@ -840,7 +840,7 @@ int totemsrp_initialize (
|
|
|
"total heartbeat_timeout (%d ms)\n", instance->heartbeat_timeout);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
totemrrp_initialize (
|
|
|
poll_handle,
|
|
|
&instance->totemrrp_handle,
|
|
|
@@ -918,7 +918,7 @@ int totemsrp_ifaces_get (
|
|
|
*iface_count = instance->totem_config->interface_count;
|
|
|
goto finish;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
for (i = 0; i < instance->my_left_memb_entries; i++) {
|
|
|
if (instance->my_left_memb_list[i].addr[0].nodeid == nodeid) {
|
|
|
found = 1;
|
|
|
@@ -1236,7 +1236,7 @@ static void memb_set_merge (
|
|
|
if (srp_addr_equal (&fullset[j], &subset[i])) {
|
|
|
found = 1;
|
|
|
break;
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
if (found == 0) {
|
|
|
srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
|
|
|
@@ -1476,7 +1476,7 @@ static void timer_function_orf_token_timeout (void *data)
|
|
|
"The token was lost in the COMMIT state.\n");
|
|
|
memb_state_gather_enter (instance, 4);
|
|
|
break;
|
|
|
-
|
|
|
+
|
|
|
case MEMB_STATE_RECOVERY:
|
|
|
log_printf (instance->totemsrp_log_level_notice,
|
|
|
"The token was lost in the RECOVERY state.\n");
|
|
|
@@ -1511,7 +1511,7 @@ static void memb_timer_function_state_gather (void *data)
|
|
|
* Restart the join timeout
|
|
|
`*/
|
|
|
poll_timer_delete (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
|
|
|
-
|
|
|
+
|
|
|
poll_timer_add (instance->totemsrp_poll_handle,
|
|
|
instance->totem_config->join_timeout,
|
|
|
(void *)instance,
|
|
|
@@ -1675,7 +1675,7 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance)
|
|
|
trans_memb_list_totemip, instance->my_trans_memb_entries,
|
|
|
left_list, instance->my_left_memb_entries,
|
|
|
0, 0, &instance->my_ring_id);
|
|
|
-
|
|
|
+
|
|
|
// TODO we need to filter to ensure we only deliver those
|
|
|
// messages which are part of instance->my_deliver_memb
|
|
|
messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
|
|
|
@@ -1790,7 +1790,7 @@ static void memb_state_commit_enter (
|
|
|
{
|
|
|
ring_save (instance);
|
|
|
|
|
|
- old_ring_state_save (instance);
|
|
|
+ old_ring_state_save (instance);
|
|
|
|
|
|
memb_state_commit_token_update (instance, commit_token);
|
|
|
|
|
|
@@ -1947,7 +1947,7 @@ static void memb_state_recovery_enter (
|
|
|
low_ring_aru + 1, instance->old_ring_state_high_seq_received);
|
|
|
strcpy (not_originated, "Not Originated for recovery: ");
|
|
|
strcpy (is_originated, "Originated for recovery: ");
|
|
|
-
|
|
|
+
|
|
|
for (i = 1; i <= range; i++) {
|
|
|
struct sort_queue_item *sort_queue_item;
|
|
|
struct message_item message_item;
|
|
|
@@ -2046,7 +2046,7 @@ int totemsrp_mcast (
|
|
|
if (res != 0) {
|
|
|
goto error_exit;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if (queue_is_full (&instance->new_message_queue)) {
|
|
|
log_printf (instance->totemsrp_log_level_warning, "queue full\n");
|
|
|
return (-1);
|
|
|
@@ -2104,7 +2104,7 @@ error_iovec:
|
|
|
for (j = 0; j < i; j++) {
|
|
|
free (message_item.iovec[j].iov_base);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
free(message_item.mcast);
|
|
|
|
|
|
error_mcast:
|
|
|
@@ -2142,7 +2142,7 @@ error_exit:
|
|
|
/*
|
|
|
* ORF Token Management
|
|
|
*/
|
|
|
-/*
|
|
|
+/*
|
|
|
* Recast message to mcast group if it is available
|
|
|
*/
|
|
|
static int orf_token_remcast (
|
|
|
@@ -2166,7 +2166,7 @@ static int orf_token_remcast (
|
|
|
log_printf (instance->totemsrp_log_level_debug, "sq not in range\n");
|
|
|
return (-1);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/*
|
|
|
* Get RTR item at seq, if not available, return
|
|
|
*/
|
|
|
@@ -2330,9 +2330,9 @@ static int orf_token_mcast (
|
|
|
memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
|
|
|
sort_queue_item.iovec[0].iov_base = message_item->mcast;
|
|
|
sort_queue_item.iovec[0].iov_len = sizeof (struct mcast);
|
|
|
-
|
|
|
+
|
|
|
mcast = sort_queue_item.iovec[0].iov_base;
|
|
|
-
|
|
|
+
|
|
|
memcpy (&sort_queue_item.iovec[1], message_item->iovec,
|
|
|
message_item->iov_len * sizeof (struct iovec));
|
|
|
|
|
|
@@ -2351,7 +2351,7 @@ static int orf_token_mcast (
|
|
|
totemrrp_mcast_noflush_send (instance->totemrrp_handle,
|
|
|
sort_queue_item_ptr->iovec,
|
|
|
sort_queue_item_ptr->iov_len);
|
|
|
-
|
|
|
+
|
|
|
/*
|
|
|
* Delete item from pending queue
|
|
|
*/
|
|
|
@@ -2397,7 +2397,7 @@ static int orf_token_rtr (
|
|
|
}
|
|
|
|
|
|
rtr_list = &orf_token->rtr_list[0];
|
|
|
-
|
|
|
+
|
|
|
strcpy (retransmit_msg, "Retransmit List: ");
|
|
|
if (orf_token->rtr_list_entries) {
|
|
|
log_printf (instance->totemsrp_log_level_debug,
|
|
|
@@ -2649,7 +2649,7 @@ static int orf_token_send_initial (struct totemsrp_instance *instance)
|
|
|
orf_token.retrans_flg = 1;
|
|
|
instance->my_set_retrans_flg = 1;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
orf_token.aru = 0;
|
|
|
orf_token.aru = SEQNO_START_MSG - 1;
|
|
|
orf_token.aru_addr = instance->my_id.addr[0].nodeid;
|
|
|
@@ -2800,7 +2800,7 @@ static int memb_lowest_in_config (struct totemsrp_instance *instance)
|
|
|
/*
|
|
|
* find representative by searching for smallest identifier
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
lowest_addr = &token_memb[0].addr[0];
|
|
|
for (i = 1; i < token_memb_entries; i++) {
|
|
|
if (totemip_compare(lowest_addr, &token_memb[i].addr[0]) > 0) {
|
|
|
@@ -2881,7 +2881,7 @@ static void memb_join_message_send (struct totemsrp_instance *instance)
|
|
|
memb_join.proc_list_entries = instance->my_proc_list_entries;
|
|
|
memb_join.failed_list_entries = instance->my_failed_list_entries;
|
|
|
srp_addr_copy (&memb_join.system_from, &instance->my_id);
|
|
|
-
|
|
|
+
|
|
|
iovec[0].iov_base = &memb_join;
|
|
|
iovec[0].iov_len = sizeof (struct memb_join);
|
|
|
iovec[1].iov_base = &instance->my_proc_list;
|
|
|
@@ -2906,7 +2906,7 @@ static void memb_join_message_send (struct totemsrp_instance *instance)
|
|
|
iovs);
|
|
|
}
|
|
|
|
|
|
-static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
|
|
|
+static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
|
|
|
{
|
|
|
struct memb_merge_detect memb_merge_detect;
|
|
|
struct iovec iovec[2];
|
|
|
@@ -2958,7 +2958,7 @@ static void memb_ring_id_create_or_load (
|
|
|
log_printf (instance->totemsrp_log_level_warning,
|
|
|
"Couldn't open %s %s\n", filename, strerror (errno));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
totemip_copy(&memb_ring_id->rep, &instance->my_id.addr[0]);
|
|
|
assert (!totemip_zero_check(&memb_ring_id->rep));
|
|
|
instance->token_ring_id_seq = memb_ring_id->seq;
|
|
|
@@ -3075,7 +3075,7 @@ static void token_callbacks_execute (
|
|
|
default:
|
|
|
assert (0);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
for (list = callback_listhead->next; list != callback_listhead;
|
|
|
list = list_next) {
|
|
|
|
|
|
@@ -3279,7 +3279,7 @@ static int message_handler_orf_token (
|
|
|
forward_token = 1;
|
|
|
if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
|
|
|
if (instance->my_token_held) {
|
|
|
- forward_token = 0;
|
|
|
+ forward_token = 0;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -3310,7 +3310,7 @@ static int message_handler_orf_token (
|
|
|
if ((forward_token)
|
|
|
&& instance->use_heartbeat) {
|
|
|
reset_heartbeat_timeout(instance);
|
|
|
- }
|
|
|
+ }
|
|
|
else {
|
|
|
cancel_heartbeat_timeout(instance);
|
|
|
}
|
|
|
@@ -3340,7 +3340,7 @@ static int message_handler_orf_token (
|
|
|
}
|
|
|
|
|
|
return (0); /* discard token */
|
|
|
- }
|
|
|
+ }
|
|
|
|
|
|
transmits_allowed = fcc_calculate (instance, token);
|
|
|
mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
|
|
|
@@ -3349,11 +3349,11 @@ static int message_handler_orf_token (
|
|
|
mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
|
|
|
fcc_token_update (instance, token, mcasted_retransmit +
|
|
|
mcasted_regular);
|
|
|
-
|
|
|
+
|
|
|
if (sq_lt_compare (instance->my_aru, token->aru) ||
|
|
|
instance->my_id.addr[0].nodeid == token->aru_addr ||
|
|
|
token->aru_addr == 0) {
|
|
|
-
|
|
|
+
|
|
|
token->aru = instance->my_aru;
|
|
|
if (token->aru == token->seq) {
|
|
|
token->aru_addr = 0;
|
|
|
@@ -3369,7 +3369,7 @@ static int message_handler_orf_token (
|
|
|
|
|
|
if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
|
|
|
token->aru_addr != instance->my_id.addr[0].nodeid) {
|
|
|
-
|
|
|
+
|
|
|
log_printf (instance->totemsrp_log_level_error,
|
|
|
"FAILED TO RECEIVE\n");
|
|
|
// TODO if we fail to receive, it may be possible to end with a gather
|
|
|
@@ -3404,11 +3404,11 @@ static int message_handler_orf_token (
|
|
|
token->retrans_flg = 0;
|
|
|
}
|
|
|
log_printf (instance->totemsrp_log_level_debug,
|
|
|
- "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x\n",
|
|
|
+ "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x\n",
|
|
|
token->retrans_flg, instance->my_set_retrans_flg,
|
|
|
queue_is_empty (&instance->retrans_message_queue),
|
|
|
instance->my_retrans_flg_count, token->aru);
|
|
|
- if (token->retrans_flg == 0) {
|
|
|
+ if (token->retrans_flg == 0) {
|
|
|
instance->my_retrans_flg_count += 1;
|
|
|
} else {
|
|
|
instance->my_retrans_flg_count = 0;
|
|
|
@@ -3444,9 +3444,9 @@ static int message_handler_orf_token (
|
|
|
instance->my_retrans_flg_count = 0;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
totemrrp_send_flush (instance->totemrrp_handle);
|
|
|
- token_send (instance, token, forward_token);
|
|
|
+ token_send (instance, token, forward_token);
|
|
|
|
|
|
#ifdef GIVEINFO
|
|
|
gettimeofday (&tv_current, NULL);
|
|
|
@@ -3619,7 +3619,7 @@ static int message_handler_mcast (
|
|
|
struct sort_queue_item sort_queue_item;
|
|
|
struct sq *sort_queue;
|
|
|
struct mcast mcast_header;
|
|
|
-
|
|
|
+
|
|
|
|
|
|
if (endian_conversion_needed) {
|
|
|
mcast_endian_convert (msg, &mcast_header);
|
|
|
@@ -3698,7 +3698,7 @@ static int message_handler_mcast (
|
|
|
* otherwise free io vectors
|
|
|
*/
|
|
|
if (msg_len > 0 && msg_len < FRAME_SIZE_MAX &&
|
|
|
- sq_in_range (sort_queue, mcast_header.seq) &&
|
|
|
+ sq_in_range (sort_queue, mcast_header.seq) &&
|
|
|
sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
|
|
|
|
|
|
/*
|
|
|
@@ -3714,7 +3714,7 @@ static int message_handler_mcast (
|
|
|
assert (sort_queue_item.iovec[0].iov_len > 0);
|
|
|
assert (sort_queue_item.iovec[0].iov_len < FRAME_SIZE_MAX);
|
|
|
sort_queue_item.iov_len = 1;
|
|
|
-
|
|
|
+
|
|
|
if (sq_lt_compare (instance->my_high_seq_received,
|
|
|
mcast_header.seq)) {
|
|
|
instance->my_high_seq_received = mcast_header.seq;
|
|
|
@@ -3816,12 +3816,12 @@ static int memb_join_process (
|
|
|
instance->my_failed_list_entries)) {
|
|
|
|
|
|
memb_consensus_set (instance, &memb_join->system_from);
|
|
|
-
|
|
|
+
|
|
|
if (memb_consensus_agreed (instance) &&
|
|
|
memb_lowest_in_config (instance)) {
|
|
|
|
|
|
memb_state_commit_token_create (instance, my_commit_token);
|
|
|
-
|
|
|
+
|
|
|
memb_state_commit_enter (instance, my_commit_token);
|
|
|
} else {
|
|
|
return (0);
|
|
|
@@ -4019,7 +4019,7 @@ static int message_handler_memb_join (
|
|
|
case MEMB_STATE_GATHER:
|
|
|
memb_join_process (instance, memb_join);
|
|
|
break;
|
|
|
-
|
|
|
+
|
|
|
case MEMB_STATE_COMMIT:
|
|
|
if (memb_set_subset (&memb_join->system_from,
|
|
|
1,
|
|
|
@@ -4091,7 +4091,7 @@ static int message_handler_memb_commit_token (
|
|
|
memb_set_subtract (sub, &sub_entries,
|
|
|
instance->my_proc_list, instance->my_proc_list_entries,
|
|
|
instance->my_failed_list, instance->my_failed_list_entries);
|
|
|
-
|
|
|
+
|
|
|
if (memb_set_equal (addr,
|
|
|
memb_commit_token->addr_entries,
|
|
|
sub,
|
|
|
@@ -4165,12 +4165,12 @@ void main_deliver_fn (
|
|
|
(unsigned int)msg_len);
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if ((int)message_header->type >= totemsrp_message_handlers.count) {
|
|
|
log_printf (instance->totemsrp_log_level_security, "Type of received message is wrong... ignoring %d.\n", (int)message_header->type);
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/*
|
|
|
* Handle incoming message
|
|
|
*/
|