|
|
@@ -89,6 +89,8 @@ static void (*sync_next_start) (
|
|
|
|
|
|
static int sync_recovery_index = 0;
|
|
|
|
|
|
+static int my_processing_idx = 0;
|
|
|
+
|
|
|
static void *sync_callback_token_handle = 0;
|
|
|
|
|
|
static struct barrier_data barrier_data_confchg[PROCESSOR_COUNT_MAX];
|
|
|
@@ -186,12 +188,16 @@ static void sync_start_init (const struct memb_ring_id *ring_id)
|
|
|
static void sync_service_init (struct memb_ring_id *ring_id)
|
|
|
{
|
|
|
if (sync_callbacks.api_version == 1) {
|
|
|
- sync_callbacks.sync_init_api.sync_init_v1 (my_member_list,
|
|
|
- my_member_list_entries, ring_id);
|
|
|
+ if (sync_callbacks_retrieve(my_processing_idx, NULL) != -1) {
|
|
|
+ sync_callbacks.sync_init_api.sync_init_v1 (my_member_list,
|
|
|
+ my_member_list_entries, ring_id);
|
|
|
+ }
|
|
|
} else {
|
|
|
- sync_callbacks.sync_init_api.sync_init_v2 (my_trans_list,
|
|
|
- my_trans_list_entries,
|
|
|
- my_member_list, my_member_list_entries, ring_id);
|
|
|
+ if (sync_callbacks_retrieve(my_processing_idx, NULL) != -1) {
|
|
|
+ sync_callbacks.sync_init_api.sync_init_v2 (my_trans_list,
|
|
|
+ my_trans_list_entries,
|
|
|
+ my_member_list, my_member_list_entries, ring_id);
|
|
|
+ }
|
|
|
}
|
|
|
totempg_callback_token_destroy (&sync_callback_token_handle);
|
|
|
|
|
|
@@ -237,6 +243,7 @@ static void sync_callbacks_load (void)
|
|
|
sync_processing = 0;
|
|
|
break;
|
|
|
}
|
|
|
+ my_processing_idx = sync_recovery_index;
|
|
|
sync_recovery_index += 1;
|
|
|
if (sync_callbacks.sync_init_api.sync_init_v1) {
|
|
|
break;
|
|
|
@@ -263,7 +270,11 @@ static int sync_service_process (enum totem_callback_token_type type,
|
|
|
* If process returns 0, then its time to activate
|
|
|
* and start the next service's synchronization
|
|
|
*/
|
|
|
- res = sync_callbacks.sync_process ();
|
|
|
+ if (sync_callbacks_retrieve(my_processing_idx, NULL) != -1) {
|
|
|
+ res = sync_callbacks.sync_process ();
|
|
|
+ } else {
|
|
|
+ res = 0;
|
|
|
+ }
|
|
|
if (res != 0) {
|
|
|
return (0);
|
|
|
}
|
|
|
@@ -341,6 +352,7 @@ static void sync_primary_callback_fn (
|
|
|
totempg_callback_token_destroy (&sync_callback_token_handle);
|
|
|
|
|
|
sync_recovery_index = 0;
|
|
|
+ my_processing_idx = 0;
|
|
|
memset (&barrier_data_confchg, 0, sizeof (barrier_data_confchg));
|
|
|
for (i = 0; i < view_list_entries; i++) {
|
|
|
barrier_data_confchg[i].nodeid = view_list[i];
|
|
|
@@ -426,7 +438,9 @@ static void sync_deliver_fn (
|
|
|
* This sync is complete so activate and start next service sync
|
|
|
*/
|
|
|
if (barrier_completed && sync_callbacks.sync_activate) {
|
|
|
- sync_callbacks.sync_activate ();
|
|
|
+ if (sync_callbacks_retrieve(my_processing_idx, NULL) != -1) {
|
|
|
+ sync_callbacks.sync_activate ();
|
|
|
+ }
|
|
|
|
|
|
log_printf (LOGSYS_LEVEL_DEBUG,
|
|
|
"Committing synchronization for (%s)\n",
|
|
|
@@ -481,7 +495,9 @@ static void sync_confchg_fn (
|
|
|
|
|
|
sync_aborted ();
|
|
|
if (sync_processing && sync_callbacks.sync_abort != NULL) {
|
|
|
- sync_callbacks.sync_abort ();
|
|
|
+ if (sync_callbacks_retrieve(my_processing_idx, NULL) != -1) {
|
|
|
+ sync_callbacks.sync_abort ();
|
|
|
+ }
|
|
|
sync_callbacks.sync_activate = NULL;
|
|
|
}
|
|
|
|