Преглед изворни кода

Change shared memory to use mmap() system calls.

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@2115 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake пре 17 година
родитељ
комит
cee464489f
11 измењених фајлова са 199 додато и 92 уклоњено
  1. 64 25
      exec/coroipcs.c
  2. 4 1
      include/corosync/coroipcc.h
  3. 12 8
      include/corosync/ipc_gen.h
  4. 7 1
      lib/cfg.c
  5. 7 1
      lib/confdb.c
  6. 70 51
      lib/coroipcc.c
  7. 7 1
      lib/cpg.c
  8. 7 1
      lib/evs.c
  9. 7 1
      lib/pload.c
  10. 7 1
      lib/quorum.c
  11. 7 1
      lib/votequorum.c

+ 64 - 25
exec/coroipcs.c

@@ -125,12 +125,17 @@ struct conn_info {
 	int refcount;
 	key_t shmkey;
 	key_t semkey;
-	int shmid;
 	int semid;
 	unsigned int pending_semops;
 	pthread_mutex_t mutex;
-	struct shared_memory *mem;
+	struct control_buffer *control_buffer;
+	char *request_buffer;
+	char *response_buffer;
 	char *dispatch_buffer;
+	size_t control_size;
+	size_t request_size;
+	size_t response_size;
+	size_t dispatch_size;
 	struct list_head outq_head;
 	void *private_data;
 	struct list_head list;
@@ -152,7 +157,10 @@ static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
 		      int locked);
 
 static int
-memory_map (const char *path, void **buf, size_t bytes)
+memory_map (
+	const char *path,
+	size_t bytes,
+	void **buf)
 {
 	int fd;
 	void *addr_orig;
@@ -188,7 +196,10 @@ memory_map (const char *path, void **buf, size_t bytes)
 }
 
 static int
-circular_memory_map (const char *path, void **buf, size_t bytes)
+circular_memory_map (
+	const char *path,
+	size_t bytes,
+	void **buf)
 {
 	int fd;
 	void *addr_orig;
@@ -301,8 +312,8 @@ static inline int zcb_alloc (
 
 	res = memory_map (
 		path_to_file,
-		addr,
-		size);
+		size,
+		addr);
 	if (res == -1) {
 		return (-1);
 	}
@@ -399,8 +410,9 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
 	/*
 	 * Destroy shared memory segment and semaphore
 	 */
-	shmdt (conn_info->mem);
-	res = shmctl (conn_info->shmid, IPC_RMID, NULL);
+	res = munmap (conn_info->control_buffer, conn_info->control_size);
+	res = munmap (conn_info->request_buffer, conn_info->request_size);
+	res = munmap (conn_info->response_buffer, conn_info->response_size);
 	semctl (conn_info->semid, 0, IPC_RMID);
 
 	/*
@@ -410,7 +422,7 @@ static inline int conn_info_destroy (struct conn_info *conn_info)
 		api->free (conn_info->private_data);
 	}
 	close (conn_info->fd);
-	res = circular_memory_unmap (conn_info->dispatch_buffer, DISPATCH_SIZE);
+	res = circular_memory_unmap (conn_info->dispatch_buffer, conn_info->dispatch_size);
 	zcb_all_free (conn_info);
 	api->free (conn_info);
 	api->serialize_unlock ();
@@ -450,7 +462,7 @@ static inline void zerocopy_operations_process (
 {
 	mar_req_header_t *header;
 
-	header = (mar_req_header_t *)conn_info->mem->req_buffer;
+	header = (mar_req_header_t *)conn_info->request_buffer;
 	if (header->id == ZC_ALLOC_HEADER) {
 		mar_req_coroipcc_zc_alloc_t *hdr = (mar_req_coroipcc_zc_alloc_t *)header;
 		mar_res_header_t res_header;
@@ -816,14 +828,25 @@ void coroipcs_ipc_exit (void)
 {
 	struct list_head *list;
 	struct conn_info *conn_info;
+	unsigned int res;
 
 	for (list = conn_info_list_head.next; list != &conn_info_list_head;
 		list = list->next) {
 
 		conn_info = list_entry (list, struct conn_info, list);
 
-		shmdt (conn_info->mem);
-		shmctl (conn_info->shmid, IPC_RMID, NULL);
+		/*
+		 * Unmap memory segments
+		 */
+		res = munmap (conn_info->control_buffer,
+			conn_info->control_size);
+		res = munmap (conn_info->request_buffer,
+			conn_info->request_size);
+		res = munmap (conn_info->response_buffer,
+			conn_info->response_size);
+		res = circular_memory_unmap (conn_info->dispatch_buffer,
+			conn_info->dispatch_size);
+
 		semctl (conn_info->semid, 0, IPC_RMID);
 	
 		pthread_kill (conn_info->thread, SIGUSR1);
@@ -846,7 +869,7 @@ int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
 	struct sembuf sop;
 	int res;
 
-	memcpy (conn_info->mem->res_buffer, msg, mlen);
+	memcpy (conn_info->response_buffer, msg, mlen);
 	sop.sem_num = 1;
 	sop.sem_op = 1;
 	sop.sem_flg = 0;
@@ -871,7 +894,8 @@ int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned in
 	int i;
 
 	for (i = 0; i < iov_len; i++) {
-		memcpy (&conn_info->mem->res_buffer[write_idx], iov[i].iov_base, iov[i].iov_len);
+		memcpy (&conn_info->response_buffer[write_idx],
+			iov[i].iov_base, iov[i].iov_len);
 		write_idx += iov[i].iov_len;
 	}
 
@@ -896,11 +920,11 @@ static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info)
 	unsigned int n_write;
 	unsigned int bytes_left;
 
-	n_read = conn_info->mem->read;
-	n_write = conn_info->mem->write;
+	n_read = conn_info->control_buffer->read;
+	n_write = conn_info->control_buffer->write;
 
 	if (n_read <= n_write) {
-		bytes_left = DISPATCH_SIZE - n_write + n_read;
+		bytes_left = conn_info->dispatch_size - n_write + n_read;
 	} else {
 		bytes_left = n_read - n_write;
 	}
@@ -911,10 +935,10 @@ static void memcpy_dwrap (struct conn_info *conn_info, void *msg, unsigned int l
 {
 	unsigned int write_idx;
 
-	write_idx = conn_info->mem->write;
+	write_idx = conn_info->control_buffer->write;
 
 	memcpy (&conn_info->dispatch_buffer[write_idx], msg, len);
-	conn_info->mem->write = (write_idx + len) % (DISPATCH_SIZE);
+	conn_info->control_buffer->write = (write_idx + len) % conn_info->dispatch_size;
 }
 
 static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
@@ -1230,21 +1254,36 @@ int coroipcs_handler_dispatch (
 			return (0);
 		}
 
-		conn_info->shmkey = req_setup->shmkey;
 		conn_info->semkey = req_setup->semkey;
+		res = memory_map (
+			req_setup->control_file,
+			req_setup->control_size,
+			(void *)&conn_info->control_buffer);
+		conn_info->control_size = req_setup->control_size;
+
+		res = memory_map (
+			req_setup->request_file,
+			req_setup->request_size,
+			(void *)&conn_info->request_buffer);
+		conn_info->request_size = req_setup->request_size;
+
+		res = memory_map (
+			req_setup->response_file,
+			req_setup->response_size,
+			(void *)&conn_info->response_buffer);
+		conn_info->response_size = req_setup->response_size;
+
 		res = circular_memory_map (
 			req_setup->dispatch_file,
-			(void *)&conn_info->dispatch_buffer,
-			DISPATCH_SIZE);
+			req_setup->dispatch_size,
+			(void *)&conn_info->dispatch_buffer);
+		conn_info->dispatch_size = req_setup->dispatch_size;
 
 		conn_info->service = req_setup->service;
 		conn_info->refcount = 0;
 		conn_info->notify_flow_control_enabled = 0;
 		conn_info->setup_bytes_read = 0;
 
-		conn_info->shmid = shmget (conn_info->shmkey,
-			sizeof (struct shared_memory), 0600);
-		conn_info->mem = shmat (conn_info->shmid, NULL, 0);
 		conn_info->semid = semget (conn_info->semkey, 3, 0600);
 		conn_info->pending_semops = 0;
 

+ 4 - 1
include/corosync/coroipcc.h

@@ -47,7 +47,10 @@
 extern cs_error_t
 coroipcc_service_connect (
 	const char *socket_name,
-	enum service_types service,
+	unsigned int service,
+	size_t request_size,
+	size_t respnse__size,
+	size_t dispatch_size,
 	void **ipc_context);
 
 extern cs_error_t

+ 12 - 8
include/corosync/ipc_gen.h

@@ -64,13 +64,11 @@ enum req_init_types {
 #define MESSAGE_REQ_CHANGE_EUID		1
 #define MESSAGE_REQ_OUTQ_FLUSH		2
 
-#define REQ_SIZE			1000000
-#define RES_SIZE			1000000
-#define DISPATCH_SIZE			8192*128
+#define IPC_REQUEST_SIZE		8192*128
+#define IPC_RESPONSE_SIZE		8192*128
+#define IPC_DISPATCH_SIZE		8192*128
 
-struct shared_memory {
-	unsigned char req_buffer[REQ_SIZE];
-	unsigned char res_buffer[RES_SIZE];
+struct control_buffer {
 	unsigned int read;
 	unsigned int write;
 };
@@ -86,9 +84,15 @@ typedef struct {
 
 typedef struct {
 	int service __attribute__((aligned(8)));
-	unsigned long long shmkey __attribute__((aligned(8)));
 	unsigned long long semkey __attribute__((aligned(8)));
-	char dispatch_file[64]__attribute__((aligned(8)));
+	char control_file[64] __attribute__((aligned(8)));
+	char request_file[64] __attribute__((aligned(8)));
+	char response_file[64] __attribute__((aligned(8)));
+	char dispatch_file[64] __attribute__((aligned(8)));
+	size_t control_size __attribute__((aligned(8)));
+	size_t request_size __attribute__((aligned(8)));
+	size_t response_size __attribute__((aligned(8)));
+	size_t dispatch_size __attribute__((aligned(8)));
 } mar_req_setup_t __attribute__((aligned(8)));
 
 typedef struct {

+ 7 - 1
lib/cfg.c

@@ -105,7 +105,13 @@ corosync_cfg_initialize (
 		goto error_destroy;
 	}
 
-	error = coroipcc_service_connect (IPC_SOCKET_NAME, CFG_SERVICE, &cfg_instance->ipc_ctx);
+	error = coroipcc_service_connect (
+		IPC_SOCKET_NAME,
+		CFG_SERVICE,
+		IPC_REQUEST_SIZE,
+		IPC_RESPONSE_SIZE,
+		IPC_DISPATCH_SIZE,
+		&cfg_instance->ipc_ctx);
 	if (error != CS_OK) {
 		goto error_put_destroy;
 	}

+ 7 - 1
lib/confdb.c

@@ -157,7 +157,13 @@ cs_error_t confdb_initialize (
 		confdb_inst->standalone = 1;
 	}
 	else {
-		error = coroipcc_service_connect (IPC_SOCKET_NAME, CONFDB_SERVICE, &confdb_inst->ipc_ctx);
+		error = coroipcc_service_connect (
+			IPC_SOCKET_NAME,
+			CONFDB_SERVICE,
+			IPC_REQUEST_SIZE,
+			IPC_RESPONSE_SIZE,
+			IPC_DISPATCH_SIZE,
+			&confdb_inst->ipc_ctx);
 	}
 	if (error != CS_OK)
 		goto error_put_destroy;

+ 70 - 51
lib/coroipcc.c

@@ -80,8 +80,14 @@ struct ipc_segment {
 	int shmid;
 	int semid;
 	int flow_control_state;
-	struct shared_memory *shared_memory;
-	void *dispatch_buffer;
+	struct control_buffer *control_buffer;
+	char *request_buffer;
+	char *response_buffer;
+	char *dispatch_buffer;
+	size_t control_size;
+	size_t request_size;
+	size_t response_size;
+	size_t dispatch_size;
 	uid_t euid;
 };
 
@@ -375,22 +381,28 @@ memory_map (char *path, const char *file, void **buf, size_t bytes)
 	return (0);
 }
  
-cs_error_t
+extern cs_error_t
 coroipcc_service_connect (
 	const char *socket_name,
-	enum service_types service,
-	void **shmseg)
+	unsigned int service,
+	size_t request_size,
+	size_t response_size,
+	size_t dispatch_size,
+	void **ipc_context)
+
 {
 	int request_fd;
 	struct sockaddr_un address;
 	cs_error_t error;
 	struct ipc_segment *ipc_segment;
-	key_t shmkey = 0;
 	key_t semkey = 0;
 	int res;
 	mar_req_setup_t req_setup;
 	mar_res_setup_t res_setup;
 	union semun semun;
+	char control_map_path[128];
+	char request_map_path[128];
+	char response_map_path[128];
 	char dispatch_map_path[128];
 
 	res_setup.error = CS_ERR_LIBRARY;
@@ -425,21 +437,6 @@ coroipcc_service_connect (
 	}
 	bzero (ipc_segment, sizeof (struct ipc_segment));
 
-	/*
-	 * Allocate a shared memory segment
-	 */
-	while (1) {
-		shmkey = random();
-		if ((ipc_segment->shmid
-		     = shmget (shmkey, sizeof (struct shared_memory),
-			       IPC_CREAT|IPC_EXCL|0600)) != -1) {
-			break;
-		}
-		if (errno != EEXIST) {
-			goto error_exit;
-		}
-	}
-
 	/*
 	 * Allocate a semaphore segment
 	 */
@@ -455,14 +452,6 @@ coroipcc_service_connect (
 		}
 	}
 
-	/*
-	 * Attach to shared memory segment
-	 */
-	ipc_segment->shared_memory = shmat (ipc_segment->shmid, NULL, 0);
-	if (ipc_segment->shared_memory == (void *)-1) {
-		goto error_exit;
-	}
-	
 	semun.val = 0;
 	res = semctl (ipc_segment->semid, 0, SETVAL, semun);
 	if (res != 0) {
@@ -474,14 +463,43 @@ coroipcc_service_connect (
 		goto error_exit;
 	}
 
-	res = circular_memory_map (dispatch_map_path,
-		"dispatch_bufer-XXXXXX",
-		&ipc_segment->dispatch_buffer, DISPATCH_SIZE);
-	strcpy (req_setup.dispatch_file, dispatch_map_path);
-	req_setup.shmkey = shmkey;
-	req_setup.semkey = semkey;
+	res = memory_map (
+		control_map_path,
+		"control_buffer-XXXXXX",
+		(void *)&ipc_segment->control_buffer,
+		8192);
+
+	res = memory_map (
+		request_map_path,
+		"request_buffer-XXXXXX",
+		(void *)&ipc_segment->request_buffer,
+		request_size);
+
+	res = memory_map (
+		response_map_path,
+		"response_buffer-XXXXXX",
+		(void *)&ipc_segment->response_buffer,
+		response_size);
+
+	res = circular_memory_map (
+		dispatch_map_path,
+		"dispatch_buffer-XXXXXX",
+		(void *)&ipc_segment->dispatch_buffer,
+		dispatch_size);
 
+	/*
+	 * Initialize IPC setup message
+	 */
 	req_setup.service = service;
+	strcpy (req_setup.control_file, control_map_path);
+	strcpy (req_setup.request_file, request_map_path);
+	strcpy (req_setup.response_file, response_map_path);
+	strcpy (req_setup.dispatch_file, dispatch_map_path);
+	req_setup.control_size = 8192;
+	req_setup.request_size = request_size;
+	req_setup.response_size = response_size;
+	req_setup.dispatch_size = dispatch_size;
+	req_setup.semkey = semkey;
 
 	error = coroipcc_send (request_fd, &req_setup, sizeof (mar_req_setup_t));
 	if (error != 0) {
@@ -494,22 +512,21 @@ coroipcc_service_connect (
 
 	ipc_segment->fd = request_fd;
 	ipc_segment->flow_control_state = 0;
-	*shmseg = ipc_segment;
 
-	/*
-	 * Something go wrong with server
-	 * Cleanup all
-	 */
 	if (res_setup.error == CS_ERR_TRY_AGAIN) {
 		goto error_exit;
 	}
 
+	ipc_segment->control_size = 8192;
+	ipc_segment->request_size = request_size;
+	ipc_segment->response_size = response_size;
+	ipc_segment->dispatch_size = dispatch_size;
+
+	*ipc_context = ipc_segment;
 	return (res_setup.error);
 
 error_exit:
 	close (request_fd);
-	if (ipc_segment->shmid > 0)
-		shmctl (ipc_segment->shmid, IPC_RMID, NULL);
 	if (ipc_segment->semid > 0)
 		semctl (ipc_segment->semid, 0, IPC_RMID);
 	return (res_setup.error);
@@ -523,11 +540,13 @@ coroipcc_service_disconnect (
 
 	shutdown (ipc_segment->fd, SHUT_RDWR);
 	close (ipc_segment->fd);
-	shmdt (ipc_segment->shared_memory);
 	/*
 	 * << 1 (or multiplied by 2) because this is a wrapped memory buffer
 	 */
-	memory_unmap (ipc_segment->dispatch_buffer, (DISPATCH_SIZE) << 1);
+	memory_unmap (ipc_segment->control_buffer, ipc_segment->control_size);
+	memory_unmap (ipc_segment->request_buffer, ipc_segment->request_size);
+	memory_unmap (ipc_segment->response_buffer, ipc_segment->response_size);
+	memory_unmap (ipc_segment->dispatch_buffer, (ipc_segment->dispatch_size) << 1);
 	free (ipc_segment);
 	return (CS_OK);
 }
@@ -614,7 +633,7 @@ retry_recv:
 
 	data_addr = ipc_segment->dispatch_buffer;
 
-	data_addr = &data_addr[ipc_segment->shared_memory->read];
+	data_addr = &data_addr[ipc_segment->control_buffer->read];
 
 	*data = (void *)data_addr;
 	return (1);
@@ -648,10 +667,10 @@ retry_semop:
 
 	addr = ipc_segment->dispatch_buffer;
 
-	read_idx = ipc_segment->shared_memory->read;
+	read_idx = ipc_segment->control_buffer->read;
 	header = (mar_res_header_t *) &addr[read_idx];
-	ipc_segment->shared_memory->read =
-		(read_idx + header->size) % (DISPATCH_SIZE);
+	ipc_segment->control_buffer->read =
+		(read_idx + header->size) % ipc_segment->dispatch_size;
 	return (0);
 }
 
@@ -668,7 +687,7 @@ coroipcc_msg_send (
 	int req_buffer_idx = 0;
 
 	for (i = 0; i < iov_len; i++) {
-		memcpy (&ipc_segment->shared_memory->req_buffer[req_buffer_idx],
+		memcpy (&ipc_segment->request_buffer[req_buffer_idx],
 			iov[i].iov_base,
 			iov[i].iov_len);
 		req_buffer_idx += iov[i].iov_len;
@@ -726,7 +745,7 @@ retry_semop:
 		return (CS_ERR_LIBRARY);
 	}
 
-	memcpy (res_msg, ipc_segment->shared_memory->res_buffer, res_len);
+	memcpy (res_msg, ipc_segment->response_buffer, res_len);
 	return (CS_OK);
 }
 
@@ -760,7 +779,7 @@ retry_semop:
 		return (CS_ERR_LIBRARY);
 	}
 
-	*res_msg = (char *)ipc_segment->shared_memory->res_buffer;
+	*res_msg = (char *)ipc_segment->response_buffer;
 	return (CS_OK);
 }
 

+ 7 - 1
lib/cpg.c

@@ -102,7 +102,13 @@ cs_error_t cpg_initialize (
 		goto error_destroy;
 	}
 
-	error = coroipcc_service_connect (IPC_SOCKET_NAME, CPG_SERVICE, &cpg_inst->ipc_ctx);
+	error = coroipcc_service_connect (
+		IPC_SOCKET_NAME,
+		CPG_SERVICE,
+		IPC_REQUEST_SIZE,
+		IPC_RESPONSE_SIZE,
+		IPC_DISPATCH_SIZE,
+		&cpg_inst->ipc_ctx);
 	if (error != CS_OK) {
 		goto error_put_destroy;
 	}

+ 7 - 1
lib/evs.c

@@ -110,7 +110,13 @@ evs_error_t evs_initialize (
 		goto error_destroy;
 	}
 
-	error = coroipcc_service_connect (IPC_SOCKET_NAME, EVS_SERVICE, &evs_inst->ipc_ctx);
+	error = coroipcc_service_connect (
+		IPC_SOCKET_NAME,
+		EVS_SERVICE,
+		IPC_REQUEST_SIZE,
+		IPC_RESPONSE_SIZE,
+		IPC_DISPATCH_SIZE,
+		&evs_inst->ipc_ctx);
 	if (error != EVS_OK) {
 		goto error_put_destroy;
 	}

+ 7 - 1
lib/pload.c

@@ -101,7 +101,13 @@ unsigned int pload_initialize (
 		goto error_destroy;
 	}
 
-	error = coroipcc_service_connect (IPC_SOCKET_NAME, PLOAD_SERVICE, &pload_inst->ipc_ctx);
+	error = coroipcc_service_connect (
+		IPC_SOCKET_NAME,
+		PLOAD_SERVICE,
+		IPC_REQUEST_SIZE,
+		IPC_RESPONSE_SIZE,
+		IPC_DISPATCH_SIZE,
+		&pload_inst->ipc_ctx);
 	if (error != CS_OK) {
 		goto error_put_destroy;
 	}

+ 7 - 1
lib/quorum.c

@@ -92,7 +92,13 @@ cs_error_t quorum_initialize (
 		goto error_destroy;
 	}
 
-	error = coroipcc_service_connect (IPC_SOCKET_NAME, QUORUM_SERVICE, &quorum_inst->ipc_ctx);
+	error = coroipcc_service_connect (
+		IPC_SOCKET_NAME,
+		QUORUM_SERVICE,
+		IPC_REQUEST_SIZE,
+		IPC_RESPONSE_SIZE,
+		IPC_DISPATCH_SIZE,
+		&quorum_inst->ipc_ctx);
 	if (error != CS_OK) {
 		goto error_put_destroy;
 	}

+ 7 - 1
lib/votequorum.c

@@ -92,7 +92,13 @@ cs_error_t votequorum_initialize (
 		goto error_destroy;
 	}
 
-	error = coroipcc_service_connect (IPC_SOCKET_NAME, VOTEQUORUM_SERVICE, &votequorum_inst->ipc_ctx);
+	error = coroipcc_service_connect (
+		IPC_SOCKET_NAME,
+		VOTEQUORUM_SERVICE,
+		IPC_REQUEST_SIZE,
+		IPC_RESPONSE_SIZE,
+		IPC_DISPATCH_SIZE,
+		 &votequorum_inst->ipc_ctx);
 	if (error != CS_OK) {
 		goto error_put_destroy;
 	}