Sfoglia il codice sorgente

Merge trunk revision 3001:
r3001 | sdake | 2010-07-22 03:03:36 +1000 (Thu, 22 Jul 2010) | 3 lines
Fix problem where flow control could lock up ipc under very heavy load in very
rare circumstances.



git-svn-id: http://svn.fedorahosted.org/svn/corosync/branches/flatiron@3003 fd59a12c-fef9-0310-b244-a6a79926bd2f

Angus Salkeld 15 anni fa
parent
commit
d4e36ec7e2
3 ha cambiato i file con 370 aggiunte e 429 eliminazioni
  1. 88 223
      exec/coroipcs.c
  2. 191 8
      include/corosync/coroipc_ipc.h
  3. 91 198
      lib/coroipcc.c

+ 88 - 223
exec/coroipcs.c

@@ -95,6 +95,9 @@
 #define MSG_SEND_LOCKED		0
 #define MSG_SEND_UNLOCKED	1
 
+#define POLL_STATE_IN		1
+#define POLL_STATE_INOUT	2
+
 static struct coroipcs_init_state_v2 *api = NULL;
 
 DECLARE_LIST_INIT (conn_info_list_head);
@@ -141,13 +144,10 @@ struct conn_info {
 	pthread_attr_t thread_attr;
 	unsigned int service;
 	enum conn_state state;
-	int notify_flow_control_enabled;
-	int flow_control_state;
 	int refcount;
 	hdb_handle_t stats_handle;
 #if _POSIX_THREAD_PROCESS_SHARED < 1
 	key_t semkey;
-	int semid;
 #endif
 	unsigned int pending_semops;
 	pthread_mutex_t mutex;
@@ -166,6 +166,7 @@ struct conn_info {
 	unsigned int setup_bytes_read;
 	struct list_head zcb_mapped_list_head;
 	char *sending_allowed_private_data[64];
+	int poll_state;
 };
 
 static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info);
@@ -221,34 +222,6 @@ static void dummy_stats_increment_value (
 {
 }
 
-static void sem_post_exit_thread (struct conn_info *conn_info)
-{
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-	struct sembuf sop;
-#endif
-	int res;
-
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-retry_semop:
-	res = sem_post (&conn_info->control_buffer->sem0);
-	if (res == -1 && errno == EINTR) {
-		api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
-		goto retry_semop;
-	}
-#else
-	sop.sem_num = 0;
-	sop.sem_op = 1;
-	sop.sem_flg = 0;
-
-retry_semop:
-	res = semop (conn_info->semid, &sop, 1);
-	if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
-		api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
-		goto retry_semop;
-	}
-#endif
-}
-
 static int
 memory_map (
 	const char *path,
@@ -383,6 +356,34 @@ circular_memory_unmap (void *buf, size_t bytes)
 	return (res);
 }
 
+static void flow_control_state_set (
+	struct conn_info *conn_info,
+	int flow_control_state)
+{
+	if (conn_info->control_buffer->flow_control_enabled == flow_control_state) {
+		return;
+	}
+	if (flow_control_state == 0) {
+		log_printf (LOGSYS_LEVEL_DEBUG,
+			"Disabling flow control for %d\n",
+			conn_info->client_pid);
+	} else
+	if (flow_control_state == 1) {
+		log_printf (LOGSYS_LEVEL_DEBUG,
+			"Enabling flow control for %d\n",
+			conn_info->client_pid);
+	}
+
+
+	conn_info->control_buffer->flow_control_enabled = flow_control_state;
+	api->stats_update_value (conn_info->stats_handle,
+		"flow_control",
+		&flow_control_state,
+		sizeof(flow_control_state));
+	api->stats_increment_value (conn_info->stats_handle,
+		"flow_control_count");
+}
+
 static inline int zcb_free (struct zcb_mapped *zcb_mapped)
 {
 	unsigned int res;
@@ -517,7 +518,7 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
 	}
 
 	if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
-		sem_post_exit_thread (conn_info);
+		ipc_sem_post (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
 		return (0);
 	}
 
@@ -546,11 +547,12 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
 	pthread_mutex_unlock (&conn_info->mutex);
 
 #if _POSIX_THREAD_PROCESS_SHARED > 0
-	sem_destroy (&conn_info->control_buffer->sem0);
-	sem_destroy (&conn_info->control_buffer->sem1);
-	sem_destroy (&conn_info->control_buffer->sem2);
+	sem_destroy (&conn_info->control_buffer->sem_request_or_flush_or_exit);
+	sem_destroy (&conn_info->control_buffer->sem_request);
+	sem_destroy (&conn_info->control_buffer->sem_response);
+	sem_destroy (&conn_info->control_buffer->sem_dispatch);
 #else
-	semctl (conn_info->semid, 0, IPC_RMID);
+	semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
 #endif
 	/*
 	 * Destroy shared memory segment and semaphore
@@ -653,14 +655,12 @@ static inline void zerocopy_operations_process (
 static void *pthread_ipc_consumer (void *conn)
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-	struct sembuf sop;
-#endif
 	int res;
 	coroipc_request_header_t *header;
 	coroipc_response_header_t coroipc_response_header;
 	int send_ok;
 	unsigned int new_message;
+	int sem_value = 0;
 
 #if defined(HAVE_PTHREAD_SETSCHEDPARAM) && defined(HAVE_SCHED_GET_PRIORITY_MAX)
 	if (api->sched_policy != 0) {
@@ -670,43 +670,28 @@ static void *pthread_ipc_consumer (void *conn)
 #endif
 
 	for (;;) {
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-retry_semwait:
-		res = sem_wait (&conn_info->control_buffer->sem0);
+		ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
 		if (ipc_thread_active (conn_info) == 0) {
 			coroipcs_refcount_dec (conn_info);
 			pthread_exit (0);
 		}
-		if ((res == -1) && (errno == EINTR)) {
-			api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
-			goto retry_semwait;
-		}
-#else
 
-		sop.sem_num = 0;
-		sop.sem_op = -1;
-		sop.sem_flg = 0;
-retry_semop:
-		res = semop (conn_info->semid, &sop, 1);
-		if (ipc_thread_active (conn_info) == 0) {
-			coroipcs_refcount_dec (conn_info);
-			pthread_exit (0);
-		}
-		if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
-			api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
-			goto retry_semop;
-		} else
-		if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
-			coroipcs_refcount_dec (conn_info);
-			pthread_exit (0);
-		}
-#endif
+		outq_flush (conn_info);
 
+		ipc_sem_getvalue (conn_info->control_buffer, SEMAPHORE_REQUEST, &sem_value);
+		if (sem_value > 0) {
+		
+			res = ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST);
+		} else {
+			continue;
+		}
+	
 		zerocopy_operations_process (conn_info, &header, &new_message);
 		/*
 		 * There is no new message to process, continue for loop
 		 */
 		if (new_message == 0) {
+printf ("continuing\n");
 			continue;
 		}
 
@@ -738,7 +723,6 @@ retry_semop:
 			/*
 			 * Overload, tell library to retry
 			 */
-			api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
 			coroipc_response_header.size = sizeof (coroipc_response_header_t);
 			coroipc_response_header.id = 0;
 			coroipc_response_header.error = CS_ERR_TRY_AGAIN;
@@ -928,7 +912,7 @@ static void ipc_disconnect (struct conn_info *conn_info)
 	conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
 	pthread_mutex_unlock (&conn_info->mutex);
 
-	sem_post_exit_thread (conn_info);
+	ipc_sem_post (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
 }
 
 static int conn_info_create (int fd)
@@ -945,6 +929,7 @@ static int conn_info_create (int fd)
 	conn_info->client_pid = 0;
 	conn_info->service = SOCKET_SERVICE_INIT;
 	conn_info->state = CONN_STATE_THREAD_INACTIVE;
+	conn_info->poll_state = POLL_STATE_IN;
 	list_init (&conn_info->outq_head);
 	list_init (&conn_info->list);
 	list_init (&conn_info->zcb_mapped_list_head);
@@ -1103,11 +1088,12 @@ void coroipcs_ipc_exit (void)
 		ipc_disconnect (conn_info);
 
 #if _POSIX_THREAD_PROCESS_SHARED > 0
-		sem_destroy (&conn_info->control_buffer->sem0);
-		sem_destroy (&conn_info->control_buffer->sem1);
-		sem_destroy (&conn_info->control_buffer->sem2);
+		sem_destroy (&conn_info->control_buffer->sem_request_or_flush_or_exit);
+		sem_destroy (&conn_info->control_buffer->sem_request);
+		sem_destroy (&conn_info->control_buffer->sem_response);
+		sem_destroy (&conn_info->control_buffer->sem_dispatch);
 #else
-		semctl (conn_info->semid, 0, IPC_RMID);
+		semctl (conn_info->control_buffer->semid, 0, IPC_RMID);
 #endif
 
 		/*
@@ -1181,33 +1167,11 @@ void *coroipcs_private_data_get (void *conn)
 int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-	struct sembuf sop;
-#endif
-	int res;
 
 	memcpy (conn_info->response_buffer, msg, mlen);
 
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-	res = sem_post (&conn_info->control_buffer->sem1);
-	if (res == -1) {
-		return (-1);
-	}
-#else
-	sop.sem_num = 1;
-	sop.sem_op = 1;
-	sop.sem_flg = 0;
-
-retry_semop:
-	res = semop (conn_info->semid, &sop, 1);
-	if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
-		api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
-		goto retry_semop;
-	} else
-	if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
-		return (0);
-	}
-#endif
+	ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
+
 	api->stats_increment_value (conn_info->stats_handle, "responses");
 	return (0);
 }
@@ -1215,10 +1179,6 @@ retry_semop:
 int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-	struct sembuf sop;
-#endif
-	int res;
 	int write_idx = 0;
 	int i;
 
@@ -1228,26 +1188,8 @@ int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned in
 		write_idx += iov[i].iov_len;
 	}
 
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-	res = sem_post (&conn_info->control_buffer->sem1);
-	if (res == -1) {
-		return (-1);
-	}
-#else
-	sop.sem_num = 1;
-	sop.sem_op = 1;
-	sop.sem_flg = 0;
-
-retry_semop:
-	res = semop (conn_info->semid, &sop, 1);
-	if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
-		api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
-		goto retry_semop;
-	} else
-	if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
-		return (0);
-	}
-#endif
+	ipc_sem_post (conn_info->control_buffer, SEMAPHORE_RESPONSE);
+
 	api->stats_increment_value (conn_info->stats_handle, "responses");
 	return (0);
 }
@@ -1283,86 +1225,31 @@ static void memcpy_dwrap (struct conn_info *conn_info, void *msg, unsigned int l
 	conn_info->control_buffer->write = (write_idx + len) % conn_info->dispatch_size;
 }
 
-/**
- * simulate the behaviour in coroipcc.c
- */
-static int flow_control_event_send (struct conn_info *conn_info, char event)
-{
-	int new_fc = 0;
-
-	if (event == MESSAGE_RES_OUTQ_NOT_EMPTY ||
-		event == MESSAGE_RES_ENABLE_FLOWCONTROL) {
-		new_fc = 1;
-	}
-
-	if (conn_info->flow_control_state != new_fc) {
-		if (new_fc == 1) {
-			log_printf (LOGSYS_LEVEL_DEBUG, "Enabling flow control for %d, event %d\n",
-				conn_info->client_pid, event);
-		} else {
-			log_printf (LOGSYS_LEVEL_DEBUG, "Disabling flow control for %d, event %d\n",
-				conn_info->client_pid, event);
-		}
-		conn_info->flow_control_state = new_fc;
-		api->stats_update_value (conn_info->stats_handle, "flow_control",
-			&conn_info->flow_control_state,
-			sizeof(conn_info->flow_control_state));
-		api->stats_increment_value (conn_info->stats_handle, "flow_control_count");
-	}
-
-	return send (conn_info->fd, &event, 1, MSG_NOSIGNAL);
-}
-
 static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
 		      int locked)
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-	struct sembuf sop;
-#endif
 	int res;
 	int i;
+	char buf;
 
 	for (i = 0; i < iov_len; i++) {
 		memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
 	}
 
-	if (list_empty (&conn_info->outq_head))
-		res = flow_control_event_send (conn_info, MESSAGE_RES_OUTQ_EMPTY);
-	else
-		res = flow_control_event_send (conn_info, MESSAGE_RES_OUTQ_NOT_EMPTY);
-
-	if (res == -1 && errno == EAGAIN) {
-		if (locked == 0) {
-			pthread_mutex_lock (&conn_info->mutex);
-		}
+	buf = list_empty (&conn_info->outq_head);
+	res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+	if (res != 1) {
 		conn_info->pending_semops += 1;
-		if (locked == 0) {
-			pthread_mutex_unlock (&conn_info->mutex);
+		if (conn_info->poll_state == POLL_STATE_IN) {
+			conn_info->poll_state = POLL_STATE_INOUT;
+			api->poll_dispatch_modify (conn_info->fd,
+				POLLIN|POLLOUT|POLLNVAL);
 		}
-		api->poll_dispatch_modify (conn_info->fd,
-			POLLIN|POLLOUT|POLLNVAL);
-	} else
-	if (res == -1) {
-		ipc_disconnect (conn_info);
-	}
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-	res = sem_post (&conn_info->control_buffer->sem2);
-#else
-	sop.sem_num = 2;
-	sop.sem_op = 1;
-	sop.sem_flg = 0;
-
-retry_semop:
-	res = semop (conn_info->semid, &sop, 1);
-	if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
-		api->stats_increment_value (conn_info->stats_handle, "sem_retry_count");
-		goto retry_semop;
-	} else
-	if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
-		return;
 	}
-#endif
+
+	ipc_sem_post (conn_info->control_buffer, SEMAPHORE_DISPATCH);
+
 	api->stats_increment_value (conn_info->stats_handle, "dispatched");
 }
 
@@ -1371,11 +1258,10 @@ static void outq_flush (struct conn_info *conn_info) {
 	struct outq_item *outq_item;
 	unsigned int bytes_left;
 	struct iovec iov;
-	int res;
 
 	pthread_mutex_lock (&conn_info->mutex);
 	if (list_empty (&conn_info->outq_head)) {
-		res = flow_control_event_send (conn_info, MESSAGE_RES_OUTQ_FLUSH_NR);
+		flow_control_state_set (conn_info, 0);
 		pthread_mutex_unlock (&conn_info->mutex);
 		return;
 	}
@@ -1441,7 +1327,7 @@ retry_recv:
 	semun.buf = &ipc_set;
 
 	for (i = 0; i < 3; i++) {
-		res = semctl (conn_info->semid, 0, IPC_SET, semun);
+		res = semctl (conn_info->control_buffer->semid, 0, IPC_SET, semun);
 		if (res == -1) {
 			return (-1);
 		}
@@ -1471,6 +1357,7 @@ static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int
 		bytes_msg += iov[i].iov_len;
 	}
 	if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
+		flow_control_state_set (conn_info, 1);
 		outq_item = api->malloc (sizeof (struct outq_item));
 		if (outq_item == NULL) {
 			ipc_disconnect (conn);
@@ -1491,11 +1378,6 @@ static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int
 		outq_item->mlen = bytes_msg;
 		list_init (&outq_item->list);
 		pthread_mutex_lock (&conn_info->mutex);
-		if (list_empty (&conn_info->outq_head)) {
-			conn_info->notify_flow_control_enabled = 1;
-			api->poll_dispatch_modify (conn_info->fd,
-				POLLIN|POLLOUT|POLLNVAL);
-		}
 		list_add_tail (&outq_item->list, &conn_info->outq_head);
 		pthread_mutex_unlock (&conn_info->mutex);
 		api->stats_increment_value (conn_info->stats_handle, "queue_size");
@@ -1742,11 +1624,10 @@ int coroipcs_handler_dispatch (
 
 		conn_info->service = req_setup->service;
 		conn_info->refcount = 0;
-		conn_info->notify_flow_control_enabled = 0;
 		conn_info->setup_bytes_read = 0;
 
 #if _POSIX_THREAD_PROCESS_SHARED < 1
-		conn_info->semid = semget (conn_info->semkey, 3, 0600);
+		conn_info->control_buffer->semid = semget (conn_info->semkey, 3, 0600);
 #endif
 		conn_info->pending_semops = 0;
 
@@ -1794,9 +1675,6 @@ int coroipcs_handler_dispatch (
 		res = recv (fd, &buf, 1, MSG_NOSIGNAL);
 		if (res == 1) {
 			switch (buf) {
-			case MESSAGE_REQ_OUTQ_FLUSH:
-				outq_flush (conn_info);
-				break;
 			case MESSAGE_REQ_CHANGE_EUID:
 				if (priv_change (conn_info) == -1) {
 					ipc_disconnect (conn_info);
@@ -1820,37 +1698,24 @@ int coroipcs_handler_dispatch (
 		coroipcs_refcount_dec (conn_info);
 	}
 
-	coroipcs_refcount_inc (conn_info);
-	pthread_mutex_lock (&conn_info->mutex);
-	if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent & POLLOUT)) {
-		if (list_empty (&conn_info->outq_head))
-			buf = MESSAGE_RES_OUTQ_EMPTY;
-		else
-			buf = MESSAGE_RES_OUTQ_NOT_EMPTY;
+	if (revent & POLLOUT) {
+		int psop = conn_info->pending_semops;
+		int i;
 
-		for (; conn_info->pending_semops;) {
-			res = flow_control_event_send (conn_info, buf);
-			if (res == 1) {
-				conn_info->pending_semops--;
+		assert (psop != 0);
+		for (i = 0; i < psop; i++) {
+			res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+			if (res != 1) {
+				return (0);
 			} else {
-				break;
-			}
-		}
-		if (conn_info->notify_flow_control_enabled) {
-			res = flow_control_event_send (conn_info, MESSAGE_RES_ENABLE_FLOWCONTROL);
-			if (res == 1) {
-				conn_info->notify_flow_control_enabled = 0;
+				conn_info->pending_semops -= 1;
 			}
 		}
-		if (conn_info->notify_flow_control_enabled == 0 &&
-			conn_info->pending_semops == 0) {
-
-			api->poll_dispatch_modify (conn_info->fd,
-				POLLIN|POLLNVAL);
+		if (conn_info->poll_state == POLL_STATE_INOUT) {
+			conn_info->poll_state = POLL_STATE_IN;
+			api->poll_dispatch_modify (conn_info->fd, POLLIN|POLLNVAL);
 		}
 	}
-	pthread_mutex_unlock (&conn_info->mutex);
-	coroipcs_refcount_dec (conn_info);
 
 	return (0);
 }

+ 191 - 8
include/corosync/coroipc_ipc.h

@@ -35,6 +35,9 @@
 #define COROIPC_IPC_H_DEFINED
 
 #include <unistd.h>
+#include <poll.h>
+#include <time.h>
+#include "corotypes.h"
 #include "config.h"
 
 /*
@@ -52,28 +55,40 @@
 
 #if _POSIX_THREAD_PROCESS_SHARED > 0
 #include <semaphore.h>
+#else
+#include <sys/sem.h>
 #endif
 
+/*
+ * Define sem_wait timeout (real timeout will be (n-1;n) )
+ */
+#define IPC_SEMWAIT_TIMEOUT 2
+
 enum req_init_types {
 	MESSAGE_REQ_RESPONSE_INIT = 0,
 	MESSAGE_REQ_DISPATCH_INIT = 1
 };
 
 #define MESSAGE_REQ_CHANGE_EUID		1
-#define MESSAGE_REQ_OUTQ_FLUSH		2
 
-#define MESSAGE_RES_OUTQ_EMPTY         0
-#define MESSAGE_RES_OUTQ_NOT_EMPTY     1
-#define MESSAGE_RES_ENABLE_FLOWCONTROL 2
-#define MESSAGE_RES_OUTQ_FLUSH_NR      3
+enum ipc_semaphore_identifiers {
+	SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT 	= 0,
+	SEMAPHORE_REQUEST			= 1,
+	SEMAPHORE_RESPONSE			= 2,
+	SEMAPHORE_DISPATCH			= 3
+};
 
 struct control_buffer {
 	unsigned int read;
 	unsigned int write;
+	int flow_control_enabled;
 #if _POSIX_THREAD_PROCESS_SHARED > 0
-	sem_t sem0;
-	sem_t sem1;
-	sem_t sem2;
+	sem_t sem_request_or_flush_or_exit;
+	sem_t sem_response;
+	sem_t sem_dispatch;
+	sem_t sem_request;
+#else
+	int semid;
 #endif
 };
 
@@ -145,4 +160,172 @@ struct coroipcs_zc_header {
 #define ZC_FREE_HEADER		0xFFFFFFFE
 #define ZC_EXECUTE_HEADER	0xFFFFFFFD
 
+static inline cs_error_t
+ipc_sem_wait (
+	struct control_buffer *control_buffer,
+	enum ipc_semaphore_identifiers sem_id)
+{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
+	struct sembuf sop;
+#else
+	struct timespec timeout;
+	sem_t *sem = NULL;
+#endif
+	int res;
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+	switch (sem_id) {
+	case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
+		sem = &control_buffer->sem_request_or_flush_or_exit;
+		break;
+	case SEMAPHORE_RESPONSE:
+		sem = &control_buffer->sem_request;
+		break;
+	case SEMAPHORE_DISPATCH:
+		sem = &control_buffer->sem_response;
+		break;
+	case SEMAPHORE_REQUEST:
+		sem = &control_buffer->sem_dispatch;
+		break;
+	}
+
+	timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
+	timeout.tv_nsec = 0;
+
+retry_sem_timedwait:
+	res = sem_timedwait (sem, &timeout);
+	if (res == -1 && errno == ETIMEDOUT) {
+		return (CS_ERR_LIBRARY);
+	} else
+	if (res == -1 && errno == EINTR) {
+		goto retry_sem_timedwait;
+	} else
+	if (res == -1) {
+		return (CS_ERR_LIBRARY);
+	}
+#else
+	/*
+	 * Wait for semaphore indicating a new message from server
+	 * to client in queue
+	 */
+	sop.sem_num = sem_id;
+	sop.sem_op = -1;
+	sop.sem_flg = 0;
+
+retry_semop:
+	res = semop (control_buffer->semid, &sop, 1);
+	if (res == -1 && errno == EINTR) {
+		return (CS_ERR_TRY_AGAIN);
+		goto retry_semop;
+	} else
+	if (res == -1 && errno == EACCES) {
+		return (CS_ERR_TRY_AGAIN);
+	} else
+	if (res == -1) {
+		return (CS_ERR_LIBRARY);
+	}
+#endif
+	return (CS_OK);
+}
+
+static inline cs_error_t
+ipc_sem_post (
+	struct control_buffer *control_buffer,
+	enum ipc_semaphore_identifiers sem_id)
+{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
+	struct sembuf sop;
+#else
+	sem_t *sem = NULL;
+#endif
+	int res;
+	
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+	switch (sem_id) {
+	case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
+		sem = &control_buffer->sem_request_or_flush_or_exit;
+		break;
+	case SEMAPHORE_RESPONSE:
+		sem = &control_buffer->sem_request;
+		break;
+	case SEMAPHORE_DISPATCH:
+		sem = &control_buffer->sem_response;
+		break;
+	case SEMAPHORE_REQUEST:
+		sem = &control_buffer->sem_dispatch;
+		break;
+	}
+
+	res = sem_post (sem);
+	if (res == -1) {
+		return (CS_ERR_LIBRARY);
+	}
+#else
+	sop.sem_num = sem_id;
+	sop.sem_op = 1;
+	sop.sem_flg = 0;
+
+retry_semop:
+	res = semop (control_buffer->semid, &sop, 1);
+	if (res == -1 && errno == EINTR) {
+		goto retry_semop;
+	} else
+	if (res == -1) {
+		return (CS_ERR_LIBRARY);
+	}
+#endif
+	return (CS_OK);
+}
+
+static inline cs_error_t
+ipc_sem_getvalue (
+	struct control_buffer *control_buffer,
+	enum ipc_semaphore_identifiers sem_id,
+	int *sem_value)
+{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
+	struct sembuf sop;
+	int sem_value_hold;
+#else
+	sem_t *sem = NULL;
+#endif
+	
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+	switch (sem_id) {
+	case SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT:
+		sem = &control_buffer->sem_request_or_flush_or_exit;
+		break;
+	case SEMAPHORE_RESPONSE:
+		sem = &control_buffer->sem_request;
+		break;
+	case SEMAPHORE_DISPATCH:
+		sem = &control_buffer->sem_response;
+		break;
+	case SEMAPHORE_REQUEST:
+		sem = &control_buffer->sem_dispatch;
+		break;
+	}
+
+	res = sem_getvalue (sem, sem_value);
+	if (res == -1) {
+		return (CS_ERR_LIBRARY);
+	}
+#else
+	sop.sem_num = sem_id;
+	sop.sem_op = 1;
+	sop.sem_flg = 0;
+
+retry_semctl:
+	sem_value_hold = semctl (control_buffer->semid, sem_id, GETVAL);
+	if (sem_value_hold == -1 && errno == EINTR) {
+		goto retry_semctl;
+	} else
+	if (sem_value_hold == -1) {
+		return (CS_ERR_LIBRARY);
+	}
+	*sem_value = sem_value_hold;
+#endif
+	return (CS_OK);
+}
+
 #endif /* COROIPC_IPC_H_DEFINED */

+ 91 - 198
lib/coroipcc.c

@@ -72,17 +72,8 @@
 
 #include "util.h"
 
-/*
- * Define sem_wait timeout (real timeout will be (n-1;n) )
- */
-#define IPC_SEMWAIT_TIMEOUT 2
-
 struct ipc_instance {
 	int fd;
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-	int semid;
-#endif
-	int flow_control_state;
 	struct control_buffer *control_buffer;
 	char *request_buffer;
 	char *response_buffer;
@@ -117,6 +108,23 @@ static void socket_nosigpipe(int s)
 #define MSG_NOSIGNAL 0
 #endif
 
+static inline int shared_mem_dispatch_bytes_left (struct ipc_instance *context)
+{
+	unsigned int n_read;
+	unsigned int n_write;
+	unsigned int bytes_left;
+
+	n_read = context->control_buffer->read;
+	n_write = context->control_buffer->write;
+
+	if (n_read <= n_write) {
+		bytes_left = context->dispatch_size - n_write + n_read;
+	} else {
+		bytes_left = n_read - n_write;
+	}
+	return (bytes_left);
+}
+
 static cs_error_t
 socket_send (
 	int s,
@@ -238,10 +246,10 @@ res_exit:
 	return (res);
 }
 
-#if _POSIX_THREAD_PROCESS_SHARED < 1
 static int
 priv_change_send (struct ipc_instance *ipc_instance)
 {
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 	char buf_req;
 	mar_req_priv_change req_priv_change;
 	unsigned int res;
@@ -268,19 +276,12 @@ priv_change_send (struct ipc_instance *ipc_instance)
 	}
 
 	ipc_instance->euid = req_priv_change.euid;
+#else
+	ipc_instance = NULL;
+#endif
 	return (0);
 }
 
-#if defined(_SEM_SEMUN_UNDEFINED)
-union semun {
-        int val;
-        struct semid_ds *buf;
-        unsigned short int *array;
-        struct seminfo *__buf;
-};
-#endif
-#endif
-
 static int
 circular_memory_map (char *path, const char *file, void **buf, size_t bytes)
 {
@@ -471,10 +472,6 @@ msg_send (
 	const struct iovec *iov,
 	unsigned int iov_len)
 {
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-	struct sembuf sop;
-#endif
-
 	int i;
 	int res;
 	int req_buffer_idx = 0;
@@ -490,117 +487,18 @@ msg_send (
 		req_buffer_idx += iov[i].iov_len;
 	}
 
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-	res = sem_post (&ipc_instance->control_buffer->sem0);
-	if (res == -1) {
-		return (CS_ERR_LIBRARY);
-	}
-#else 
 	/*
-	 * Signal semaphore #0 indicting a new message from client
+	 * Signal semaphore #3 and #0 indicting a new message from client
 	 * to server request queue
 	 */
-	sop.sem_num = 0;
-	sop.sem_op = 1;
-	sop.sem_flg = 0;
-
-retry_semop:
-	res = semop (ipc_instance->semid, &sop, 1);
-	if (res == -1 && errno == EINTR) {
-		return (CS_ERR_TRY_AGAIN);
-	} else
-	if (res == -1 && errno == EACCES) {
-		priv_change_send (ipc_instance);
-		goto retry_semop;
-	} else
-	if (res == -1) {
-		return (CS_ERR_LIBRARY);
-	}
-#endif
-	return (CS_OK);
-}
-
-inline static cs_error_t
-ipc_sem_wait (
-	struct ipc_instance *ipc_instance,
-	int sem_num)
-{
-#if _POSIX_THREAD_PROCESS_SHARED < 1
-	struct sembuf sop;
-#else
-	struct timespec timeout;
-	struct pollfd pfd;
-	sem_t *sem = NULL;
-#endif
-	int res;
-
-#if _POSIX_THREAD_PROCESS_SHARED > 0
-	switch (sem_num) {
-	case 0:
-		sem = &ipc_instance->control_buffer->sem0;
-		break;
-	case 1:
-		sem = &ipc_instance->control_buffer->sem1;
-		break;
-	case 2:
-		sem = &ipc_instance->control_buffer->sem2;
-		break;
-	}
-
-retry_semwait:
-	timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
-	timeout.tv_nsec = 0;
-
-	res = sem_timedwait (sem, &timeout);
-	if (res == -1 && errno == ETIMEDOUT) {
-		pfd.fd = ipc_instance->fd;
-		pfd.events = 0;
-
-		res = poll (&pfd, 1, 0);
-
-		if (res == -1 && errno == EINTR) {
-			return (CS_ERR_TRY_AGAIN);
-		} else
-		if (res == -1) {
-			return (CS_ERR_LIBRARY);
-		}
-
-		if (res == 1) {
-			if (pfd.revents == POLLERR || pfd.revents == POLLHUP || pfd.revents == POLLNVAL) {
-				return (CS_ERR_LIBRARY);
-			}
-		}
-
-		goto retry_semwait;
-	} else
-	if (res == -1 && errno == EINTR) {
-		return (CS_ERR_TRY_AGAIN);
-	} else
-	if (res == -1) {
+	res = ipc_sem_post (ipc_instance->control_buffer, SEMAPHORE_REQUEST);
+	if (res != CS_OK) {
 		return (CS_ERR_LIBRARY);
 	}
-#else
-	/*
-	 * Wait for semaphore indicating a new message from server
-	 * to client in queue
-	 */
-	sop.sem_num = sem_num;
-	sop.sem_op = -1;
-	sop.sem_flg = 0;
-
-retry_semop:
-	res = semop (ipc_instance->semid, &sop, 1);
-	if (res == -1 && errno == EINTR) {
-		return (CS_ERR_TRY_AGAIN);
-	} else
-	if (res == -1 && errno == EACCES) {
-		priv_change_send (ipc_instance);
-		goto retry_semop;
-	} else
-	if (res == -1) {
+	res = ipc_sem_post (ipc_instance->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
+	if (res != CS_OK) {
 		return (CS_ERR_LIBRARY);
 	}
-#endif
 	return (CS_OK);
 }
 
@@ -611,10 +509,17 @@ reply_receive (
 	size_t res_len)
 {
 	coroipc_response_header_t *response_header;
-	cs_error_t err;
+	cs_error_t res;
 
-	if ((err = ipc_sem_wait (ipc_instance, 1)) != CS_OK) {
-		return (err);
+retry_ipc_sem_wait:
+	res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE);
+	if (res != CS_OK) {
+		if (res == CS_ERR_TRY_AGAIN) {
+			priv_change_send (ipc_instance);
+			goto retry_ipc_sem_wait;
+		} else {
+			return (res);
+		}
 	}
 
 	response_header = (coroipc_response_header_t *)ipc_instance->response_buffer;
@@ -631,10 +536,17 @@ reply_receive_in_buf (
 	struct ipc_instance *ipc_instance,
 	void **res_msg)
 {
-	cs_error_t err;
+	cs_error_t res;
 
-	if ((err = ipc_sem_wait (ipc_instance, 1)) != CS_OK) {
-		return (err);
+retry_ipc_sem_wait:
+	res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE);
+	if (res != CS_OK) {
+		if (res == CS_ERR_TRY_AGAIN) {
+			priv_change_send (ipc_instance);
+			goto retry_ipc_sem_wait;
+		} else {
+			return (res);
+		}
 	}
 
 	*res_msg = (char *)ipc_instance->response_buffer;
@@ -754,18 +666,22 @@ coroipcc_service_connect (
 	}
 
 #if _POSIX_THREAD_PROCESS_SHARED > 0
-	sem_init (&ipc_instance->control_buffer->sem0, 1, 0);
-	sem_init (&ipc_instance->control_buffer->sem1, 1, 0);
-	sem_init (&ipc_instance->control_buffer->sem2, 1, 0);
+	sem_init (&ipc_instance->control_buffer->sem_request_or_flush_or_exit, 1, 0);
+	sem_init (&ipc_instance->control_buffer->sem_request, 1, 0);
+	sem_init (&ipc_instance->control_buffer->sem_response, 1, 0);
+	sem_init (&ipc_instance->control_buffer->sem_dispatch, 1, 0);
 #else
+{
+	int i;
+
 	/*
 	 * Allocate a semaphore segment
 	 */
 	while (1) {
 		semkey = random();
 		ipc_instance->euid = geteuid ();
-		if ((ipc_instance->semid
-		     = semget (semkey, 3, IPC_CREAT|IPC_EXCL|0600)) != -1) {
+		if ((ipc_instance->control_buffer->semid
+		     = semget (semkey, 4, IPC_CREAT|IPC_EXCL|0600)) != -1) {
 		      break;
 		}
 		/*
@@ -781,18 +697,15 @@ coroipcc_service_connect (
 		}
 	}
 
-	semun.val = 0;
-	sys_res = semctl (ipc_instance->semid, 0, SETVAL, semun);
-	if (sys_res != 0) {
-		res = CS_ERR_LIBRARY;
-		goto error_exit;
-	}
-
-	sys_res = semctl (ipc_instance->semid, 1, SETVAL, semun);
-	if (sys_res != 0) {
-		res = CS_ERR_LIBRARY;
-		goto error_exit;
+	for (i = 0; i < 4; i++) {
+		semun.val = 0;
+		sys_res = semctl (ipc_instance->control_buffer->semid, i, SETVAL, semun);
+		if (sys_res != 0) {
+			res = CS_ERR_LIBRARY;
+			goto error_exit;
+		}
 	}
+}
 #endif
 
 	/*
@@ -822,7 +735,6 @@ coroipcc_service_connect (
 	}
 
 	ipc_instance->fd = request_fd;
-	ipc_instance->flow_control_state = 0;
 
 	if (res_setup.error == CS_ERR_TRY_AGAIN) {
 		res = res_setup.error;
@@ -842,8 +754,8 @@ coroipcc_service_connect (
 
 error_exit:
 #if _POSIX_THREAD_PROCESS_SHARED < 1
-	if (ipc_instance->semid > 0)
-		semctl (ipc_instance->semid, 0, IPC_RMID);
+	if (ipc_instance->control_buffer->semid > 0)
+		semctl (ipc_instance->control_buffer->semid, 0, IPC_RMID);
 #endif
 	memory_unmap (ipc_instance->dispatch_buffer, dispatch_size);
 error_dispatch_buffer:
@@ -893,7 +805,7 @@ coroipcc_dispatch_flow_control_get (
 		return (res);
 	}
 
-	*flow_control_state = ipc_instance->flow_control_state;
+	*flow_control_state = ipc_instance->control_buffer->flow_control_enabled;
 
 	hdb_handle_put (&ipc_hdb, handle);
 	return (res);
@@ -928,10 +840,9 @@ coroipcc_dispatch_get (
 	int poll_events;
 	char buf;
 	struct ipc_instance *ipc_instance;
-	int res;
-	char buf_two = 1;
 	char *data_addr;
 	cs_error_t error = CS_OK;
+	int res;
 
 	error = hdb_error_to_cs (hdb_handle_get (&ipc_hdb, handle, (void **)&ipc_instance));
 	if (error != CS_OK) {
@@ -962,46 +873,21 @@ coroipcc_dispatch_get (
 		goto error_put;
 	}
 
-	res = recv (ipc_instance->fd, &buf, 1, 0);
-	if (res == -1 && errno == EINTR) {
-		error = CS_ERR_TRY_AGAIN;
-		goto error_put;
-	} else
-	if (res == -1) {
-		error = CS_ERR_LIBRARY;
-		goto error_put;
-	} else
-	if (res == 0) {
-		/* Means that the peer closed cleanly the socket. However, it should
-		 * happen only on BSD and Darwing systems since poll() returns a
-		 * POLLHUP event on other systems.
+	error = socket_recv (ipc_instance->fd, &buf, 1);
+	assert (error == CS_OK);
+
+	if (shared_mem_dispatch_bytes_left (ipc_instance) > 500000) {
+		/*
+		 * Notify coroipcs to flush any pending dispatch messages
 		 */
-		error = CS_ERR_LIBRARY;
-		goto error_put;
-	}
-	ipc_instance->flow_control_state = 0;
-	if (buf == MESSAGE_RES_OUTQ_NOT_EMPTY || buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
-		ipc_instance->flow_control_state = 1;
-	}
-	/*
-	 * Notify executive to flush any pending dispatch messages
-	 */
-	if (ipc_instance->flow_control_state) {
-		buf_two = MESSAGE_REQ_OUTQ_FLUSH;
-		res = socket_send (ipc_instance->fd, &buf_two, 1);
-		assert (res == CS_OK); /* TODO */
-	}
-	/*
-	 * This is just a notification of flow control starting at the addition
-	 * of a new pending message, not a message to dispatch
-	 */
-	if (buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
-		error = CS_ERR_TRY_AGAIN;
-		goto error_put;
-	}
-	if (buf == MESSAGE_RES_OUTQ_FLUSH_NR) {
-		error = CS_ERR_TRY_AGAIN;
-		goto error_put;
+		
+		res = ipc_sem_post (ipc_instance->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
+		if (res != CS_OK) {
+			error = CS_ERR_LIBRARY;
+			goto error_put;
+		}
+
+
 	}
 
 	data_addr = ipc_instance->dispatch_buffer;
@@ -1030,8 +916,15 @@ coroipcc_dispatch_put (hdb_handle_t handle)
 		return (res);
 	}
 
-	if ((res = ipc_sem_wait (ipc_instance, 2)) != CS_OK) {
-		goto error_exit;
+retry_ipc_sem_wait:
+	res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_DISPATCH);
+	if (res != CS_OK) {
+		if (res == CS_ERR_TRY_AGAIN) {
+			priv_change_send (ipc_instance);
+			goto retry_ipc_sem_wait;
+		} else {
+			goto error_exit;
+		}
 	}
 
 	addr = ipc_instance->dispatch_buffer;
@@ -1078,8 +971,8 @@ coroipcc_msg_send_reply_receive (
 	res = reply_receive (ipc_instance, res_msg, res_len);
 
 error_exit:
-	hdb_handle_put (&ipc_hdb, handle);
 	pthread_mutex_unlock (&ipc_instance->mutex);
+	hdb_handle_put (&ipc_hdb, handle);
 
 	return (res);
 }