Просмотр исходного кода

Patch to ensure backlogs are never negative which would cause problems with
the flow control algorithm and protocol in general.


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1006 fd59a12c-fef9-0310-b244-a6a79926bd2f

Steven Dake 20 лет назад
Родитель
Сommit
8f281418a9
1 измененных файлов с 35 добавлено и 16 удалено
  1. 35 16
      exec/totemsrp.c

+ 35 - 16
exec/totemsrp.c

@@ -1,6 +1,6 @@
 /*
  * Copyright (c) 2003-2006 MontaVista Software, Inc.
- * Copyright (c) 2006 RedHat, Inc.
+ * Copyright (c) 2006 Red Hat, Inc.
  *
  * All rights reserved.
  *
@@ -88,7 +88,6 @@
 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX		500 /* allow 500 messages to be queued */
 #define MAXIOVS					5	
 #define RETRANSMIT_ENTRIES_MAX			30
-#define MISSING_MCAST_WINDOW			128
 
 /*
  * Rollover handling:
@@ -285,8 +284,6 @@ struct totemsrp_instance {
 
 	int fcc_mcast_last;
 
-	int fcc_mcast_current;
-
 	int fcc_remcast_current;
 
 	struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX];
@@ -472,6 +469,8 @@ struct totemsrp_instance {
 	unsigned int my_trc;
 
 	unsigned int my_pbl;
+
+	unsigned int my_cbl;
 };
 
 struct message_handlers {
@@ -1538,6 +1537,7 @@ static void memb_state_commit_enter (
 	 */
 	instance->my_trc = 0;
 	instance->my_pbl = 0;
+	instance->my_cbl = 0;
 	return;
 }
 
@@ -2003,6 +2003,7 @@ static int orf_token_mcast (
 	struct sort_queue_item sort_queue_item;
 	struct sort_queue_item *sort_queue_item_ptr;
 	struct mcast *mcast;
+	unsigned int fcc_mcast_current;
 
 	if (instance->memb_state == MEMB_STATE_RECOVERY) {
 		mcast_queue = &instance->retrans_message_queue;
@@ -2013,7 +2014,7 @@ static int orf_token_mcast (
 		sort_queue = &instance->regular_sort_queue;
 	}
 
-	for (instance->fcc_mcast_current = 0; instance->fcc_mcast_current <= fcc_mcasts_allowed; instance->fcc_mcast_current++) {
+	for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
 		if (queue_is_empty (mcast_queue)) {
 			break;
 		}
@@ -2066,19 +2067,17 @@ static int orf_token_mcast (
 		queue_item_remove (mcast_queue);
 	}
 
-
-	assert (instance->fcc_mcast_current < 100);
-
 	/*
 	 * If messages mcasted, deliver any new messages to totempg
 	 */
 	instance->my_high_seq_received = token->seq;
 		
 	update_aru (instance);
+
 	/*
 	 * Return 1 if more messages are available for single node clusters
 	 */
-	return (instance->fcc_mcast_current);
+	return (fcc_mcast_current);
 }
 
 /*
@@ -2771,13 +2770,15 @@ static int fcc_calculate (
 		transmits_allowed = instance->totem_config->window_size - token->fcc;
 	}
 
+	instance->my_cbl = backlog_get (instance);
+
 	/*
 	 * Only do backlog calculation if there is a backlog otherwise
 	 * we would result in div by zero
 	 */
-	if (token->backlog + backlog_get (instance) - instance->my_pbl) {
+	if (token->backlog + instance->my_cbl - instance->my_pbl) {
 		backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
-			(token->backlog + backlog_get (instance) - instance->my_pbl);
+			(token->backlog + instance->my_cbl - instance->my_pbl);
 		if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
 			transmits_allowed = backlog_calc;
 		}
@@ -2786,15 +2787,35 @@ static int fcc_calculate (
 	return (transmits_allowed);
 }
 
+/*
+ * don't overflow the RTR sort queue
+ */
+static void fcc_rtr_limit (
+	struct totemsrp_instance *instance,
+	struct orf_token *token,
+	int *transmits_allowed)
+{
+	assert ((QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed - instance->totem_config->window_size) >= 0);
+	if (sq_lt_compare (instance->last_released +
+		QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed -
+		instance->totem_config->window_size,
+
+			token->seq)) {
+
+			*transmits_allowed = 0;
+	}
+}
+
 static void fcc_token_update (
 	struct totemsrp_instance *instance,
 	struct orf_token *token,
 	unsigned int msgs_transmitted)
 {
 	token->fcc += msgs_transmitted - instance->my_trc;
-	token->backlog += backlog_get (instance) - instance->my_pbl;
+	token->backlog += instance->my_cbl - instance->my_pbl;
+	assert (token->backlog >= 0);
 	instance->my_trc = msgs_transmitted;
-	instance->my_pbl = backlog_get (instance);
+	instance->my_pbl = instance->my_cbl;
 }
 
 /*
@@ -2964,9 +2985,7 @@ static int message_handler_orf_token (
 		transmits_allowed = fcc_calculate (instance, token);
 		mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
 
-		if (sq_lt_compare (instance->last_released + MISSING_MCAST_WINDOW, token->seq + TRANSMITS_ALLOWED)) {
-			transmits_allowed = 0;
-		}
+		fcc_rtr_limit (instance, token, &transmits_allowed);
 		mcasted_regular = orf_token_mcast (instance, token, transmits_allowed, system_from);
 		fcc_token_update (instance, token, mcasted_retransmit +
 			mcasted_regular);