Ver código fonte

Rework how dispatch functions so service engines work properly.

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@2079 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake 17 anos atrás
pai
commit
0969721db3
12 arquivos alterados com 238 adições e 211 exclusões
  1. 54 20
      exec/coroipcs.c
  2. 17 16
      include/corosync/coroipcc.h
  3. 2 2
      include/corosync/ipc_gen.h
  4. 13 21
      lib/cfg.c
  5. 11 15
      lib/confdb.c
  6. 90 57
      lib/coroipcc.c
  7. 10 17
      lib/cpg.c
  8. 22 33
      lib/evs.c
  9. 9 12
      lib/quorum.c
  10. 10 14
      lib/votequorum.c
  11. 0 1
      test/evsbench.c
  12. 0 3
      test/testevs.c

+ 54 - 20
exec/coroipcs.c

@@ -124,6 +124,7 @@ struct conn_info {
 	unsigned int pending_semops;
 	pthread_mutex_t mutex;
 	struct shared_memory *mem;
+	char *dispatch_buffer;
 	struct list_head outq_head;
 	void *private_data;
 	struct list_head list;
@@ -143,8 +144,6 @@ static void ipc_disconnect (struct conn_info *conn_info);
 static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
 		      int locked);
 
-static int memcpy_dwrap (struct conn_info *conn_info, void *msg, int len);
-
 static int ipc_thread_active (void *conn)
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
@@ -241,6 +240,7 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
 		api->free (conn_info->private_data);
 	}
 	close (conn_info->fd);
+	munmap (conn_info->dispatch_buffer, (DISPATCH_SIZE));
 	api->free (conn_info);
 	api->serialize_unlock ();
 	return (-1);
@@ -651,25 +651,14 @@ static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info)
 	return (bytes_left);
 }
 
-static int memcpy_dwrap (struct conn_info *conn_info, void *msg, int len)
+static void memcpy_dwrap (struct conn_info *conn_info, void *msg, unsigned int len)
 {
-	char *dest_char = (char *)conn_info->mem->dispatch_buffer;
-	char *src_char = msg;
-	unsigned int first_write;
-	unsigned int second_write;
-
-	first_write = len;
-	second_write = 0;
-	if (len + conn_info->mem->write >= DISPATCH_SIZE) {
-		first_write = DISPATCH_SIZE - conn_info->mem->write;
-		second_write = len - first_write;
-	}
-	memcpy (&dest_char[conn_info->mem->write], src_char, first_write);
-	if (second_write) {
-		memcpy (dest_char, &src_char[first_write], second_write);
-	}
-	conn_info->mem->write = (conn_info->mem->write + len) % DISPATCH_SIZE;
-	return (0);
+	unsigned int write_idx;
+
+	write_idx = conn_info->mem->write;
+
+	memcpy (&conn_info->dispatch_buffer[write_idx], msg, len);
+	conn_info->mem->write = (write_idx + len) % (DISPATCH_SIZE);
 }
 
 static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
@@ -934,6 +923,46 @@ retry_accept:
 	return (0);
 }
 
+static int
+coroipcs_memory_map (char *path, void **buf, size_t bytes)
+{
+	int fd;
+	void *addr_orig;
+	void *addr;
+	int res;
+ 
+	fd = open (path, O_RDWR, 0600);
+
+	unlink (path);
+
+	res = ftruncate (fd, bytes);
+
+	addr_orig = mmap (NULL, bytes << 1, PROT_NONE,
+		MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
+ 
+	if (addr_orig == MAP_FAILED) {
+		return (-1);
+	}
+ 
+	addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
+		MAP_FIXED | MAP_SHARED, fd, 0);
+ 
+	if (addr != addr_orig) {
+		return (-1);
+	}
+ 
+	addr = mmap (((char *)addr_orig) + bytes,
+                  bytes, PROT_READ | PROT_WRITE,
+                  MAP_FIXED | MAP_SHARED, fd, 0);
+ 
+	res = close (fd);
+	if (res) {
+		return (-1);
+	}
+	*buf = addr_orig;
+	return (0);
+}
+
 int coroipcs_handler_dispatch (
 	int fd,
 	int revent,
@@ -987,6 +1016,11 @@ int coroipcs_handler_dispatch (
 
 		conn_info->shmkey = req_setup->shmkey;
 		conn_info->semkey = req_setup->semkey;
+		res = coroipcs_memory_map (
+			req_setup->dispatch_file,
+			(void *)&conn_info->dispatch_buffer,
+			DISPATCH_SIZE);
+
 		conn_info->service = req_setup->service;
 		conn_info->refcount = 0;
 		conn_info->notify_flow_control_enabled = 0;

+ 17 - 16
include/corosync/coroipcc.h

@@ -67,32 +67,35 @@ struct saHandleDatabase {
 };
 
 
-cs_error_t
+extern cs_error_t
 coroipcc_service_connect (
 	const char *socket_name,
 	enum service_types service,
 	void **ipc_context);
 
-cs_error_t
+extern cs_error_t
 coroipcc_service_disconnect (
 	void *ipc_context);
 
-int
+extern int
 coroipcc_fd_get (
 	void *ipc_context);
 
-int
-coroipcc_dispatch_recv (
+extern int
+coroipcc_dispatch_get (
 	void *ipc_context,
-	void *buf,
-	size_t buflen,
+	void **buf,
 	int timeout);
 
-int
+extern int
+coroipcc_dispatch_put (
+	void *ipc_context);
+
+extern int
 coroipcc_dispatch_flow_control_get (
 	void *ipc_context);
 
-cs_error_t
+extern cs_error_t
 coroipcc_msg_send_reply_receive (
 	void *ipc_context,
 	const struct iovec *iov,
@@ -100,35 +103,33 @@ coroipcc_msg_send_reply_receive (
 	void *res_msg,
 	size_t res_len);
 
-cs_error_t
+extern cs_error_t
 coroipcc_msg_send_reply_receive_in_buf (
 	void *ipc_context,
 	const struct iovec *iov,
 	unsigned int iov_len,
 	void **res_msg);
 
-cs_error_t
+extern cs_error_t
 saHandleCreate (
 	struct saHandleDatabase *handleDatabase,
 	int instanceSize,
 	uint64_t *handleOut);
 
-cs_error_t
+extern cs_error_t
 saHandleDestroy (
 	struct saHandleDatabase *handleDatabase,
 	uint64_t handle);
 
-cs_error_t
+extern cs_error_t
 saHandleInstanceGet (
 	struct saHandleDatabase *handleDatabase,
 	uint64_t handle,
 	void **instance);
 
-cs_error_t
+extern cs_error_t
 saHandleInstancePut (
 	struct saHandleDatabase *handleDatabase,
 	uint64_t handle);
 
-#define offset_of(type,member) (int)(&(((type *)0)->member))
-
 #endif /* COROIPC_H_DEFINED */

+ 2 - 2
include/corosync/ipc_gen.h

@@ -66,12 +66,11 @@ enum req_init_types {
 
 #define REQ_SIZE			1000000
 #define RES_SIZE			1000000
-#define DISPATCH_SIZE			1000000
+#define DISPATCH_SIZE			8192*128
 
 struct shared_memory {
 	unsigned char req_buffer[REQ_SIZE];
 	unsigned char res_buffer[RES_SIZE];
-	unsigned char dispatch_buffer[DISPATCH_SIZE];
 	unsigned int read;
 	unsigned int write;
 };
@@ -89,6 +88,7 @@ typedef struct {
 	int service __attribute__((aligned(8)));
 	unsigned long long shmkey __attribute__((aligned(8)));
 	unsigned long long semkey __attribute__((aligned(8)));
+	char dispatch_file[64]__attribute__((aligned(8)));
 } mar_req_setup_t __attribute__((aligned(8)));
 
 typedef struct {

+ 13 - 21
lib/cfg.c

@@ -56,11 +56,6 @@
 #include <corosync/ipc_cfg.h>
 #include <corosync/coroipcc.h>
 
-struct cfg_res_overlay {
-	mar_res_header_t header;
-	char data[4096];
-};
-
 /*
  * Data structure for instance data
  */
@@ -170,15 +165,8 @@ corosync_cfg_dispatch (
 	int dispatch_avail;
 	struct cfg_instance *cfg_instance;
 	struct res_lib_cfg_testshutdown *res_lib_cfg_testshutdown;
-#ifdef COMPILE_OUT
-	struct res_lib_corosync_healthcheckcallback *res_lib_corosync_healthcheckcallback;
-	struct res_lib_corosync_readinessstatesetcallback *res_lib_corosync_readinessstatesetcallback;
-	struct res_lib_corosync_csisetcallback *res_lib_corosync_csisetcallback;
-	struct res_lib_corosync_csiremovecallback *res_lib_corosync_csiremovecallback;
-	struct res_lib_cfg_statetrackcallback *res_lib_cfg_statetrackcallback;
-#endif
 	corosync_cfg_callbacks_t callbacks;
-	struct cfg_res_overlay dispatch_data;
+	mar_res_header_t *dispatch_data;
 
 	error = saHandleInstanceGet (&cfg_hdb, cfg_handle,
 		(void *)&cfg_instance);
@@ -194,10 +182,12 @@ corosync_cfg_dispatch (
 	}
 
 	do {
-		dispatch_avail = coroipcc_dispatch_recv (cfg_instance->ipc_ctx,
-							 (void *)&dispatch_data,
-							 sizeof (dispatch_data),
-							 timeout);
+		pthread_mutex_lock (&cfg_instance->dispatch_mutex);
+
+		dispatch_avail = coroipcc_dispatch_get (
+			cfg_instance->ipc_ctx,
+			(void **)&dispatch_data,
+			timeout);
 
 		/*
 		 * Handle has been finalized in another thread
@@ -205,7 +195,7 @@ corosync_cfg_dispatch (
 		if (cfg_instance->finalize == 1) {
 			error = CS_OK;
 			pthread_mutex_unlock (&cfg_instance->dispatch_mutex);
-			goto error_unlock;
+			goto error_put;
 		}
 
 		if (dispatch_avail == 0 && dispatch_flags == CS_DISPATCH_ALL) {
@@ -228,18 +218,20 @@ corosync_cfg_dispatch (
 		/*
 		 * Dispatch incoming response
 		 */
-		switch (dispatch_data.header.id) {
+		switch (dispatch_data->id) {
 		case MESSAGE_RES_CFG_TESTSHUTDOWN:
 			if (callbacks.corosync_cfg_shutdown_callback) {
-				res_lib_cfg_testshutdown = (struct res_lib_cfg_testshutdown *)&dispatch_data;
+				res_lib_cfg_testshutdown = (struct res_lib_cfg_testshutdown *)dispatch_data;
 				callbacks.corosync_cfg_shutdown_callback(cfg_handle, res_lib_cfg_testshutdown->flags);
 			}
 			break;
 		default:
+			coroipcc_dispatch_put (cfg_instance->ipc_ctx);
 			error = CS_ERR_LIBRARY;
 			goto error_nounlock;
 			break;
 		}
+		coroipcc_dispatch_put (cfg_instance->ipc_ctx);
 
 		/*
 		 * Determine if more messages should be processed
@@ -255,7 +247,7 @@ corosync_cfg_dispatch (
 		}
 	} while (cont);
 
-error_unlock:
+error_put:
 	(void)saHandleInstancePut (&cfg_hdb, cfg_handle);
 error_nounlock:
 	return (error);

+ 11 - 15
lib/confdb.c

@@ -288,11 +288,6 @@ cs_error_t confdb_context_set (
 	return (CS_OK);
 }
 
-struct confdb_res_overlay {
-	mar_res_header_t header __attribute__((aligned(8)));
-	char data[512000];
-};
-
 cs_error_t confdb_dispatch (
 	confdb_handle_t handle,
 	cs_dispatch_flags_t dispatch_types)
@@ -306,7 +301,7 @@ cs_error_t confdb_dispatch (
 	struct res_lib_confdb_key_change_callback *res_key_changed_pt;
 	struct res_lib_confdb_object_create_callback *res_object_created_pt;
 	struct res_lib_confdb_object_destroy_callback *res_object_destroyed_pt;
-	struct confdb_res_overlay dispatch_data;
+	mar_res_header_t *dispatch_data;
 
 	error = saHandleInstanceGet (&confdb_handle_t_db, handle, (void *)&confdb_inst);
 	if (error != CS_OK) {
@@ -329,11 +324,10 @@ cs_error_t confdb_dispatch (
 	do {
 		pthread_mutex_lock (&confdb_inst->dispatch_mutex);
 
-		dispatch_avail = coroipcc_dispatch_recv (confdb_inst->ipc_ctx,
-							 (void *)&dispatch_data,
-							 sizeof (dispatch_data),
-							 timeout);
-
+		dispatch_avail = coroipcc_dispatch_get (
+			confdb_inst->ipc_ctx,
+			(void **)&dispatch_data,
+			timeout);
 
 		/*
 		 * Handle has been finalized in another thread
@@ -365,9 +359,9 @@ cs_error_t confdb_dispatch (
 		/*
 		 * Dispatch incoming message
 		 */
-		switch (dispatch_data.header.id) {
+		switch (dispatch_data->id) {
 			case MESSAGE_RES_CONFDB_KEY_CHANGE_CALLBACK:
-				res_key_changed_pt = (struct res_lib_confdb_key_change_callback *)&dispatch_data;
+				res_key_changed_pt = (struct res_lib_confdb_key_change_callback *)dispatch_data;
 
 				callbacks.confdb_key_change_notify_fn(handle,
 					res_key_changed_pt->change_type,
@@ -382,7 +376,7 @@ cs_error_t confdb_dispatch (
 				break;
 
 			case MESSAGE_RES_CONFDB_OBJECT_CREATE_CALLBACK:
-				res_object_created_pt = (struct res_lib_confdb_object_create_callback *)&dispatch_data;
+				res_object_created_pt = (struct res_lib_confdb_object_create_callback *)dispatch_data;
 
 				callbacks.confdb_object_create_change_notify_fn(handle,
 					res_object_created_pt->object_handle,
@@ -392,7 +386,7 @@ cs_error_t confdb_dispatch (
 				break;
 
 			case MESSAGE_RES_CONFDB_OBJECT_DESTROY_CALLBACK:
-				res_object_destroyed_pt = (struct res_lib_confdb_object_destroy_callback *)&dispatch_data;
+				res_object_destroyed_pt = (struct res_lib_confdb_object_destroy_callback *)dispatch_data;
 
 				callbacks.confdb_object_delete_change_notify_fn(handle,
 					res_object_destroyed_pt->parent_object_handle,
@@ -401,10 +395,12 @@ cs_error_t confdb_dispatch (
 				break;
 
 			default:
+				coroipcc_dispatch_put (confdb_inst->ipc_ctx);
 				error = CS_ERR_LIBRARY;
 				goto error_noput;
 				break;
 		}
+		coroipcc_dispatch_put (confdb_inst->ipc_ctx);
 
 		/*
 		 * Determine if more messages should be processed

+ 90 - 57
lib/coroipcc.c

@@ -56,6 +56,7 @@
 #include <assert.h>
 #include <sys/shm.h>
 #include <sys/sem.h>
+#include <sys/mman.h>
 
 #include <corosync/corotypes.h>
 #include <corosync/ipc_gen.h>
@@ -80,6 +81,7 @@ struct ipc_segment {
 	int semid;
 	int flow_control_state;
 	struct shared_memory *shared_memory;
+	void *dispatch_buffer;
 	uid_t euid;
 };
 
@@ -275,6 +277,61 @@ union semun {
 };
 #endif
 	
+static int
+coroipcc_memory_map (char *path, const char *file, void **buf, size_t bytes)
+{
+	int fd;
+	void *addr_orig;
+	void *addr;
+	int res;
+
+	sprintf (path, "/dev/shm/%s", file);
+ 
+	fd = mkstemp (path);
+	if (fd == -1) {
+		sprintf (path, "/var/run/%s", file);
+		fd = mkstemp (path);
+		if (fd == -1) {
+			return (-1);
+		}
+	}
+
+	res = ftruncate (fd, bytes);
+
+	addr_orig = mmap (NULL, bytes << 1, PROT_NONE,
+		MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
+ 
+	if (addr_orig == MAP_FAILED) {
+		return (-1);
+	}
+ 
+	addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
+		MAP_FIXED | MAP_SHARED, fd, 0);
+ 
+	if (addr != addr_orig) {
+		return (-1);
+	}
+ 
+	addr = mmap (((char *)addr_orig) + bytes,
+                  bytes, PROT_READ | PROT_WRITE,
+                  MAP_FIXED | MAP_SHARED, fd, 0);
+ 
+	res = close (fd);
+	if (res) {
+		return (-1);
+	}
+	*buf = addr_orig;
+	return (0);
+}
+ 
+static void
+coroipcc_memory_unmap (void *addr, size_t bytes)
+{
+	int res;
+ 
+	res = munmap (addr, bytes);
+}
+
 cs_error_t
 coroipcc_service_connect (
 	const char *socket_name,
@@ -291,6 +348,7 @@ coroipcc_service_connect (
 	mar_req_setup_t req_setup;
 	mar_res_setup_t res_setup;
 	union semun semun;
+	char dispatch_map_path[128];
 
 	res_setup.error = CS_ERR_LIBRARY;
 
@@ -373,8 +431,13 @@ coroipcc_service_connect (
 		goto error_exit;
 	}
 
+	res = coroipcc_memory_map (dispatch_map_path,
+		"dispatch_bufer-XXXXXX",
+		&ipc_segment->dispatch_buffer, DISPATCH_SIZE);
+	strcpy (req_setup.dispatch_file, dispatch_map_path);
 	req_setup.shmkey = shmkey;
 	req_setup.semkey = semkey;
+
 	req_setup.service = service;
 
 	error = coroipcc_send (request_fd, &req_setup, sizeof (mar_req_setup_t));
@@ -418,6 +481,7 @@ coroipcc_service_disconnect (
 	shutdown (ipc_segment->fd, SHUT_RDWR);
 	close (ipc_segment->fd);
 	shmdt (ipc_segment->shared_memory);
+	coroipcc_memory_unmap (ipc_segment->dispatch_buffer, (DISPATCH_SIZE));
 	free (ipc_segment);
 	return (CS_OK);
 }
@@ -431,7 +495,6 @@ coroipcc_dispatch_flow_control_get (
 	return (ipc_segment->flow_control_state);
 }
 
-
 int
 coroipcc_fd_get (void *ipc_ctx)
 {
@@ -440,43 +503,16 @@ coroipcc_fd_get (void *ipc_ctx)
 	return (ipc_segment->fd);
 }
 
-static void memcpy_swrap (void *dest, size_t dest_len,
-			  void *src, int len, unsigned int *n_read)
-{
-	char *dest_chr = (char *)dest;
-	char *src_chr = (char *)src;
-
-	unsigned int first_read;
-	unsigned int second_read;
-
-	first_read = len;
-	second_read = 0;
-
-	if (len + *n_read >= DISPATCH_SIZE) {
-		first_read = DISPATCH_SIZE - *n_read;
-		second_read = (len + *n_read) % DISPATCH_SIZE;
-	}
-	memcpy (dest_chr, &src_chr[*n_read], first_read);
-	if (second_read) {
-		memcpy (&dest_chr[first_read], src_chr,
-			second_read);
-	}
-	*n_read = (*n_read + len) % (DISPATCH_SIZE);
-}
-int original_flow = -1;
-
 int
-coroipcc_dispatch_recv (void *ipc_ctx, void *data, size_t buflen, int timeout)
+coroipcc_dispatch_get (void *ipc_ctx, void **data, int timeout)
 {
 	struct pollfd ufds;
-	struct sembuf sop;
 	int poll_events;
-	mar_res_header_t *header;
 	char buf;
 	struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_ctx;
 	int res;
-	unsigned int my_read;
 	char buf_two = 1;
+	char *data_addr;
 
 	ufds.fd = ipc_segment->fd;
 	ufds.events = POLLIN;
@@ -530,10 +566,27 @@ retry_recv:
 		return (0);
 	}
 
+	data_addr = ipc_segment->dispatch_buffer;
+
+	data_addr = &data_addr[ipc_segment->shared_memory->read];
+
+	*data = (void *)data_addr;
+	return (1);
+}
+
+int
+coroipcc_dispatch_put (void *ipc_ctx)
+{
+	struct sembuf sop;
+	mar_res_header_t *header;
+	struct ipc_segment *ipc_segment = (struct ipc_segment *)ipc_ctx;
+	int res;
+	char *addr;
+	unsigned int read_idx;
+
 	sop.sem_num = 2;
 	sop.sem_op = -1;
 	sop.sem_flg = 0;
-
 retry_semop:
 	res = semop (ipc_segment->semid, &sop, 1);
 	if (res == -1 && errno == EINTR) {
@@ -547,33 +600,13 @@ retry_semop:
 		return (-1);
 	}
 
-	if (buflen < DISPATCH_SIZE) {
-		return -1;
-	}
-
-	if (ipc_segment->shared_memory->read + sizeof (mar_res_header_t) >= DISPATCH_SIZE) {
-		my_read = ipc_segment->shared_memory->read;
-		memcpy_swrap (data, DISPATCH_SIZE,
-			ipc_segment->shared_memory->dispatch_buffer,
-			sizeof (mar_res_header_t),
-			&ipc_segment->shared_memory->read);
-		header = (mar_res_header_t *)data;
-		memcpy_swrap (
-			(void *)((char *)data + sizeof (mar_res_header_t)),
-			DISPATCH_SIZE,
-			ipc_segment->shared_memory->dispatch_buffer,
-			header->size - sizeof (mar_res_header_t),
-			&ipc_segment->shared_memory->read);
-	} else {
-		header = (mar_res_header_t *)&ipc_segment->shared_memory->dispatch_buffer[ipc_segment->shared_memory->read];
-		memcpy_swrap (
-			data, DISPATCH_SIZE,
-			ipc_segment->shared_memory->dispatch_buffer,
-			header->size,
-			&ipc_segment->shared_memory->read);
-	}
+	addr = ipc_segment->dispatch_buffer;
 
-	return (1);
+	read_idx = ipc_segment->shared_memory->read;
+	header = (mar_res_header_t *) &addr[read_idx];
+	ipc_segment->shared_memory->read =
+		(read_idx + header->size) % (DISPATCH_SIZE);
+	return (0);
 }
 
 static cs_error_t

+ 10 - 17
lib/cpg.c

@@ -222,11 +222,6 @@ cs_error_t cpg_context_set (
 	return (CS_OK);
 }
 
-struct res_overlay {
-	mar_res_header_t header __attribute__((aligned(8)));
-	char data[512000];
-};
-
 cs_error_t cpg_dispatch (
 	cpg_handle_t handle,
 	cs_dispatch_flags_t dispatch_types)
@@ -239,7 +234,7 @@ cs_error_t cpg_dispatch (
 	struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
 	struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
 	cpg_callbacks_t callbacks;
-	struct res_overlay dispatch_data;
+	mar_res_header_t *dispatch_data;
 	int ignore_dispatch = 0;
 	struct cpg_address member_list[CPG_MEMBERS_MAX];
 	struct cpg_address left_list[CPG_MEMBERS_MAX];
@@ -265,17 +260,13 @@ cs_error_t cpg_dispatch (
 	do {
 		pthread_mutex_lock (&cpg_inst->dispatch_mutex);
 
-		dispatch_avail = coroipcc_dispatch_recv (cpg_inst->ipc_ctx,
-							 (void *)&dispatch_data,
-							 sizeof (dispatch_data),
-							 timeout);
+		dispatch_avail = coroipcc_dispatch_get (
+			cpg_inst->ipc_ctx,
+			(void **)&dispatch_data,
+			timeout);
 
 		pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
 
-		if (error != CS_OK) {
-			goto error_put;
-		}
-
 		if (dispatch_avail == 0 && dispatch_types == CPG_DISPATCH_ALL) {
 			pthread_mutex_unlock (&cpg_inst->dispatch_mutex);
 			break; /* exit do while cont is 1 loop */
@@ -302,9 +293,9 @@ cs_error_t cpg_dispatch (
 		/*
 		 * Dispatch incoming message
 		 */
-		switch (dispatch_data.header.id) {
+		switch (dispatch_data->id) {
 		case MESSAGE_RES_CPG_DELIVER_CALLBACK:
-			res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)&dispatch_data;
+			res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
 
 			marshall_from_mar_cpg_name_t (
 				&group_name,
@@ -319,7 +310,7 @@ cs_error_t cpg_dispatch (
 			break;
 
 		case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
-			res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)&dispatch_data;
+			res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
 
 			for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
 				marshall_from_mar_cpg_address_t (&member_list[i],
@@ -353,10 +344,12 @@ cs_error_t cpg_dispatch (
 			break;
 
 		default:
+			coroipcc_dispatch_put (cpg_inst->ipc_ctx);
 			error = CS_ERR_LIBRARY;
 			goto error_put;
 			break;
 		}
+		coroipcc_dispatch_put (cpg_inst->ipc_ctx);
 
 		/*
 		 * Determine if more messages should be processed

+ 22 - 33
lib/evs.c

@@ -65,11 +65,6 @@ struct evs_inst {
 	pthread_mutex_t dispatch_mutex;
 };
 
-struct res_overlay {
-	mar_res_header_t header __attribute__((aligned(8)));
-	char data[512000];
-};
-
 static void evs_instance_destructor (void *instance);
 
 static struct saHandleDatabase evs_handle_t_db = {
@@ -208,7 +203,7 @@ evs_error_t evs_dispatch (
 	struct res_evs_confchg_callback *res_evs_confchg_callback;
 	struct res_evs_deliver_callback *res_evs_deliver_callback;
 	evs_callbacks_t callbacks;
-	struct res_overlay dispatch_data;
+	mar_res_header_t *dispatch_data;
 	int ignore_dispatch = 0;
 
 	error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
@@ -225,35 +220,29 @@ evs_error_t evs_dispatch (
 	}
 
 	do {
-		dispatch_avail = coroipcc_dispatch_recv (evs_inst->ipc_ctx,
-							 (void *)&dispatch_data,
-							 sizeof (dispatch_data),
-							 timeout);
-		if (dispatch_avail == -1) {
-			error = CS_ERR_LIBRARY;
-			goto error_nounlock;
-		}
-			
-
 		pthread_mutex_lock (&evs_inst->dispatch_mutex);
 
-		/*
-		 * Handle has been finalized in another thread
-		 */
-		if (evs_inst->finalize == 1) {
-			error = EVS_OK;
-			pthread_mutex_unlock (&evs_inst->dispatch_mutex);
-			goto error_unlock;
-		}
+		dispatch_avail = coroipcc_dispatch_get (
+			evs_inst->ipc_ctx,
+			(void **)&dispatch_data,
+			timeout);
+
+		pthread_mutex_unlock (&evs_inst->dispatch_mutex);
 
 		if (dispatch_avail == 0 && dispatch_types == EVS_DISPATCH_ALL) {
-			pthread_mutex_unlock (&evs_inst->dispatch_mutex);
 			break; /* exit do while cont is 1 loop */
 		} else 
 		if (dispatch_avail == 0) {
-			pthread_mutex_unlock (&evs_inst->dispatch_mutex);
 			continue; /* next dispatch event */
 		}
+		if (dispatch_avail == -1) {
+			if (evs_inst->finalize == 1) {
+				error = CS_OK;
+			} else {
+				error = CS_ERR_LIBRARY;
+			}
+			goto error_put;
+		}
 
 		/*
 		 * Make copy of callbacks, message data, unlock instance, and call callback
@@ -262,13 +251,12 @@ evs_error_t evs_dispatch (
 		*/
 		memcpy (&callbacks, &evs_inst->callbacks, sizeof (evs_callbacks_t));
 
-		pthread_mutex_unlock (&evs_inst->dispatch_mutex);
 		/*
 		 * Dispatch incoming message
 		 */
-		switch (dispatch_data.header.id) {
+		switch (dispatch_data->id) {
 		case MESSAGE_RES_EVS_DELIVER_CALLBACK:
-			res_evs_deliver_callback = (struct res_evs_deliver_callback *)&dispatch_data;
+			res_evs_deliver_callback = (struct res_evs_deliver_callback *)dispatch_data;
 			callbacks.evs_deliver_fn (
 				res_evs_deliver_callback->local_nodeid,
 				&res_evs_deliver_callback->msg,
@@ -276,7 +264,7 @@ evs_error_t evs_dispatch (
 			break;
 
 		case MESSAGE_RES_EVS_CONFCHG_CALLBACK:
-			res_evs_confchg_callback = (struct res_evs_confchg_callback *)&dispatch_data;
+			res_evs_confchg_callback = (struct res_evs_confchg_callback *)dispatch_data;
 			callbacks.evs_confchg_fn (
 				res_evs_confchg_callback->member_list,
 				res_evs_confchg_callback->member_list_entries,
@@ -287,10 +275,12 @@ evs_error_t evs_dispatch (
 			break;
 
 		default:
+			coroipcc_dispatch_put (evs_inst->ipc_ctx);
 			error = CS_ERR_LIBRARY;
-			goto error_nounlock;
+			goto error_put;
 			break;
 		}
+		coroipcc_dispatch_put (evs_inst->ipc_ctx);
 
 		/*
 		 * Determine if more messages should be processed
@@ -313,9 +303,8 @@ evs_error_t evs_dispatch (
 		}
 	} while (cont);
 
-error_unlock:
+error_put:
 	saHandleInstancePut (&evs_handle_t_db, handle);
-error_nounlock:
 	return (error);
 }
 

+ 9 - 12
lib/quorum.c

@@ -349,11 +349,6 @@ error_exit:
 	return (error);
 }
 
-struct quorum_res_overlay {
-	mar_res_header_t header __attribute__((aligned(8)));
-	char data[512000];
-};
-
 cs_error_t quorum_dispatch (
 	quorum_handle_t handle,
 	cs_dispatch_flags_t dispatch_types)
@@ -364,7 +359,7 @@ cs_error_t quorum_dispatch (
 	int dispatch_avail;
 	struct quorum_inst *quorum_inst;
 	quorum_callbacks_t callbacks;
-	struct quorum_res_overlay dispatch_data;
+	mar_res_header_t *dispatch_data;
 	struct res_lib_quorum_notification *res_lib_quorum_notification;
 
 	if (dispatch_types != CS_DISPATCH_ONE &&
@@ -391,10 +386,10 @@ cs_error_t quorum_dispatch (
 	do {
 		pthread_mutex_lock (&quorum_inst->dispatch_mutex);
 
-		dispatch_avail = coroipcc_dispatch_recv (quorum_inst->ipc_ctx,
-							 (void *)&dispatch_data,
-							 sizeof (dispatch_data),
-							 timeout);
+		dispatch_avail = coroipcc_dispatch_get (
+			quorum_inst->ipc_ctx,
+			(void **)&dispatch_data,
+			timeout);
 
 		/*
 		 * Handle has been finalized in another thread
@@ -424,13 +419,13 @@ cs_error_t quorum_dispatch (
 		/*
 		 * Dispatch incoming message
 		 */
-		switch (dispatch_data.header.id) {
+		switch (dispatch_data->id) {
 
 		case MESSAGE_RES_QUORUM_NOTIFICATION:
 			if (callbacks.quorum_notify_fn == NULL) {
 				continue;
 			}
-			res_lib_quorum_notification = (struct res_lib_quorum_notification *)&dispatch_data;
+			res_lib_quorum_notification = (struct res_lib_quorum_notification *)dispatch_data;
 
 			callbacks.quorum_notify_fn ( handle,
 				res_lib_quorum_notification->quorate,
@@ -440,10 +435,12 @@ cs_error_t quorum_dispatch (
 			break;
 
 		default:
+			coroipcc_dispatch_put (quorum_inst->ipc_ctx);
 			error = CS_ERR_LIBRARY;
 			goto error_put;
 			break;
 		}
+		coroipcc_dispatch_put (quorum_inst->ipc_ctx);
 
 		/*
 		 * Determine if more messages should be processed

+ 10 - 14
lib/votequorum.c

@@ -732,12 +732,6 @@ cs_error_t votequorum_fd_get (
 	return (CS_OK);
 }
 
-
-struct res_overlay {
-	mar_res_header_t header __attribute__((aligned(8)));
-	char data[512000];
-};
-
 cs_error_t votequorum_dispatch (
 	votequorum_handle_t handle,
 	cs_dispatch_flags_t dispatch_types)
@@ -748,7 +742,7 @@ cs_error_t votequorum_dispatch (
 	int dispatch_avail;
 	struct votequorum_inst *votequorum_inst;
 	votequorum_callbacks_t callbacks;
-	struct res_overlay dispatch_data;
+	mar_res_header_t *dispatch_data;
 	struct res_lib_votequorum_notification *res_lib_votequorum_notification;
 	struct res_lib_votequorum_expectedvotes_notification *res_lib_votequorum_expectedvotes_notification;
 
@@ -776,10 +770,10 @@ cs_error_t votequorum_dispatch (
 	do {
 		pthread_mutex_lock (&votequorum_inst->dispatch_mutex);
 
-		dispatch_avail = coroipcc_dispatch_recv (votequorum_inst->ipc_ctx,
-							 (void *)&dispatch_data,
-							 sizeof (dispatch_data),
-							 timeout);
+		dispatch_avail = coroipcc_dispatch_get (
+			votequorum_inst->ipc_ctx,
+			(void **)&dispatch_data,
+			timeout);
 
 		/*
 		 * Handle has been finalized in another thread
@@ -809,13 +803,13 @@ cs_error_t votequorum_dispatch (
 		/*
 		 * Dispatch incoming message
 		 */
-		switch (dispatch_data.header.id) {
+		switch (dispatch_data->id) {
 
 		case MESSAGE_RES_VOTEQUORUM_NOTIFICATION:
 			if (callbacks.votequorum_notify_fn == NULL) {
 				continue;
 			}
-			res_lib_votequorum_notification = (struct res_lib_votequorum_notification *)&dispatch_data;
+			res_lib_votequorum_notification = (struct res_lib_votequorum_notification *)dispatch_data;
 
 			callbacks.votequorum_notify_fn ( handle,
 							 res_lib_votequorum_notification->context,
@@ -829,7 +823,7 @@ cs_error_t votequorum_dispatch (
 			if (callbacks.votequorum_expectedvotes_notify_fn == NULL) {
 				continue;
 			}
-			res_lib_votequorum_expectedvotes_notification = (struct res_lib_votequorum_expectedvotes_notification *)&dispatch_data;
+			res_lib_votequorum_expectedvotes_notification = (struct res_lib_votequorum_expectedvotes_notification *)dispatch_data;
 
 			callbacks.votequorum_expectedvotes_notify_fn ( handle,
 								       res_lib_votequorum_expectedvotes_notification->context,
@@ -837,10 +831,12 @@ cs_error_t votequorum_dispatch (
 			break;
 
 		default:
+			coroipcc_dispatch_put (votequorum_inst->ipc_ctx);
 			error = CS_ERR_LIBRARY;
 			goto error_put;
 			break;
 		}
+		coroipcc_dispatch_put (votequorum_inst->ipc_ctx);
 
 		/*
 		 * Determine if more messages should be processed

+ 0 - 1
test/evsbench.c

@@ -74,7 +74,6 @@ static void evs_deliver_fn (
 	size_t msg_len)
 {
   const char *m = msg;
-  printf ("Delivering message %s\n", m);
 }
 
 static void evs_confchg_fn (

+ 0 - 3
test/testevs.c

@@ -126,8 +126,6 @@ int main (void)
 	printf ("Init result %d\n", result);
 	result = evs_join (handle, groups, 3);
 	printf ("Join result %d\n", result);
-	result = evs_leave (handle, &groups[0], 1);
-	printf ("Leave result %d\n", result);
 	delivery_string = "evs_mcast_joined";
 
 	/*
@@ -145,7 +143,6 @@ try_again_one:
 		result = evs_mcast_joined (handle, EVS_TYPE_AGREED,
 			&iov, 1);
 		if (result == CS_ERR_TRY_AGAIN) {
-//printf ("try again\n");
 			goto try_again_one;
 		}
 		result = evs_dispatch (handle, CS_DISPATCH_ALL);