|
|
@@ -143,6 +143,11 @@ enum message_type {
|
|
|
MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */
|
|
|
};
|
|
|
|
|
|
+enum encapsulation_type {
|
|
|
+ MESSAGE_ENCAPSULATED = 1,
|
|
|
+ MESSAGE_NOT_ENCAPSULATED = 2
|
|
|
+};
|
|
|
+
|
|
|
/*
|
|
|
* New membership algorithm local variables
|
|
|
*/
|
|
|
@@ -678,7 +683,6 @@ int totemsrp_initialize (
|
|
|
{
|
|
|
struct totemsrp_instance *instance;
|
|
|
unsigned int res;
|
|
|
-
|
|
|
res = hdb_handle_create (&totemsrp_instance_database,
|
|
|
sizeof (struct totemsrp_instance), handle);
|
|
|
if (res != 0) {
|
|
|
@@ -1917,8 +1921,9 @@ static void memb_state_recovery_enter (
|
|
|
sizeof (struct mcast));
|
|
|
memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
|
|
|
sizeof (struct memb_ring_id));
|
|
|
- message_item.mcast->header.encapsulated = 1;
|
|
|
+ message_item.mcast->header.encapsulated = MESSAGE_ENCAPSULATED;
|
|
|
message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
|
|
|
+ message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
|
|
|
assert (message_item.mcast->header.nodeid);
|
|
|
message_item.iov_len = sort_queue_item->iov_len;
|
|
|
memcpy (&message_item.iovec, &sort_queue_item->iovec,
|
|
|
@@ -2013,7 +2018,7 @@ int totemsrp_mcast (
|
|
|
*/
|
|
|
message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
|
|
|
message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
|
|
|
- message_item.mcast->header.encapsulated = 2;
|
|
|
+ message_item.mcast->header.encapsulated = MESSAGE_NOT_ENCAPSULATED;
|
|
|
message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
|
|
|
assert (message_item.mcast->header.nodeid);
|
|
|
|
|
|
@@ -2584,14 +2589,14 @@ static int orf_token_send_initial (struct totemsrp_instance *instance)
|
|
|
orf_token.token_seq = SEQNO_START_TOKEN;
|
|
|
orf_token.retrans_flg = 1;
|
|
|
instance->my_set_retrans_flg = 1;
|
|
|
-/*
|
|
|
+
|
|
|
if (queue_is_empty (&instance->retrans_message_queue) == 1) {
|
|
|
orf_token.retrans_flg = 0;
|
|
|
+ instance->my_set_retrans_flg = 0;
|
|
|
} else {
|
|
|
orf_token.retrans_flg = 1;
|
|
|
instance->my_set_retrans_flg = 1;
|
|
|
}
|
|
|
-*/
|
|
|
|
|
|
orf_token.aru = 0;
|
|
|
orf_token.aru = SEQNO_START_MSG - 1;
|
|
|
@@ -2627,6 +2632,9 @@ static void memb_state_commit_token_update (
|
|
|
* TODO high delivered is really instance->my_aru, but with safe this
|
|
|
* could change?
|
|
|
*/
|
|
|
+ instance->my_received_flg =
|
|
|
+ (instance->my_aru == instance->my_high_seq_received);
|
|
|
+
|
|
|
memb_list[commit_token->memb_index].high_delivered = instance->my_high_delivered;
|
|
|
memb_list[commit_token->memb_index].received_flg = instance->my_received_flg;
|
|
|
|
|
|
@@ -3110,7 +3118,6 @@ static int message_handler_orf_token (
|
|
|
unsigned int mcasted_retransmit;
|
|
|
unsigned int mcasted_regular;
|
|
|
unsigned int last_aru;
|
|
|
- unsigned int low_water;
|
|
|
|
|
|
#ifdef GIVEINFO
|
|
|
struct timeval tv_current;
|
|
|
@@ -3304,13 +3311,7 @@ static int message_handler_orf_token (
|
|
|
* has recovered all messages it can recover
|
|
|
* (ie: its retrans queue is empty)
|
|
|
*/
|
|
|
- low_water = instance->my_aru;
|
|
|
- if (sq_lt_compare (last_aru, low_water)) {
|
|
|
- low_water = last_aru;
|
|
|
- }
|
|
|
-// TODO is this code right
|
|
|
- if (queue_is_empty (&instance->retrans_message_queue) == 0 ||
|
|
|
- low_water != instance->my_high_seq_received) {
|
|
|
+ if (queue_is_empty (&instance->retrans_message_queue) == 0) {
|
|
|
|
|
|
if (token->retrans_flg == 0) {
|
|
|
token->retrans_flg = 1;
|
|
|
@@ -3321,10 +3322,10 @@ static int message_handler_orf_token (
|
|
|
token->retrans_flg = 0;
|
|
|
}
|
|
|
log_printf (instance->totemsrp_log_level_debug,
|
|
|
- "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, low_water %x aru %x\n",
|
|
|
+ "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x\n",
|
|
|
token->retrans_flg, instance->my_set_retrans_flg,
|
|
|
queue_is_empty (&instance->retrans_message_queue),
|
|
|
- instance->my_retrans_flg_count, low_water, token->aru);
|
|
|
+ instance->my_retrans_flg_count, token->aru);
|
|
|
if (token->retrans_flg == 0) {
|
|
|
instance->my_retrans_flg_count += 1;
|
|
|
} else {
|
|
|
@@ -3336,13 +3337,16 @@ static int message_handler_orf_token (
|
|
|
log_printf (instance->totemsrp_log_level_debug,
|
|
|
"install seq %x aru %x high seq received %x\n",
|
|
|
instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
|
|
|
- if (instance->my_retrans_flg_count >= 2 && instance->my_aru >= instance->my_install_seq && instance->my_received_flg == 0) {
|
|
|
+ if (instance->my_retrans_flg_count >= 2 &&
|
|
|
+ instance->my_received_flg == 0 &&
|
|
|
+ sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
|
|
|
instance->my_received_flg = 1;
|
|
|
instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
|
|
|
memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
|
|
|
sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
|
|
|
}
|
|
|
- if (instance->my_retrans_flg_count >= 3 && token->aru >= instance->my_install_seq) {
|
|
|
+ if (instance->my_retrans_flg_count >= 3 &&
|
|
|
+ sq_lte_compare (instance->my_install_seq, token->aru)) {
|
|
|
instance->my_rotation_counter += 1;
|
|
|
} else {
|
|
|
instance->my_rotation_counter = 0;
|
|
|
@@ -3541,18 +3545,12 @@ static int message_handler_mcast (
|
|
|
memcpy (&mcast_header, msg, sizeof (struct mcast));
|
|
|
}
|
|
|
|
|
|
-/*
|
|
|
- if (mcast_header.header.encapsulated == 1) {
|
|
|
- sort_queue = &instance->recovery_sort_queue;
|
|
|
- } else {
|
|
|
- sort_queue = &instance->regular_sort_queue;
|
|
|
- }
|
|
|
-*/
|
|
|
- if (instance->memb_state == MEMB_STATE_RECOVERY) {
|
|
|
+ if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
|
|
|
sort_queue = &instance->recovery_sort_queue;
|
|
|
} else {
|
|
|
sort_queue = &instance->regular_sort_queue;
|
|
|
}
|
|
|
+
|
|
|
assert (msg_len < FRAME_SIZE_MAX);
|
|
|
|
|
|
#ifdef TEST_DROP_MCAST_PERCENTAGE
|
|
|
@@ -3879,6 +3877,8 @@ static void mcast_endian_convert (struct mcast *in, struct mcast *out)
|
|
|
out->header.type = in->header.type;
|
|
|
out->header.endian_detector = ENDIAN_LOCAL;
|
|
|
out->header.nodeid = swab32 (in->header.nodeid);
|
|
|
+ out->header.encapsulated = in->header.encapsulated;
|
|
|
+
|
|
|
out->seq = swab32 (in->seq);
|
|
|
out->this_seqno = swab32 (in->this_seqno);
|
|
|
memb_ring_id_copy_endian_convert (&out->ring_id, &in->ring_id);
|