Просмотр исходного кода

coroipcs: add logging for flow control state changes.


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@2551 fd59a12c-fef9-0310-b244-a6a79926bd2f
Angus Salkeld 16 лет назад
Родитель
Сommit
ce8046f353
4 измененных файлов с 62 добавлено и 13 удалено
  1. 44 10
      exec/coroipcs.c
  2. 10 0
      exec/main.c
  3. 5 0
      include/corosync/coroipc_ipc.h
  4. 3 3
      lib/coroipcc.c

+ 44 - 10
exec/coroipcs.c

@@ -139,6 +139,7 @@ struct conn_info {
 	unsigned int service;
 	unsigned int service;
 	enum conn_state state;
 	enum conn_state state;
 	int notify_flow_control_enabled;
 	int notify_flow_control_enabled;
+	int flow_control_state;
 	int refcount;
 	int refcount;
 	hdb_handle_t stats_handle;
 	hdb_handle_t stats_handle;
 #if _POSIX_THREAD_PROCESS_SHARED < 1
 #if _POSIX_THREAD_PROCESS_SHARED < 1
@@ -1188,6 +1189,36 @@ static void memcpy_dwrap (struct conn_info *conn_info, void *msg, unsigned int l
 	conn_info->control_buffer->write = (write_idx + len) % conn_info->dispatch_size;
 	conn_info->control_buffer->write = (write_idx + len) % conn_info->dispatch_size;
 }
 }
 
 
+/**
+ * simulate the behaviour in coroipcc.c
+ */
+static int flow_control_event_send (struct conn_info *conn_info, char event)
+{
+	int new_fc = 0;
+
+	if (event == MESSAGE_RES_OUTQ_NOT_EMPTY ||
+		event == MESSAGE_RES_ENABLE_FLOWCONTROL) {
+		new_fc = 1;
+	}
+
+	if (conn_info->flow_control_state != new_fc) {
+		if (new_fc == 1) {
+			log_printf (LOGSYS_LEVEL_INFO, "Enabling flow control for %d, event %d\n",
+				conn_info->client_pid, event);
+		} else {
+			log_printf (LOGSYS_LEVEL_INFO, "Disabling flow control for %d, event %d\n",
+				conn_info->client_pid, event);
+		}
+		conn_info->flow_control_state = new_fc;
+		api->stats_update_value (conn_info->stats_handle, "flow_control",
+			&conn_info->flow_control_state,
+			sizeof(conn_info->flow_control_state));
+		api->stats_increment_value (conn_info->stats_handle, "flow_control_count");
+	}
+
+	return send (conn_info->fd, &event, 1, MSG_NOSIGNAL);
+}
+
 static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
 static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
 		      int locked)
 		      int locked)
 {
 {
@@ -1197,14 +1228,16 @@ static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
 #endif
 #endif
 	int res;
 	int res;
 	int i;
 	int i;
-	char buf;
 
 
 	for (i = 0; i < iov_len; i++) {
 	for (i = 0; i < iov_len; i++) {
 		memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
 		memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
 	}
 	}
 
 
-	buf = !list_empty (&conn_info->outq_head);
-	res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+	if (list_empty (&conn_info->outq_head))
+		res = flow_control_event_send (conn_info, MESSAGE_RES_OUTQ_EMPTY);
+	else
+		res = flow_control_event_send (conn_info, MESSAGE_RES_OUTQ_NOT_EMPTY);
+
 	if (res == -1 && errno == EAGAIN) {
 	if (res == -1 && errno == EAGAIN) {
 		if (locked == 0) {
 		if (locked == 0) {
 			pthread_mutex_lock (&conn_info->mutex);
 			pthread_mutex_lock (&conn_info->mutex);
@@ -1244,13 +1277,11 @@ static void outq_flush (struct conn_info *conn_info) {
 	struct outq_item *outq_item;
 	struct outq_item *outq_item;
 	unsigned int bytes_left;
 	unsigned int bytes_left;
 	struct iovec iov;
 	struct iovec iov;
-	char buf;
 	int res;
 	int res;
 
 
 	pthread_mutex_lock (&conn_info->mutex);
 	pthread_mutex_lock (&conn_info->mutex);
 	if (list_empty (&conn_info->outq_head)) {
 	if (list_empty (&conn_info->outq_head)) {
-		buf = 3;
-		res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+		res = flow_control_event_send (conn_info, MESSAGE_RES_OUTQ_FLUSH_NR);
 		pthread_mutex_unlock (&conn_info->mutex);
 		pthread_mutex_unlock (&conn_info->mutex);
 		return;
 		return;
 	}
 	}
@@ -1692,9 +1723,13 @@ int coroipcs_handler_dispatch (
 	coroipcs_refcount_inc (conn_info);
 	coroipcs_refcount_inc (conn_info);
 	pthread_mutex_lock (&conn_info->mutex);
 	pthread_mutex_lock (&conn_info->mutex);
 	if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent & POLLOUT)) {
 	if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent & POLLOUT)) {
-		buf = !list_empty (&conn_info->outq_head);
+		if (list_empty (&conn_info->outq_head))
+			buf = MESSAGE_RES_OUTQ_EMPTY;
+		else
+			buf = MESSAGE_RES_OUTQ_NOT_EMPTY;
+
 		for (; conn_info->pending_semops;) {
 		for (; conn_info->pending_semops;) {
-			res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+			res = flow_control_event_send (conn_info, buf);
 			if (res == 1) {
 			if (res == 1) {
 				conn_info->pending_semops--;
 				conn_info->pending_semops--;
 			} else {
 			} else {
@@ -1702,8 +1737,7 @@ int coroipcs_handler_dispatch (
 			}
 			}
 		}
 		}
 		if (conn_info->notify_flow_control_enabled) {
 		if (conn_info->notify_flow_control_enabled) {
-			buf = 2;
-			res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+			res = flow_control_event_send (conn_info, MESSAGE_RES_ENABLE_FLOWCONTROL);
 			if (res == 1) {
 			if (res == 1) {
 				conn_info->notify_flow_control_enabled = 0;
 				conn_info->notify_flow_control_enabled = 0;
 			}
 			}

+ 10 - 0
exec/main.c

@@ -994,6 +994,16 @@ static hdb_handle_t corosync_stats_create_connection (const char* name,
 									&zero_64, sizeof (zero_64),
 									&zero_64, sizeof (zero_64),
 									OBJDB_VALUETYPE_UINT64);
 									OBJDB_VALUETYPE_UINT64);
 
 
+	objdb->object_key_create_typed (object_handle,
+		"flow_control",
+		&zero_32, sizeof (zero_32),
+		OBJDB_VALUETYPE_UINT32);
+
+	objdb->object_key_create_typed (object_handle,
+		"flow_control_count",
+		&zero_64, sizeof (zero_64),
+		OBJDB_VALUETYPE_UINT64);
+
 	return object_handle;
 	return object_handle;
 }
 }
 
 

+ 5 - 0
include/corosync/coroipc_ipc.h

@@ -62,6 +62,11 @@ enum req_init_types {
 #define MESSAGE_REQ_CHANGE_EUID		1
 #define MESSAGE_REQ_CHANGE_EUID		1
 #define MESSAGE_REQ_OUTQ_FLUSH		2
 #define MESSAGE_REQ_OUTQ_FLUSH		2
 
 
+#define MESSAGE_RES_OUTQ_EMPTY         0
+#define MESSAGE_RES_OUTQ_NOT_EMPTY     1
+#define MESSAGE_RES_ENABLE_FLOWCONTROL 2
+#define MESSAGE_RES_OUTQ_FLUSH_NR      3
+
 struct control_buffer {
 struct control_buffer {
 	unsigned int read;
 	unsigned int read;
 	unsigned int write;
 	unsigned int write;

+ 3 - 3
lib/coroipcc.c

@@ -849,7 +849,7 @@ coroipcc_dispatch_get (
 		goto error_put;
 		goto error_put;
 	}
 	}
 	ipc_instance->flow_control_state = 0;
 	ipc_instance->flow_control_state = 0;
-	if (buf == 1 || buf == 2) {
+	if (buf == MESSAGE_RES_OUTQ_NOT_EMPTY || buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
 		ipc_instance->flow_control_state = 1;
 		ipc_instance->flow_control_state = 1;
 	}
 	}
 	/*
 	/*
@@ -864,11 +864,11 @@ coroipcc_dispatch_get (
 	 * This is just a notification of flow control starting at the addition
 	 * This is just a notification of flow control starting at the addition
 	 * of a new pending message, not a message to dispatch
 	 * of a new pending message, not a message to dispatch
 	 */
 	 */
-	if (buf == 2) {
+	if (buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
 		error = CS_ERR_TRY_AGAIN;
 		error = CS_ERR_TRY_AGAIN;
 		goto error_put;
 		goto error_put;
 	}
 	}
-	if (buf == 3) {
+	if (buf == MESSAGE_RES_OUTQ_FLUSH_NR) {
 		error = CS_ERR_TRY_AGAIN;
 		error = CS_ERR_TRY_AGAIN;
 		goto error_put;
 		goto error_put;
 	}
 	}