|
|
@@ -968,9 +968,11 @@ void deliver_messages_from_recovery_to_regular (void)
|
|
|
void *ptr;
|
|
|
struct mcast *mcast;
|
|
|
|
|
|
+printf ("recovery to regular %d-%d\n", 1, my_aru);
|
|
|
/*
|
|
|
* Move messages from recovery to regular sort queue
|
|
|
*/
|
|
|
+// todo should i be initialized to 0 or 1 ?
|
|
|
for (i = 1; i <= my_aru; i++) {
|
|
|
res = sq_item_get (&recovery_sort_queue, i, &ptr);
|
|
|
if (res != 0) {
|
|
|
@@ -1016,6 +1018,8 @@ static void memb_state_operational_enter (void)
|
|
|
|
|
|
deliver_messages_from_recovery_to_regular ();
|
|
|
|
|
|
+ printf ("Delivering to app %d to %d\n",
|
|
|
+ my_old_high_seq_delivered, my_high_ring_delivered);
|
|
|
messages_deliver_to_app (0, &my_old_high_seq_delivered, my_high_ring_delivered);
|
|
|
|
|
|
/*
|
|
|
@@ -1071,6 +1075,8 @@ static void memb_state_operational_enter (void)
|
|
|
my_failed_list_entries = 0;
|
|
|
// TODO the recovery messages are leaked
|
|
|
|
|
|
+ my_old_high_seq_delivered = 0;
|
|
|
+
|
|
|
totemsrp_log_printf (totemsrp_log_level_notice, "entering OPERATIONAL state.\n");
|
|
|
memb_state = MEMB_STATE_OPERATIONAL;
|
|
|
return;
|
|
|
@@ -1273,7 +1279,9 @@ j++;
|
|
|
my_seq_unchanged = 0;
|
|
|
my_high_seq_received = 0;
|
|
|
my_install_seq = 0;
|
|
|
- my_old_high_seq_delivered = my_high_seq_delivered;
|
|
|
+ if (my_old_high_seq_delivered == 0) {
|
|
|
+ my_old_high_seq_delivered = my_high_seq_delivered;
|
|
|
+ }
|
|
|
|
|
|
totemsrp_log_printf (totemsrp_log_level_notice, "entering RECOVERY state.\n");
|
|
|
reset_token_timeout (); // REVIEWED
|
|
|
@@ -2424,7 +2432,14 @@ static void memb_ring_id_store (
|
|
|
|
|
|
fd = open (filename, O_WRONLY, 0777);
|
|
|
if (fd == -1) {
|
|
|
- printf ("Couldn't store the ring id %s\n", strerror (errno));
|
|
|
+ fd = open (filename, O_CREAT|O_RDWR, 0777);
|
|
|
+ }
|
|
|
+ if (fd == -1) {
|
|
|
+ totemsrp_log_printf (totemsrp_log_level_notice,
|
|
|
+ "Couldn't store new ring id %llx to stable storage (%s)\n",
|
|
|
+ commit_token->ring_id.seq, strerror (errno));
|
|
|
+ assert (0);
|
|
|
+ return;
|
|
|
}
|
|
|
totemsrp_log_printf (totemsrp_log_level_notice,
|
|
|
"Storing new sequence id for ring %d\n", commit_token->ring_id.seq);
|
|
|
@@ -2557,6 +2572,8 @@ static int message_handler_orf_token (
|
|
|
int forward_token;
|
|
|
int mcasted;
|
|
|
int last_aru;
|
|
|
+ int low_water;
|
|
|
+
|
|
|
#ifdef GIVEINFO
|
|
|
struct timeval tv_current;
|
|
|
struct timeval tv_diff;
|
|
|
@@ -2713,8 +2730,12 @@ if (random () % 100 < 10) {
|
|
|
* has recovered all messages it can recover
|
|
|
* (ie: its retrans queue is empty)
|
|
|
*/
|
|
|
+ low_water = my_aru;
|
|
|
+ if (low_water > my_last_aru) {
|
|
|
+ low_water = my_last_aru;
|
|
|
+ }
|
|
|
if (queue_is_empty (&retrans_message_queue) == 0 ||
|
|
|
- my_aru != my_high_seq_received) {
|
|
|
+ low_water != my_high_seq_received) {
|
|
|
|
|
|
if (token->retrans_flg == 0) {
|
|
|
token->retrans_flg = 1;
|
|
|
@@ -2724,8 +2745,10 @@ if (random () % 100 < 10) {
|
|
|
if (token->retrans_flg == 1 && my_set_retrans_flg) {
|
|
|
token->retrans_flg = 0;
|
|
|
}
|
|
|
-printf ("token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d\n",
|
|
|
- token->retrans_flg, my_set_retrans_flg, queue_is_empty (&retrans_message_queue), my_retrans_flg_count);
|
|
|
+printf ("token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, low_water %d aru %d\n",
|
|
|
+ token->retrans_flg, my_set_retrans_flg,
|
|
|
+ queue_is_empty (&retrans_message_queue), my_retrans_flg_count,
|
|
|
+ low_water, token->aru);
|
|
|
if (token->retrans_flg == 0) {
|
|
|
my_retrans_flg_count += 1;
|
|
|
} else {
|
|
|
@@ -2744,10 +2767,14 @@ my_high_seq_received);
|
|
|
}
|
|
|
if (my_retrans_flg_count >= 3 && token->aru >= my_install_seq) {
|
|
|
my_rotation_counter += 1;
|
|
|
+ } else {
|
|
|
+ my_rotation_counter = 0;
|
|
|
}
|
|
|
if (my_rotation_counter == 2) {
|
|
|
-printf ("retrans flag count %d token aru %d install seq %d aru %d %d\n", my_retrans_flg_count,
|
|
|
- token->aru, my_install_seq, my_aru, token->seq);
|
|
|
+ printf ("retrans flag count %d token aru %d install seq %d aru %d %d\n",
|
|
|
+ my_retrans_flg_count, token->aru, my_install_seq,
|
|
|
+ my_aru, token->seq);
|
|
|
+
|
|
|
memb_state_operational_enter ();
|
|
|
my_rotation_counter = 0;
|
|
|
my_retrans_flg_count = 0;
|
|
|
@@ -3236,7 +3263,9 @@ if (random()%100 < 10) {
|
|
|
break;
|
|
|
|
|
|
case MEMB_STATE_COMMIT:
|
|
|
- if (memb_commit_token->ring_id.seq == my_ring_id.seq) {
|
|
|
+ if (memcmp (&memb_commit_token->ring_id, &my_ring_id,
|
|
|
+ sizeof (struct memb_ring_id)) == 0) {
|
|
|
+// if (memb_commit_token->ring_id.seq == my_ring_id.seq) {
|
|
|
memb_state_recovery_enter (memb_commit_token);
|
|
|
}
|
|
|
break;
|