|
@@ -103,6 +103,7 @@ int totemsrp_brake;
|
|
|
#define TIMEOUT_STATE_GATHER_CONSENSUS 200
|
|
#define TIMEOUT_STATE_GATHER_CONSENSUS 200
|
|
|
#define TIMEOUT_TOKEN 200
|
|
#define TIMEOUT_TOKEN 200
|
|
|
#define TIMEOUT_TOKEN_RETRANSMIT 45
|
|
#define TIMEOUT_TOKEN_RETRANSMIT 45
|
|
|
|
|
+#define TIMEOUT_MERGE_DETECT 200
|
|
|
#define PACKET_SIZE_MAX 2000
|
|
#define PACKET_SIZE_MAX 2000
|
|
|
#define FAIL_TO_RECV_CONST 250
|
|
#define FAIL_TO_RECV_CONST 250
|
|
|
#define SEQNO_UNCHANGED_CONST 20
|
|
#define SEQNO_UNCHANGED_CONST 20
|
|
@@ -193,6 +194,8 @@ static struct memb_ring_id my_old_ring_id;
|
|
|
|
|
|
|
|
static int my_aru_count = 0;
|
|
static int my_aru_count = 0;
|
|
|
|
|
|
|
|
|
|
+static int my_merge_detect_timeout_outstanding = 0;
|
|
|
|
|
+
|
|
|
static int my_last_aru = 0;
|
|
static int my_last_aru = 0;
|
|
|
|
|
|
|
|
static int my_seq_unchanged = 0;
|
|
static int my_seq_unchanged = 0;
|
|
@@ -265,6 +268,8 @@ poll_timer_handle timer_orf_token_timeout = 0;
|
|
|
|
|
|
|
|
poll_timer_handle timer_orf_token_retransmit_timeout = 0;
|
|
poll_timer_handle timer_orf_token_retransmit_timeout = 0;
|
|
|
|
|
|
|
|
|
|
+poll_timer_handle timer_merge_detect_timeout = 0;
|
|
|
|
|
+
|
|
|
poll_timer_handle memb_timer_state_gather_join_timeout = 0;
|
|
poll_timer_handle memb_timer_state_gather_join_timeout = 0;
|
|
|
|
|
|
|
|
poll_timer_handle memb_timer_state_gather_consensus_timeout = 0;
|
|
poll_timer_handle memb_timer_state_gather_consensus_timeout = 0;
|
|
@@ -848,8 +853,10 @@ void memb_set_print (char *string,
|
|
|
|
|
|
|
|
static void timer_function_orf_token_timeout (void *data);
|
|
static void timer_function_orf_token_timeout (void *data);
|
|
|
static void timer_function_token_retransmit_timeout (void *data);
|
|
static void timer_function_token_retransmit_timeout (void *data);
|
|
|
|
|
+static void timer_function_merge_detect_timeout (void *data);
|
|
|
|
|
|
|
|
-void reset_token_retransmit_timeout (void) {
|
|
|
|
|
|
|
+void reset_token_retransmit_timeout (void)
|
|
|
|
|
+{
|
|
|
poll_timer_delete (*totemsrp_poll_handle,
|
|
poll_timer_delete (*totemsrp_poll_handle,
|
|
|
timer_orf_token_retransmit_timeout);
|
|
timer_orf_token_retransmit_timeout);
|
|
|
poll_timer_add (*totemsrp_poll_handle, TIMEOUT_TOKEN_RETRANSMIT, 0,
|
|
poll_timer_add (*totemsrp_poll_handle, TIMEOUT_TOKEN_RETRANSMIT, 0,
|
|
@@ -858,6 +865,21 @@ void reset_token_retransmit_timeout (void) {
|
|
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+void start_merge_detect_timeout (void)
|
|
|
|
|
+{
|
|
|
|
|
+ if (my_merge_detect_timeout_outstanding == 0) {
|
|
|
|
|
+ poll_timer_add (*totemsrp_poll_handle, TIMEOUT_MERGE_DETECT, 0,
|
|
|
|
|
+ timer_function_merge_detect_timeout, &timer_merge_detect_timeout);
|
|
|
|
|
+ my_merge_detect_timeout_outstanding = 1;
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+void cancel_merge_detect_timeout (void)
|
|
|
|
|
+{
|
|
|
|
|
+ poll_timer_delete (*totemsrp_poll_handle, timer_merge_detect_timeout);
|
|
|
|
|
+ my_merge_detect_timeout_outstanding = 0;
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
/*
|
|
/*
|
|
|
* ring_state_* is used to save and restore the sort queue
|
|
* ring_state_* is used to save and restore the sort queue
|
|
|
* state when a recovery operation fails (and enters gather)
|
|
* state when a recovery operation fails (and enters gather)
|
|
@@ -952,7 +974,7 @@ static void memb_state_consensus_timeout_expired (void)
|
|
|
|
|
|
|
|
static int memb_join_message_send (void);
|
|
static int memb_join_message_send (void);
|
|
|
|
|
|
|
|
-static int memb_merge_detect_send (void);
|
|
|
|
|
|
|
+static int memb_merge_detect_transmit (void);
|
|
|
|
|
|
|
|
/*
|
|
/*
|
|
|
* Timers used for various states of the membership algorithm
|
|
* Timers used for various states of the membership algorithm
|
|
@@ -1221,6 +1243,7 @@ static void memb_state_gather_enter (void)
|
|
|
*/
|
|
*/
|
|
|
cancel_token_retransmit_timeout (); // REVIEWED
|
|
cancel_token_retransmit_timeout (); // REVIEWED
|
|
|
cancel_token_timeout (); // REVIEWED
|
|
cancel_token_timeout (); // REVIEWED
|
|
|
|
|
+ cancel_merge_detect_timeout ();
|
|
|
|
|
|
|
|
memb_consensus_reset ();
|
|
memb_consensus_reset ();
|
|
|
|
|
|
|
@@ -2489,6 +2512,22 @@ struct timeval timeval;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+void timer_function_merge_detect_timeout(void *data)
|
|
|
|
|
+{
|
|
|
|
|
+ my_merge_detect_timeout_outstanding = 0;
|
|
|
|
|
+
|
|
|
|
|
+ switch (memb_state) {
|
|
|
|
|
+ case MEMB_STATE_OPERATIONAL:
|
|
|
|
|
+ if (my_ring_id.rep.s_addr == my_id.sin_addr.s_addr) {
|
|
|
|
|
+ memb_merge_detect_transmit ();
|
|
|
|
|
+ }
|
|
|
|
|
+ break;
|
|
|
|
|
+ case MEMB_STATE_GATHER:
|
|
|
|
|
+ case MEMB_STATE_COMMIT:
|
|
|
|
|
+ case MEMB_STATE_RECOVERY:
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
/*
|
|
/*
|
|
|
* Send orf_token to next member (requires orf_token)
|
|
* Send orf_token to next member (requires orf_token)
|
|
@@ -2728,7 +2767,7 @@ int memb_join_message_send (void)
|
|
|
return (res);
|
|
return (res);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-static int memb_merge_detect_send (void)
|
|
|
|
|
|
|
+static int memb_merge_detect_transmit (void)
|
|
|
{
|
|
{
|
|
|
struct msghdr msghdr;
|
|
struct msghdr msghdr;
|
|
|
struct iovec iovec;
|
|
struct iovec iovec;
|
|
@@ -2994,11 +3033,15 @@ if (random () % 100 < 10) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /*
|
|
|
|
|
+ * Handle merge detection timeout
|
|
|
|
|
+ */
|
|
|
if (token_ref->seq == my_last_seq) {
|
|
if (token_ref->seq == my_last_seq) {
|
|
|
- my_seq_unchanged++;
|
|
|
|
|
|
|
+ start_merge_detect_timeout ();
|
|
|
} else {
|
|
} else {
|
|
|
- my_seq_unchanged = 0;
|
|
|
|
|
|
|
+ cancel_merge_detect_timeout ();
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
my_last_seq = token_ref->seq;
|
|
my_last_seq = token_ref->seq;
|
|
|
|
|
|
|
|
assert (bytes_received >= sizeof (struct orf_token));
|
|
assert (bytes_received >= sizeof (struct orf_token));
|
|
@@ -3201,9 +3244,6 @@ printf ("I held %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0);
|
|
|
reset_token_timeout (); // REVIEWED
|
|
reset_token_timeout (); // REVIEWED
|
|
|
if (forward_token == 0) {
|
|
if (forward_token == 0) {
|
|
|
reset_token_retransmit_timeout (); // REVIEWED
|
|
reset_token_retransmit_timeout (); // REVIEWED
|
|
|
- if (memb_state == MEMB_STATE_OPERATIONAL) {
|
|
|
|
|
- memb_merge_detect_send ();
|
|
|
|
|
- }
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
token_callbacks_execute (TOTEM_CALLBACK_TOKEN_SENT);
|
|
token_callbacks_execute (TOTEM_CALLBACK_TOKEN_SENT);
|