Przeglądaj źródła

IPC: place calls to stats functions outside of mutexes

This is to prevent nasty deadlocks between IPC and objdb.

Signed-off-by: Angus Salkeld <asalkeld@redhat.com>
Reviewed-by: Steven Dake <sdake@redhat.com>
Angus Salkeld 15 lat temu
rodzic
commit
746f57d400
1 zmienionych plików z 36 dodań i 19 usunięć
  1. 36 19
      exec/coroipcs.c

+ 36 - 19
exec/coroipcs.c

@@ -356,12 +356,12 @@ circular_memory_unmap (void *buf, size_t bytes)
 	return (res);
 	return (res);
 }
 }
 
 
-static void flow_control_state_set (
+static int32_t flow_control_state_set (
 	struct conn_info *conn_info,
 	struct conn_info *conn_info,
 	int flow_control_state)
 	int flow_control_state)
 {
 {
 	if (conn_info->control_buffer->flow_control_enabled == flow_control_state) {
 	if (conn_info->control_buffer->flow_control_enabled == flow_control_state) {
-		return;
+		return 0;
 	}
 	}
 	if (flow_control_state == 0) {
 	if (flow_control_state == 0) {
 		log_printf (LOGSYS_LEVEL_DEBUG,
 		log_printf (LOGSYS_LEVEL_DEBUG,
@@ -374,14 +374,18 @@ static void flow_control_state_set (
 			conn_info->client_pid);
 			conn_info->client_pid);
 	}
 	}
 
 
-
 	conn_info->control_buffer->flow_control_enabled = flow_control_state;
 	conn_info->control_buffer->flow_control_enabled = flow_control_state;
-	api->stats_update_value (conn_info->stats_handle,
-		"flow_control",
-		&flow_control_state,
-		sizeof(flow_control_state));
-	api->stats_increment_value (conn_info->stats_handle,
-		"flow_control_count");
+	return 1;
+}
+
+static void flow_control_stats_update (
+	hdb_handle_t stats_handle,
+	int flow_control_state)
+{
+	uint32_t fc_state = flow_control_state;
+	api->stats_update_value (stats_handle, "flow_control",
+				 &fc_state, sizeof(fc_state));
+	api->stats_increment_value (stats_handle, "flow_control_count");
 }
 }
 
 
 static inline int zcb_free (struct zcb_mapped *zcb_mapped)
 static inline int zcb_free (struct zcb_mapped *zcb_mapped)
@@ -522,15 +526,15 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
 		return (0);
 		return (0);
 	}
 	}
 
 
-	api->serialize_lock ();
 	/*
 	/*
 	 * Retry library exit function if busy
 	 * Retry library exit function if busy
 	 */
 	 */
 	if (conn_info->state == CONN_STATE_THREAD_DESTROYED) {
 	if (conn_info->state == CONN_STATE_THREAD_DESTROYED) {
+		api->serialize_lock ();
 		res = api->exit_fn_get (conn_info->service) (conn_info);
 		res = api->exit_fn_get (conn_info->service) (conn_info);
+		api->serialize_unlock ();
 		api->stats_destroy_connection (conn_info->stats_handle);
 		api->stats_destroy_connection (conn_info->stats_handle);
 		if (res == -1) {
 		if (res == -1) {
-			api->serialize_unlock ();
 			return (0);
 			return (0);
 		} else {
 		} else {
 			conn_info->state = CONN_STATE_LIB_EXIT_CALLED;
 			conn_info->state = CONN_STATE_LIB_EXIT_CALLED;
@@ -540,7 +544,6 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
 	pthread_mutex_lock (&conn_info->mutex);
 	pthread_mutex_lock (&conn_info->mutex);
 	if (conn_info->refcount > 0) {
 	if (conn_info->refcount > 0) {
 		pthread_mutex_unlock (&conn_info->mutex);
 		pthread_mutex_unlock (&conn_info->mutex);
-		api->serialize_unlock ();
 		return (0);
 		return (0);
 	}
 	}
 	list_del (&conn_info->list);
 	list_del (&conn_info->list);
@@ -578,7 +581,6 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
 	res = circular_memory_unmap (conn_info->dispatch_buffer, conn_info->dispatch_size);
 	res = circular_memory_unmap (conn_info->dispatch_buffer, conn_info->dispatch_size);
 	zcb_all_free (conn_info);
 	zcb_all_free (conn_info);
 	api->free (conn_info);
 	api->free (conn_info);
-	api->serialize_unlock ();
 	return (-1);
 	return (-1);
 }
 }
 
 
@@ -721,8 +723,8 @@ static void *pthread_ipc_consumer (void *conn)
 				sizeof (coroipc_response_header_t));
 				sizeof (coroipc_response_header_t));
 		} else 
 		} else 
 		if (send_ok) {
 		if (send_ok) {
-			api->serialize_lock();
 			api->stats_increment_value (conn_info->stats_handle, "requests");
 			api->stats_increment_value (conn_info->stats_handle, "requests");
+			api->serialize_lock();
 			api->handler_fn_get (conn_info->service, header->id) (conn_info, header);
 			api->handler_fn_get (conn_info->service, header->id) (conn_info, header);
 			api->serialize_unlock();
 			api->serialize_unlock();
 		} else {
 		} else {
@@ -1266,8 +1268,6 @@ static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
 	}
 	}
 
 
 	ipc_sem_post (conn_info->control_buffer, SEMAPHORE_DISPATCH);
 	ipc_sem_post (conn_info->control_buffer, SEMAPHORE_DISPATCH);
-
-	api->stats_increment_value (conn_info->stats_handle, "dispatched");
 }
 }
 
 
 static void outq_flush (struct conn_info *conn_info) {
 static void outq_flush (struct conn_info *conn_info) {
@@ -1275,11 +1275,17 @@ static void outq_flush (struct conn_info *conn_info) {
 	struct outq_item *outq_item;
 	struct outq_item *outq_item;
 	unsigned int bytes_left;
 	unsigned int bytes_left;
 	struct iovec iov;
 	struct iovec iov;
+	int32_t q_size_dec = 0;
+	int32_t i;
+	int32_t fc_set;
 
 
 	pthread_mutex_lock (&conn_info->mutex);
 	pthread_mutex_lock (&conn_info->mutex);
 	if (list_empty (&conn_info->outq_head)) {
 	if (list_empty (&conn_info->outq_head)) {
-		flow_control_state_set (conn_info, 0);
+		fc_set = flow_control_state_set (conn_info, 0);
 		pthread_mutex_unlock (&conn_info->mutex);
 		pthread_mutex_unlock (&conn_info->mutex);
+		if (fc_set) {
+			flow_control_stats_update (conn_info->stats_handle, 0);
+		}
 		return;
 		return;
 	}
 	}
 	for (list = conn_info->outq_head.next;
 	for (list = conn_info->outq_head.next;
@@ -1295,12 +1301,20 @@ static void outq_flush (struct conn_info *conn_info) {
 			list_del (list);
 			list_del (list);
 			api->free (iov.iov_base);
 			api->free (iov.iov_base);
 			api->free (outq_item);
 			api->free (outq_item);
-			api->stats_decrement_value (conn_info->stats_handle, "queue_size");
+			q_size_dec++;
 		} else {
 		} else {
 			break;
 			break;
 		}
 		}
 	}
 	}
 	pthread_mutex_unlock (&conn_info->mutex);
 	pthread_mutex_unlock (&conn_info->mutex);
+
+	/*
+	 * these need to be sent out of the conn_info->mutex
+	 */
+	for (i = 0; i < q_size_dec; i++) {
+		api->stats_decrement_value (conn_info->stats_handle, "queue_size");
+		api->stats_increment_value (conn_info->stats_handle, "dispatched");
+	}
 }
 }
 
 
 static int priv_change (struct conn_info *conn_info)
 static int priv_change (struct conn_info *conn_info)
@@ -1374,7 +1388,9 @@ static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int
 		bytes_msg += iov[i].iov_len;
 		bytes_msg += iov[i].iov_len;
 	}
 	}
 	if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
 	if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
-		flow_control_state_set (conn_info, 1);
+		if (flow_control_state_set (conn_info, 1)) {
+			flow_control_stats_update(conn_info->stats_handle, 1);
+		}
 		outq_item = api->malloc (sizeof (struct outq_item));
 		outq_item = api->malloc (sizeof (struct outq_item));
 		if (outq_item == NULL) {
 		if (outq_item == NULL) {
 			ipc_disconnect (conn);
 			ipc_disconnect (conn);
@@ -1401,6 +1417,7 @@ static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int
 		return;
 		return;
 	}
 	}
 	msg_send (conn, iov, iov_len, MSG_SEND_LOCKED);
 	msg_send (conn, iov, iov_len, MSG_SEND_LOCKED);
+	api->stats_increment_value (conn_info->stats_handle, "dispatched");
 }
 }
 
 
 void coroipcs_refcount_inc (void *conn)
 void coroipcs_refcount_inc (void *conn)