|
@@ -368,6 +368,8 @@ struct totemsrp_instance {
|
|
|
*/
|
|
*/
|
|
|
struct cs_queue new_message_queue;
|
|
struct cs_queue new_message_queue;
|
|
|
|
|
|
|
|
|
|
+ struct cs_queue new_message_queue_trans;
|
|
|
|
|
+
|
|
|
struct cs_queue retrans_message_queue;
|
|
struct cs_queue retrans_message_queue;
|
|
|
|
|
|
|
|
struct sq regular_sort_queue;
|
|
struct sq regular_sort_queue;
|
|
@@ -497,6 +499,8 @@ struct totemsrp_instance {
|
|
|
|
|
|
|
|
uint32_t orf_token_discard;
|
|
uint32_t orf_token_discard;
|
|
|
|
|
|
|
|
|
|
+ uint32_t waiting_trans_ack;
|
|
|
|
|
+
|
|
|
void * token_recv_event_handle;
|
|
void * token_recv_event_handle;
|
|
|
void * token_sent_event_handle;
|
|
void * token_sent_event_handle;
|
|
|
char commit_token_storage[40000];
|
|
char commit_token_storage[40000];
|
|
@@ -675,6 +679,8 @@ static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
|
|
|
instance->orf_token_discard = 0;
|
|
instance->orf_token_discard = 0;
|
|
|
|
|
|
|
|
instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
|
|
instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
|
|
|
|
|
+
|
|
|
|
|
+ instance->waiting_trans_ack = 1;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
static void main_token_seqid_get (
|
|
static void main_token_seqid_get (
|
|
@@ -945,6 +951,10 @@ int totemsrp_initialize (
|
|
|
MESSAGE_QUEUE_MAX,
|
|
MESSAGE_QUEUE_MAX,
|
|
|
sizeof (struct message_item));
|
|
sizeof (struct message_item));
|
|
|
|
|
|
|
|
|
|
+ cs_queue_init (&instance->new_message_queue_trans,
|
|
|
|
|
+ MESSAGE_QUEUE_MAX,
|
|
|
|
|
+ sizeof (struct message_item));
|
|
|
|
|
+
|
|
|
totemsrp_callback_token_create (instance,
|
|
totemsrp_callback_token_create (instance,
|
|
|
&instance->token_recv_event_handle,
|
|
&instance->token_recv_event_handle,
|
|
|
TOTEM_CALLBACK_TOKEN_RECEIVED,
|
|
TOTEM_CALLBACK_TOKEN_RECEIVED,
|
|
@@ -1771,6 +1781,7 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance)
|
|
|
trans_memb_list_totemip, instance->my_trans_memb_entries,
|
|
trans_memb_list_totemip, instance->my_trans_memb_entries,
|
|
|
left_list, instance->my_left_memb_entries,
|
|
left_list, instance->my_left_memb_entries,
|
|
|
0, 0, &instance->my_ring_id);
|
|
0, 0, &instance->my_ring_id);
|
|
|
|
|
+ instance->waiting_trans_ack = 1;
|
|
|
|
|
|
|
|
// TODO we need to filter to ensure we only deliver those
|
|
// TODO we need to filter to ensure we only deliver those
|
|
|
// messages which are part of instance->my_deliver_memb
|
|
// messages which are part of instance->my_deliver_memb
|
|
@@ -2207,8 +2218,15 @@ int totemsrp_mcast (
|
|
|
struct message_item message_item;
|
|
struct message_item message_item;
|
|
|
char *addr;
|
|
char *addr;
|
|
|
unsigned int addr_idx;
|
|
unsigned int addr_idx;
|
|
|
|
|
+ struct cs_queue *queue_use;
|
|
|
|
|
|
|
|
- if (cs_queue_is_full (&instance->new_message_queue)) {
|
|
|
|
|
|
|
+ if (instance->waiting_trans_ack) {
|
|
|
|
|
+ queue_use = &instance->new_message_queue_trans;
|
|
|
|
|
+ } else {
|
|
|
|
|
+ queue_use = &instance->new_message_queue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (cs_queue_is_full (queue_use)) {
|
|
|
log_printf (instance->totemsrp_log_level_debug, "queue full\n");
|
|
log_printf (instance->totemsrp_log_level_debug, "queue full\n");
|
|
|
return (-1);
|
|
return (-1);
|
|
|
}
|
|
}
|
|
@@ -2246,7 +2264,7 @@ int totemsrp_mcast (
|
|
|
|
|
|
|
|
TRACE1 ("mcasted message added to pending queue\n");
|
|
TRACE1 ("mcasted message added to pending queue\n");
|
|
|
instance->stats.mcast_tx++;
|
|
instance->stats.mcast_tx++;
|
|
|
- cs_queue_item_add (&instance->new_message_queue, &message_item);
|
|
|
|
|
|
|
+ cs_queue_item_add (queue_use, &message_item);
|
|
|
|
|
|
|
|
return (0);
|
|
return (0);
|
|
|
|
|
|
|
@@ -2261,8 +2279,14 @@ int totemsrp_avail (void *srp_context)
|
|
|
{
|
|
{
|
|
|
struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
|
|
struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
|
|
|
int avail;
|
|
int avail;
|
|
|
|
|
+ struct cs_queue *queue_use;
|
|
|
|
|
|
|
|
- cs_queue_avail (&instance->new_message_queue, &avail);
|
|
|
|
|
|
|
+ if (instance->waiting_trans_ack) {
|
|
|
|
|
+ queue_use = &instance->new_message_queue_trans;
|
|
|
|
|
+ } else {
|
|
|
|
|
+ queue_use = &instance->new_message_queue;
|
|
|
|
|
+ }
|
|
|
|
|
+ cs_queue_avail (queue_use, &avail);
|
|
|
|
|
|
|
|
return (avail);
|
|
return (avail);
|
|
|
}
|
|
}
|
|
@@ -2423,7 +2447,12 @@ static int orf_token_mcast (
|
|
|
sort_queue = &instance->recovery_sort_queue;
|
|
sort_queue = &instance->recovery_sort_queue;
|
|
|
reset_token_retransmit_timeout (instance); // REVIEWED
|
|
reset_token_retransmit_timeout (instance); // REVIEWED
|
|
|
} else {
|
|
} else {
|
|
|
- mcast_queue = &instance->new_message_queue;
|
|
|
|
|
|
|
+ if (instance->waiting_trans_ack) {
|
|
|
|
|
+ mcast_queue = &instance->new_message_queue_trans;
|
|
|
|
|
+ } else {
|
|
|
|
|
+ mcast_queue = &instance->new_message_queue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
sort_queue = &instance->regular_sort_queue;
|
|
sort_queue = &instance->regular_sort_queue;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -3325,13 +3354,22 @@ static void token_callbacks_execute (
|
|
|
static unsigned int backlog_get (struct totemsrp_instance *instance)
|
|
static unsigned int backlog_get (struct totemsrp_instance *instance)
|
|
|
{
|
|
{
|
|
|
unsigned int backlog = 0;
|
|
unsigned int backlog = 0;
|
|
|
|
|
+ struct cs_queue *queue_use = NULL;
|
|
|
|
|
|
|
|
if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
|
|
if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
|
|
|
backlog = cs_queue_used (&instance->new_message_queue);
|
|
backlog = cs_queue_used (&instance->new_message_queue);
|
|
|
|
|
+ if (instance->waiting_trans_ack) {
|
|
|
|
|
+ queue_use = &instance->new_message_queue_trans;
|
|
|
|
|
+ } else {
|
|
|
|
|
+ queue_use = &instance->new_message_queue;
|
|
|
|
|
+ }
|
|
|
} else
|
|
} else
|
|
|
if (instance->memb_state == MEMB_STATE_RECOVERY) {
|
|
if (instance->memb_state == MEMB_STATE_RECOVERY) {
|
|
|
backlog = cs_queue_used (&instance->retrans_message_queue);
|
|
backlog = cs_queue_used (&instance->retrans_message_queue);
|
|
|
|
|
+ queue_use = &instance->retrans_message_queue;
|
|
|
}
|
|
}
|
|
|
|
|
+ backlog = cs_queue_used (queue_use);
|
|
|
|
|
+
|
|
|
instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
|
|
instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
|
|
|
return (backlog);
|
|
return (backlog);
|
|
|
}
|
|
}
|
|
@@ -4517,3 +4555,10 @@ int totemsrp_member_remove (
|
|
|
|
|
|
|
|
return (res);
|
|
return (res);
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+void totemsrp_trans_ack (void *context)
|
|
|
|
|
+{
|
|
|
|
|
+ struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
|
|
|
|
|
+
|
|
|
|
|
+ instance->waiting_trans_ack = 0;
|
|
|
|
|
+}
|