|
@@ -95,6 +95,9 @@
|
|
|
#define MSG_SEND_LOCKED 0
|
|
#define MSG_SEND_LOCKED 0
|
|
|
#define MSG_SEND_UNLOCKED 1
|
|
#define MSG_SEND_UNLOCKED 1
|
|
|
|
|
|
|
|
|
|
+#define POLL_STATE_IN 1
|
|
|
|
|
+#define POLL_STATE_INOUT 2
|
|
|
|
|
+
|
|
|
static struct coroipcs_init_state_v2 *api = NULL;
|
|
static struct coroipcs_init_state_v2 *api = NULL;
|
|
|
|
|
|
|
|
DECLARE_LIST_INIT (conn_info_list_head);
|
|
DECLARE_LIST_INIT (conn_info_list_head);
|
|
@@ -141,13 +144,10 @@ struct conn_info {
|
|
|
pthread_attr_t thread_attr;
|
|
pthread_attr_t thread_attr;
|
|
|
unsigned int service;
|
|
unsigned int service;
|
|
|
enum conn_state state;
|
|
enum conn_state state;
|
|
|
- int notify_flow_control_enabled;
|
|
|
|
|
- int flow_control_state;
|
|
|
|
|
int refcount;
|
|
int refcount;
|
|
|
hdb_handle_t stats_handle;
|
|
hdb_handle_t stats_handle;
|
|
|
#if _POSIX_THREAD_PROCESS_SHARED < 1
|
|
#if _POSIX_THREAD_PROCESS_SHARED < 1
|
|
|
key_t semkey;
|
|
key_t semkey;
|
|
|
- int semid;
|
|
|
|
|
#endif
|
|
#endif
|
|
|
unsigned int pending_semops;
|
|
unsigned int pending_semops;
|
|
|
pthread_mutex_t mutex;
|
|
pthread_mutex_t mutex;
|
|
@@ -166,6 +166,7 @@ struct conn_info {
|
|
|
unsigned int setup_bytes_read;
|
|
unsigned int setup_bytes_read;
|
|
|
struct list_head zcb_mapped_list_head;
|
|
struct list_head zcb_mapped_list_head;
|
|
|
char *sending_allowed_private_data[64];
|
|
char *sending_allowed_private_data[64];
|
|
|
|
|
+ int poll_state;
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info);
|
|
static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info);
|
|
@@ -221,34 +222,6 @@ static void dummy_stats_increment_value (
|
|
|
{
|
|
{
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-static void sem_post_exit_thread (struct conn_info *conn_info)
|
|
|
|
|
-{
|
|
|
|
|
-#if _POSIX_THREAD_PROCESS_SHARED < 1
|
|
|
|
|
- struct sembuf sop;
|
|
|
|
|
-#endif
|
|
|
|
|
- int res;
|
|
|
|
|
-
|
|
|
|
|
-#if _POSIX_THREAD_PROCESS_SHARED > 0
|
|
|
|
|
-retry_semop:
|
|
|
|
|
- res = sem_post (&conn_info->control_buffer->sem0);
|
|
|
|
|
- if (res == -1 && errno == EINTR) {
|
|
|
|
|
- api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
|
|
|
|
|
- goto retry_semop;
|
|
|
|
|
- }
|
|
|
|
|
-#else
|
|
|
|
|
- sop.sem_num = 0;
|
|
|
|
|
- sop.sem_op = 1;
|
|
|
|
|
- sop.sem_flg = 0;
|
|
|
|
|
-
|
|
|
|
|
-retry_semop:
|
|
|
|
|
- res = semop (conn_info->semid, &sop, 1);
|
|
|
|
|
- if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
|
|
|
|
|
- api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
|
|
|
|
|
- goto retry_semop;
|
|
|
|
|
- }
|
|
|
|
|
-#endif
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
static int
|
|
static int
|
|
|
memory_map (
|
|
memory_map (
|
|
|
const char *path,
|
|
const char *path,
|
|
@@ -383,6 +356,34 @@ circular_memory_unmap (void *buf, size_t bytes)
|
|
|
return (res);
|
|
return (res);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+static void flow_control_state_set (
|
|
|
|
|
+ struct conn_info *conn_info,
|
|
|
|
|
+ int flow_control_state)
|
|
|
|
|
+{
|
|
|
|
|
+ if (conn_info->control_buffer->flow_control_enabled == flow_control_state) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ if (flow_control_state == 0) {
|
|
|
|
|
+ log_printf (LOGSYS_LEVEL_DEBUG,
|
|
|
|
|
+ "Disabling flow control for %d\n",
|
|
|
|
|
+ conn_info->client_pid);
|
|
|
|
|
+ } else
|
|
|
|
|
+ if (flow_control_state == 1) {
|
|
|
|
|
+ log_printf (LOGSYS_LEVEL_DEBUG,
|
|
|
|
|
+ "Enabling flow control for %d\n",
|
|
|
|
|
+ conn_info->client_pid);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ conn_info->control_buffer->flow_control_enabled = flow_control_state;
|
|
|
|
|
+ api->stats_update_value (conn_info->stats_handle,
|
|
|
|
|
+ "flow_control",
|
|
|
|
|
+ &flow_control_state,
|
|
|
|
|
+ sizeof(flow_control_state));
|
|
|
|
|
+ api->stats_increment_value (conn_info->stats_handle,
|
|
|
|
|
+ "flow_control_count");
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
static inline int zcb_free (struct zcb_mapped *zcb_mapped)
|
|
static inline int zcb_free (struct zcb_mapped *zcb_mapped)
|
|
|
{
|
|
{
|
|
|
unsigned int res;
|
|
unsigned int res;
|
|
@@ -517,7 +518,7 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
|
|
if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
|
|
|
- sem_post_exit_thread (conn_info);
|
|
|
|
|
|
|
+ ipc_sem_post (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
|
|
|
return (0);
|
|
return (0);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -546,11 +547,12 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
|
|
|
pthread_mutex_unlock (&conn_info->mutex);
|
|
pthread_mutex_unlock (&conn_info->mutex);
|
|
|
|
|
|
|
|
#if _POSIX_THREAD_PROCESS_SHARED > 0
|
|
#if _POSIX_THREAD_PROCESS_SHARED > 0
|
|
|
- sem_destroy (&conn_info->control_buffer->sem0);
|
|
|
|
|
- sem_destroy (&conn_info->control_buffer->sem1);
|
|
|
|
|
- sem_destroy (&conn_info->control_buffer->sem2);
|
|
|
|
|
|
|
+ sem_destroy (&conn_info->control_buffer->sem_request_or_flush_or_exit);
|
|
|
|
|
+ sem_destroy (&conn_info->control_buffer->sem_request);
|
|
|
|
|
+ sem_destroy (&conn_info->control_buffer->sem_response);
|
|
|
|
|
+ sem_destroy (&conn_info->control_buffer->sem_dispatch);
|
|
|
#else
|
|
#else
|
|
|
- semctl (conn_info->semid, 0, IPC_RMID);
|
|
|
|
|
|
|
+ semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
|
|
|
#endif
|
|
#endif
|
|
|
/*
|
|
/*
|
|
|
* Destroy shared memory segment and semaphore
|
|
* Destroy shared memory segment and semaphore
|
|
@@ -653,14 +655,12 @@ static inline void zerocopy_operations_process (
|
|
|
static void *pthread_ipc_consumer (void *conn)
|
|
static void *pthread_ipc_consumer (void *conn)
|
|
|
{
|
|
{
|
|
|
struct conn_info *conn_info = (struct conn_info *)conn;
|
|
struct conn_info *conn_info = (struct conn_info *)conn;
|
|
|
-#if _POSIX_THREAD_PROCESS_SHARED < 1
|
|
|
|
|
- struct sembuf sop;
|
|
|
|
|
-#endif
|
|
|
|
|
int res;
|
|
int res;
|
|
|
coroipc_request_header_t *header;
|
|
coroipc_request_header_t *header;
|
|
|
coroipc_response_header_t coroipc_response_header;
|
|
coroipc_response_header_t coroipc_response_header;
|
|
|
int send_ok;
|
|
int send_ok;
|
|
|
unsigned int new_message;
|
|
unsigned int new_message;
|
|
|
|
|
+ int sem_value = 0;
|
|
|
|
|
|
|
|
#if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX)
|
|
#if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX)
|
|
|
if (api->sched_policy != 0) {
|
|
if (api->sched_policy != 0) {
|
|
@@ -670,43 +670,28 @@ static void *pthread_ipc_consumer (void *conn)
|
|
|
#endif
|
|
#endif
|
|
|
|
|
|
|
|
for (;;) {
|
|
for (;;) {
|
|
|
-#if _POSIX_THREAD_PROCESS_SHARED > 0
|
|
|
|
|
-retry_semwait:
|
|
|
|
|
- res = sem_wait (&conn_info->control_buffer->sem0);
|
|
|
|
|
|
|
+ ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
|
|
|
if (ipc_thread_active (conn_info) == 0) {
|
|
if (ipc_thread_active (conn_info) == 0) {
|
|
|
coroipcs_refcount_dec (conn_info);
|
|
coroipcs_refcount_dec (conn_info);
|
|
|
pthread_exit (0);
|
|
pthread_exit (0);
|
|
|
}
|
|
}
|
|
|
- if ((res == -1) && (errno == EINTR)) {
|
|
|
|
|
- api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
|
|
|
|
|
- goto retry_semwait;
|
|
|
|
|
- }
|
|
|
|
|
-#else
|
|
|
|
|
|
|
|
|
|
- sop.sem_num = 0;
|
|
|
|
|
- sop.sem_op = -1;
|
|
|
|
|
- sop.sem_flg = 0;
|
|
|
|
|
-retry_semop:
|
|
|
|
|
- res = semop (conn_info->semid, &sop, 1);
|
|
|
|
|
- if (ipc_thread_active (conn_info) == 0) {
|
|
|
|
|
- coroipcs_refcount_dec (conn_info);
|
|
|
|
|
- pthread_exit (0);
|
|
|
|
|
- }
|
|
|
|
|
- if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
|
|
|
|
|
- api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
|
|
|
|
|
- goto retry_semop;
|
|
|
|
|
- } else
|
|
|
|
|
- if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
|
|
|
|
|
- coroipcs_refcount_dec (conn_info);
|
|
|
|
|
- pthread_exit (0);
|
|
|
|
|
- }
|
|
|
|
|
-#endif
|
|
|
|
|
|
|
+ outq_flush (conn_info);
|
|
|
|
|
|
|
|
|
|
+ ipc_sem_getvalue (conn_info->control_buffer, SEMAPHORE_REQUEST, &sem_value);
|
|
|
|
|
+ if (sem_value > 0) {
|
|
|
|
|
+
|
|
|
|
|
+ res = ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
zerocopy_operations_process (conn_info, &header, &new_message);
|
|
zerocopy_operations_process (conn_info, &header, &new_message);
|
|
|
/*
|
|
/*
|
|
|
* There is no new message to process, continue for loop
|
|
* There is no new message to process, continue for loop
|
|
|
*/
|
|
*/
|
|
|
if (new_message == 0) {
|
|
if (new_message == 0) {
|
|
|
|
|
+printf ("continuing\n");
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -738,7 +723,6 @@ retry_semop:
|
|
|
/*
|
|
/*
|
|
|
* Overload, tell library to retry
|
|
* Overload, tell library to retry
|
|
|
*/
|
|
*/
|
|
|
- api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
|
|
|
|
|
coroipc_response_header.size = sizeof (coroipc_response_header_t);
|
|
coroipc_response_header.size = sizeof (coroipc_response_header_t);
|
|
|
coroipc_response_header.id = 0;
|
|
coroipc_response_header.id = 0;
|
|
|
coroipc_response_header.error = CS_ERR_TRY_AGAIN;
|
|
coroipc_response_header.error = CS_ERR_TRY_AGAIN;
|
|
@@ -928,7 +912,7 @@ static void ipc_disconnect (struct conn_info *conn_info)
|
|
|
conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
|
|
conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
|
|
|
pthread_mutex_unlock (&conn_info->mutex);
|
|
pthread_mutex_unlock (&conn_info->mutex);
|
|
|
|
|
|
|
|
- sem_post_exit_thread (conn_info);
|
|
|
|
|
|
|
+ ipc_sem_post (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
static int conn_info_create (int fd)
|
|
static int conn_info_create (int fd)
|
|
@@ -945,6 +929,7 @@ static int conn_info_create (int fd)
|
|
|
conn_info->client_pid = 0;
|
|
conn_info->client_pid = 0;
|
|
|
conn_info->service = SOCKET_SERVICE_INIT;
|
|
conn_info->service = SOCKET_SERVICE_INIT;
|
|
|
conn_info->state = CONN_STATE_THREAD_INACTIVE;
|
|
conn_info->state = CONN_STATE_THREAD_INACTIVE;
|
|
|
|
|
+ conn_info->poll_state = POLL_STATE_IN;
|
|
|
list_init (&conn_info->outq_head);
|
|
list_init (&conn_info->outq_head);
|
|
|
list_init (&conn_info->list);
|
|
list_init (&conn_info->list);
|
|
|
list_init (&conn_info->zcb_mapped_list_head);
|
|
list_init (&conn_info->zcb_mapped_list_head);
|
|
@@ -1103,11 +1088,12 @@ void coroipcs_ipc_exit (void)
|
|
|
ipc_disconnect (conn_info);
|
|
ipc_disconnect (conn_info);
|
|
|
|
|
|
|
|
#if _POSIX_THREAD_PROCESS_SHARED > 0
|
|
#if _POSIX_THREAD_PROCESS_SHARED > 0
|
|
|
- sem_destroy (&conn_info->control_buffer->sem0);
|
|
|
|
|
- sem_destroy (&conn_info->control_buffer->sem1);
|
|
|
|
|
- sem_destroy (&conn_info->control_buffer->sem2);
|
|
|
|
|
|
|
+ sem_destroy (&conn_info->control_buffer->sem_request_or_flush_or_exit);
|
|
|
|
|
+ sem_destroy (&conn_info->control_buffer->sem_request);
|
|
|
|
|
+ sem_destroy (&conn_info->control_buffer->sem_response);
|
|
|
|
|
+ sem_destroy (&conn_info->control_buffer->sem_dispatch);
|
|
|
#else
|
|
#else
|
|
|
- semctl (conn_info->semid, 0, IPC_RMID);
|
|
|
|
|
|
|
+ semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
|
|
|
#endif
|
|
#endif
|
|
|
|
|
|
|
|
/*
|
|
/*
|
|
@@ -1181,33 +1167,11 @@ void *coroipcs_private_data_get (void *conn)
|
|
|
int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
|
|
int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
|
|
|
{
|
|
{
|
|
|
struct conn_info *conn_info = (struct conn_info *)conn;
|
|
struct conn_info *conn_info = (struct conn_info *)conn;
|
|
|
-#if _POSIX_THREAD_PROCESS_SHARED < 1
|
|
|
|
|
- struct sembuf sop;
|
|
|
|
|
-#endif
|
|
|
|
|
- int res;
|
|
|
|
|
|
|
|
|
|
memcpy (conn_info->response_buffer, msg, mlen);
|
|
memcpy (conn_info->response_buffer, msg, mlen);
|
|
|
|
|
|
|
|
-#if _POSIX_THREAD_PROCESS_SHARED > 0
|
|
|
|
|
- res = sem_post (&conn_info->control_buffer->sem1);
|
|
|
|
|
- if (res == -1) {
|
|
|
|
|
- return (-1);
|
|
|
|
|
- }
|
|
|
|
|
-#else
|
|
|
|
|
- sop.sem_num = 1;
|
|
|
|
|
- sop.sem_op = 1;
|
|
|
|
|
- sop.sem_flg = 0;
|
|
|
|
|
-
|
|
|
|
|
-retry_semop:
|
|
|
|
|
- res = semop (conn_info->semid, &sop, 1);
|
|
|
|
|
- if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
|
|
|
|
|
- api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
|
|
|
|
|
- goto retry_semop;
|
|
|
|
|
- } else
|
|
|
|
|
- if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
|
|
|
|
|
- return (0);
|
|
|
|
|
- }
|
|
|
|
|
-#endif
|
|
|
|
|
|
|
+ ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
|
|
|
|
|
+
|
|
|
api->stats_increment_value (conn_info->stats_handle, "responses");
|
|
api->stats_increment_value (conn_info->stats_handle, "responses");
|
|
|
return (0);
|
|
return (0);
|
|
|
}
|
|
}
|
|
@@ -1215,10 +1179,6 @@ retry_semop:
|
|
|
int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
|
|
int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
|
|
|
{
|
|
{
|
|
|
struct conn_info *conn_info = (struct conn_info *)conn;
|
|
struct conn_info *conn_info = (struct conn_info *)conn;
|
|
|
-#if _POSIX_THREAD_PROCESS_SHARED < 1
|
|
|
|
|
- struct sembuf sop;
|
|
|
|
|
-#endif
|
|
|
|
|
- int res;
|
|
|
|
|
int write_idx = 0;
|
|
int write_idx = 0;
|
|
|
int i;
|
|
int i;
|
|
|
|
|
|
|
@@ -1228,26 +1188,8 @@ int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned in
|
|
|
write_idx += iov[i].iov_len;
|
|
write_idx += iov[i].iov_len;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-#if _POSIX_THREAD_PROCESS_SHARED > 0
|
|
|
|
|
- res = sem_post (&conn_info->control_buffer->sem1);
|
|
|
|
|
- if (res == -1) {
|
|
|
|
|
- return (-1);
|
|
|
|
|
- }
|
|
|
|
|
-#else
|
|
|
|
|
- sop.sem_num = 1;
|
|
|
|
|
- sop.sem_op = 1;
|
|
|
|
|
- sop.sem_flg = 0;
|
|
|
|
|
-
|
|
|
|
|
-retry_semop:
|
|
|
|
|
- res = semop (conn_info->semid, &sop, 1);
|
|
|
|
|
- if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
|
|
|
|
|
- api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
|
|
|
|
|
- goto retry_semop;
|
|
|
|
|
- } else
|
|
|
|
|
- if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
|
|
|
|
|
- return (0);
|
|
|
|
|
- }
|
|
|
|
|
-#endif
|
|
|
|
|
|
|
+ ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
|
|
|
|
|
+
|
|
|
api->stats_increment_value (conn_info->stats_handle, "responses");
|
|
api->stats_increment_value (conn_info->stats_handle, "responses");
|
|
|
return (0);
|
|
return (0);
|
|
|
}
|
|
}
|
|
@@ -1283,86 +1225,31 @@ static void memcpy_dwrap (struct conn_info *conn_info, void *msg, unsigned int l
|
|
|
conn_info->control_buffer->write = (write_idx + len) % conn_info->dispatch_size;
|
|
conn_info->control_buffer->write = (write_idx + len) % conn_info->dispatch_size;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-/**
|
|
|
|
|
- * simulate the behaviour in coroipcc.c
|
|
|
|
|
- */
|
|
|
|
|
-static int flow_control_event_send (struct conn_info *conn_info, char event)
|
|
|
|
|
-{
|
|
|
|
|
- int new_fc = 0;
|
|
|
|
|
-
|
|
|
|
|
- if (event == MESSAGE_RES_OUTQ_NOT_EMPTY ||
|
|
|
|
|
- event == MESSAGE_RES_ENABLE_FLOWCONTROL) {
|
|
|
|
|
- new_fc = 1;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- if (conn_info->flow_control_state != new_fc) {
|
|
|
|
|
- if (new_fc == 1) {
|
|
|
|
|
- log_printf (LOGSYS_LEVEL_DEBUG, "Enabling flow control for %d, event %d\n",
|
|
|
|
|
- conn_info->client_pid, event);
|
|
|
|
|
- } else {
|
|
|
|
|
- log_printf (LOGSYS_LEVEL_DEBUG, "Disabling flow control for %d, event %d\n",
|
|
|
|
|
- conn_info->client_pid, event);
|
|
|
|
|
- }
|
|
|
|
|
- conn_info->flow_control_state = new_fc;
|
|
|
|
|
- api->stats_update_value (conn_info->stats_handle, "flow_control",
|
|
|
|
|
- &conn_info->flow_control_state,
|
|
|
|
|
- sizeof(conn_info->flow_control_state));
|
|
|
|
|
- api->stats_increment_value (conn_info->stats_handle, "flow_control_count");
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- return send (conn_info->fd, &event, 1, MSG_NOSIGNAL);
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
|
|
static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
|
|
|
int locked)
|
|
int locked)
|
|
|
{
|
|
{
|
|
|
struct conn_info *conn_info = (struct conn_info *)conn;
|
|
struct conn_info *conn_info = (struct conn_info *)conn;
|
|
|
-#if _POSIX_THREAD_PROCESS_SHARED < 1
|
|
|
|
|
- struct sembuf sop;
|
|
|
|
|
-#endif
|
|
|
|
|
int res;
|
|
int res;
|
|
|
int i;
|
|
int i;
|
|
|
|
|
+ char buf;
|
|
|
|
|
|
|
|
for (i = 0; i < iov_len; i++) {
|
|
for (i = 0; i < iov_len; i++) {
|
|
|
memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
|
|
memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if (list_empty (&conn_info->outq_head))
|
|
|
|
|
- res = flow_control_event_send (conn_info, MESSAGE_RES_OUTQ_EMPTY);
|
|
|
|
|
- else
|
|
|
|
|
- res = flow_control_event_send (conn_info, MESSAGE_RES_OUTQ_NOT_EMPTY);
|
|
|
|
|
-
|
|
|
|
|
- if (res == -1 && errno == EAGAIN) {
|
|
|
|
|
- if (locked == 0) {
|
|
|
|
|
- pthread_mutex_lock (&conn_info->mutex);
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ buf = list_empty (&conn_info->outq_head);
|
|
|
|
|
+ res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
|
|
|
|
|
+ if (res != 1) {
|
|
|
conn_info->pending_semops += 1;
|
|
conn_info->pending_semops += 1;
|
|
|
- if (locked == 0) {
|
|
|
|
|
- pthread_mutex_unlock (&conn_info->mutex);
|
|
|
|
|
|
|
+ if (conn_info->poll_state == POLL_STATE_IN) {
|
|
|
|
|
+ conn_info->poll_state = POLL_STATE_INOUT;
|
|
|
|
|
+ api->poll_dispatch_modify (conn_info->fd,
|
|
|
|
|
+ POLLIN|POLLOUT|POLLNVAL);
|
|
|
}
|
|
}
|
|
|
- api->poll_dispatch_modify (conn_info->fd,
|
|
|
|
|
- POLLIN|POLLOUT|POLLNVAL);
|
|
|
|
|
- } else
|
|
|
|
|
- if (res == -1) {
|
|
|
|
|
- ipc_disconnect (conn_info);
|
|
|
|
|
- }
|
|
|
|
|
-#if _POSIX_THREAD_PROCESS_SHARED > 0
|
|
|
|
|
- res = sem_post (&conn_info->control_buffer->sem2);
|
|
|
|
|
-#else
|
|
|
|
|
- sop.sem_num = 2;
|
|
|
|
|
- sop.sem_op = 1;
|
|
|
|
|
- sop.sem_flg = 0;
|
|
|
|
|
-
|
|
|
|
|
-retry_semop:
|
|
|
|
|
- res = semop (conn_info->semid, &sop, 1);
|
|
|
|
|
- if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
|
|
|
|
|
- api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
|
|
|
|
|
- goto retry_semop;
|
|
|
|
|
- } else
|
|
|
|
|
- if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
|
|
|
|
|
- return;
|
|
|
|
|
}
|
|
}
|
|
|
-#endif
|
|
|
|
|
|
|
+
|
|
|
|
|
+ ipc_sem_post (conn_info->control_buffer, SEMAPHORE_DISPATCH);
|
|
|
|
|
+
|
|
|
api->stats_increment_value (conn_info->stats_handle, "dispatched");
|
|
api->stats_increment_value (conn_info->stats_handle, "dispatched");
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -1371,11 +1258,10 @@ static void outq_flush (struct conn_info *conn_info) {
|
|
|
struct outq_item *outq_item;
|
|
struct outq_item *outq_item;
|
|
|
unsigned int bytes_left;
|
|
unsigned int bytes_left;
|
|
|
struct iovec iov;
|
|
struct iovec iov;
|
|
|
- int res;
|
|
|
|
|
|
|
|
|
|
pthread_mutex_lock (&conn_info->mutex);
|
|
pthread_mutex_lock (&conn_info->mutex);
|
|
|
if (list_empty (&conn_info->outq_head)) {
|
|
if (list_empty (&conn_info->outq_head)) {
|
|
|
- res = flow_control_event_send (conn_info, MESSAGE_RES_OUTQ_FLUSH_NR);
|
|
|
|
|
|
|
+ flow_control_state_set (conn_info, 0);
|
|
|
pthread_mutex_unlock (&conn_info->mutex);
|
|
pthread_mutex_unlock (&conn_info->mutex);
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
@@ -1441,7 +1327,7 @@ retry_recv:
|
|
|
semun.buf = &ipc_set;
|
|
semun.buf = &ipc_set;
|
|
|
|
|
|
|
|
for (i = 0; i < 3; i++) {
|
|
for (i = 0; i < 3; i++) {
|
|
|
- res = semctl (conn_info->semid, 0, IPC_SET, semun);
|
|
|
|
|
|
|
+ res = semctl (conn_info->control_buffer->semid, 0, IPC_SET, semun);
|
|
|
if (res == -1) {
|
|
if (res == -1) {
|
|
|
return (-1);
|
|
return (-1);
|
|
|
}
|
|
}
|
|
@@ -1471,6 +1357,7 @@ static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int
|
|
|
bytes_msg += iov[i].iov_len;
|
|
bytes_msg += iov[i].iov_len;
|
|
|
}
|
|
}
|
|
|
if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
|
|
if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
|
|
|
|
|
+ flow_control_state_set (conn_info, 1);
|
|
|
outq_item = api->malloc (sizeof (struct outq_item));
|
|
outq_item = api->malloc (sizeof (struct outq_item));
|
|
|
if (outq_item == NULL) {
|
|
if (outq_item == NULL) {
|
|
|
ipc_disconnect (conn);
|
|
ipc_disconnect (conn);
|
|
@@ -1491,11 +1378,6 @@ static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int
|
|
|
outq_item->mlen = bytes_msg;
|
|
outq_item->mlen = bytes_msg;
|
|
|
list_init (&outq_item->list);
|
|
list_init (&outq_item->list);
|
|
|
pthread_mutex_lock (&conn_info->mutex);
|
|
pthread_mutex_lock (&conn_info->mutex);
|
|
|
- if (list_empty (&conn_info->outq_head)) {
|
|
|
|
|
- conn_info->notify_flow_control_enabled = 1;
|
|
|
|
|
- api->poll_dispatch_modify (conn_info->fd,
|
|
|
|
|
- POLLIN|POLLOUT|POLLNVAL);
|
|
|
|
|
- }
|
|
|
|
|
list_add_tail (&outq_item->list, &conn_info->outq_head);
|
|
list_add_tail (&outq_item->list, &conn_info->outq_head);
|
|
|
pthread_mutex_unlock (&conn_info->mutex);
|
|
pthread_mutex_unlock (&conn_info->mutex);
|
|
|
api->stats_increment_value (conn_info->stats_handle, "queue_size");
|
|
api->stats_increment_value (conn_info->stats_handle, "queue_size");
|
|
@@ -1742,11 +1624,10 @@ int coroipcs_handler_dispatch (
|
|
|
|
|
|
|
|
conn_info->service = req_setup->service;
|
|
conn_info->service = req_setup->service;
|
|
|
conn_info->refcount = 0;
|
|
conn_info->refcount = 0;
|
|
|
- conn_info->notify_flow_control_enabled = 0;
|
|
|
|
|
conn_info->setup_bytes_read = 0;
|
|
conn_info->setup_bytes_read = 0;
|
|
|
|
|
|
|
|
#if _POSIX_THREAD_PROCESS_SHARED < 1
|
|
#if _POSIX_THREAD_PROCESS_SHARED < 1
|
|
|
- conn_info->semid = semget (conn_info->semkey, 3, 0600);
|
|
|
|
|
|
|
+ conn_info->control_buffer->semid = semget (conn_info->semkey, 3, 0600);
|
|
|
#endif
|
|
#endif
|
|
|
conn_info->pending_semops = 0;
|
|
conn_info->pending_semops = 0;
|
|
|
|
|
|
|
@@ -1794,9 +1675,6 @@ int coroipcs_handler_dispatch (
|
|
|
res = recv (fd, &buf, 1, MSG_NOSIGNAL);
|
|
res = recv (fd, &buf, 1, MSG_NOSIGNAL);
|
|
|
if (res == 1) {
|
|
if (res == 1) {
|
|
|
switch (buf) {
|
|
switch (buf) {
|
|
|
- case MESSAGE_REQ_OUTQ_FLUSH:
|
|
|
|
|
- outq_flush (conn_info);
|
|
|
|
|
- break;
|
|
|
|
|
case MESSAGE_REQ_CHANGE_EUID:
|
|
case MESSAGE_REQ_CHANGE_EUID:
|
|
|
if (priv_change (conn_info) == -1) {
|
|
if (priv_change (conn_info) == -1) {
|
|
|
ipc_disconnect (conn_info);
|
|
ipc_disconnect (conn_info);
|
|
@@ -1820,37 +1698,24 @@ int coroipcs_handler_dispatch (
|
|
|
coroipcs_refcount_dec (conn_info);
|
|
coroipcs_refcount_dec (conn_info);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- coroipcs_refcount_inc (conn_info);
|
|
|
|
|
- pthread_mutex_lock (&conn_info->mutex);
|
|
|
|
|
- if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent & POLLOUT)) {
|
|
|
|
|
- if (list_empty (&conn_info->outq_head))
|
|
|
|
|
- buf = MESSAGE_RES_OUTQ_EMPTY;
|
|
|
|
|
- else
|
|
|
|
|
- buf = MESSAGE_RES_OUTQ_NOT_EMPTY;
|
|
|
|
|
|
|
+ if (revent & POLLOUT) {
|
|
|
|
|
+ int psop = conn_info->pending_semops;
|
|
|
|
|
+ int i;
|
|
|
|
|
|
|
|
- for (; conn_info->pending_semops;) {
|
|
|
|
|
- res = flow_control_event_send (conn_info, buf);
|
|
|
|
|
- if (res == 1) {
|
|
|
|
|
- conn_info->pending_semops--;
|
|
|
|
|
|
|
+ assert (psop != 0);
|
|
|
|
|
+ for (i = 0; i < psop; i++) {
|
|
|
|
|
+ res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
|
|
|
|
|
+ if (res != 1) {
|
|
|
|
|
+ return (0);
|
|
|
} else {
|
|
} else {
|
|
|
- break;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- if (conn_info->notify_flow_control_enabled) {
|
|
|
|
|
- res = flow_control_event_send (conn_info, MESSAGE_RES_ENABLE_FLOWCONTROL);
|
|
|
|
|
- if (res == 1) {
|
|
|
|
|
- conn_info->notify_flow_control_enabled = 0;
|
|
|
|
|
|
|
+ conn_info->pending_semops -= 1;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- if (conn_info->notify_flow_control_enabled == 0 &&
|
|
|
|
|
- conn_info->pending_semops == 0) {
|
|
|
|
|
-
|
|
|
|
|
- api->poll_dispatch_modify (conn_info->fd,
|
|
|
|
|
- POLLIN|POLLNVAL);
|
|
|
|
|
|
|
+ if (conn_info->poll_state == POLL_STATE_INOUT) {
|
|
|
|
|
+ conn_info->poll_state = POLL_STATE_IN;
|
|
|
|
|
+ api->poll_dispatch_modify (conn_info->fd, POLLIN|POLLNVAL);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- pthread_mutex_unlock (&conn_info->mutex);
|
|
|
|
|
- coroipcs_refcount_dec (conn_info);
|
|
|
|
|
|
|
|
|
|
return (0);
|
|
return (0);
|
|
|
}
|
|
}
|