|
|
@@ -356,12 +356,12 @@ circular_memory_unmap (void *buf, size_t bytes)
|
|
|
return (res);
|
|
|
}
|
|
|
|
|
|
-static void flow_control_state_set (
|
|
|
+static int32_t flow_control_state_set (
|
|
|
struct conn_info *conn_info,
|
|
|
int flow_control_state)
|
|
|
{
|
|
|
if (conn_info->control_buffer->flow_control_enabled == flow_control_state) {
|
|
|
- return;
|
|
|
+ return 0;
|
|
|
}
|
|
|
if (flow_control_state == 0) {
|
|
|
log_printf (LOGSYS_LEVEL_DEBUG,
|
|
|
@@ -374,14 +374,18 @@ static void flow_control_state_set (
|
|
|
conn_info->client_pid);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
conn_info->control_buffer->flow_control_enabled = flow_control_state;
|
|
|
- api->stats_update_value (conn_info->stats_handle,
|
|
|
- "flow_control",
|
|
|
- &flow_control_state,
|
|
|
- sizeof(flow_control_state));
|
|
|
- api->stats_increment_value (conn_info->stats_handle,
|
|
|
- "flow_control_count");
|
|
|
+ return 1;
|
|
|
+}
|
|
|
+
|
|
|
+static void flow_control_stats_update (
|
|
|
+ hdb_handle_t stats_handle,
|
|
|
+ int flow_control_state)
|
|
|
+{
|
|
|
+ uint32_t fc_state = flow_control_state;
|
|
|
+ api->stats_update_value (stats_handle, "flow_control",
|
|
|
+ &fc_state, sizeof(fc_state));
|
|
|
+ api->stats_increment_value (stats_handle, "flow_control_count");
|
|
|
}
|
|
|
|
|
|
static inline int zcb_free (struct zcb_mapped *zcb_mapped)
|
|
|
@@ -522,15 +526,15 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
|
|
|
return (0);
|
|
|
}
|
|
|
|
|
|
- api->serialize_lock ();
|
|
|
/*
|
|
|
* Retry library exit function if busy
|
|
|
*/
|
|
|
if (conn_info->state == CONN_STATE_THREAD_DESTROYED) {
|
|
|
+ api->serialize_lock ();
|
|
|
res = api->exit_fn_get (conn_info->service) (conn_info);
|
|
|
+ api->serialize_unlock ();
|
|
|
api->stats_destroy_connection (conn_info->stats_handle);
|
|
|
if (res == -1) {
|
|
|
- api->serialize_unlock ();
|
|
|
return (0);
|
|
|
} else {
|
|
|
conn_info->state = CONN_STATE_LIB_EXIT_CALLED;
|
|
|
@@ -540,7 +544,6 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
|
|
|
pthread_mutex_lock (&conn_info->mutex);
|
|
|
if (conn_info->refcount > 0) {
|
|
|
pthread_mutex_unlock (&conn_info->mutex);
|
|
|
- api->serialize_unlock ();
|
|
|
return (0);
|
|
|
}
|
|
|
list_del (&conn_info->list);
|
|
|
@@ -578,7 +581,6 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
|
|
|
res = circular_memory_unmap (conn_info->dispatch_buffer, conn_info->dispatch_size);
|
|
|
zcb_all_free (conn_info);
|
|
|
api->free (conn_info);
|
|
|
- api->serialize_unlock ();
|
|
|
return (-1);
|
|
|
}
|
|
|
|
|
|
@@ -721,8 +723,8 @@ static void *pthread_ipc_consumer (void *conn)
|
|
|
sizeof (coroipc_response_header_t));
|
|
|
} else
|
|
|
if (send_ok) {
|
|
|
- api->serialize_lock();
|
|
|
api->stats_increment_value (conn_info->stats_handle, "requests");
|
|
|
+ api->serialize_lock();
|
|
|
api->handler_fn_get (conn_info->service, header->id) (conn_info, header);
|
|
|
api->serialize_unlock();
|
|
|
} else {
|
|
|
@@ -1266,8 +1268,6 @@ static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
|
|
|
}
|
|
|
|
|
|
ipc_sem_post (conn_info->control_buffer, SEMAPHORE_DISPATCH);
|
|
|
-
|
|
|
- api->stats_increment_value (conn_info->stats_handle, "dispatched");
|
|
|
}
|
|
|
|
|
|
static void outq_flush (struct conn_info *conn_info) {
|
|
|
@@ -1275,11 +1275,17 @@ static void outq_flush (struct conn_info *conn_info) {
|
|
|
struct outq_item *outq_item;
|
|
|
unsigned int bytes_left;
|
|
|
struct iovec iov;
|
|
|
+ int32_t q_size_dec = 0;
|
|
|
+ int32_t i;
|
|
|
+ int32_t fc_set;
|
|
|
|
|
|
pthread_mutex_lock (&conn_info->mutex);
|
|
|
if (list_empty (&conn_info->outq_head)) {
|
|
|
- flow_control_state_set (conn_info, 0);
|
|
|
+ fc_set = flow_control_state_set (conn_info, 0);
|
|
|
pthread_mutex_unlock (&conn_info->mutex);
|
|
|
+ if (fc_set) {
|
|
|
+ flow_control_stats_update (conn_info->stats_handle, 0);
|
|
|
+ }
|
|
|
return;
|
|
|
}
|
|
|
for (list = conn_info->outq_head.next;
|
|
|
@@ -1295,12 +1301,20 @@ static void outq_flush (struct conn_info *conn_info) {
|
|
|
list_del (list);
|
|
|
api->free (iov.iov_base);
|
|
|
api->free (outq_item);
|
|
|
- api->stats_decrement_value (conn_info->stats_handle, "queue_size");
|
|
|
+ q_size_dec++;
|
|
|
} else {
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
pthread_mutex_unlock (&conn_info->mutex);
|
|
|
+
|
|
|
+ /*
|
|
|
+ * these need to be sent out of the conn_info->mutex
|
|
|
+ */
|
|
|
+ for (i = 0; i < q_size_dec; i++) {
|
|
|
+ api->stats_decrement_value (conn_info->stats_handle, "queue_size");
|
|
|
+ api->stats_increment_value (conn_info->stats_handle, "dispatched");
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static int priv_change (struct conn_info *conn_info)
|
|
|
@@ -1374,7 +1388,9 @@ static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int
|
|
|
bytes_msg += iov[i].iov_len;
|
|
|
}
|
|
|
if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
|
|
|
- flow_control_state_set (conn_info, 1);
|
|
|
+ if (flow_control_state_set (conn_info, 1)) {
|
|
|
+ flow_control_stats_update(conn_info->stats_handle, 1);
|
|
|
+ }
|
|
|
outq_item = api->malloc (sizeof (struct outq_item));
|
|
|
if (outq_item == NULL) {
|
|
|
ipc_disconnect (conn);
|
|
|
@@ -1401,6 +1417,7 @@ static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int
|
|
|
return;
|
|
|
}
|
|
|
msg_send (conn, iov, iov_len, MSG_SEND_LOCKED);
|
|
|
+ api->stats_increment_value (conn_info->stats_handle, "dispatched");
|
|
|
}
|
|
|
|
|
|
void coroipcs_refcount_inc (void *conn)
|