|
|
@@ -1787,85 +1787,97 @@ static void memb_state_recovery_enter (
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
- if (local_received_flg == 0) {
|
|
|
- /*
|
|
|
- * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
|
|
|
- */
|
|
|
- for (i = 0; i < commit_token->addr_entries; i++) {
|
|
|
- if (memb_set_subset (&instance->my_new_memb_list[i], 1,
|
|
|
- instance->my_deliver_memb_list,
|
|
|
- instance->my_deliver_memb_entries) &&
|
|
|
+ if (local_received_flg == 1) {
|
|
|
+ goto no_originate;
|
|
|
+ } /* Else originate messages if we should */
|
|
|
|
|
|
- memcmp (&instance->my_old_ring_id,
|
|
|
- &memb_list[i].ring_id,
|
|
|
- sizeof (struct memb_ring_id)) == 0) {
|
|
|
-
|
|
|
- if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
|
|
|
+ /*
|
|
|
+ * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
|
|
|
+ */
|
|
|
+ for (i = 0; i < commit_token->addr_entries; i++) {
|
|
|
+ if (memb_set_subset (&instance->my_new_memb_list[i], 1,
|
|
|
+ instance->my_deliver_memb_list,
|
|
|
+ instance->my_deliver_memb_entries) &&
|
|
|
|
|
|
- low_ring_aru = memb_list[i].aru;
|
|
|
- }
|
|
|
- if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
|
|
|
- instance->my_high_ring_delivered = memb_list[i].high_delivered;
|
|
|
- }
|
|
|
+ memcmp (&instance->my_old_ring_id,
|
|
|
+ &memb_list[i].ring_id,
|
|
|
+ sizeof (struct memb_ring_id)) == 0) {
|
|
|
+
|
|
|
+ if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
|
|
|
+
|
|
|
+ low_ring_aru = memb_list[i].aru;
|
|
|
+ }
|
|
|
+ if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
|
|
|
+ instance->my_high_ring_delivered = memb_list[i].high_delivered;
|
|
|
}
|
|
|
}
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Copy all old ring messages to instance->retrans_message_queue
|
|
|
+ */
|
|
|
+ range = instance->old_ring_state_high_seq_received - low_ring_aru;
|
|
|
+ if (range == 0) {
|
|
|
/*
|
|
|
- * Copy all old ring messages to instance->retrans_message_queue
|
|
|
+ * No messages to copy
|
|
|
*/
|
|
|
- log_printf (instance->totemsrp_log_level_notice,
|
|
|
- "copying all old ring messages from %x-%x.\n",
|
|
|
- low_ring_aru + 1, instance->old_ring_state_high_seq_received);
|
|
|
- strcpy (not_originated, "Not Originated for recovery: ");
|
|
|
- strcpy (is_originated, "Originated for recovery: ");
|
|
|
-
|
|
|
- range = instance->old_ring_state_high_seq_received - low_ring_aru;
|
|
|
- assert (range < 1024);
|
|
|
- for (i = 1; i <= range; i++) {
|
|
|
-
|
|
|
- struct sort_queue_item *sort_queue_item;
|
|
|
- struct message_item message_item;
|
|
|
- void *ptr;
|
|
|
- int res;
|
|
|
-
|
|
|
- sprintf (seqno_string_hex, "%x ", low_ring_aru + i);
|
|
|
- res = sq_item_get (&instance->regular_sort_queue,
|
|
|
- low_ring_aru + i, &ptr);
|
|
|
- if (res != 0) {
|
|
|
- strcat (not_originated, seqno_string_hex);
|
|
|
- continue;
|
|
|
- }
|
|
|
- strcat (is_originated, seqno_string_hex);
|
|
|
- sort_queue_item = ptr;
|
|
|
- assert (sort_queue_item->iov_len > 0);
|
|
|
- assert (sort_queue_item->iov_len <= MAXIOVS);
|
|
|
- messages_originated++;
|
|
|
- memset (&message_item, 0, sizeof (struct message_item));
|
|
|
-// TODO LEAK
|
|
|
- message_item.mcast = malloc (sizeof (struct mcast));
|
|
|
- assert (message_item.mcast);
|
|
|
- memcpy (message_item.mcast, sort_queue_item->iovec[0].iov_base,
|
|
|
- sizeof (struct mcast));
|
|
|
- memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
|
|
|
- sizeof (struct memb_ring_id));
|
|
|
- message_item.mcast->header.encapsulated = 1;
|
|
|
- message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
|
|
|
- assert (message_item.mcast->header.nodeid);
|
|
|
- message_item.iov_len = sort_queue_item->iov_len;
|
|
|
- memcpy (&message_item.iovec, &sort_queue_item->iovec, sizeof (struct iovec) *
|
|
|
- sort_queue_item->iov_len);
|
|
|
- queue_item_add (&instance->retrans_message_queue, &message_item);
|
|
|
- }
|
|
|
- log_printf (instance->totemsrp_log_level_notice,
|
|
|
- "Originated %d messages in RECOVERY.\n", messages_originated);
|
|
|
- strcat (not_originated, "\n");
|
|
|
- strcat (is_originated, "\n");
|
|
|
- log_printf (instance->totemsrp_log_level_notice, is_originated);
|
|
|
- log_printf (instance->totemsrp_log_level_notice, not_originated);
|
|
|
- } else {
|
|
|
- log_printf (instance->totemsrp_log_level_notice,
|
|
|
- "Did not need to originate any messages in recovery.\n");
|
|
|
+ goto no_originate;
|
|
|
+ }
|
|
|
+ assert (range < 1024);
|
|
|
+
|
|
|
+ log_printf (instance->totemsrp_log_level_notice,
|
|
|
+ "copying all old ring messages from %x-%x.\n",
|
|
|
+ low_ring_aru + 1, instance->old_ring_state_high_seq_received);
|
|
|
+ strcpy (not_originated, "Not Originated for recovery: ");
|
|
|
+ strcpy (is_originated, "Originated for recovery: ");
|
|
|
+
|
|
|
+ for (i = 1; i <= range; i++) {
|
|
|
+ struct sort_queue_item *sort_queue_item;
|
|
|
+ struct message_item message_item;
|
|
|
+ void *ptr;
|
|
|
+ int res;
|
|
|
+
|
|
|
+ sprintf (seqno_string_hex, "%x ", low_ring_aru + i);
|
|
|
+ res = sq_item_get (&instance->regular_sort_queue,
|
|
|
+ low_ring_aru + i, &ptr);
|
|
|
+ if (res != 0) {
|
|
|
+ strcat (not_originated, seqno_string_hex);
|
|
|
+ continue;
|
|
|
}
|
|
|
+ strcat (is_originated, seqno_string_hex);
|
|
|
+ sort_queue_item = ptr;
|
|
|
+ assert (sort_queue_item->iov_len > 0);
|
|
|
+ assert (sort_queue_item->iov_len <= MAXIOVS);
|
|
|
+ messages_originated++;
|
|
|
+ memset (&message_item, 0, sizeof (struct message_item));
|
|
|
+// TODO LEAK
|
|
|
+ message_item.mcast = malloc (sizeof (struct mcast));
|
|
|
+ assert (message_item.mcast);
|
|
|
+ memcpy (message_item.mcast, sort_queue_item->iovec[0].iov_base,
|
|
|
+ sizeof (struct mcast));
|
|
|
+ memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
|
|
|
+ sizeof (struct memb_ring_id));
|
|
|
+ message_item.mcast->header.encapsulated = 1;
|
|
|
+ message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
|
|
|
+ assert (message_item.mcast->header.nodeid);
|
|
|
+ message_item.iov_len = sort_queue_item->iov_len;
|
|
|
+ memcpy (&message_item.iovec, &sort_queue_item->iovec,
|
|
|
+ sizeof (struct iovec) * sort_queue_item->iov_len);
|
|
|
+ queue_item_add (&instance->retrans_message_queue, &message_item);
|
|
|
+ }
|
|
|
+ log_printf (instance->totemsrp_log_level_notice,
|
|
|
+ "Originated %d messages in RECOVERY.\n", messages_originated);
|
|
|
+ strcat (not_originated, "\n");
|
|
|
+ strcat (is_originated, "\n");
|
|
|
+ log_printf (instance->totemsrp_log_level_notice, is_originated);
|
|
|
+ log_printf (instance->totemsrp_log_level_notice, not_originated);
|
|
|
+ goto originated;
|
|
|
+
|
|
|
+no_originate:
|
|
|
+ log_printf (instance->totemsrp_log_level_notice,
|
|
|
+ "Did not need to originate any messages in recovery.\n");
|
|
|
|
|
|
+originated:
|
|
|
instance->my_aru = SEQNO_START_MSG;
|
|
|
instance->my_aru_count = 0;
|
|
|
instance->my_seq_unchanged = 0;
|