Просмотр исходного кода

Use unnamed shared posix semaphores on platforms which support them.

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@2379 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake 16 лет назад
Родитель
Сommit
ee7ce5e328
3 измененных файлов с 219 добавлено и 35 удалено
  1. 104 8
      exec/coroipcs.c
  2. 25 0
      include/corosync/coroipc_ipc.h
  3. 90 27
      lib/coroipcc.c

+ 104 - 8
exec/coroipcs.c

@@ -67,7 +67,7 @@
 #include <string.h>
 #include <string.h>
 
 
 #include <sys/shm.h>
 #include <sys/shm.h>
-#include <sys/sem.h>
+
 #include <corosync/corotypes.h>
 #include <corosync/corotypes.h>
 #include <corosync/list.h>
 #include <corosync/list.h>
 
 
@@ -75,6 +75,12 @@
 #include <corosync/coroipcs.h>
 #include <corosync/coroipcs.h>
 #include <corosync/coroipc_ipc.h>
 #include <corosync/coroipc_ipc.h>
 
 
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+#include <semaphore.h>
+#else
+#include <sys/sem.h>
+#endif
+
 #ifndef MSG_NOSIGNAL
 #ifndef MSG_NOSIGNAL
 #define MSG_NOSIGNAL 0
 #define MSG_NOSIGNAL 0
 #endif
 #endif
@@ -100,6 +106,7 @@ struct zcb_mapped {
 	size_t size;
 	size_t size;
 };
 };
 
 
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 #if defined(_SEM_SEMUN_UNDEFINED)
 #if defined(_SEM_SEMUN_UNDEFINED)
 union semun {
 union semun {
 	int val;
 	int val;
@@ -108,6 +115,8 @@ union semun {
 	struct seminfo *__buf;
 	struct seminfo *__buf;
 };
 };
 #endif
 #endif
+#endif
+
 
 
 enum conn_state {
 enum conn_state {
 	CONN_STATE_THREAD_INACTIVE = 0,
 	CONN_STATE_THREAD_INACTIVE = 0,
@@ -126,9 +135,10 @@ struct conn_info {
 	enum conn_state state;
 	enum conn_state state;
 	int notify_flow_control_enabled;
 	int notify_flow_control_enabled;
 	int refcount;
 	int refcount;
-	key_t shmkey;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 	key_t semkey;
 	key_t semkey;
 	int semid;
 	int semid;
+#endif
 	unsigned int pending_semops;
 	unsigned int pending_semops;
 	pthread_mutex_t mutex;
 	pthread_mutex_t mutex;
 	struct control_buffer *control_buffer;
 	struct control_buffer *control_buffer;
@@ -159,6 +169,32 @@ static void ipc_disconnect (struct conn_info *conn_info);
 static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
 static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
 		      int locked);
 		      int locked);
 
 
+static void sem_post_exit_thread (struct conn_info *conn_info)
+{
+#if _POSIX_THREAD_PROCESS_SHARED < 1
+	struct sembuf sop;
+#endif
+	int res;
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+retry_semop:
+	res = sem_post (&conn_info->control_buffer->sem0);
+	if (res == -1 && errno == EINTR) {
+		goto retry_semop;
+	}
+#else
+	sop.sem_num = 1;
+	sop.sem_op = 1;
+	sop.sem_flg = 0;
+
+retry_semop:
+	res = semop (conn_info->semid, &sop, 1);
+	if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
+		goto retry_semop;
+	}
+#endif
+}
+
 static int
 static int
 memory_map (
 memory_map (
 	const char *path,
 	const char *path,
@@ -392,7 +428,7 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
 	}
 	}
 
 
 	if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
 	if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
-		pthread_kill (conn_info->thread, SIGUSR1);
+		sem_post_exit_thread (conn_info);
 		return (0);
 		return (0);
 	}
 	}
 
 
@@ -419,13 +455,19 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
 	list_del (&conn_info->list);
 	list_del (&conn_info->list);
 	pthread_mutex_unlock (&conn_info->mutex);
 	pthread_mutex_unlock (&conn_info->mutex);
 
 
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+	sem_destroy (&conn_info->control_buffer->sem0);
+	sem_destroy (&conn_info->control_buffer->sem1);
+	sem_destroy (&conn_info->control_buffer->sem2);
+#else
+	semctl (conn_info->semid, 0, IPC_RMID);
+#endif
 	/*
 	/*
 	 * Destroy shared memory segment and semaphore
 	 * Destroy shared memory segment and semaphore
 	 */
 	 */
 	res = munmap ((void *)conn_info->control_buffer, conn_info->control_size);
 	res = munmap ((void *)conn_info->control_buffer, conn_info->control_size);
 	res = munmap ((void *)conn_info->request_buffer, conn_info->request_size);
 	res = munmap ((void *)conn_info->request_buffer, conn_info->request_size);
 	res = munmap ((void *)conn_info->response_buffer, conn_info->response_size);
 	res = munmap ((void *)conn_info->response_buffer, conn_info->response_size);
-	semctl (conn_info->semid, 0, IPC_RMID);
 
 
 	/*
 	/*
 	 * Free allocated data needed to retry exiting library IPC connection
 	 * Free allocated data needed to retry exiting library IPC connection
@@ -521,7 +563,9 @@ static inline void zerocopy_operations_process (
 static void *pthread_ipc_consumer (void *conn)
 static void *pthread_ipc_consumer (void *conn)
 {
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
 	struct conn_info *conn_info = (struct conn_info *)conn;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 	struct sembuf sop;
 	struct sembuf sop;
+#endif
 	int res;
 	int res;
 	coroipc_request_header_t *header;
 	coroipc_request_header_t *header;
 	coroipc_response_header_t coroipc_response_header;
 	coroipc_response_header_t coroipc_response_header;
@@ -536,6 +580,18 @@ static void *pthread_ipc_consumer (void *conn)
 #endif
 #endif
 
 
 	for (;;) {
 	for (;;) {
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+retry_semwait:
+		res = sem_wait (&conn_info->control_buffer->sem0);
+		if (ipc_thread_active (conn_info) == 0) {
+			coroipcs_refcount_dec (conn_info);
+			pthread_exit (0);
+		}
+		if ((res == -1) && (errno == EINTR)) {
+			goto retry_semwait;
+		}
+#else
+
 		sop.sem_num = 0;
 		sop.sem_num = 0;
 		sop.sem_op = -1;
 		sop.sem_op = -1;
 		sop.sem_flg = 0;
 		sop.sem_flg = 0;
@@ -552,6 +608,7 @@ retry_semop:
 			coroipcs_refcount_dec (conn_info);
 			coroipcs_refcount_dec (conn_info);
 			pthread_exit (0);
 			pthread_exit (0);
 		}
 		}
+#endif
 
 
 		zerocopy_operations_process (conn_info, &header, &new_message);
 		zerocopy_operations_process (conn_info, &header, &new_message);
 		/*
 		/*
@@ -755,7 +812,7 @@ static void ipc_disconnect (struct conn_info *conn_info)
 	conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
 	conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
 	pthread_mutex_unlock (&conn_info->mutex);
 	pthread_mutex_unlock (&conn_info->mutex);
 
 
-	pthread_kill (conn_info->thread, SIGUSR1);
+	sem_post_exit_thread (conn_info);
 }
 }
 
 
 static int conn_info_create (int fd)
 static int conn_info_create (int fd)
@@ -874,6 +931,14 @@ void coroipcs_ipc_exit (void)
 
 
 		conn_info = list_entry (list, struct conn_info, list);
 		conn_info = list_entry (list, struct conn_info, list);
 
 
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+		sem_destroy (&conn_info->control_buffer->sem0);
+		sem_destroy (&conn_info->control_buffer->sem1);
+		sem_destroy (&conn_info->control_buffer->sem2);
+#else
+		semctl (conn_info->semid, 0, IPC_RMID);
+#endif
+
 		/*
 		/*
 		 * Unmap memory segments
 		 * Unmap memory segments
 		 */
 		 */
@@ -886,9 +951,7 @@ void coroipcs_ipc_exit (void)
 		res = circular_memory_unmap (conn_info->dispatch_buffer,
 		res = circular_memory_unmap (conn_info->dispatch_buffer,
 			conn_info->dispatch_size);
 			conn_info->dispatch_size);
 
 
-		semctl (conn_info->semid, 0, IPC_RMID);
-
-		pthread_kill (conn_info->thread, SIGUSR1);
+		sem_post_exit_thread (conn_info);
 	}
 	}
 }
 }
 
 
@@ -905,10 +968,19 @@ void *coroipcs_private_data_get (void *conn)
 int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
 int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
 {
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
 	struct conn_info *conn_info = (struct conn_info *)conn;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 	struct sembuf sop;
 	struct sembuf sop;
+#endif
 	int res;
 	int res;
 
 
 	memcpy (conn_info->response_buffer, msg, mlen);
 	memcpy (conn_info->response_buffer, msg, mlen);
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+	res = sem_post (&conn_info->control_buffer->sem1);
+	if (res == -1) {
+		return (-1);
+	}
+#else
 	sop.sem_num = 1;
 	sop.sem_num = 1;
 	sop.sem_op = 1;
 	sop.sem_op = 1;
 	sop.sem_flg = 0;
 	sop.sem_flg = 0;
@@ -921,13 +993,16 @@ retry_semop:
 	if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
 	if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
 		return (0);
 		return (0);
 	}
 	}
+#endif
 	return (0);
 	return (0);
 }
 }
 
 
 int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
 int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
 {
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
 	struct conn_info *conn_info = (struct conn_info *)conn;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 	struct sembuf sop;
 	struct sembuf sop;
+#endif
 	int res;
 	int res;
 	int write_idx = 0;
 	int write_idx = 0;
 	int i;
 	int i;
@@ -938,6 +1013,12 @@ int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned in
 		write_idx += iov[i].iov_len;
 		write_idx += iov[i].iov_len;
 	}
 	}
 
 
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+	res = sem_post (&conn_info->control_buffer->sem1);
+	if (res == -1) {
+		return (-1);
+	}
+#else
 	sop.sem_num = 1;
 	sop.sem_num = 1;
 	sop.sem_op = 1;
 	sop.sem_op = 1;
 	sop.sem_flg = 0;
 	sop.sem_flg = 0;
@@ -950,6 +1031,7 @@ retry_semop:
 	if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
 	if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
 		return (0);
 		return (0);
 	}
 	}
+#endif
 	return (0);
 	return (0);
 }
 }
 
 
@@ -984,7 +1066,9 @@ static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
 		      int locked)
 		      int locked)
 {
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
 	struct conn_info *conn_info = (struct conn_info *)conn;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 	struct sembuf sop;
 	struct sembuf sop;
+#endif
 	int res;
 	int res;
 	int i;
 	int i;
 	char buf;
 	char buf;
@@ -1009,6 +1093,9 @@ static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
 	if (res == -1) {
 	if (res == -1) {
 		ipc_disconnect (conn_info);
 		ipc_disconnect (conn_info);
 	}
 	}
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+	res = sem_post (&conn_info->control_buffer->sem2);
+#else
 	sop.sem_num = 2;
 	sop.sem_num = 2;
 	sop.sem_op = 1;
 	sop.sem_op = 1;
 	sop.sem_flg = 0;
 	sop.sem_flg = 0;
@@ -1021,6 +1108,7 @@ retry_semop:
 	if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
 	if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
 		return;
 		return;
 	}
 	}
+#endif
 }
 }
 
 
 static void outq_flush (struct conn_info *conn_info) {
 static void outq_flush (struct conn_info *conn_info) {
@@ -1062,9 +1150,11 @@ static int priv_change (struct conn_info *conn_info)
 {
 {
 	mar_req_priv_change req_priv_change;
 	mar_req_priv_change req_priv_change;
 	unsigned int res;
 	unsigned int res;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 	union semun semun;
 	union semun semun;
 	struct semid_ds ipc_set;
 	struct semid_ds ipc_set;
 	int i;
 	int i;
+#endif
 
 
 retry_recv:
 retry_recv:
 	res = recv (conn_info->fd, &req_priv_change,
 	res = recv (conn_info->fd, &req_priv_change,
@@ -1087,6 +1177,7 @@ retry_recv:
 	}
 	}
 #endif
 #endif
 
 
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 	ipc_set.sem_perm.uid = req_priv_change.euid;
 	ipc_set.sem_perm.uid = req_priv_change.euid;
 	ipc_set.sem_perm.gid = req_priv_change.egid;
 	ipc_set.sem_perm.gid = req_priv_change.egid;
 	ipc_set.sem_perm.mode = 0600;
 	ipc_set.sem_perm.mode = 0600;
@@ -1099,6 +1190,7 @@ retry_recv:
 			return (-1);
 			return (-1);
 		}
 		}
 	}
 	}
+#endif
 	return (0);
 	return (0);
 }
 }
 
 
@@ -1293,7 +1385,9 @@ int coroipcs_handler_dispatch (
 			return (0);
 			return (0);
 		}
 		}
 
 
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 		conn_info->semkey = req_setup->semkey;
 		conn_info->semkey = req_setup->semkey;
+#endif
 		res = memory_map (
 		res = memory_map (
 			req_setup->control_file,
 			req_setup->control_file,
 			req_setup->control_size,
 			req_setup->control_size,
@@ -1323,7 +1417,9 @@ int coroipcs_handler_dispatch (
 		conn_info->notify_flow_control_enabled = 0;
 		conn_info->notify_flow_control_enabled = 0;
 		conn_info->setup_bytes_read = 0;
 		conn_info->setup_bytes_read = 0;
 
 
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 		conn_info->semid = semget (conn_info->semkey, 3, 0600);
 		conn_info->semid = semget (conn_info->semkey, 3, 0600);
+#endif
 		conn_info->pending_semops = 0;
 		conn_info->pending_semops = 0;
 
 
 		/*
 		/*

+ 25 - 0
include/corosync/coroipc_ipc.h

@@ -34,6 +34,26 @@
 #ifndef COROIPC_IPC_H_DEFINED
 #ifndef COROIPC_IPC_H_DEFINED
 #define COROIPC_IPC_H_DEFINED
 #define COROIPC_IPC_H_DEFINED
 
 
+#include <unistd.h>
+#include "config.h"
+
+/*
+ * Darwin claims to support process shared synchronization
+ * but it really does not.  The unistd.h header file is wrong.
+ */
+#ifdef COROSYNC_DARWIN
+#undef _POSIX_THREAD_PROCESS_SHARED
+#define _POSIX_THREAD_PROCESS_SHARED -1
+#endif
+
+#ifndef _POSIX_THREAD_PROCESS_SHARED
+#define _POSIX_THREAD_PROCESS_SHARED -1
+#endif
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+#include <semaphore.h>
+#endif
+
 enum req_init_types {
 enum req_init_types {
 	MESSAGE_REQ_RESPONSE_INIT = 0,
 	MESSAGE_REQ_RESPONSE_INIT = 0,
 	MESSAGE_REQ_DISPATCH_INIT = 1
 	MESSAGE_REQ_DISPATCH_INIT = 1
@@ -45,6 +65,11 @@ enum req_init_types {
 struct control_buffer {
 struct control_buffer {
 	unsigned int read;
 	unsigned int read;
 	unsigned int write;
 	unsigned int write;
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+	sem_t sem0;
+	sem_t sem1;
+	sem_t sem2;
+#endif
 };
 };
 
 
 enum res_init_types {
 enum res_init_types {

+ 90 - 27
lib/coroipcc.c

@@ -55,7 +55,6 @@
 #include <netinet/in.h>
 #include <netinet/in.h>
 #include <assert.h>
 #include <assert.h>
 #include <sys/shm.h>
 #include <sys/shm.h>
-#include <sys/sem.h>
 #include <sys/mman.h>
 #include <sys/mman.h>
 
 
 #include <corosync/corotypes.h>
 #include <corosync/corotypes.h>
@@ -64,12 +63,19 @@
 #include <corosync/coroipcc.h>
 #include <corosync/coroipcc.h>
 #include <corosync/hdb.h>
 #include <corosync/hdb.h>
 
 
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+#include <semaphore.h>
+#else
+#include <sys/sem.h>
+#endif
+
 #include "util.h"
 #include "util.h"
 
 
 struct ipc_instance {
 struct ipc_instance {
 	int fd;
 	int fd;
-	int shmid;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 	int semid;
 	int semid;
+#endif
 	int flow_control_state;
 	int flow_control_state;
 	struct control_buffer *control_buffer;
 	struct control_buffer *control_buffer;
 	char *request_buffer;
 	char *request_buffer;
@@ -258,6 +264,7 @@ priv_change_send (struct ipc_instance *ipc_instance)
 	return (0);
 	return (0);
 }
 }
 
 
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 #if defined(_SEM_SEMUN_UNDEFINED)
 #if defined(_SEM_SEMUN_UNDEFINED)
 union semun {
 union semun {
         int val;
         int val;
@@ -266,6 +273,7 @@ union semun {
         struct seminfo *__buf;
         struct seminfo *__buf;
 };
 };
 #endif
 #endif
+#endif
 
 
 static int
 static int
 circular_memory_map (char *path, const char *file, void **buf, size_t bytes)
 circular_memory_map (char *path, const char *file, void **buf, size_t bytes)
@@ -391,7 +399,10 @@ msg_send (
 	const struct iovec *iov,
 	const struct iovec *iov,
 	unsigned int iov_len)
 	unsigned int iov_len)
 {
 {
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 	struct sembuf sop;
 	struct sembuf sop;
+#endif
+
 	int i;
 	int i;
 	int res;
 	int res;
 	int req_buffer_idx = 0;
 	int req_buffer_idx = 0;
@@ -402,6 +413,13 @@ msg_send (
 			iov[i].iov_len);
 			iov[i].iov_len);
 		req_buffer_idx += iov[i].iov_len;
 		req_buffer_idx += iov[i].iov_len;
 	}
 	}
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+	res = sem_post (&ipc_instance->control_buffer->sem0);
+	if (res == -1) {
+		return (CS_ERR_LIBRARY);
+	}
+#else 
 	/*
 	/*
 	 * Signal semaphore #0 indicting a new message from client
 	 * Signal semaphore #0 indicting a new message from client
 	 * to server request queue
 	 * to server request queue
@@ -422,6 +440,7 @@ retry_semop:
 	if (res == -1) {
 	if (res == -1) {
 		return (CS_ERR_LIBRARY);
 		return (CS_ERR_LIBRARY);
 	}
 	}
+#endif
 	return (CS_OK);
 	return (CS_OK);
 }
 }
 
 
@@ -431,10 +450,19 @@ reply_receive (
 	void *res_msg,
 	void *res_msg,
 	size_t res_len)
 	size_t res_len)
 {
 {
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 	struct sembuf sop;
 	struct sembuf sop;
+#endif
 	coroipc_response_header_t *response_header;
 	coroipc_response_header_t *response_header;
 	int res;
 	int res;
 
 
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+retry_semwait:
+	res = sem_wait (&ipc_instance->control_buffer->sem1);
+	if (res == -1 && errno == EINTR) {
+		goto retry_semwait;
+	}
+#else
 	/*
 	/*
 	 * Wait for semaphore #1 indicating a new message from server
 	 * Wait for semaphore #1 indicating a new message from server
 	 * to client in the response queue
 	 * to client in the response queue
@@ -455,6 +483,7 @@ retry_semop:
 	if (res == -1) {
 	if (res == -1) {
 		return (CS_ERR_LIBRARY);
 		return (CS_ERR_LIBRARY);
 	}
 	}
+#endif
 
 
 	response_header = (coroipc_response_header_t *)ipc_instance->response_buffer;
 	response_header = (coroipc_response_header_t *)ipc_instance->response_buffer;
 	if (response_header->error == CS_ERR_TRY_AGAIN) {
 	if (response_header->error == CS_ERR_TRY_AGAIN) {
@@ -470,9 +499,18 @@ reply_receive_in_buf (
 	struct ipc_instance *ipc_instance,
 	struct ipc_instance *ipc_instance,
 	void **res_msg)
 	void **res_msg)
 {
 {
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 	struct sembuf sop;
 	struct sembuf sop;
+#endif
 	int res;
 	int res;
 
 
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+retry_semwait:
+	res = sem_wait (&ipc_instance->control_buffer->sem1);
+	if (res == -1 && errno == EINTR) {
+		goto retry_semwait;
+	}
+#else
 	/*
 	/*
 	 * Wait for semaphore #1 indicating a new message from server
 	 * Wait for semaphore #1 indicating a new message from server
 	 * to client in the response queue
 	 * to client in the response queue
@@ -493,6 +531,7 @@ retry_semop:
 	if (res == -1) {
 	if (res == -1) {
 		return (CS_ERR_LIBRARY);
 		return (CS_ERR_LIBRARY);
 	}
 	}
+#endif
 
 
 	*res_msg = (char *)ipc_instance->response_buffer;
 	*res_msg = (char *)ipc_instance->response_buffer;
 	return (CS_OK);
 	return (CS_OK);
@@ -515,11 +554,13 @@ coroipcc_service_connect (
 	struct sockaddr_un address;
 	struct sockaddr_un address;
 	cs_error_t res;
 	cs_error_t res;
 	struct ipc_instance *ipc_instance;
 	struct ipc_instance *ipc_instance;
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 	key_t semkey = 0;
 	key_t semkey = 0;
+	union semun semun;
+#endif
 	int sys_res;
 	int sys_res;
 	mar_req_setup_t req_setup;
 	mar_req_setup_t req_setup;
 	mar_res_setup_t res_setup;
 	mar_res_setup_t res_setup;
-	union semun semun;
 	char control_map_path[128];
 	char control_map_path[128];
 	char request_map_path[128];
 	char request_map_path[128];
 	char response_map_path[128];
 	char response_map_path[128];
@@ -568,6 +609,36 @@ coroipcc_service_connect (
 		return (CS_ERR_TRY_AGAIN);
 		return (CS_ERR_TRY_AGAIN);
 	}
 	}
 
 
+	res = memory_map (
+		control_map_path,
+		"control_buffer-XXXXXX",
+		(void *)&ipc_instance->control_buffer,
+		8192);
+
+	res = memory_map (
+		request_map_path,
+		"request_buffer-XXXXXX",
+		(void *)&ipc_instance->request_buffer,
+		request_size);
+
+	res = memory_map (
+		response_map_path,
+		"response_buffer-XXXXXX",
+		(void *)&ipc_instance->response_buffer,
+		response_size);
+
+	res = circular_memory_map (
+		dispatch_map_path,
+		"dispatch_buffer-XXXXXX",
+		(void *)&ipc_instance->dispatch_buffer,
+		dispatch_size);
+
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+	sem_init (&ipc_instance->control_buffer->sem0, 1, 0);
+	sem_init (&ipc_instance->control_buffer->sem1, 1, 0);
+	sem_init (&ipc_instance->control_buffer->sem2, 1, 0);
+#else
+
 	/*
 	/*
 	 * Allocate a semaphore segment
 	 * Allocate a semaphore segment
 	 */
 	 */
@@ -600,30 +671,7 @@ coroipcc_service_connect (
 	if (res != 0) {
 	if (res != 0) {
 		goto res_exit;
 		goto res_exit;
 	}
 	}
-
-	res = memory_map (
-		control_map_path,
-		"control_buffer-XXXXXX",
-		(void *)&ipc_instance->control_buffer,
-		8192);
-
-	res = memory_map (
-		request_map_path,
-		"request_buffer-XXXXXX",
-		(void *)&ipc_instance->request_buffer,
-		request_size);
-
-	res = memory_map (
-		response_map_path,
-		"response_buffer-XXXXXX",
-		(void *)&ipc_instance->response_buffer,
-		response_size);
-
-	res = circular_memory_map (
-		dispatch_map_path,
-		"dispatch_buffer-XXXXXX",
-		(void *)&ipc_instance->dispatch_buffer,
-		dispatch_size);
+#endif
 
 
 	/*
 	/*
 	 * Initialize IPC setup message
 	 * Initialize IPC setup message
@@ -637,7 +685,10 @@ coroipcc_service_connect (
 	req_setup.request_size = request_size;
 	req_setup.request_size = request_size;
 	req_setup.response_size = response_size;
 	req_setup.response_size = response_size;
 	req_setup.dispatch_size = dispatch_size;
 	req_setup.dispatch_size = dispatch_size;
+
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 	req_setup.semkey = semkey;
 	req_setup.semkey = semkey;
+#endif
 
 
 	res = socket_send (request_fd, &req_setup, sizeof (mar_req_setup_t));
 	res = socket_send (request_fd, &req_setup, sizeof (mar_req_setup_t));
 	if (res != CS_OK) {
 	if (res != CS_OK) {
@@ -668,8 +719,10 @@ coroipcc_service_connect (
 
 
 res_exit:
 res_exit:
 	close (request_fd);
 	close (request_fd);
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 	if (ipc_instance->semid > 0)
 	if (ipc_instance->semid > 0)
 		semctl (ipc_instance->semid, 0, IPC_RMID);
 		semctl (ipc_instance->semid, 0, IPC_RMID);
+#endif
 	return (res_setup.error);
 	return (res_setup.error);
 }
 }
 
 
@@ -821,7 +874,9 @@ error_put:
 cs_error_t
 cs_error_t
 coroipcc_dispatch_put (hdb_handle_t handle)
 coroipcc_dispatch_put (hdb_handle_t handle)
 {
 {
+#if _POSIX_THREAD_PROCESS_SHARED < 1
 	struct sembuf sop;
 	struct sembuf sop;
+#endif
 	coroipc_response_header_t *header;
 	coroipc_response_header_t *header;
 	struct ipc_instance *ipc_instance;
 	struct ipc_instance *ipc_instance;
 	int res;
 	int res;
@@ -832,6 +887,13 @@ coroipcc_dispatch_put (hdb_handle_t handle)
 	if (res != CS_OK) {
 	if (res != CS_OK) {
 		return (res);
 		return (res);
 	}
 	}
+#if _POSIX_THREAD_PROCESS_SHARED > 0
+retry_semwait:
+	res = sem_wait (&ipc_instance->control_buffer->sem2);
+	if (res == -1 && errno == EINTR) {
+		goto retry_semwait;
+	}
+#else
 	sop.sem_num = 2;
 	sop.sem_num = 2;
 	sop.sem_op = -1;
 	sop.sem_op = -1;
 	sop.sem_flg = 0;
 	sop.sem_flg = 0;
@@ -847,6 +909,7 @@ retry_semop:
 	if (res == -1) {
 	if (res == -1) {
 		return (CS_ERR_LIBRARY);
 		return (CS_ERR_LIBRARY);
 	}
 	}
+#endif
 
 
 	addr = ipc_instance->dispatch_buffer;
 	addr = ipc_instance->dispatch_buffer;