Explorar o código

Improve behavior of IPC flow control for CPG service during configuration
changes.


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1301 fd59a12c-fef9-0310-b244-a6a79926bd2f

Steven Dake %!s(int64=19) %!d(string=hai) anos
pai
achega
9db9cb1263
Modificáronse 2 ficheiros con 71 adicións e 31 borrados
  1. 40 8
      exec/flow.c
  2. 31 23
      exec/ipc.c

+ 40 - 8
exec/flow.c

@@ -34,9 +34,11 @@
 
 
 /*
 /*
  * New messages are allowed from the library ONLY when the processor has not
  * New messages are allowed from the library ONLY when the processor has not
- * received a OPENAIS_FLOW_CONTROL_STATE_ENABLED from any processor.  If a OPENAIS_FLOW_CONTROL_STATE_ENABLED
- * message is sent, it must later be cancelled by a OPENAIS_FLOW_CONTROL_STATE_DISABLED
- * message.
+ * received a OPENAIS_FLOW_CONTROL_STATE_ENABLED from any processor.  If a
+ * OPENAIS_FLOW_CONTROL_STATE_ENABLED message is sent, it must later be
+ *  cancelled by a OPENAIS_FLOW_CONTROL_STATE_DISABLED message.  A configuration
+ * change with the flow controlled processor leaving the configuration will
+ * also cancel flow control.
  */
  */
 
 
 #include <stdio.h>
 #include <stdio.h>
@@ -51,8 +53,6 @@
 #include "hdb.h"
 #include "hdb.h"
 #include "../include/list.h"
 #include "../include/list.h"
 
 
-#define OPENAIS_FLOW_CONTROL_ENABLED_SERVICES_MAX 128
-
 struct flow_control_instance {
 struct flow_control_instance {
 	struct list_head list_head;
 	struct list_head list_head;
 	unsigned int service;
 	unsigned int service;
@@ -180,8 +180,10 @@ static void flow_control_confchg_fn (
 	struct memb_ring_id *ring_id)
 	struct memb_ring_id *ring_id)
 {
 {
 	unsigned int i;
 	unsigned int i;
+	unsigned int j;
 	struct flow_control_service *flow_control_service;
 	struct flow_control_service *flow_control_service;
 	struct list_head *list;
 	struct list_head *list;
+	struct flow_control_node_state flow_control_node_state_temp[PROCESSOR_COUNT_MAX];
 
 
 	memcpy (flow_control_member_list, member_list,
 	memcpy (flow_control_member_list, member_list,
 		sizeof (unsigned int) * member_list_entries);
 		sizeof (unsigned int) * member_list_entries);
@@ -193,16 +195,46 @@ static void flow_control_confchg_fn (
 
 
 		flow_control_service = list_entry (list, struct flow_control_service, list_all);
 		flow_control_service = list_entry (list, struct flow_control_service, list_all);
 
 
+		/*
+		 * Generate temporary flow control node state information
+		 */
+		for (i = 0; i < member_list_entries; i++) {
+			flow_control_node_state_temp[i].nodeid = member_list[i];
+			flow_control_node_state_temp[i].flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED;
+
+			/*
+			 * Determine if previous state was set for this processor
+			 * if so keep that setting
+			 */
+			for (j = 0; j < flow_control_service->processor_count; j++) {
+				if (flow_control_service->flow_control_node_state[j].nodeid == member_list[i]) {
+					flow_control_node_state_temp[i].flow_control_state =
+						flow_control_service->flow_control_node_state[j].flow_control_state;
+					break; /* from for */
+				}
+			}
+		}
+
+		/*
+		 * Copy temporary node state information to node state information
+		 */
+		memcpy (flow_control_service->flow_control_node_state,
+			flow_control_node_state_temp,
+			sizeof (struct flow_control_node_state) * member_list_entries);
+
 		/*
 		/*
 		 * Set all of the node ids after a configuration change
 		 * Set all of the node ids after a configuration change
 		 * Turn on all flow control after a configuration change
 		 * Turn on all flow control after a configuration change
 		 */
 		 */
 		flow_control_service->processor_count = flow_control_member_list_entries;
 		flow_control_service->processor_count = flow_control_member_list_entries;
-		flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED;
+		flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_DISABLED;
 		for (i = 0; i < member_list_entries; i++) {
 		for (i = 0; i < member_list_entries; i++) {
-			flow_control_service->flow_control_node_state[i].nodeid = member_list[i];
-			flow_control_service->flow_control_node_state[i].flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED;
+			if (flow_control_service->flow_control_node_state[j].flow_control_state == OPENAIS_FLOW_CONTROL_STATE_DISABLED) {
+				flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED;
+				flow_control_service->flow_control_state_set_fn (flow_control_service->context, flow_control_service->flow_control_state);
+			}
 		}
 		}
+
 	}
 	}
 } 
 } 
 /*
 /*

+ 31 - 23
exec/ipc.c

@@ -558,31 +558,53 @@ static void ipc_flow_control (struct conn_info *conn_info)
 {
 {
 	unsigned int entries_used;
 	unsigned int entries_used;
 	unsigned int entries_usedhw;
 	unsigned int entries_usedhw;
+	unsigned int flow_control_local_count;
+	unsigned int fcc;
 
 
+	/*
+	 * Determine FCC variable and printing variables
+	 */
 	entries_used = queue_used (&conn_info->outq);
 	entries_used = queue_used (&conn_info->outq);
-	if (conn_info->flow_control_local_count > entries_used) {
-		entries_used = conn_info->flow_control_local_count;
+	if (conn_info->conn_info_partner &&
+		queue_used (&conn_info->conn_info_partner->outq) > entries_used) {
+		entries_used = queue_used (&conn_info->conn_info_partner->outq);
+	}
+	entries_usedhw = queue_usedhw (&conn_info->outq);
+	if (conn_info->conn_info_partner &&
+		queue_usedhw (&conn_info->conn_info_partner->outq) > entries_used) {
+		entries_usedhw = queue_usedhw (&conn_info->conn_info_partner->outq);
+	}
+	flow_control_local_count = conn_info->flow_control_local_count;
+	if (conn_info->conn_info_partner &&
+		conn_info->conn_info_partner->flow_control_local_count > flow_control_local_count) {
+		flow_control_local_count = conn_info->conn_info_partner->flow_control_local_count;
+	}
+
+	fcc = entries_used;
+	if (flow_control_local_count > fcc) {
+		fcc = flow_control_local_count;
 	}
 	}
 	/*
 	/*
 	 * IPC group-wide flow control
 	 * IPC group-wide flow control
 	 */
 	 */
 	if (conn_info->flow_control == OPENAIS_FLOW_CONTROL_REQUIRED) {
 	if (conn_info->flow_control == OPENAIS_FLOW_CONTROL_REQUIRED) {
 		if (conn_info->flow_control_enabled == 0 &&
 		if (conn_info->flow_control_enabled == 0 &&
-			((entries_used + FLOW_CONTROL_ENTRIES_ENABLE) > SIZEQUEUE)) {
+			((fcc + FLOW_CONTROL_ENTRIES_ENABLE) > SIZEQUEUE)) {
 
 
-			entries_usedhw = queue_usedhw (&conn_info->outq);
-			log_printf (LOG_LEVEL_NOTICE, "Enabling flow control - HW mark %d of %d %p.\n", entries_usedhw, SIZEQUEUE, &conn_info->outq);
+			log_printf (LOG_LEVEL_NOTICE, "Enabling flow control [%d/%d] - [%d].\n",
+				entries_usedhw, SIZEQUEUE,
+				flow_control_local_count);
 			openais_flow_control_enable (conn_info->flow_control_handle);
 			openais_flow_control_enable (conn_info->flow_control_handle);
 			conn_info->flow_control_enabled = 1;
 			conn_info->flow_control_enabled = 1;
 			conn_info->conn_info_partner->flow_control_enabled = 1;
 			conn_info->conn_info_partner->flow_control_enabled = 1;
 		}
 		}
 		if (conn_info->flow_control_enabled == 1 &&
 		if (conn_info->flow_control_enabled == 1 &&
 
 
-			entries_used <= FLOW_CONTROL_ENTRIES_DISABLE) {
-			entries_usedhw = queue_usedhw (&conn_info->outq);
+			fcc <= FLOW_CONTROL_ENTRIES_DISABLE) {
 
 
-			log_printf (LOG_LEVEL_NOTICE, "Disabling flow control - HW mark [%d/%d].\n",
-				entries_usedhw, SIZEQUEUE);
+			log_printf (LOG_LEVEL_NOTICE, "Disabling flow control [%d/%d] - [%d].\n",
+				entries_usedhw, SIZEQUEUE,
+				flow_control_local_count);
 			openais_flow_control_disable (conn_info->flow_control_handle);
 			openais_flow_control_disable (conn_info->flow_control_handle);
 			conn_info->flow_control_enabled = 0;
 			conn_info->flow_control_enabled = 0;
 			conn_info->conn_info_partner->flow_control_enabled = 0;
 			conn_info->conn_info_partner->flow_control_enabled = 0;
@@ -919,20 +941,6 @@ static void ipc_confchg_fn (
 	unsigned int *joined_list, int joined_list_entries,
 	unsigned int *joined_list, int joined_list_entries,
 	struct memb_ring_id *ring_id)
 	struct memb_ring_id *ring_id)
 {
 {
-	struct conn_info *conn_info;
-	struct list_head *list;
-
-	/*
-	 * Turn on flow control enabled flag for all connections
-	 */
-	for (list = conn_info_list_head.next;
-		list != &conn_info_list_head;
-		list = list->next) {
-
-		conn_info = list_entry (list, struct conn_info, list);
-		conn_info->flow_control_enabled = 1;
-		conn_info->conn_info_partner->flow_control_enabled = 1;
-	}
 }
 }
 
 
 void openais_ipc_init (
 void openais_ipc_init (