|
|
@@ -1,5 +1,6 @@
|
|
|
/*
|
|
|
- * Copyright (c) 2003-2005 MontaVista Software, Inc.
|
|
|
+ * Copyright (c) 2003-2006 MontaVista Software, Inc.
|
|
|
+ * Copyright (c) 2006 RedHat, Inc.
|
|
|
*
|
|
|
* All rights reserved.
|
|
|
*
|
|
|
@@ -81,10 +82,10 @@
|
|
|
|
|
|
#include "crypto.h"
|
|
|
|
|
|
-#define LOCALHOST_IP inet_addr("127.0.0.1")
|
|
|
+#define LOCALHOST_IP inet_addr("127.0.0.1")
|
|
|
#define QUEUE_RTR_ITEMS_SIZE_MAX 256 /* allow 512 retransmit items */
|
|
|
-#define RETRANS_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
|
|
|
-#define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
|
|
|
+#define RETRANS_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
|
|
|
+#define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
|
|
|
#define MAXIOVS 5
|
|
|
#define RETRANSMIT_ENTRIES_MAX 30
|
|
|
#define MISSING_MCAST_WINDOW 128
|
|
|
@@ -117,8 +118,8 @@
|
|
|
#define ENDIAN_LOCAL 0xff22
|
|
|
|
|
|
enum message_type {
|
|
|
- MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
|
|
|
- MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
|
|
|
+ MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
|
|
|
+ MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
|
|
|
MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
|
|
|
MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
|
|
|
MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
|
|
|
@@ -186,7 +187,8 @@ struct orf_token {
|
|
|
unsigned int aru;
|
|
|
struct totem_ip_address aru_addr;
|
|
|
struct memb_ring_id ring_id;
|
|
|
- short int fcc;
|
|
|
+ unsigned int backlog;
|
|
|
+ unsigned int fcc;
|
|
|
int retrans_flg;
|
|
|
int rtr_list_entries;
|
|
|
struct rtr_item rtr_list[0];
|
|
|
@@ -446,7 +448,11 @@ struct totemsrp_instance {
|
|
|
|
|
|
struct totem_config *totem_config;
|
|
|
|
|
|
- int use_heartbeat;
|
|
|
+ unsigned int use_heartbeat;
|
|
|
+
|
|
|
+ unsigned int my_trc;
|
|
|
+
|
|
|
+ unsigned int my_pbl;
|
|
|
};
|
|
|
|
|
|
struct message_handlers {
|
|
|
@@ -676,8 +682,14 @@ int totemsrp_initialize (
|
|
|
totem_config->downcheck_timeout, totem_config->fail_to_recv_const);
|
|
|
instance->totemsrp_log_printf (instance->totemsrp_log_level_notice,
|
|
|
"seqno unchanged const (%d rotations) Maximum network MTU %d\n", totem_config->seqno_unchanged_const, totem_config->net_mtu);
|
|
|
+
|
|
|
+ instance->totemsrp_log_printf (instance->totemsrp_log_level_notice,
|
|
|
+ "window size per rotation (%d messages) maximum messages per rotation (%d messages)\n",
|
|
|
+ totem_config->window_size, totem_config->max_messages);
|
|
|
+
|
|
|
instance->totemsrp_log_printf (instance->totemsrp_log_level_notice,
|
|
|
"send threads (%d threads)\n", totem_config->threads);
|
|
|
+
|
|
|
instance->totemsrp_log_printf (instance->totemsrp_log_level_notice,
|
|
|
"heartbeat_failures_allowed (%d)\n", totem_config->heartbeat_failures_allowed);
|
|
|
instance->totemsrp_log_printf (instance->totemsrp_log_level_notice,
|
|
|
@@ -1497,6 +1509,11 @@ static void memb_state_commit_enter (
|
|
|
|
|
|
instance->memb_state = MEMB_STATE_COMMIT;
|
|
|
|
|
|
+ /*
|
|
|
+ * reset all flow control variables since we are starting a new ring
|
|
|
+ */
|
|
|
+ instance->my_trc = 0;
|
|
|
+ instance->my_pbl = 0;
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
@@ -1980,7 +1997,7 @@ static int orf_token_mcast (
|
|
|
sort_queue = &instance->regular_sort_queue;
|
|
|
}
|
|
|
|
|
|
- for (instance->fcc_mcast_current = 0; instance->fcc_mcast_current < fcc_mcasts_allowed; instance->fcc_mcast_current++) {
|
|
|
+ for (instance->fcc_mcast_current = 0; instance->fcc_mcast_current <= fcc_mcasts_allowed; instance->fcc_mcast_current++) {
|
|
|
if (queue_is_empty (mcast_queue)) {
|
|
|
break;
|
|
|
}
|
|
|
@@ -2329,6 +2346,7 @@ static int orf_token_send_initial (struct totemsrp_instance *instance)
|
|
|
|
|
|
memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
|
|
|
orf_token.fcc = 0;
|
|
|
+ orf_token.backlog = 0;
|
|
|
|
|
|
orf_token.rtr_list_entries = 0;
|
|
|
|
|
|
@@ -2673,6 +2691,61 @@ static void token_callbacks_execute (
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+/*
|
|
|
+ * Flow control functions
|
|
|
+ */
|
|
|
+static unsigned int backlog_get (struct totemsrp_instance *instance)
|
|
|
+{
|
|
|
+ unsigned int backlog = 0;
|
|
|
+
|
|
|
+ if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
|
|
|
+ backlog = queue_used (&instance->new_message_queue);
|
|
|
+ } else
|
|
|
+ if (instance->memb_state == MEMB_STATE_RECOVERY) {
|
|
|
+ backlog = queue_used (&instance->retrans_message_queue);
|
|
|
+ }
|
|
|
+ return (backlog);
|
|
|
+}
|
|
|
+
|
|
|
+static int fcc_calculate (
|
|
|
+ struct totemsrp_instance *instance,
|
|
|
+ struct orf_token *token)
|
|
|
+{
|
|
|
+ unsigned int transmits_allowed;
|
|
|
+ unsigned int backlog_calc;
|
|
|
+
|
|
|
+ transmits_allowed = instance->totem_config->max_messages;
|
|
|
+
|
|
|
+ if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
|
|
|
+ transmits_allowed = instance->totem_config->window_size - token->fcc;
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Only do backlog calculation if there is a backlog otherwise
|
|
|
+ * we would result in div by zero
|
|
|
+ */
|
|
|
+ if (token->backlog + backlog_get (instance) - instance->my_pbl) {
|
|
|
+ backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
|
|
|
+ (token->backlog + backlog_get (instance) - instance->my_pbl);
|
|
|
+ if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
|
|
|
+ transmits_allowed = backlog_calc;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return (transmits_allowed);
|
|
|
+}
|
|
|
+
|
|
|
+static void fcc_token_update (
|
|
|
+ struct totemsrp_instance *instance,
|
|
|
+ struct orf_token *token,
|
|
|
+ unsigned int msgs_transmitted)
|
|
|
+{
|
|
|
+ token->fcc += msgs_transmitted - instance->my_trc;
|
|
|
+ token->backlog += backlog_get (instance) - instance->my_pbl;
|
|
|
+ instance->my_trc = msgs_transmitted;
|
|
|
+ instance->my_pbl = backlog_get (instance);
|
|
|
+}
|
|
|
+
|
|
|
/*
|
|
|
* Message Handlers
|
|
|
*/
|
|
|
@@ -2690,9 +2763,10 @@ static int message_handler_orf_token (
|
|
|
char token_storage[1500];
|
|
|
char token_convert[1500];
|
|
|
struct orf_token *token = NULL;
|
|
|
- int transmits_allowed;
|
|
|
int forward_token;
|
|
|
- int mcasted;
|
|
|
+ unsigned int transmits_allowed;
|
|
|
+ unsigned int mcasted_retransmit;
|
|
|
+ unsigned int mcasted_regular;
|
|
|
unsigned int last_aru;
|
|
|
unsigned int low_water;
|
|
|
|
|
|
@@ -2829,13 +2903,17 @@ static int message_handler_orf_token (
|
|
|
|
|
|
return (0); /* discard token */
|
|
|
}
|
|
|
- transmits_allowed = TRANSMITS_ALLOWED;
|
|
|
- mcasted = orf_token_rtr (instance, token, &transmits_allowed);
|
|
|
+
|
|
|
+ transmits_allowed = fcc_calculate (instance, token);
|
|
|
+ mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
|
|
|
|
|
|
if (sq_lt_compare (instance->last_released + MISSING_MCAST_WINDOW, token->seq + TRANSMITS_ALLOWED)) {
|
|
|
transmits_allowed = 0;
|
|
|
}
|
|
|
- mcasted = orf_token_mcast (instance, token, transmits_allowed, system_from);
|
|
|
+ mcasted_regular = orf_token_mcast (instance, token, transmits_allowed, system_from);
|
|
|
+ fcc_token_update (instance, token, mcasted_retransmit +
|
|
|
+ mcasted_regular);
|
|
|
+
|
|
|
if (sq_lt_compare (instance->my_aru, token->aru) ||
|
|
|
totemip_equal(&instance->my_id, &token->aru_addr) ||
|
|
|
totemip_zero_check(&token->aru_addr)) {
|
|
|
@@ -3409,6 +3487,7 @@ static void orf_token_endian_convert (struct orf_token *in, struct orf_token *ou
|
|
|
totemip_copy_endian_convert(&out->aru_addr, &in->aru_addr);
|
|
|
out->ring_id.seq = swab64 (in->ring_id.seq);
|
|
|
out->fcc = swab32 (in->fcc);
|
|
|
+ out->backlog = swab32 (in->backlog);
|
|
|
out->retrans_flg = swab32 (in->retrans_flg);
|
|
|
out->rtr_list_entries = swab32 (in->rtr_list_entries);
|
|
|
for (i = 0; i < out->rtr_list_entries; i++) {
|