Преглед изворни кода

Zero copy feature for IPC transmits. Also integrated into CPG library
service.


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@2114 fd59a12c-fef9-0310-b244-a6a79926bd2f

Steven Dake пре 17 година
родитељ
комит
75c4bc0d71

+ 251 - 45
exec/coroipcs.c

@@ -91,6 +91,12 @@ struct outq_item {
 	struct list_head list;
 	struct list_head list;
 };
 };
 
 
+struct zcb_mapped {
+	struct list_head list;
+	void *addr;
+	size_t size;
+};
+
 #if defined(_SEM_SEMUN_UNDEFINED)
 #if defined(_SEM_SEMUN_UNDEFINED)
 union semun {
 union semun {
 	int val;
 	int val;
@@ -130,6 +136,7 @@ struct conn_info {
 	struct list_head list;
 	struct list_head list;
 	char setup_msg[sizeof (mar_req_setup_t)];
 	char setup_msg[sizeof (mar_req_setup_t)];
 	unsigned int setup_bytes_read;
 	unsigned int setup_bytes_read;
+	struct list_head zcb_mapped_list_head;
 	char *sending_allowed_private_data[64];
 	char *sending_allowed_private_data[64];
 };
 };
 
 
@@ -144,8 +151,84 @@ static void ipc_disconnect (struct conn_info *conn_info);
 static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
 static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
 		      int locked);
 		      int locked);
 
 
+static int
+memory_map (const char *path, void **buf, size_t bytes)
+{
+	int fd;
+	void *addr_orig;
+	void *addr;
+	int res;
+ 
+	fd = open (path, O_RDWR, 0600);
+
+	unlink (path);
+
+	res = ftruncate (fd, bytes);
+
+	addr_orig = mmap (NULL, bytes, PROT_NONE,
+		MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
+ 
+	if (addr_orig == MAP_FAILED) {
+		return (-1);
+	}
+ 
+	addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
+		MAP_FIXED | MAP_SHARED, fd, 0);
+ 
+	if (addr != addr_orig) {
+		return (-1);
+	}
+ 
+	res = close (fd);
+	if (res) {
+		return (-1);
+	}
+	*buf = addr_orig;
+	return (0);
+}
+
+static int
+circular_memory_map (const char *path, void **buf, size_t bytes)
+{
+	int fd;
+	void *addr_orig;
+	void *addr;
+	int res;
+ 
+	fd = open (path, O_RDWR, 0600);
+
+	unlink (path);
+
+	res = ftruncate (fd, bytes);
+
+	addr_orig = mmap (NULL, bytes << 1, PROT_NONE,
+		MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
+ 
+	if (addr_orig == MAP_FAILED) {
+		return (-1);
+	}
+ 
+	addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
+		MAP_FIXED | MAP_SHARED, fd, 0);
+ 
+	if (addr != addr_orig) {
+		return (-1);
+	}
+ 
+	addr = mmap (((char *)addr_orig) + bytes,
+                  bytes, PROT_READ | PROT_WRITE,
+                  MAP_FIXED | MAP_SHARED, fd, 0);
+ 
+	res = close (fd);
+	if (res) {
+		return (-1);
+	}
+	*buf = addr_orig;
+	return (0);
+}
+
 static inline int
 static inline int
-coroipcs_circular_memory_unmap (void *buf, size_t bytes)
+circular_memory_unmap (void *buf, size_t bytes)
 {
 {
 	int res;
 	int res;
 
 
@@ -154,6 +237,83 @@ coroipcs_circular_memory_unmap (void *buf, size_t bytes)
 	return (res);
 	return (res);
 }
 }
 
 
+static inline int zcb_free (struct zcb_mapped *zcb_mapped)
+{
+	unsigned int res;
+
+	res = munmap (zcb_mapped->addr, zcb_mapped->size);
+	list_del (&zcb_mapped->list);
+	free (zcb_mapped);
+	return (res);
+}
+
+static inline int zcb_by_addr_free (struct conn_info *conn_info, void *addr)
+{
+	struct list_head *list;
+	struct zcb_mapped *zcb_mapped;
+	unsigned int res = 0;
+
+	for (list = conn_info->zcb_mapped_list_head.next;
+		list != &conn_info->zcb_mapped_list_head; list = list->next) {
+
+		zcb_mapped = list_entry (list, struct zcb_mapped, list);
+
+		if (zcb_mapped->addr == addr) {
+			res = zcb_free (zcb_mapped);
+			break;
+		}
+
+	}
+	return (res);
+}
+
+static inline int zcb_all_free (
+	struct conn_info *conn_info)
+{
+	struct list_head *list;
+	struct zcb_mapped *zcb_mapped;
+
+	for (list = conn_info->zcb_mapped_list_head.next;
+		list != &conn_info->zcb_mapped_list_head;) {
+
+		zcb_mapped = list_entry (list, struct zcb_mapped, list);
+
+		list = list->next;
+
+		zcb_free (zcb_mapped);
+	}
+	return (0);
+}
+
+static inline int zcb_alloc (
+	struct conn_info *conn_info,
+	const char *path_to_file,
+	size_t size,
+	void **addr)
+{
+	struct zcb_mapped *zcb_mapped;
+	unsigned int res;
+
+	zcb_mapped = malloc (sizeof (struct zcb_mapped));
+	if (zcb_mapped == NULL) {
+		return (-1);
+	}
+
+	res = memory_map (
+		path_to_file,
+		addr,
+		size);
+	if (res == -1) {
+		return (-1);
+	}
+
+	list_init (&zcb_mapped->list);
+	zcb_mapped->addr = *addr;
+	zcb_mapped->size = size;
+	list_add_tail (&zcb_mapped->list, &conn_info->zcb_mapped_list_head);
+	return (0);
+}
+
 static int ipc_thread_active (void *conn)
 static int ipc_thread_active (void *conn)
 {
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
 	struct conn_info *conn_info = (struct conn_info *)conn;
@@ -250,7 +410,8 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
 		api->free (conn_info->private_data);
 		api->free (conn_info->private_data);
 	}
 	}
 	close (conn_info->fd);
 	close (conn_info->fd);
-	res = coroipcs_circular_memory_unmap (conn_info->dispatch_buffer, DISPATCH_SIZE);
+	res = circular_memory_unmap (conn_info->dispatch_buffer, DISPATCH_SIZE);
+	zcb_all_free (conn_info);
 	api->free (conn_info);
 	api->free (conn_info);
 	api->serialize_unlock ();
 	api->serialize_unlock ();
 	return (-1);
 	return (-1);
@@ -261,6 +422,83 @@ struct res_overlay {
 	char buf[4096];
 	char buf[4096];
 };
 };
 
 
+union u {
+	uint64_t server_addr;
+	void *server_ptr;
+};
+
+static uint64_t void2serveraddr (void *server_ptr)
+{
+	union u u;
+
+	u.server_ptr = server_ptr;
+	return (u.server_addr);
+}
+
+static void *serveraddr2void (uint64_t server_addr)
+{
+	union u u;
+
+	u.server_addr = server_addr;
+	return (u.server_ptr);
+}; 
+
+static inline void zerocopy_operations_process (
+	struct conn_info *conn_info,
+	mar_req_header_t **header_out,
+	unsigned int *new_message)
+{
+	mar_req_header_t *header;
+
+	header = (mar_req_header_t *)conn_info->mem->req_buffer;
+	if (header->id == ZC_ALLOC_HEADER) {
+		mar_req_coroipcc_zc_alloc_t *hdr = (mar_req_coroipcc_zc_alloc_t *)header;
+		mar_res_header_t res_header;
+		void *addr = NULL;
+		struct coroipcs_zc_header *zc_header;
+		unsigned int res;
+
+		res = zcb_alloc (conn_info, hdr->path_to_file, hdr->map_size,
+			&addr);
+
+		zc_header = (struct coroipcs_zc_header *)addr;
+		zc_header->server_address = void2serveraddr(addr);
+
+		res_header.size = sizeof (mar_res_header_t);
+		res_header.id = 0;
+		coroipcs_response_send (
+			conn_info, &res_header, 
+			res_header.size);
+		*new_message = 0;
+		return;
+	} else 
+	if (header->id == ZC_FREE_HEADER) {
+		mar_req_coroipcc_zc_free_t *hdr = (mar_req_coroipcc_zc_free_t *)header;
+		mar_res_header_t res_header;
+		void *addr = NULL;
+
+		addr = serveraddr2void (hdr->server_address);
+
+		zcb_by_addr_free (conn_info, addr);
+
+		res_header.size = sizeof (mar_res_header_t);
+		res_header.id = 0;
+		coroipcs_response_send (
+			conn_info, &res_header, 
+			res_header.size);
+
+		*new_message = 0;
+		return;
+	} else 
+	if (header->id == ZC_EXECUTE_HEADER) {
+		mar_req_coroipcc_zc_execute_t *hdr = (mar_req_coroipcc_zc_execute_t *)header;
+		
+		header = (mar_req_header_t *)(((char *)serveraddr2void(hdr->server_address) + sizeof (struct coroipcs_zc_header)));
+	}
+	*header_out = header;
+	*new_message = 1;
+}
+
 static void *pthread_ipc_consumer (void *conn)
 static void *pthread_ipc_consumer (void *conn)
 {
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
 	struct conn_info *conn_info = (struct conn_info *)conn;
@@ -269,6 +507,7 @@ static void *pthread_ipc_consumer (void *conn)
 	mar_req_header_t *header;
 	mar_req_header_t *header;
 	struct res_overlay res_overlay;
 	struct res_overlay res_overlay;
 	int send_ok;
 	int send_ok;
+	unsigned int new_message;
 
 
 	if (api->sched_priority != 0) {
 	if (api->sched_priority != 0) {
 		struct sched_param sched_param;
 		struct sched_param sched_param;
@@ -295,9 +534,15 @@ retry_semop:
 			pthread_exit (0);
 			pthread_exit (0);
 		}
 		}
 
 
-		coroipcs_refcount_inc (conn_info);
+		zerocopy_operations_process (conn_info, &header, &new_message);
+		/*
+		 * There is no new message to process, continue for loop
+		 */
+		if (new_message == 0) {
+			continue;
+		}
 
 
-                header = (mar_req_header_t *)conn_info->mem->req_buffer;
+		coroipcs_refcount_inc (conn);
 
 
 		send_ok = api->sending_allowed (conn_info->service,
 		send_ok = api->sending_allowed (conn_info->service,
 			header->id,
 			header->id,
@@ -497,6 +742,7 @@ static int conn_info_create (int fd)
 	conn_info->state = CONN_STATE_THREAD_INACTIVE;
 	conn_info->state = CONN_STATE_THREAD_INACTIVE;
 	list_init (&conn_info->outq_head);
 	list_init (&conn_info->outq_head);
 	list_init (&conn_info->list);
 	list_init (&conn_info->list);
+	list_init (&conn_info->zcb_mapped_list_head);
 	list_add (&conn_info->list, &conn_info_list_head);
 	list_add (&conn_info->list, &conn_info_list_head);
 
 
         api->poll_dispatch_add (fd, conn_info);
         api->poll_dispatch_add (fd, conn_info);
@@ -933,46 +1179,6 @@ retry_accept:
 	return (0);
 	return (0);
 }
 }
 
 
-static int
-coroipcs_memory_map (char *path, void **buf, size_t bytes)
-{
-	int fd;
-	void *addr_orig;
-	void *addr;
-	int res;
- 
-	fd = open (path, O_RDWR, 0600);
-
-	unlink (path);
-
-	res = ftruncate (fd, bytes);
-
-	addr_orig = mmap (NULL, bytes << 1, PROT_NONE,
-		MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
- 
-	if (addr_orig == MAP_FAILED) {
-		return (-1);
-	}
- 
-	addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
-		MAP_FIXED | MAP_SHARED, fd, 0);
- 
-	if (addr != addr_orig) {
-		return (-1);
-	}
- 
-	addr = mmap (((char *)addr_orig) + bytes,
-                  bytes, PROT_READ | PROT_WRITE,
-                  MAP_FIXED | MAP_SHARED, fd, 0);
- 
-	res = close (fd);
-	if (res) {
-		return (-1);
-	}
-	*buf = addr_orig;
-	return (0);
-}
-
 int coroipcs_handler_dispatch (
 int coroipcs_handler_dispatch (
 	int fd,
 	int fd,
 	int revent,
 	int revent,
@@ -1026,7 +1232,7 @@ int coroipcs_handler_dispatch (
 
 
 		conn_info->shmkey = req_setup->shmkey;
 		conn_info->shmkey = req_setup->shmkey;
 		conn_info->semkey = req_setup->semkey;
 		conn_info->semkey = req_setup->semkey;
-		res = coroipcs_memory_map (
+		res = circular_memory_map (
 			req_setup->dispatch_file,
 			req_setup->dispatch_file,
 			(void *)&conn_info->dispatch_buffer,
 			(void *)&conn_info->dispatch_buffer,
 			DISPATCH_SIZE);
 			DISPATCH_SIZE);

+ 20 - 2
include/corosync/coroipcc.h

@@ -87,8 +87,27 @@ coroipcc_msg_send_reply_receive_in_buf (
 	unsigned int iov_len,
 	unsigned int iov_len,
 	void **res_msg);
 	void **res_msg);
 
 
+extern cs_error_t
+coroipcc_zcb_alloc (
+	void *ipc_context,
+	void **buffer,
+	size_t size,
+        size_t header_size);
+
+extern cs_error_t
+coroipcc_zcb_free (
+	void *ipc_context,
+	void *buffer);
+
+extern cs_error_t
+coroipcc_zcb_msg_send_reply_receive (
+	void *ipc_context,
+	void *msg,
+	void *res_msg,
+	size_t res_len);
+
 /*
 /*
- * This needs to be removed
+ * TODO This needs to be removed
  */
  */
 struct saHandleDatabase {
 struct saHandleDatabase {
 	unsigned int handleCount;
 	unsigned int handleCount;
@@ -115,7 +134,6 @@ static void database_name##_init(void)					\
         saHandleDatabaseLock_init (&(database_name));			\
         saHandleDatabaseLock_init (&(database_name));			\
 }
 }
 
 
-
 extern cs_error_t
 extern cs_error_t
 saHandleCreate (
 saHandleCreate (
 	struct saHandleDatabase *handleDatabase,
 	struct saHandleDatabase *handleDatabase,

+ 15 - 0
include/corosync/cpg.h

@@ -198,4 +198,19 @@ cs_error_t cpg_flow_control_state_get (
 	cpg_handle_t handle,
 	cpg_handle_t handle,
 	cpg_flow_control_state_t *flow_control_enabled);
 	cpg_flow_control_state_t *flow_control_enabled);
 
 
+cs_error_t cpg_zcb_alloc (
+	cpg_handle_t handle,
+	size_t size,
+	void **buffer);
+
+cs_error_t cpg_zcb_free (
+	cpg_handle_t handle,
+	void *buffer);
+
+cs_error_t cpg_zcb_mcast_joined (
+	cpg_handle_t handle,
+	cpg_guarantee_t guarantee,
+	void *msg,
+	size_t msg_len);
+
 #endif /* COROSYNC_CPG_H_DEFINED */
 #endif /* COROSYNC_CPG_H_DEFINED */

+ 25 - 0
include/corosync/ipc_gen.h

@@ -126,6 +126,27 @@ typedef struct {
 	void *conn __attribute__((aligned(8)));
 	void *conn __attribute__((aligned(8)));
 } mar_message_source_t __attribute__((aligned(8)));
 } mar_message_source_t __attribute__((aligned(8)));
 
 
+typedef struct {
+        mar_req_header_t header __attribute__((aligned(8)));
+        size_t map_size __attribute__((aligned(8)));
+        char path_to_file[128] __attribute__((aligned(8)));
+} mar_req_coroipcc_zc_alloc_t __attribute__((aligned(8)));
+
+typedef struct {
+        mar_req_header_t header __attribute__((aligned(8)));
+        size_t map_size __attribute__((aligned(8)));
+	uint64_t server_address __attribute__((aligned(8)));
+} mar_req_coroipcc_zc_free_t __attribute__((aligned(8)));
+
+typedef struct {
+        mar_req_header_t header __attribute__((aligned(8)));
+	uint64_t server_address __attribute__((aligned(8)));
+} mar_req_coroipcc_zc_execute_t __attribute__((aligned(8)));
+
+struct coroipcs_zc_header {
+	int map_size;
+	uint64_t server_address;
+};
 static inline void swab_mar_message_source_t (mar_message_source_t *to_swab)
 static inline void swab_mar_message_source_t (mar_message_source_t *to_swab)
 {
 {
 	swab_mar_uint32_t (&to_swab->nodeid);
 	swab_mar_uint32_t (&to_swab->nodeid);
@@ -137,4 +158,8 @@ static inline void swab_mar_message_source_t (mar_message_source_t *to_swab)
 	to_swab->conn = NULL;
 	to_swab->conn = NULL;
 }
 }
 
 
+#define ZC_ALLOC_HEADER		0xFFFFFFFF
+#define ZC_FREE_HEADER		0xFFFFFFFE
+#define ZC_EXECUTE_HEADER	0xFFFFFFFD
+
 #endif /* IPC_GEN_H_DEFINED */
 #endif /* IPC_GEN_H_DEFINED */

+ 155 - 4
lib/coroipcc.c

@@ -278,7 +278,7 @@ union semun {
 #endif
 #endif
 	
 	
 static int
 static int
-coroipcc_memory_map (char *path, const char *file, void **buf, size_t bytes)
+circular_memory_map (char *path, const char *file, void **buf, size_t bytes)
 {
 {
 	int fd;
 	int fd;
 	void *addr_orig;
 	void *addr_orig;
@@ -325,13 +325,56 @@ coroipcc_memory_map (char *path, const char *file, void **buf, size_t bytes)
 }
 }
  
  
 static void
 static void
-coroipcc_memory_unmap (void *addr, size_t bytes)
+memory_unmap (void *addr, size_t bytes)
 {
 {
 	int res;
 	int res;
  
  
 	res = munmap (addr, bytes);
 	res = munmap (addr, bytes);
 }
 }
 
 
+static int
+memory_map (char *path, const char *file, void **buf, size_t bytes)
+{
+	int fd;
+	void *addr_orig;
+	void *addr;
+	int res;
+
+	sprintf (path, "/dev/shm/%s", file);
+ 
+	fd = mkstemp (path);
+	if (fd == -1) {
+		sprintf (path, "/var/run/%s", file);
+		fd = mkstemp (path);
+		if (fd == -1) {
+			return (-1);
+		}
+	}
+
+	res = ftruncate (fd, bytes);
+
+	addr_orig = mmap (NULL, bytes, PROT_NONE,
+		MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
+ 
+	if (addr_orig == MAP_FAILED) {
+		return (-1);
+	}
+ 
+	addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
+		MAP_FIXED | MAP_SHARED, fd, 0);
+ 
+	if (addr != addr_orig) {
+		return (-1);
+	}
+ 
+	res = close (fd);
+	if (res) {
+		return (-1);
+	}
+	*buf = addr_orig;
+	return (0);
+}
+ 
 cs_error_t
 cs_error_t
 coroipcc_service_connect (
 coroipcc_service_connect (
 	const char *socket_name,
 	const char *socket_name,
@@ -431,7 +474,7 @@ coroipcc_service_connect (
 		goto error_exit;
 		goto error_exit;
 	}
 	}
 
 
-	res = coroipcc_memory_map (dispatch_map_path,
+	res = circular_memory_map (dispatch_map_path,
 		"dispatch_bufer-XXXXXX",
 		"dispatch_bufer-XXXXXX",
 		&ipc_segment->dispatch_buffer, DISPATCH_SIZE);
 		&ipc_segment->dispatch_buffer, DISPATCH_SIZE);
 	strcpy (req_setup.dispatch_file, dispatch_map_path);
 	strcpy (req_setup.dispatch_file, dispatch_map_path);
@@ -481,7 +524,10 @@ coroipcc_service_disconnect (
 	shutdown (ipc_segment->fd, SHUT_RDWR);
 	shutdown (ipc_segment->fd, SHUT_RDWR);
 	close (ipc_segment->fd);
 	close (ipc_segment->fd);
 	shmdt (ipc_segment->shared_memory);
 	shmdt (ipc_segment->shared_memory);
-	coroipcc_memory_unmap (ipc_segment->dispatch_buffer, (DISPATCH_SIZE) << 1);
+	/*
+	 * << 1 (or multiplied by 2) because this is a wrapped memory buffer
+	 */
+	memory_unmap (ipc_segment->dispatch_buffer, (DISPATCH_SIZE) << 1);
 	free (ipc_segment);
 	free (ipc_segment);
 	return (CS_OK);
 	return (CS_OK);
 }
 }
@@ -796,6 +842,111 @@ void saHandleDatabaseLock_init (struct saHandleDatabase *hdb)
 #endif
 #endif
 
 
 
 
+cs_error_t
+coroipcc_zcb_alloc (
+	void *ipc_context,
+	void **buffer,
+	size_t size,
+	size_t header_size)
+{
+	void *buf = NULL;
+	char path[128];
+	unsigned int res;
+	mar_req_coroipcc_zc_alloc_t req_coroipcc_zc_alloc;
+	mar_res_header_t res_coroipcs_zc_alloc;
+	size_t map_size;
+	struct iovec iovec;
+	struct coroipcs_zc_header *hdr;
+
+	map_size = size + header_size + sizeof (struct coroipcs_zc_header);
+	res = memory_map (path, "cpg_zc-XXXXXX", &buf, size);
+	assert (res != -1);
+
+	req_coroipcc_zc_alloc.header.size = sizeof (mar_req_coroipcc_zc_alloc_t);
+	req_coroipcc_zc_alloc.header.id = ZC_ALLOC_HEADER;
+	req_coroipcc_zc_alloc.map_size = map_size;
+	strcpy (req_coroipcc_zc_alloc.path_to_file, path);
+
+
+	iovec.iov_base = &req_coroipcc_zc_alloc;
+	iovec.iov_len = sizeof (mar_req_coroipcc_zc_alloc_t);
+
+	res = coroipcc_msg_send_reply_receive (
+		ipc_context,
+		&iovec,
+		1,
+		&res_coroipcs_zc_alloc,
+		sizeof (mar_res_header_t));
+
+	hdr = (struct coroipcs_zc_header *)buf;
+	hdr->map_size = map_size;
+	*buffer = ((char *)buf) + sizeof (struct coroipcs_zc_header);
+	return (CS_OK);
+}
+
+cs_error_t
+coroipcc_zcb_free (
+	void *ipc_context,
+	void *buffer)
+{
+	mar_req_coroipcc_zc_free_t req_coroipcc_zc_free;
+	mar_res_header_t res_coroipcs_zc_free;
+	struct iovec iovec;
+	unsigned int res;
+
+	struct coroipcs_zc_header *header = (struct coroipcs_zc_header *)((char *)buffer - sizeof (struct coroipcs_zc_header));
+
+	req_coroipcc_zc_free.header.size = sizeof (mar_req_coroipcc_zc_free_t);
+	req_coroipcc_zc_free.header.id = ZC_FREE_HEADER;
+	req_coroipcc_zc_free.map_size = header->map_size;
+	req_coroipcc_zc_free.server_address = header->server_address;
+
+	iovec.iov_base = &req_coroipcc_zc_free;
+	iovec.iov_len = sizeof (mar_req_coroipcc_zc_free_t);
+
+	res = coroipcc_msg_send_reply_receive (
+		ipc_context,
+		&iovec,
+		1,
+		&res_coroipcs_zc_free,
+		sizeof (mar_res_header_t));
+
+	munmap (header, header->map_size);
+
+	return (CS_OK);
+}
+
+cs_error_t
+coroipcc_zcb_msg_send_reply_receive (
+        void *ipc_context,
+        void *msg,
+        void *res_msg,
+        size_t res_len)
+{
+	mar_req_coroipcc_zc_execute_t req_coroipcc_zc_execute;
+	struct coroipcs_zc_header *hdr;
+	struct iovec iovec;
+	cs_error_t res;
+
+	hdr = (struct coroipcs_zc_header *)(((char *)msg) - sizeof (struct coroipcs_zc_header));
+
+	req_coroipcc_zc_execute.header.size = sizeof (mar_req_coroipcc_zc_execute_t);
+	req_coroipcc_zc_execute.header.id = ZC_EXECUTE_HEADER;
+	req_coroipcc_zc_execute.server_address = hdr->server_address;
+
+	iovec.iov_base = &req_coroipcc_zc_execute;
+	iovec.iov_len = sizeof (mar_req_coroipcc_zc_execute_t);
+
+	res = coroipcc_msg_send_reply_receive (
+		ipc_context,
+		&iovec,
+		1,
+		res_msg,
+		res_len);
+
+	return (res);
+}
+		
 cs_error_t
 cs_error_t
 saHandleCreate (
 saHandleCreate (
 	struct saHandleDatabase *handleDatabase,
 	struct saHandleDatabase *handleDatabase,

+ 142 - 53
lib/cpg.c

@@ -477,59 +477,6 @@ error_exit:
 	return (error);
 	return (error);
 }
 }
 
 
-cs_error_t cpg_mcast_joined (
-	cpg_handle_t handle,
-	cpg_guarantee_t guarantee,
-	const struct iovec *iovec,
-	unsigned int iov_len)
-{
-	int i;
-	cs_error_t error;
-	struct cpg_inst *cpg_inst;
-	struct iovec iov[64];
-	struct req_lib_cpg_mcast req_lib_cpg_mcast;
-	struct res_lib_cpg_mcast res_lib_cpg_mcast;
-	size_t msg_len = 0;
-
-	error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
-	if (error != CS_OK) {
-		return (error);
-	}
-
-	for (i = 0; i < iov_len; i++ ) {
-		msg_len += iovec[i].iov_len;
-	}
-
-	req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) +
-		msg_len;
-
-	req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_MCAST;
-	req_lib_cpg_mcast.guarantee = guarantee;
-	req_lib_cpg_mcast.msglen = msg_len;
-
-	iov[0].iov_base = &req_lib_cpg_mcast;
-	iov[0].iov_len = sizeof (struct req_lib_cpg_mcast);
-	memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
-
-	pthread_mutex_lock (&cpg_inst->response_mutex);
-
-	error = coroipcc_msg_send_reply_receive (cpg_inst->ipc_ctx, iov,
-		iov_len + 1, &res_lib_cpg_mcast, sizeof (res_lib_cpg_mcast));
-
-	pthread_mutex_unlock (&cpg_inst->response_mutex);
-
-	if (error != CS_OK) {
-		goto error_exit;
-	}
-
-	error = res_lib_cpg_mcast.header.error;
-
-error_exit:
-	saHandleInstancePut (&cpg_handle_t_db, handle);
-
-	return (error);
-}
-
 cs_error_t cpg_membership_get (
 cs_error_t cpg_membership_get (
 	cpg_handle_t handle,
 	cpg_handle_t handle,
 	struct cpg_name *group_name,
 	struct cpg_name *group_name,
@@ -644,4 +591,146 @@ cs_error_t cpg_flow_control_state_get (
 
 
 	return (error);
 	return (error);
 }
 }
+
+cs_error_t cpg_zcb_alloc (
+	cpg_handle_t handle,
+	size_t size,
+	void **buffer)
+{
+	cs_error_t error;
+	struct cpg_inst *cpg_inst;
+
+	error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
+	if (error != CS_OK) {
+		return (error);
+	}
+
+	error = coroipcc_zcb_alloc (cpg_inst->ipc_ctx,
+		buffer,
+		size,
+		sizeof (struct req_lib_cpg_mcast));
+
+	saHandleInstancePut (&cpg_handle_t_db, handle);
+	*buffer = ((char *)*buffer) + sizeof (struct req_lib_cpg_mcast);
+
+	return (error);
+}
+
+cs_error_t cpg_zcb_free (
+	cpg_handle_t handle,
+	void *buffer)
+{
+	cs_error_t error;
+	struct cpg_inst *cpg_inst;
+
+	error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
+	if (error != CS_OK) {
+		return (error);
+	}
+
+	coroipcc_zcb_free (cpg_inst->ipc_ctx, ((char *)buffer) - sizeof (struct req_lib_cpg_mcast));
+
+	saHandleInstancePut (&cpg_handle_t_db, handle);
+
+	return (error);
+}
+
+cs_error_t cpg_zcb_mcast_joined (
+	cpg_handle_t handle,
+	cpg_guarantee_t guarantee,
+	void *msg,
+	size_t msg_len)
+{
+	cs_error_t error;
+	struct cpg_inst *cpg_inst;
+	struct req_lib_cpg_mcast *req_lib_cpg_mcast;
+	struct res_lib_cpg_mcast res_lib_cpg_mcast;
+
+	error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
+	if (error != CS_OK) {
+		return (error);
+	}
+
+	req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)(((char *)msg) - sizeof (struct req_lib_cpg_mcast));
+	req_lib_cpg_mcast->header.size = sizeof (struct req_lib_cpg_mcast) +
+		msg_len;
+
+	req_lib_cpg_mcast->header.id = MESSAGE_REQ_CPG_MCAST;
+	req_lib_cpg_mcast->guarantee = guarantee;
+	req_lib_cpg_mcast->msglen = msg_len;
+
+	pthread_mutex_lock (&cpg_inst->response_mutex);
+
+	error = coroipcc_zcb_msg_send_reply_receive (
+		cpg_inst->ipc_ctx,
+		req_lib_cpg_mcast,
+		&res_lib_cpg_mcast,
+		sizeof (res_lib_cpg_mcast));
+
+	pthread_mutex_unlock (&cpg_inst->response_mutex);
+
+	if (error != CS_OK) {
+		goto error_exit;
+	}
+
+	error = res_lib_cpg_mcast.header.error;
+
+error_exit:
+	saHandleInstancePut (&cpg_handle_t_db, handle);
+
+	return (error);
+}
+
+cs_error_t cpg_mcast_joined (
+	cpg_handle_t handle,
+	cpg_guarantee_t guarantee,
+	const struct iovec *iovec,
+	unsigned int iov_len)
+{
+	int i;
+	cs_error_t error;
+	struct cpg_inst *cpg_inst;
+	struct iovec iov[64];
+	struct req_lib_cpg_mcast req_lib_cpg_mcast;
+	struct res_lib_cpg_mcast res_lib_cpg_mcast;
+	size_t msg_len = 0;
+
+	error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
+	if (error != CS_OK) {
+		return (error);
+	}
+
+	for (i = 0; i < iov_len; i++ ) {
+		msg_len += iovec[i].iov_len;
+	}
+
+	req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) +
+		msg_len;
+
+	req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_MCAST;
+	req_lib_cpg_mcast.guarantee = guarantee;
+	req_lib_cpg_mcast.msglen = msg_len;
+
+	iov[0].iov_base = &req_lib_cpg_mcast;
+	iov[0].iov_len = sizeof (struct req_lib_cpg_mcast);
+	memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
+
+	pthread_mutex_lock (&cpg_inst->response_mutex);
+
+	error = coroipcc_msg_send_reply_receive (cpg_inst->ipc_ctx, iov,
+		iov_len + 1, &res_lib_cpg_mcast, sizeof (res_lib_cpg_mcast));
+
+	pthread_mutex_unlock (&cpg_inst->response_mutex);
+
+	if (error != CS_OK) {
+		goto error_exit;
+	}
+
+	error = res_lib_cpg_mcast.header.error;
+
+error_exit:
+	saHandleInstancePut (&cpg_handle_t_db, handle);
+
+	return (error);
+}
 /** @} */
 /** @} */

+ 3 - 0
lib/libcfg.versions

@@ -24,6 +24,9 @@ COROSYNC_CFG_0.82 {
 		coroipcc_dispatch_recv;
 		coroipcc_dispatch_recv;
 		coroipcc_msg_send_reply_receive;
 		coroipcc_msg_send_reply_receive;
 		coroipcc_msg_send_reply_receive_in_buf;
 		coroipcc_msg_send_reply_receive_in_buf;
+		coroipcc_zcb_alloc;
+		coroipcc_zcb_free;
+		coroipcc_zcb_msg_send_reply_receive;
 		saHandleCreate;
 		saHandleCreate;
 		saHandleDestroy;
 		saHandleDestroy;
 		saHandleInstanceGet;
 		saHandleInstanceGet;

+ 3 - 0
lib/libconfdb.versions

@@ -30,6 +30,9 @@ COROSYNC_CONFDB_1.0 {
 		coroipcc_dispatch_recv;
 		coroipcc_dispatch_recv;
 		coroipcc_msg_send_reply_receive;
 		coroipcc_msg_send_reply_receive;
 		coroipcc_msg_send_reply_receive_in_buf;
 		coroipcc_msg_send_reply_receive_in_buf;
+		coroipcc_zcb_alloc;
+		coroipcc_zcb_free;
+		coroipcc_zcb_msg_send_reply_receive;
 		saHandleCreate;
 		saHandleCreate;
 		saHandleDestroy;
 		saHandleDestroy;
 		saHandleInstanceGet;
 		saHandleInstanceGet;

+ 5 - 2
lib/libcoroipcc.versions

@@ -1,6 +1,6 @@
-# Version and symbol export for libipcutil.so
+# Version and symbol export for libcoroipcc.so
 
 
-COROSYNC_IPCUTIL_2.0 {
+COROSYNC_COROIPCC_3.0 {
 	global:
 	global:
 		coroipcc_service_connect;
 		coroipcc_service_connect;
 		coroipcc_service_disconnect;
 		coroipcc_service_disconnect;
@@ -9,6 +9,9 @@ COROSYNC_IPCUTIL_2.0 {
 		coroipcc_dispatch_recv;
 		coroipcc_dispatch_recv;
 		coroipcc_msg_send_reply_receive;
 		coroipcc_msg_send_reply_receive;
 		coroipcc_msg_send_reply_receive_in_buf;
 		coroipcc_msg_send_reply_receive_in_buf;
+		coroipcc_zcb_alloc;
+		coroipcc_zcb_free;
+		coroipcc_zcb_msg_send_reply_receive;
 		saHandleCreate;
 		saHandleCreate;
 		saHandleDestroy;
 		saHandleDestroy;
 		saHandleInstanceGet;
 		saHandleInstanceGet;

+ 5 - 0
lib/libcpg.versions

@@ -12,6 +12,8 @@ COROSYNC_CPG_1.0 {
 		cpg_membership_get;
 		cpg_membership_get;
 		cpg_context_get;
 		cpg_context_get;
 		cpg_context_set;
 		cpg_context_set;
+		cpg_zcb_alloc;
+		cpg_zcb_free;
 
 
 	local:
 	local:
 		coroipcc_service_connect;
 		coroipcc_service_connect;
@@ -21,6 +23,9 @@ COROSYNC_CPG_1.0 {
 		coroipcc_dispatch_recv;
 		coroipcc_dispatch_recv;
 		coroipcc_msg_send_reply_receive;
 		coroipcc_msg_send_reply_receive;
 		coroipcc_msg_send_reply_receive_in_buf;
 		coroipcc_msg_send_reply_receive_in_buf;
+		coroipcc_zcb_alloc;
+		coroipcc_zcb_free;
+		coroipcc_zcb_msg_send_reply_receive;
 		saHandleCreate;
 		saHandleCreate;
 		saHandleDestroy;
 		saHandleDestroy;
 		saHandleInstanceGet;
 		saHandleInstanceGet;

+ 3 - 0
lib/libevs.versions

@@ -20,6 +20,9 @@ COROSYNC_EVS_2.0 {
 		coroipcc_dispatch_recv;
 		coroipcc_dispatch_recv;
 		coroipcc_msg_send_reply_receive;
 		coroipcc_msg_send_reply_receive;
 		coroipcc_msg_send_reply_receive_in_buf;
 		coroipcc_msg_send_reply_receive_in_buf;
+		coroipcc_zcb_alloc;
+		coroipcc_zcb_free;
+		coroipcc_zcb_msg_send_reply_receive;
 		saHandleCreate;
 		saHandleCreate;
 		saHandleDestroy;
 		saHandleDestroy;
 		saHandleInstanceGet;
 		saHandleInstanceGet;

+ 3 - 0
lib/libpload.versions

@@ -12,6 +12,9 @@ COROSYNC_PLOAD_1.0 {
 		coroipcc_dispatch_recv;
 		coroipcc_dispatch_recv;
 		coroipcc_msg_send_reply_receive;
 		coroipcc_msg_send_reply_receive;
 		coroipcc_msg_send_reply_receive_in_buf;
 		coroipcc_msg_send_reply_receive_in_buf;
+		coroipcc_zcb_alloc;
+		coroipcc_zcb_free;
+		coroipcc_zcb_msg_send_reply_receive;
 		saHandleCreate;
 		saHandleCreate;
 		saHandleDestroy;
 		saHandleDestroy;
 		saHandleInstanceGet;
 		saHandleInstanceGet;

+ 3 - 0
lib/libquorum.versions

@@ -17,6 +17,9 @@ COROSYNC_QUORUM_1.0 {
 		coroipcc_dispatch_recv;
 		coroipcc_dispatch_recv;
 		coroipcc_msg_send_reply_receive;
 		coroipcc_msg_send_reply_receive;
 		coroipcc_msg_send_reply_receive_in_buf;
 		coroipcc_msg_send_reply_receive_in_buf;
+		coroipcc_zcb_alloc;
+		coroipcc_zcb_free;
+		coroipcc_zcb_msg_send_reply_receive;
 		saHandleCreate;
 		saHandleCreate;
 		saHandleDestroy;
 		saHandleDestroy;
 		saHandleInstanceGet;
 		saHandleInstanceGet;

+ 3 - 0
lib/libvotequorum.versions

@@ -26,6 +26,9 @@ COROSYNC_VOTEQUORUM_1.0 {
 		coroipcc_dispatch_recv;
 		coroipcc_dispatch_recv;
 		coroipcc_msg_send_reply_receive;
 		coroipcc_msg_send_reply_receive;
 		coroipcc_msg_send_reply_receive_in_buf;
 		coroipcc_msg_send_reply_receive_in_buf;
+		coroipcc_zcb_alloc;
+		coroipcc_zcb_free;
+		coroipcc_zcb_msg_send_reply_receive;
 		saHandleCreate;
 		saHandleCreate;
 		saHandleDestroy;
 		saHandleDestroy;
 		saHandleInstanceGet;
 		saHandleInstanceGet;

+ 3 - 0
man/Makefile.am

@@ -69,6 +69,9 @@ dist_man_MANS = \
 	cpg_leave.3 \
 	cpg_leave.3 \
 	cpg_local_get.3 \
 	cpg_local_get.3 \
 	cpg_mcast_joined.3 \
 	cpg_mcast_joined.3 \
+	cpg_zcb_mcast_joined.3 \
+	cpg_zcb_alloc.3 \
+	cpg_zcb_free.3 \
 	cpg_membership_get.3 \
 	cpg_membership_get.3 \
 	evs_dispatch.3 \
 	evs_dispatch.3 \
 	evs_fd_get.3 \
 	evs_fd_get.3 \

+ 5 - 2
man/cpg_context_get.3

@@ -56,6 +56,9 @@ The errors are undocumented.
 .BR cpg_join (3),
 .BR cpg_join (3),
 .BR cpg_leave (3),
 .BR cpg_leave (3),
 .BR cpg_mcast_joined (3),
 .BR cpg_mcast_joined (3),
-.BR cpg_membership_get (3)
-.BR cpg_context_set (3)
+.BR cpg_membership_get (3),
+.BR cpg_context_set (3),
+.BR cpg_zcb_alloc (3),
+.BR cpg_zcb_free (3),
+.BR cpg_zcb_mcast_joined (3)
 .PP
 .PP

+ 3 - 0
man/cpg_context_set.3

@@ -60,4 +60,7 @@ The errors are undocumented.
 .BR cpg_mcast_joined (3),
 .BR cpg_mcast_joined (3),
 .BR cpg_membership_get (3)
 .BR cpg_membership_get (3)
 .BR cpg_context_get (3)
 .BR cpg_context_get (3)
+.BR cpg_zcb_alloc (3),
+.BR cpg_zcb_free (3),
+.BR cpg_zcb_mcast_joined (3)
 .PP
 .PP

+ 3 - 0
man/cpg_fd_get.3

@@ -64,4 +64,7 @@ The errors are undocumented.
 .BR cpg_leave (3),
 .BR cpg_leave (3),
 .BR cpg_mcast_joined (3),
 .BR cpg_mcast_joined (3),
 .BR cpg_membership_get (3)
 .BR cpg_membership_get (3)
+.BR cpg_zcb_alloc (3),
+.BR cpg_zcb_free (3),
+.BR cpg_zcb_mcast_joined (3)
 .PP
 .PP

+ 5 - 1
man/cpg_finalize.3

@@ -59,5 +59,9 @@ The errors are undocumented.
 .BR cpg_join (3),
 .BR cpg_join (3),
 .BR cpg_leave (3),
 .BR cpg_leave (3),
 .BR cpg_mcast_joined (3),
 .BR cpg_mcast_joined (3),
-.BR cpg_membership_get (3)
+.BR cpg_membership_get (3),
+.BR cpg_zcb_alloc (3),
+.BR cpg_zcb_free (3),
+.BR cpg_zcb_mcast_joined (3)
+
 .PP
 .PP

+ 5 - 2
man/cpg_initialize.3

@@ -167,6 +167,9 @@ The errors are undocumented.
 .BR cpg_join (3),
 .BR cpg_join (3),
 .BR cpg_leave (3),
 .BR cpg_leave (3),
 .BR cpg_mcast_joined (3),
 .BR cpg_mcast_joined (3),
-.BR cpg_membership_get (3)
-.BR cpg_groups_get (3)
+.BR cpg_membership_get (3),
+.BR cpg_zcb_alloc (3),
+.BR cpg_zcb_free (3),
+.BR cpg_zcb_mcast_joined (3)
+
 .PP
 .PP

+ 5 - 1
man/cpg_join.3

@@ -100,5 +100,9 @@ Not all errors are documented.
 .BR cpg_dispatch (3),
 .BR cpg_dispatch (3),
 .BR cpg_leave (3),
 .BR cpg_leave (3),
 .BR cpg_mcast_joined (3),
 .BR cpg_mcast_joined (3),
-.BR cpg_membership_get (3)
+.BR cpg_membership_get (3),
+.BR cpg_zcb_alloc (3),
+.BR cpg_zcb_free (3),
+.BR cpg_zcb_mcast_joined (3)
+
 .PP
 .PP

+ 5 - 1
man/cpg_leave.3

@@ -65,5 +65,9 @@ The errors are undocumented.
 .BR cpg_dispatch (3),
 .BR cpg_dispatch (3),
 .BR cpg_join (3),
 .BR cpg_join (3),
 .BR cpg_mcast_joined (3),
 .BR cpg_mcast_joined (3),
-.BR cpg_membership_get (3)
+.BR cpg_membership_get (3),
+.BR cpg_zcb_alloc (3),
+.BR cpg_zcb_free (3),
+.BR cpg_zcb_mcast_joined (3)
+
 .PP
 .PP

+ 5 - 1
man/cpg_local_get.3

@@ -60,5 +60,9 @@ The errors are undocumented.
 .BR cpg_dispatch (3),
 .BR cpg_dispatch (3),
 .BR cpg_leave (3),
 .BR cpg_leave (3),
 .BR cpg_mcast_joined (3),
 .BR cpg_mcast_joined (3),
-.BR cpg_membership_get (3)
+.BR cpg_membership_get (3),
+.BR cpg_zcb_alloc (3),
+.BR cpg_zcb_free (3),
+.BR cpg_zcb_mcast_joined (3)
+
 .PP
 .PP

+ 5 - 1
man/cpg_mcast_joined.3

@@ -129,5 +129,9 @@ The errors are undocumented.
 .BR cpg_dispatch (3),
 .BR cpg_dispatch (3),
 .BR cpg_leave (3),
 .BR cpg_leave (3),
 .BR cpg_join (3),
 .BR cpg_join (3),
-.BR cpg_membership_get (3)
+.BR cpg_membership_get (3),
+.BR cpg_zcb_alloc (3),
+.BR cpg_zcb_free (3),
+.BR cpg_zcb_mcast_joined (3)
+
 .PP
 .PP

+ 5 - 1
man/cpg_membership_get.3

@@ -69,5 +69,9 @@ The errors are undocumented.
 .BR cpg_dispatch (3),
 .BR cpg_dispatch (3),
 .BR cpg_leave (3),
 .BR cpg_leave (3),
 .BR cpg_mcast_joined (3),
 .BR cpg_mcast_joined (3),
-.BR cpg_membership_get (3)
+.BR cpg_membership_get (3),
+.BR cpg_zcb_alloc (3),
+.BR cpg_zcb_free (3),
+.BR cpg_zcb_mcast_joined (3)
+
 .PP
 .PP

+ 10 - 8
man/cpg_overview.8

@@ -1,5 +1,5 @@
 .\"/*
 .\"/*
-.\" * Copyright (c) 2006 Red Hat, Inc.
+.\" * Copyright (c) 2006-2009 Red Hat, Inc.
 .\" *
 .\" *
 .\" * All rights reserved.
 .\" * All rights reserved.
 .\" *
 .\" *
@@ -31,7 +31,7 @@
 .\" * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
 .\" * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
 .\" * THE POSSIBILITY OF SUCH DAMAGE.
 .\" * THE POSSIBILITY OF SUCH DAMAGE.
 .\" */
 .\" */
-.TH CPG_OVERVIEW 8 2006-03-06 "corosync Man Page" "Corosync Cluster Engine Programmer's Manual"
+.TH CPG_OVERVIEW 8 2009-4-15 "corosync Man Page" "Corosync Cluster Engine Programmer's Manual"
 .SH OVERVIEW
 .SH OVERVIEW
 The CPG library is delivered with the corosync project.  This library is used
 The CPG library is delivered with the corosync project.  This library is used
 to create distributed applications that operate properly during partitions, merges,
 to create distributed applications that operate properly during partitions, merges,
@@ -45,12 +45,11 @@ The library provides a mechanism to:
 * Deliver configuration changes
 * Deliver configuration changes
 .PP
 .PP
 .SH SECURITY
 .SH SECURITY
-The CPG library encrypts all messages sent over the network using the SOBER-128
-stream cipher.  The EVS library uses HMAC and SHA1 to authenticate all messages.
-The CPG library uses SOBER-128 as a pseudo random number generator.  TheCPG
-library feeds the PRNG using the /dev/random Linux device.
-.SH BUGS
-This software is not yet production, so there may still be some bugs.
+If encryption is enabled in corosync.conf, the CPG library will encrypt and
+authenticate message contents.  Applications must run as the ais user to be
+validated by corosync on IPC connection, otherwise they will be unable to
+access the corosync services.
+
 .SH "SEE ALSO"
 .SH "SEE ALSO"
 .BR cpg_initialize (3),
 .BR cpg_initialize (3),
 .BR cpg_finalize (3),
 .BR cpg_finalize (3),
@@ -60,5 +59,8 @@ This software is not yet production, so there may still be some bugs.
 .BR cpg_leave (3),
 .BR cpg_leave (3),
 .BR cpg_mcast_joined (3),
 .BR cpg_mcast_joined (3),
 .BR cpg_membership_get (3)
 .BR cpg_membership_get (3)
+.BR cpg_zcb_alloc (3)
+.BR cpg_zcb_free (3)
+.BR cpg_zcb_mcast_joined (3)
 
 
 .PP
 .PP

+ 80 - 0
man/cpg_zcb_alloc.3

@@ -0,0 +1,80 @@
+.\"/*
+.\" * Copyright (c) 2009 Red Hat, Inc.
+.\" *
+.\" * All rights reserved.
+.\" *
+.\" * Author: Steven Dake <sdake@redhat.com>
+.\" *
+.\" * This software licensed under BSD license, the text of which follows:
+.\" * 
+.\" * Redistribution and use in source and binary forms, with or without
+.\" * modification, are permitted provided that the following conditions are met:
+.\" *
+.\" * - Redistributions of source code must retain the above copyright notice,
+.\" *   this list of conditions and the following disclaimer.
+.\" * - Redistributions in binary form must reproduce the above copyright notice,
+.\" *   this list of conditions and the following disclaimer in the documentation
+.\" *   and/or other materials provided with the distribution.
+.\" * - Neither the name of the MontaVista Software, Inc. nor the names of its
+.\" *   contributors may be used to endorse or promote products derived from this
+.\" *   software without specific prior written permission.
+.\" *
+.\" * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+.\" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+.\" * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+.\" * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+.\" * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+.\" * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+.\" * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+.\" * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+.\" * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+.\" * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+.\" * THE POSSIBILITY OF SUCH DAMAGE.
+.\" */
+.TH CPG_ZCB_ALLOC 2009-04-15 "corosync Man Page" "Corosync Cluster Engine Programmer's Manual"
+.SH NAME
+cpg_zcb_alloc \- Allocates a zero copy buffer
+.B #include <corosync/cpg.h>
+.sp
+.BI "int cpg_zcb_alloc(cpg_handle_t " handle ", size_t " size ", void **" buffer ");
+.SH DESCRIPTION
+The
+.B cpg_zcb_alloc
+function will allocate a zero copy buffer for use with the
+.B cpg_zcb_mcast_joined(3)
+funtion.  This buffer should not be used in another thread while a
+cpg_zcb_mcast_joined operation is taking place on the buffer.  The buffer is
+allocated via operating system mechanisms to avoid copying in the IPC layer.
+
+.PP
+The argument
+.I handle
+describes the handle on which the buffer will be allocated.
+.PP
+The argument
+.I size
+requests a buffer of size be allocated.
+.PP
+The
+.I buffer
+argument is set to the buffer address that is allocated by this operatoin.
+
+.SH RETURN VALUE
+This call returns the CPG_OK value if successful, otherwise an error is returned.
+.PP
+.SH ERRORS
+The errors are undocumented.
+.SH "SEE ALSO"
+.BR cpg_overview (8),
+.BR cpg_initialize (3),
+.BR cpg_finalize (3),
+.BR cpg_fd_get (3),
+.BR cpg_dispatch (3),
+.BR cpg_leave (3),
+.BR cpg_join (3),
+.BR cpg_membership_get (3),
+.BR cpg_zcb_alloc (3),
+.BR cpg_zcb_free (3),
+.BR cpg_zcb_mcast_joined (3)
+
+.PP

+ 72 - 0
man/cpg_zcb_free.3

@@ -0,0 +1,72 @@
+.\"/*
+.\" * Copyright (c) 2009 Red Hat, Inc.
+.\" *
+.\" * All rights reserved.
+.\" *
+.\" * Author: Steven Dake <sdake@redhat.com>
+.\" *
+.\" * This software licensed under BSD license, the text of which follows:
+.\" * 
+.\" * Redistribution and use in source and binary forms, with or without
+.\" * modification, are permitted provided that the following conditions are met:
+.\" *
+.\" * - Redistributions of source code must retain the above copyright notice,
+.\" *   this list of conditions and the following disclaimer.
+.\" * - Redistributions in binary form must reproduce the above copyright notice,
+.\" *   this list of conditions and the following disclaimer in the documentation
+.\" *   and/or other materials provided with the distribution.
+.\" * - Neither the name of the MontaVista Software, Inc. nor the names of its
+.\" *   contributors may be used to endorse or promote products derived from this
+.\" *   software without specific prior written permission.
+.\" *
+.\" * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+.\" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+.\" * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+.\" * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+.\" * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+.\" * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+.\" * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+.\" * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+.\" * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+.\" * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+.\" * THE POSSIBILITY OF SUCH DAMAGE.
+.\" */
+.TH CPG_ZCB_FREE 2009-04-15 "corosync Man Page" "Corosync Cluster Engine Programmer's Manual"
+.SH NAME
+cpg_zcb_free \- Frees a zero copy buffer
+.B #include <corosync/cpg.h>
+.sp
+.BI "int cpg_zcb_fre(cpg_handle_t " handle ", void *" buffer ");
+.SH DESCRIPTION
+The
+.B cpg_zcb_free
+function will free a zero copy buffer.
+
+.PP
+The argument
+.I handle
+describes the handle on which the buffer will be allocated.
+.PP
+The argument
+.I buffer
+is the zero copy buffer to free.
+
+.SH RETURN VALUE
+This call returns the CPG_OK value if successful, otherwise an error is returned.
+.PP
+.SH ERRORS
+The errors are undocumented.
+.SH "SEE ALSO"
+.BR cpg_overview (8),
+.BR cpg_initialize (3),
+.BR cpg_finalize (3),
+.BR cpg_fd_get (3),
+.BR cpg_dispatch (3),
+.BR cpg_leave (3),
+.BR cpg_join (3),
+.BR cpg_membership_get (3),
+.BR cpg_zcb_alloc (3),
+.BR cpg_zcb_free (3),
+.BR cpg_zcb_mcast_joined (3)
+
+.PP

+ 121 - 0
man/cpg_zcb_mcast_joined.3

@@ -0,0 +1,121 @@
+.\"/*
+.\" * Copyright (c) 2009 Red Hat, Inc.
+.\" *
+.\" * All rights reserved.
+.\" *
+.\" * Author: Steven Dake <sdake@redhat.com>
+.\" *
+.\" * This software licensed under BSD license, the text of which follows:
+.\" * 
+.\" * Redistribution and use in source and binary forms, with or without
+.\" * modification, are permitted provided that the following conditions are met:
+.\" *
+.\" * - Redistributions of source code must retain the above copyright notice,
+.\" *   this list of conditions and the following disclaimer.
+.\" * - Redistributions in binary form must reproduce the above copyright notice,
+.\" *   this list of conditions and the following disclaimer in the documentation
+.\" *   and/or other materials provided with the distribution.
+.\" * - Neither the name of the MontaVista Software, Inc. nor the names of its
+.\" *   contributors may be used to endorse or promote products derived from this
+.\" *   software without specific prior written permission.
+.\" *
+.\" * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+.\" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+.\" * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+.\" * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+.\" * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+.\" * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+.\" * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+.\" * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+.\" * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+.\" * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+.\" * THE POSSIBILITY OF SUCH DAMAGE.
+.\" */
+.TH CPG_ZCB_MCAST_JOINED 3 3004-08-31 "corosync Man Page" "Corosync Cluster Engine Programmer's Manual"
+.SH NAME
+cpg_mcast_joined \- Multicasts a zero copy buffer to all groups joined to a handle
+.SH SYNOPSIS
+.B #include <sys/uio.h>
+.B #include <corosync/cpg.h>
+.sp
+.BI "int cpg_zcb_mcast_joined(cpg_handle_t " handle ", cpg_gurantee_t " guarantee ", const void *" buffer "", int " msg_len ");
+.SH DESCRIPTION
+The
+.B cpg_zcb_mcast_joined
+function will multicast a zero copy buffer message to all the processes that
+have been joined with the
+.B cpg_join(3)
+funtion for the same group name.
+Messages that are sent to any of the groups joined to the parameter
+.I handle
+will be delivered to all subscribed processes in the system.
+.PP
+The argument
+.I guarantee
+requests a delivery guarantee for the message to be sent.  The cpg_guarantee_t type is
+defined by:
+.IP
+.RS
+.ne 18
+.nf
+.ta 4n 30n 33n
+typedef enum {
+        CPG_TYPE_UNORDERED,     /* not implemented */
+        CPG_TYPE_FIFO,          /* same as agreed */
+        CPG_TYPE_AGREED,        /* implemented */
+        CPG_TYPE_SAFE           /* not implemented */
+} cpg_guarantee_t;
+.ta
+.fi
+.RE
+.IP
+.PP
+.PP
+The meanings of the cpg_guarantee_t typedef are:
+.TP
+.B CPG_TYPE_UNORDERED
+Messages are guaranteed to be delivered, but with no particular order.  This 
+mode is unimplemented in the CPG library.
+.TP
+.B CPG_TYPE_FIFO
+Messages are guaranteed to be delivered in first sent first delivery order.
+In fact, this guarantee is equivalent to the CPG_TYPE_AGREED guarantee.
+.TP
+.B CPG_TYPE_AGREED
+All processors must agree on the order of delivery.  If a message is sent
+from two or more processors at about the same time, the delivery will occur
+in the same order to all processors.
+.TP
+.B CPG_TYPE_SAFE
+All processors must agree on the order of delivery.  Further all processors
+must have a copy of the message before any delivery takes place.  This mode is
+unimplemented in the CPG library.
+.PP
+The
+.I msg
+argument describes the zero copy buffer which is used to transmit a message.
+this buffer must be allocated by 
+.B cpg_zcb_alloc(3).
+
+.PP
+The
+.I msg_len
+argument describes the number of bytes to be transmitted in the zero copy buffer.
+
+.SH RETURN VALUE
+This call returns the CPG_OK value if successful, otherwise an error is returned.
+.PP
+.SH ERRORS
+The errors are undocumented.
+.SH "SEE ALSO"
+.BR cpg_overview (8),
+.BR cpg_initialize (3),
+.BR cpg_finalize (3),
+.BR cpg_fd_get (3),
+.BR cpg_dispatch (3),
+.BR cpg_leave (3),
+.BR cpg_join (3),
+.BR cpg_membership_get (3)
+.BR cpg_zcb_alloc (3)
+.BR cpg_zcb_free (3)
+.PP

+ 7 - 1
test/Makefile.am

@@ -35,7 +35,7 @@ INCLUDES       		= -I$(top_builddir)/include -I$(top_srcdir)/include
 
 
 noinst_PROGRAMS		= testevs evsbench evsverify testcpg testcpg2 cpgbench testconfdb	\
 noinst_PROGRAMS		= testevs evsbench evsverify testcpg testcpg2 cpgbench testconfdb	\
 			logsysbench logsysrec testquorum testvotequorum1 testvotequorum2	\
 			logsysbench logsysrec testquorum testvotequorum1 testvotequorum2	\
-			logsys_s logsys_t1 logsys_t2
+			logsys_s logsys_t1 logsys_t2 testcpgzc cpgbenchzc testzcgc
 
 
 testevs_LDADD		= -levs
 testevs_LDADD		= -levs
 testevs_LDFLAGS		= -L../lib
 testevs_LDFLAGS		= -L../lib
@@ -43,6 +43,10 @@ testcpg_LDADD		= -lcpg
 testcpg_LDFLAGS		= -L../lib
 testcpg_LDFLAGS		= -L../lib
 testcpg2_LDADD		= -lcpg
 testcpg2_LDADD		= -lcpg
 testcpg2_LDFLAGS	= -L../lib
 testcpg2_LDFLAGS	= -L../lib
+testcpgzc_LDADD		= -lcpg
+testcpgzc_LDFLAGS	= -L../lib
+testzcgc_LDADD		= -lcpg
+testzcgc_LDFLAGS	= -L../lib
 testconfdb_LDADD	= -lconfdb ../lcr/liblcr.a
 testconfdb_LDADD	= -lconfdb ../lcr/liblcr.a
 testconfdb_LDFLAGS	= -L../lib
 testconfdb_LDFLAGS	= -L../lib
 testquorum_LDADD	= -lquorum
 testquorum_LDADD	= -lquorum
@@ -57,6 +61,8 @@ evsbench_LDADD		= -levs
 evsbench_LDFLAGS	= -L../lib
 evsbench_LDFLAGS	= -L../lib
 cpgbench_LDADD		= -lcpg
 cpgbench_LDADD		= -lcpg
 cpgbench_LDFLAGS	= -L../lib
 cpgbench_LDFLAGS	= -L../lib
+cpgbenchzc_LDADD	= -lcpg
+cpgbenchzc_LDFLAGS	= -L../lib
 logsysbench_LDADD	= -llogsys
 logsysbench_LDADD	= -llogsys
 logsysbench_LDFLAGS	= -L../exec
 logsysbench_LDFLAGS	= -L../exec
 logsysrec_LDADD		= -llogsys
 logsysrec_LDADD		= -llogsys

+ 195 - 0
test/cpgbenchzc.c

@@ -0,0 +1,195 @@
+#include <assert.h>
+/*
+ * Copyright (c) 2006, 2009 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake@redhat.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <config.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <unistd.h>
+#include <errno.h>
+#include <unistd.h>
+#include <time.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/select.h>
+#include <sys/un.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include <corosync/corotypes.h>
+#include <corosync/cpg.h>
+
+#ifdef COROSYNC_SOLARIS
+#define timersub(a, b, result)						\
+    do {								\
+	(result)->tv_sec = (a)->tv_sec - (b)->tv_sec;			\
+	(result)->tv_usec = (a)->tv_usec - (b)->tv_usec;		\
+	if ((result)->tv_usec < 0) {					\
+	    --(result)->tv_sec;						\
+	    (result)->tv_usec += 1000000;				\
+	}								\
+    } while (0)
+#endif
+
+static int alarm_notice;
+
+static void cpg_bm_confchg_fn (
+	cpg_handle_t handle,
+	const struct cpg_name *group_name,
+	const struct cpg_address *member_list, size_t member_list_entries,
+	const struct cpg_address *left_list, size_t left_list_entries,
+	const struct cpg_address *joined_list, size_t joined_list_entries)
+{
+}
+
+static unsigned int write_count;
+
+static void cpg_bm_deliver_fn (
+        cpg_handle_t handle,
+        const struct cpg_name *group_name,
+        uint32_t nodeid,
+        uint32_t pid,
+        const void *msg,
+        size_t msg_len)
+{
+	write_count++;
+}
+
+static cpg_callbacks_t callbacks = {
+	.cpg_deliver_fn 	= cpg_bm_deliver_fn,
+	.cpg_confchg_fn		= cpg_bm_confchg_fn
+};
+
+
+void *data;
+
+static void cpg_benchmark (
+	cpg_handle_t handle,
+	int write_size)
+{
+	struct timeval tv1, tv2, tv_elapsed;
+	unsigned int res;
+	cpg_flow_control_state_t flow_control_state;
+
+	alarm_notice = 0;
+
+	write_count = 0;
+	alarm (10);
+
+	gettimeofday (&tv1, NULL);
+	do {
+		/*
+		 * Test checkpoint write
+		 */
+		cpg_flow_control_state_get (handle, &flow_control_state);
+		if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
+retry:
+			res = cpg_zcb_mcast_joined (handle, CPG_TYPE_AGREED, data, write_size);
+			if (res == CS_ERR_TRY_AGAIN) {
+				goto retry;
+			}
+		}
+		res = cpg_dispatch (handle, CS_DISPATCH_ALL);
+		if (res != CS_OK) {
+			printf ("cpg dispatch returned error %d\n", res);
+			exit (1);
+		}
+	} while (alarm_notice == 0);
+	gettimeofday (&tv2, NULL);
+	timersub (&tv2, &tv1, &tv_elapsed);
+
+	printf ("%5d messages received ", write_count);
+	printf ("%5d bytes per write ", write_size);
+	printf ("%7.3f Seconds runtime ",
+		(tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
+	printf ("%9.3f TP/s ",
+		((float)write_count) /  (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
+	printf ("%7.3f MB/s.\n",
+		((float)write_count) * ((float)write_size) /  ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
+}
+
+static void sigalrm_handler (int num)
+{
+	alarm_notice = 1;
+}
+
+static struct cpg_name group_name = {
+	.value = "cpg_bm",
+	.length = 6
+};
+
+int main (void) {
+	cpg_handle_t handle;
+	unsigned int size;
+	int i;
+	unsigned int res;
+
+
+
+	size = 1000;
+	signal (SIGALRM, sigalrm_handler);
+	res = cpg_initialize (&handle, &callbacks);
+	if (res != CS_OK) {
+		printf ("cpg_initialize failed with result %d\n", res);
+		exit (1);
+	}
+	cpg_zcb_alloc (handle, 500000, &data);
+	if (res != CS_OK) {
+		printf ("cpg_zcb_alloc couldn't allocate zero copy buffer %d\n", res);
+		exit (1);
+	}
+
+	res = cpg_join (handle, &group_name);
+	if (res != CS_OK) {
+		printf ("cpg_join failed with result %d\n", res);
+		exit (1);
+	}
+
+	for (i = 0; i < 50; i++) { /* number of repetitions - up to 50k */
+		cpg_benchmark (handle, size);
+		size += 1000;
+	}
+
+	res = cpg_finalize (handle);
+	if (res != CS_OK) {
+		printf ("cpg_join failed with result %d\n", res);
+		exit (1);
+	}
+	return (0);
+}

+ 244 - 0
test/testcpgzc.c

@@ -0,0 +1,244 @@
+/*
+ * Copyright (c) 2006-2009 Red Hat Inc
+ *
+ * All rights reserved.
+ *
+ * Author: Christine Caulfield <ccaulfie@redhat.com>
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <config.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <signal.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/select.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include <corosync/corotypes.h>
+#include <corosync/cpg.h>
+
+static int quit = 0;
+static int show_ip = 0;
+
+static void print_cpgname (const struct cpg_name *name)
+{
+	int i;
+
+	for (i = 0; i < name->length; i++) {
+		printf ("%c", name->value[i]);
+	}
+}
+
+static void DeliverCallback (
+	cpg_handle_t handle,
+	const struct cpg_name *groupName,
+	uint32_t nodeid,
+	uint32_t pid,
+	const void *msg,
+	size_t msg_len)
+{
+	if (show_ip) {
+		struct in_addr saddr;
+		saddr.s_addr = nodeid;
+		printf("DeliverCallback: message (len=%lu)from node/pid %s/%d: '%s'\n",
+		       (unsigned long int) msg_len,
+		       inet_ntoa(saddr), pid, (const char *)msg);
+	}
+	else {
+		printf("DeliverCallback: message (len=%lu)from node/pid %d/%d: '%s'\n",
+		       (unsigned long int) msg_len, nodeid, pid,
+		       (const char *)msg);
+	}
+}
+
+static void ConfchgCallback (
+	cpg_handle_t handle,
+	const struct cpg_name *groupName,
+	const struct cpg_address *member_list, size_t member_list_entries,
+	const struct cpg_address *left_list, size_t left_list_entries,
+	const struct cpg_address *joined_list, size_t joined_list_entries)
+{
+	int i;
+	struct in_addr saddr;
+
+	printf("\nConfchgCallback: group '");
+	print_cpgname(groupName);
+	printf("'\n");
+	for (i=0; i<joined_list_entries; i++) {
+		if (show_ip) {
+			saddr.s_addr = joined_list[i].nodeid;
+			printf("joined node/pid: %s/%d reason: %d\n",
+			       inet_ntoa (saddr), joined_list[i].pid,
+			       joined_list[i].reason);
+		}
+		else {
+			printf("joined node/pid: %d/%d reason: %d\n",
+			       joined_list[i].nodeid, joined_list[i].pid,
+			       joined_list[i].reason);
+		}
+	}
+
+	for (i=0; i<left_list_entries; i++) {
+		if (show_ip) {
+			saddr.s_addr = left_list[i].nodeid;
+			printf("left node/pid: %s/%d reason: %d\n",
+			       inet_ntoa (saddr), left_list[i].pid,
+			       left_list[i].reason);
+		}
+		else {
+			printf("left node/pid: %d/%d reason: %d\n",
+			       left_list[i].nodeid, left_list[i].pid,
+			       left_list[i].reason);
+		}
+	}
+
+	printf("nodes in group now %lu\n",
+	       (unsigned long int) member_list_entries);
+	for (i=0; i<member_list_entries; i++) {
+		if (show_ip) {
+			saddr.s_addr = member_list[i].nodeid;
+			printf("node/pid: %s/%d\n",
+			       inet_ntoa (saddr), member_list[i].pid);
+		}
+		else {
+			printf("node/pid: %d/%d\n",
+			       member_list[i].nodeid, member_list[i].pid);
+		}
+	}
+
+	/* Is it us??
+	   NOTE: in reality we should also check the nodeid */
+	if (left_list_entries && left_list[0].pid == getpid()) {
+		printf("We have left the building\n");
+		quit = 1;
+	}
+}
+
+static cpg_callbacks_t callbacks = {
+	.cpg_deliver_fn =            DeliverCallback,
+	.cpg_confchg_fn =            ConfchgCallback,
+};
+
+static void sigintr_handler (int signum) __attribute__((__noreturn__));
+static void sigintr_handler (int signum) {
+	exit (0);
+}
+static struct cpg_name group_name;
+
+int main (int argc, char *argv[]) {
+	cpg_handle_t handle;
+	fd_set read_fds;
+	int select_fd;
+	int result;
+	const char *options = "i";
+	int opt;
+	unsigned int nodeid;
+	char *fgets_res;
+	void *buffer;
+
+	while ( (opt = getopt(argc, argv, options)) != -1 ) {
+		switch (opt) {
+		case 'i':
+			show_ip = 1;
+			break;
+		}
+	}
+
+	if (argc > optind) {
+		strcpy(group_name.value, argv[optind]);
+		group_name.length = strlen(argv[optind])+1;
+	}
+	else {
+		strcpy(group_name.value, "GROUP");
+		group_name.length = 6;
+	}
+
+	result = cpg_initialize (&handle, &callbacks);
+	if (result != CS_OK) {
+		printf ("Could not initialize Cluster Process Group API instance error %d\n", result);
+		exit (1);
+	}
+	cpg_zcb_alloc (handle, 8192, &buffer);
+	cpg_zcb_free (handle, buffer);
+	cpg_zcb_alloc (handle, 8192, &buffer);
+
+	result = cpg_local_get (handle, &nodeid);
+	if (result != CS_OK) {
+		printf ("Could not get local node id\n");
+		exit (1);
+	}
+
+	printf ("Local node id is %x\n", nodeid);
+	result = cpg_join(handle, &group_name);
+	if (result != CS_OK) {
+		printf ("Could not join process group, error %d\n", result);
+		exit (1);
+	}
+
+	FD_ZERO (&read_fds);
+	cpg_fd_get(handle, &select_fd);
+	printf ("Type EXIT to finish\n");
+	do {
+		FD_SET (select_fd, &read_fds);
+		FD_SET (STDIN_FILENO, &read_fds);
+		result = select (select_fd + 1, &read_fds, 0, 0, 0);
+		if (result == -1) {
+			perror ("select\n");
+		}
+		if (FD_ISSET (STDIN_FILENO, &read_fds)) {
+			fgets_res = fgets(buffer, sizeof(buffer), stdin);
+			if (fgets_res == NULL) {
+				cpg_leave(handle, &group_name);
+			}
+			if (strncmp(buffer, "EXIT", 4) == 0) {
+				cpg_leave(handle, &group_name);
+			}
+			else {
+				cpg_zcb_mcast_joined (handle, CPG_TYPE_AGREED,
+					buffer, strlen (buffer) + 1);
+			}
+		}
+		if (FD_ISSET (select_fd, &read_fds)) {
+			if (cpg_dispatch (handle, CS_DISPATCH_ALL) != CS_OK)
+				exit(1);
+		}
+	} while (result && !quit);
+
+
+	result = cpg_finalize (handle);
+	printf ("Finalize  result is %d (should be 1)\n", result);
+	return (0);
+}

+ 179 - 0
test/testzcgc.c

@@ -0,0 +1,179 @@
+/*
+ * Copyright (c) 2006-2009 Red Hat Inc
+ *
+ * All rights reserved.
+ *
+ * Author: Christine Caulfield <ccaulfie@redhat.com>
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <config.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <signal.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/select.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include <corosync/corotypes.h>
+#include <corosync/cpg.h>
+
+static int quit = 0;
+static int show_ip = 0;
+
+static void print_cpgname (const struct cpg_name *name)
+{
+	int i;
+
+	for (i = 0; i < name->length; i++) {
+		printf ("%c", name->value[i]);
+	}
+}
+
+static void DeliverCallback (
+	cpg_handle_t handle,
+	const struct cpg_name *groupName,
+	uint32_t nodeid,
+	uint32_t pid,
+	const void *msg,
+	size_t msg_len)
+{
+	if (show_ip) {
+		struct in_addr saddr;
+		saddr.s_addr = nodeid;
+		printf("DeliverCallback: message (len=%lu)from node/pid %s/%d: '%s'\n",
+		       (unsigned long int) msg_len,
+		       inet_ntoa(saddr), pid, (const char *)msg);
+	}
+	else {
+		printf("DeliverCallback: message (len=%lu)from node/pid %d/%d: '%s'\n",
+		       (unsigned long int) msg_len, nodeid, pid,
+		       (const char *)msg);
+	}
+}
+
+static void ConfchgCallback (
+	cpg_handle_t handle,
+	const struct cpg_name *groupName,
+	const struct cpg_address *member_list, size_t member_list_entries,
+	const struct cpg_address *left_list, size_t left_list_entries,
+	const struct cpg_address *joined_list, size_t joined_list_entries)
+{
+	int i;
+	struct in_addr saddr;
+
+	printf("\nConfchgCallback: group '");
+	print_cpgname(groupName);
+	printf("'\n");
+	for (i=0; i<joined_list_entries; i++) {
+		if (show_ip) {
+			saddr.s_addr = joined_list[i].nodeid;
+			printf("joined node/pid: %s/%d reason: %d\n",
+			       inet_ntoa (saddr), joined_list[i].pid,
+			       joined_list[i].reason);
+		}
+		else {
+			printf("joined node/pid: %d/%d reason: %d\n",
+			       joined_list[i].nodeid, joined_list[i].pid,
+			       joined_list[i].reason);
+		}
+	}
+
+	for (i=0; i<left_list_entries; i++) {
+		if (show_ip) {
+			saddr.s_addr = left_list[i].nodeid;
+			printf("left node/pid: %s/%d reason: %d\n",
+			       inet_ntoa (saddr), left_list[i].pid,
+			       left_list[i].reason);
+		}
+		else {
+			printf("left node/pid: %d/%d reason: %d\n",
+			       left_list[i].nodeid, left_list[i].pid,
+			       left_list[i].reason);
+		}
+	}
+
+	printf("nodes in group now %lu\n",
+	       (unsigned long int) member_list_entries);
+	for (i=0; i<member_list_entries; i++) {
+		if (show_ip) {
+			saddr.s_addr = member_list[i].nodeid;
+			printf("node/pid: %s/%d\n",
+			       inet_ntoa (saddr), member_list[i].pid);
+		}
+		else {
+			printf("node/pid: %d/%d\n",
+			       member_list[i].nodeid, member_list[i].pid);
+		}
+	}
+
+	/* Is it us??
+	   NOTE: in reality we should also check the nodeid */
+	if (left_list_entries && left_list[0].pid == getpid()) {
+		printf("We have left the building\n");
+		quit = 1;
+	}
+}
+
+static cpg_callbacks_t callbacks = {
+	.cpg_deliver_fn =            DeliverCallback,
+	.cpg_confchg_fn =            ConfchgCallback,
+};
+
+static void sigintr_handler (int signum) __attribute__((__noreturn__));
+static void sigintr_handler (int signum) {
+	exit (0);
+}
+static struct cpg_name group_name;
+
+int main (int argc, char *argv[]) {
+	cpg_handle_t handle;
+	int result;
+	void *buffer;
+	unsigned int i;
+
+	strcpy(group_name.value, "GROUP");
+	group_name.length = 6;
+
+	result = cpg_initialize (&handle, &callbacks);
+	if (result != CS_OK) {
+		printf ("Could not initialize Cluster Process Group API instance error %d\n", result);
+		exit (1);
+	}
+	for (i = 0; i < 100; i++) {
+		cpg_zcb_alloc (handle, 1024*1024, &buffer);
+	}
+	return (0);
+}

+ 1 - 1
tools/corosync-pload.c

@@ -72,7 +72,7 @@ int main (void) {
 	result = pload_start (
 	result = pload_start (
 		handle,
 		handle,
 		0, /* code */
 		0, /* code */
-		150000, /* count */
+		1500000, /* count */
 		300); /* size */
 		300); /* size */
 	return (0);
 	return (0);
 }
 }