|
|
@@ -369,6 +369,8 @@ struct totemsrp_instance {
|
|
|
*/
|
|
|
struct cs_queue new_message_queue;
|
|
|
|
|
|
+ struct cs_queue new_message_queue_trans;
|
|
|
+
|
|
|
struct cs_queue retrans_message_queue;
|
|
|
|
|
|
struct sq regular_sort_queue;
|
|
|
@@ -502,6 +504,8 @@ struct totemsrp_instance {
|
|
|
uint32_t orf_token_discard;
|
|
|
|
|
|
uint32_t threaded_mode_enabled;
|
|
|
+
|
|
|
+ uint32_t waiting_trans_ack;
|
|
|
|
|
|
void * token_recv_event_handle;
|
|
|
void * token_sent_event_handle;
|
|
|
@@ -679,6 +683,8 @@ static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
|
|
|
instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
|
|
|
|
|
|
instance->my_id.no_addrs = INTERFACE_MAX;
|
|
|
+
|
|
|
+ instance->waiting_trans_ack = 1;
|
|
|
}
|
|
|
|
|
|
static void main_token_seqid_get (
|
|
|
@@ -950,6 +956,10 @@ int totemsrp_initialize (
|
|
|
MESSAGE_QUEUE_MAX,
|
|
|
sizeof (struct message_item), instance->threaded_mode_enabled);
|
|
|
|
|
|
+ cs_queue_init (&instance->new_message_queue_trans,
|
|
|
+ MESSAGE_QUEUE_MAX,
|
|
|
+ sizeof (struct message_item), instance->threaded_mode_enabled);
|
|
|
+
|
|
|
totemsrp_callback_token_create (instance,
|
|
|
&instance->token_recv_event_handle,
|
|
|
TOTEM_CALLBACK_TOKEN_RECEIVED,
|
|
|
@@ -981,6 +991,7 @@ void totemsrp_finalize (
|
|
|
memb_leave_message_send (instance);
|
|
|
totemrrp_finalize (instance->totemrrp_context);
|
|
|
cs_queue_free (&instance->new_message_queue);
|
|
|
+ cs_queue_free (&instance->new_message_queue_trans);
|
|
|
cs_queue_free (&instance->retrans_message_queue);
|
|
|
sq_free (&instance->regular_sort_queue);
|
|
|
sq_free (&instance->recovery_sort_queue);
|
|
|
@@ -1825,6 +1836,7 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance)
|
|
|
trans_memb_list_totemip, instance->my_trans_memb_entries,
|
|
|
left_list, instance->my_left_memb_entries,
|
|
|
0, 0, &instance->my_ring_id);
|
|
|
+ instance->waiting_trans_ack = 1;
|
|
|
|
|
|
// TODO we need to filter to ensure we only deliver those
|
|
|
// messages which are part of instance->my_deliver_memb
|
|
|
@@ -2265,8 +2277,15 @@ int totemsrp_mcast (
|
|
|
struct message_item message_item;
|
|
|
char *addr;
|
|
|
unsigned int addr_idx;
|
|
|
+ struct cs_queue *queue_use;
|
|
|
|
|
|
- if (cs_queue_is_full (&instance->new_message_queue)) {
|
|
|
+ if (instance->waiting_trans_ack) {
|
|
|
+ queue_use = &instance->new_message_queue_trans;
|
|
|
+ } else {
|
|
|
+ queue_use = &instance->new_message_queue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (cs_queue_is_full (queue_use)) {
|
|
|
log_printf (instance->totemsrp_log_level_debug, "queue full");
|
|
|
return (-1);
|
|
|
}
|
|
|
@@ -2305,7 +2324,7 @@ int totemsrp_mcast (
|
|
|
|
|
|
log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
|
|
|
instance->stats.mcast_tx++;
|
|
|
- cs_queue_item_add (&instance->new_message_queue, &message_item);
|
|
|
+ cs_queue_item_add (queue_use, &message_item);
|
|
|
|
|
|
return (0);
|
|
|
|
|
|
@@ -2320,8 +2339,14 @@ int totemsrp_avail (void *srp_context)
|
|
|
{
|
|
|
struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
|
|
|
int avail;
|
|
|
+ struct cs_queue *queue_use;
|
|
|
|
|
|
- cs_queue_avail (&instance->new_message_queue, &avail);
|
|
|
+ if (instance->waiting_trans_ack) {
|
|
|
+ queue_use = &instance->new_message_queue_trans;
|
|
|
+ } else {
|
|
|
+ queue_use = &instance->new_message_queue;
|
|
|
+ }
|
|
|
+ cs_queue_avail (queue_use, &avail);
|
|
|
|
|
|
return (avail);
|
|
|
}
|
|
|
@@ -2483,7 +2508,12 @@ static int orf_token_mcast (
|
|
|
sort_queue = &instance->recovery_sort_queue;
|
|
|
reset_token_retransmit_timeout (instance); // REVIEWED
|
|
|
} else {
|
|
|
- mcast_queue = &instance->new_message_queue;
|
|
|
+ if (instance->waiting_trans_ack) {
|
|
|
+ mcast_queue = &instance->new_message_queue_trans;
|
|
|
+ } else {
|
|
|
+ mcast_queue = &instance->new_message_queue;
|
|
|
+ }
|
|
|
+
|
|
|
sort_queue = &instance->regular_sort_queue;
|
|
|
}
|
|
|
|
|
|
@@ -3372,13 +3402,20 @@ static void token_callbacks_execute (
|
|
|
static unsigned int backlog_get (struct totemsrp_instance *instance)
|
|
|
{
|
|
|
unsigned int backlog = 0;
|
|
|
+ struct cs_queue *queue_use = NULL;
|
|
|
|
|
|
if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
|
|
|
- backlog = cs_queue_used (&instance->new_message_queue);
|
|
|
+ if (instance->waiting_trans_ack) {
|
|
|
+ queue_use = &instance->new_message_queue_trans;
|
|
|
+ } else {
|
|
|
+ queue_use = &instance->new_message_queue;
|
|
|
+ }
|
|
|
} else
|
|
|
if (instance->memb_state == MEMB_STATE_RECOVERY) {
|
|
|
- backlog = cs_queue_used (&instance->retrans_message_queue);
|
|
|
+ queue_use = &instance->retrans_message_queue;
|
|
|
}
|
|
|
+ backlog = cs_queue_used (queue_use);
|
|
|
+
|
|
|
instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
|
|
|
return (backlog);
|
|
|
}
|
|
|
@@ -4572,3 +4609,10 @@ void totemsrp_threaded_mode_enable (void *context)
|
|
|
|
|
|
instance->threaded_mode_enabled = 1;
|
|
|
}
|
|
|
+
|
|
|
+void totemsrp_trans_ack (void *context)
|
|
|
+{
|
|
|
+ struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
|
|
|
+
|
|
|
+ instance->waiting_trans_ack = 0;
|
|
|
+}
|