Selaa lähdekoodia

Rework a bit of how lib_exit_fn works so that reference counts may be
used in the exit functions for services such as cpg.


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1786 fd59a12c-fef9-0310-b244-a6a79926bd2f

Steven Dake 17 vuotta sitten
vanhempi
commit
0d274de4ec
1 muutettua tiedostoa jossa 121 lisäystä ja 73 poistoa
  1. 121 73
      exec/ipc.c

+ 121 - 73
exec/ipc.c

@@ -46,6 +46,7 @@
 #include <sys/un.h>
 #include <sys/un.h>
 #include <sys/time.h>
 #include <sys/time.h>
 #include <sys/resource.h>
 #include <sys/resource.h>
+#include <sys/wait.h>
 #include <netinet/in.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <arpa/inet.h>
 #include <unistd.h>
 #include <unistd.h>
@@ -62,7 +63,6 @@
 
 
 #include <sys/shm.h>
 #include <sys/shm.h>
 #include <sys/sem.h>
 #include <sys/sem.h>
-
 #include <corosync/swab.h>
 #include <corosync/swab.h>
 #include <corosync/corotypes.h>
 #include <corosync/corotypes.h>
 #include <corosync/list.h>
 #include <corosync/list.h>
@@ -87,11 +87,11 @@
 #include <corosync/engine/coroapi.h>
 #include <corosync/engine/coroapi.h>
 #include "service.h"
 #include "service.h"
 
 
-#include "util.h"
-
 LOGSYS_DECLARE_SUBSYS ("IPC", LOG_INFO);
 LOGSYS_DECLARE_SUBSYS ("IPC", LOG_INFO);
 
 
-#ifdef CS_SOLARIS
+#include "util.h"
+
+#ifdef COROSYNC_SOLARIS
 #define MSG_NOSIGNAL 0
 #define MSG_NOSIGNAL 0
 #endif
 #endif
 
 
@@ -119,13 +119,21 @@ union semun {
 };
 };
 #endif
 #endif
 
 
+enum conn_state {
+	CONN_STATE_THREAD_INACTIVE = 0,
+	CONN_STATE_THREAD_ACTIVE = 1,
+	CONN_STATE_THREAD_REQUEST_EXIT = 2,
+	CONN_STATE_THREAD_DESTROYED = 3,
+	CONN_STATE_LIB_EXIT_CALLED = 4,
+	CONN_STATE_DISCONNECT_INACTIVE = 5
+};
+
 struct conn_info {
 struct conn_info {
 	int fd;
 	int fd;
 	pthread_t thread;
 	pthread_t thread;
 	pthread_attr_t thread_attr;
 	pthread_attr_t thread_attr;
 	unsigned int service;
 	unsigned int service;
-	int destroyed;
-	int disconnect_requested;
+	enum conn_state state;
 	int notify_flow_control_enabled;
 	int notify_flow_control_enabled;
 	int refcount;
 	int refcount;
 	key_t shmkey;
 	key_t shmkey;
@@ -151,53 +159,85 @@ static int priv_change (struct conn_info *conn_info);
 
 
 static void ipc_disconnect (struct conn_info *conn_info);
 static void ipc_disconnect (struct conn_info *conn_info);
 
 
-static int ipc_conn_exiting (void *conn)
+static int ipc_thread_active (void *conn)
 {
 {
 	struct conn_info *conn_info = (struct conn_info *)conn;
 	struct conn_info *conn_info = (struct conn_info *)conn;
+	int retval = 0;
 
 
 	pthread_mutex_lock (&conn_info->mutex);
 	pthread_mutex_lock (&conn_info->mutex);
-	if (conn_info->destroyed || conn_info->disconnect_requested) {
-		pthread_mutex_unlock (&conn_info->mutex);
-		return (1);
+	if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
+		retval = 1;
 	}
 	}
 	pthread_mutex_unlock (&conn_info->mutex);
 	pthread_mutex_unlock (&conn_info->mutex);
-	return (0);
+	return (retval);
 }
 }
 
 
+static int ipc_thread_exiting (void *conn)
+{
+	struct conn_info *conn_info = (struct conn_info *)conn;
+	int retval = 1;
+
+	pthread_mutex_lock (&conn_info->mutex);
+	if (conn_info->state == CONN_STATE_THREAD_INACTIVE) {
+		retval = 0;
+	} else
+	if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
+		retval = 0;
+	}
+	pthread_mutex_unlock (&conn_info->mutex);
+	return (retval);
+}
 
 
+/*
+ * returns 0 if should be called again, -1 if finished
+ */
 static inline int conn_info_destroy (struct conn_info *conn_info)
 static inline int conn_info_destroy (struct conn_info *conn_info)
 {
 {
-unsigned int res;
+	unsigned int res;
+	void *retval;
 
 
-list_del (&conn_info->list);
+	list_del (&conn_info->list);
 	list_init (&conn_info->list);
 	list_init (&conn_info->list);
 
 
-	if (conn_info->service == SOCKET_SERVICE_INIT) {
+	if (conn_info->state == CONN_STATE_THREAD_REQUEST_EXIT) {
+		res = pthread_join (conn_info->thread, &retval);
+		conn_info->state = CONN_STATE_THREAD_DESTROYED;
+		return (0);
+	}
+
+	if (conn_info->state == CONN_STATE_THREAD_INACTIVE ||
+		conn_info->state == CONN_STATE_DISCONNECT_INACTIVE) {
 		list_del (&conn_info->list);
 		list_del (&conn_info->list);
 		close (conn_info->fd);
 		close (conn_info->fd);
 		free (conn_info);
 		free (conn_info);
+		return (-1);
+	}
+
+	if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
+		pthread_kill (conn_info->thread, SIGUSR1);
 		return (0);
 		return (0);
 	}
 	}
-	if (conn_info->destroyed == 0) {
-		conn_info->destroyed = 1;
-		cs_conn_refcount_dec (conn_info);
+
+	/*
+	 * Retry library exit function if busy
+	 */
+	if (conn_info->state == CONN_STATE_THREAD_DESTROYED) {
+		res = ais_service[conn_info->service]->lib_exit_fn (conn_info);
+		if (res == -1) {
+			return (0);
+		} else {
+			conn_info->state = CONN_STATE_LIB_EXIT_CALLED;
+		}
 	}
 	}
 
 
 	pthread_mutex_lock (&conn_info->mutex);
 	pthread_mutex_lock (&conn_info->mutex);
 	if (conn_info->refcount > 0) {
 	if (conn_info->refcount > 0) {
 		pthread_mutex_unlock (&conn_info->mutex);
 		pthread_mutex_unlock (&conn_info->mutex);
-		return (-1);
+		return (0);
 	}
 	}
+	list_del (&conn_info->list);
 	pthread_mutex_unlock (&conn_info->mutex);
 	pthread_mutex_unlock (&conn_info->mutex);
 
 
-	/*
-	 * Retry library exit function if busy
-	 */
-	res = ais_service[conn_info->service]->lib_exit_fn (conn_info);
-	if (res == -1) {
-		return (-1);
-	}
-
 	/*
 	/*
 	 * Destroy shared memory segment and semaphore
 	 * Destroy shared memory segment and semaphore
 	 */
 	 */
@@ -212,9 +252,8 @@ list_del (&conn_info->list);
 		free (conn_info->private_data);
 		free (conn_info->private_data);
 	}
 	}
 	close (conn_info->fd);
 	close (conn_info->fd);
-	list_del (&conn_info->list);
 	free (conn_info);
 	free (conn_info);
-	return (0);
+	return (-1);
 }
 }
 
 
 struct res_overlay {
 struct res_overlay {
@@ -238,7 +277,8 @@ static void *pthread_ipc_consumer (void *conn)
 		sop.sem_op = -1;
 		sop.sem_op = -1;
 		sop.sem_flg = 0;
 		sop.sem_flg = 0;
 retry_semop:
 retry_semop:
-		if (ipc_conn_exiting (conn_info)) {
+		if (ipc_thread_active (conn_info) == 0) {
+			cs_conn_refcount_dec (conn_info);
 			pthread_exit (0);
 			pthread_exit (0);
 		}
 		}
 		res = semop (conn_info->semid, &sop, 1);
 		res = semop (conn_info->semid, &sop, 1);
@@ -246,9 +286,12 @@ retry_semop:
 			goto retry_semop;
 			goto retry_semop;
 		} else
 		} else
 		if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
 		if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
+			cs_conn_refcount_dec (conn_info);
 			pthread_exit (0);
 			pthread_exit (0);
 		}
 		}
 
 
+		cs_conn_refcount_inc (conn_info);
+
 		header = (mar_req_header_t *)conn_info->mem->req_buffer;
 		header = (mar_req_header_t *)conn_info->mem->req_buffer;
 
 
 		send_ok_joined_iovec.iov_base = (char *)header;
 		send_ok_joined_iovec.iov_base = (char *)header;
@@ -277,9 +320,10 @@ retry_semop:
 			cs_response_send (conn_info, &res_overlay, 
 			cs_response_send (conn_info, &res_overlay, 
 				res_overlay.header.size);
 				res_overlay.header.size);
 		}
 		}
+
+		cs_conn_refcount_dec (conn);
 	}
 	}
-	cs_conn_refcount_dec (conn);
-	return (NULL);
+	pthread_exit (0);
 }
 }
 
 
 static int
 static int
@@ -327,7 +371,7 @@ req_setup_recv (
 #endif
 #endif
 
 
 #ifdef PORTABILITY_WORK_TODO
 #ifdef PORTABILITY_WORK_TODO
-#ifdef CS_SOLARIS
+#ifdef COROSYNC_SOLARIS
 	msg_recv.msg_flags = 0;
 	msg_recv.msg_flags = 0;
 	uid_t euid;
 	uid_t euid;
 	gid_t egid;
 	gid_t egid;
@@ -343,7 +387,7 @@ req_setup_recv (
 	}
 	}
 	msg_recv.msg_accrights = 0;
 	msg_recv.msg_accrights = 0;
 	msg_recv.msg_accrightslen = 0;
 	msg_recv.msg_accrightslen = 0;
-#else /* CS_SOLARIS */
+#else /* COROSYNC_SOLARIS */
 
 
 #ifdef HAVE_GETPEERUCRED
 #ifdef HAVE_GETPEERUCRED
 	ucred_t *uc;
 	ucred_t *uc;
@@ -367,7 +411,7 @@ req_setup_recv (
  		"authentication with sockets, continuing "
  		"authentication with sockets, continuing "
  		"with a fake authentication\n");
  		"with a fake authentication\n");
 #endif /* HAVE_GETPEERUCRED */
 #endif /* HAVE_GETPEERUCRED */
-#endif /* CS_SOLARIS */
+#endif /* COROSYNC_SOLARIS */
 
 
 #endif
 #endif
 
 
@@ -386,7 +430,7 @@ retry_recv:
 		return (0);
 		return (0);
 	} else
 	} else
 	if (res == 0) {
 	if (res == 0) {
-#if defined(CS_SOLARIS) || defined(CS_BSD) || defined(CS_DARWIN)
+#if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
 		/* On many OS poll never return POLLHUP or POLLERR.
 		/* On many OS poll never return POLLHUP or POLLERR.
 		 * EOF is detected when recvmsg return 0.
 		 * EOF is detected when recvmsg return 0.
 		 */
 		 */
@@ -422,18 +466,6 @@ retry_recv:
 	return (0);
 	return (0);
 }
 }
 
 
-static int poll_handler_connection_destroy(
-	struct conn_info *conn_info)
-{
-	int res;
-	res = conn_info_destroy (conn_info);
-	if (res == -1) {
-		return (0);
-	} else {
-		return (-1);
-	}
-}
-
 static int poll_handler_connection (
 static int poll_handler_connection (
 	hdb_handle_t handle,
 	hdb_handle_t handle,
 	int fd,
 	int fd,
@@ -446,11 +478,16 @@ static int poll_handler_connection (
 	char buf;
 	char buf;
 
 
 
 
+	if (ipc_thread_exiting (conn_info)) {
+		return conn_info_destroy (conn_info);
+	}
+
 	/*
 	/*
-	 * If an error occurs, try to exit if possible
+	 * If an error occurs, request exit
 	 */
 	 */
-	if (ipc_conn_exiting (conn_info) || (revent & (POLLERR|POLLHUP))) {
-		return poll_handler_connection_destroy (conn_info);
+	if (revent & (POLLERR|POLLHUP)) {
+		ipc_disconnect (conn_info);
+		return (0);
 	}
 	}
 
 
 	/*
 	/*
@@ -470,21 +507,20 @@ static int poll_handler_connection (
 			return (0);
 			return (0);
 		}
 		}
 		req_setup_send (conn_info, CS_OK);
 		req_setup_send (conn_info, CS_OK);
-		req_setup = (mar_req_setup_t *)conn_info->setup_msg;
 
 
+		pthread_mutex_init (&conn_info->mutex, NULL);
+		req_setup = (mar_req_setup_t *)conn_info->setup_msg;
 		/*
 		/*
 		 * Is the service registered ?
 		 * Is the service registered ?
 		 */
 		 */
 		if (!ais_service[req_setup->service]) {
 		if (!ais_service[req_setup->service]) {
-			return poll_handler_connection_destroy (conn_info);
+			ipc_disconnect (conn_info);
+			return (0);
 		}
 		}
 
 
-		pthread_mutex_init (&conn_info->mutex, NULL);
 		conn_info->shmkey = req_setup->shmkey;
 		conn_info->shmkey = req_setup->shmkey;
 		conn_info->semkey = req_setup->semkey;
 		conn_info->semkey = req_setup->semkey;
 		conn_info->service = req_setup->service;
 		conn_info->service = req_setup->service;
-		conn_info->destroyed = 0;
-		conn_info->disconnect_requested = 0;
 		conn_info->refcount = 0;
 		conn_info->refcount = 0;
 		conn_info->notify_flow_control_enabled = 0;
 		conn_info->notify_flow_control_enabled = 0;
 		conn_info->setup_bytes_read = 0;
 		conn_info->setup_bytes_read = 0;
@@ -494,7 +530,12 @@ static int poll_handler_connection (
 		conn_info->mem = shmat (conn_info->shmid, NULL, 0);
 		conn_info->mem = shmat (conn_info->shmid, NULL, 0);
 		conn_info->semid = semget (conn_info->semkey, 3, 0600);
 		conn_info->semid = semget (conn_info->semkey, 3, 0600);
 		conn_info->pending_semops = 0;
 		conn_info->pending_semops = 0;
-		conn_info->refcount = 1;
+
+		/*
+		 * ipc thread is the only reference at startup
+		 */
+		conn_info->refcount = 1; 
+		conn_info->state = CONN_STATE_THREAD_ACTIVE;
 
 
 		conn_info->private_data = malloc (ais_service[conn_info->service]->private_data_size);
 		conn_info->private_data = malloc (ais_service[conn_info->service]->private_data_size);
 		memset (conn_info->private_data, 0,
 		memset (conn_info->private_data, 0,
@@ -512,7 +553,7 @@ static int poll_handler_connection (
 		pthread_attr_setstacksize (&conn_info->thread_attr, 200000);
 		pthread_attr_setstacksize (&conn_info->thread_attr, 200000);
 		#endif
 		#endif
 
 
-		pthread_attr_setdetachstate (&conn_info->thread_attr, PTHREAD_CREATE_DETACHED);
+		pthread_attr_setdetachstate (&conn_info->thread_attr, PTHREAD_CREATE_JOINABLE);
 		res = pthread_create (&conn_info->thread,
 		res = pthread_create (&conn_info->thread,
 			&conn_info->thread_attr,
 			&conn_info->thread_attr,
 			pthread_ipc_consumer,
 			pthread_ipc_consumer,
@@ -536,28 +577,29 @@ static int poll_handler_connection (
 				break;
 				break;
 			case MESSAGE_REQ_CHANGE_EUID:
 			case MESSAGE_REQ_CHANGE_EUID:
 				if (priv_change (conn_info) == -1) {
 				if (priv_change (conn_info) == -1) {
-					return poll_handler_connection_destroy (conn_info);
+					ipc_disconnect (conn_info);
 				}
 				}
 				break;
 				break;
 			default:
 			default:
 				res = 0;
 				res = 0;
 				break;
 				break;
 			}
 			}
+			cs_conn_refcount_dec (conn_info);
 		}
 		}
-#if defined(CS_SOLARIS) || defined(CS_BSD) || defined(CS_DARWIN)
+#if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
 		/* On many OS poll never return POLLHUP or POLLERR.
 		/* On many OS poll never return POLLHUP or POLLERR.
 		 * EOF is detected when recvmsg return 0.
 		 * EOF is detected when recvmsg return 0.
 		 */
 		 */
 		if (res == 0) {
 		if (res == 0) {
-			return poll_handler_connection_destroy (conn_info);
+			ipc_disconnect (conn_info);
+			return (0);
 		}
 		}
-		cs_conn_refcount_dec (conn_info);
 #endif
 #endif
 	}
 	}
 
 
 	cs_conn_refcount_inc (conn_info);
 	cs_conn_refcount_inc (conn_info);
 	pthread_mutex_lock (&conn_info->mutex);
 	pthread_mutex_lock (&conn_info->mutex);
-	if ((conn_info->disconnect_requested == 0) && (revent & POLLOUT)) {
+	if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent & POLLOUT)) {
 		buf = !list_empty (&conn_info->outq_head);
 		buf = !list_empty (&conn_info->outq_head);
 		for (; conn_info->pending_semops;) {
 		for (; conn_info->pending_semops;) {
 			res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
 			res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
@@ -584,17 +626,24 @@ static int poll_handler_connection (
 	}
 	}
 	pthread_mutex_unlock (&conn_info->mutex);
 	pthread_mutex_unlock (&conn_info->mutex);
 	cs_conn_refcount_dec (conn_info);
 	cs_conn_refcount_dec (conn_info);
+
+	return (0);
 }
 }
 
 
 static void ipc_disconnect (struct conn_info *conn_info)
 static void ipc_disconnect (struct conn_info *conn_info)
 {
 {
+	if (conn_info->state == CONN_STATE_THREAD_INACTIVE) {
+		conn_info->state = CONN_STATE_DISCONNECT_INACTIVE;
+		return;
+	}
+	if (conn_info->state != CONN_STATE_THREAD_ACTIVE) {
+		return;
+	}
 	pthread_mutex_lock (&conn_info->mutex);
 	pthread_mutex_lock (&conn_info->mutex);
-	conn_info->disconnect_requested = 1;
+	conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
 	pthread_mutex_unlock (&conn_info->mutex);
 	pthread_mutex_unlock (&conn_info->mutex);
 
 
-	poll_dispatch_modify (corosync_poll_handle,
-		conn_info->fd, POLLOUT|POLLNVAL,
-		poll_handler_connection);
+	pthread_kill (conn_info->thread, SIGUSR1);
 }
 }
 
 
 static int conn_info_create (int fd)
 static int conn_info_create (int fd)
@@ -609,6 +658,7 @@ static int conn_info_create (int fd)
 
 
 	conn_info->fd = fd;
 	conn_info->fd = fd;
 	conn_info->service = SOCKET_SERVICE_INIT;
 	conn_info->service = SOCKET_SERVICE_INIT;
+	conn_info->state = CONN_STATE_THREAD_INACTIVE;
 	list_init (&conn_info->outq_head);
 	list_init (&conn_info->outq_head);
 	list_init (&conn_info->list);
 	list_init (&conn_info->list);
 	list_add (&conn_info->list, &conn_info_list_head);
 	list_add (&conn_info->list, &conn_info_list_head);
@@ -618,7 +668,7 @@ static int conn_info_create (int fd)
 	return (0);
 	return (0);
 }
 }
 
 
-#if defined(COROSYNC_LINUX) || defined(CS_SOLARIS)
+#if defined(COROSYNC_LINUX) || defined(COROSYNC_SOLARIS)
 /* SUN_LEN is broken for abstract namespace
 /* SUN_LEN is broken for abstract namespace
  */
  */
 #define AIS_SUN_LEN(a) sizeof(*(a))
 #define AIS_SUN_LEN(a) sizeof(*(a))
@@ -712,8 +762,7 @@ void message_source_set (
 	source->conn = conn;
 	source->conn = conn;
 }
 }
 
 
-extern void cs_ipc_init (
-	unsigned int gid_valid)
+void cs_ipc_init (unsigned int gid_valid)
 {
 {
 	int libais_server_fd;
 	int libais_server_fd;
 	struct sockaddr_un un_addr;
 	struct sockaddr_un un_addr;
@@ -740,7 +789,7 @@ extern void cs_ipc_init (
 #endif
 #endif
 	memset (&un_addr, 0, sizeof (struct sockaddr_un));
 	memset (&un_addr, 0, sizeof (struct sockaddr_un));
 	un_addr.sun_family = AF_UNIX;
 	un_addr.sun_family = AF_UNIX;
-#if defined(CS_BSD) || defined(CS_DARWIN)
+#if defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
 	un_addr.sun_len = sizeof(struct sockaddr_un);
 	un_addr.sun_len = sizeof(struct sockaddr_un);
 #endif
 #endif
 #if defined(COROSYNC_LINUX)
 #if defined(COROSYNC_LINUX)
@@ -778,7 +827,6 @@ void cs_ipc_exit (void)
 		shmdt (conn_info->mem);
 		shmdt (conn_info->mem);
 		shmctl (conn_info->shmid, IPC_RMID, NULL);
 		shmctl (conn_info->shmid, IPC_RMID, NULL);
 		semctl (conn_info->semid, 0, IPC_RMID);
 		semctl (conn_info->semid, 0, IPC_RMID);
-		conn_info->destroyed = 1;
 	
 	
 		pthread_kill (conn_info->thread, SIGUSR1);
 		pthread_kill (conn_info->thread, SIGUSR1);
 	}
 	}
@@ -980,7 +1028,7 @@ retry_recv:
 	if (res == -1 && errno != EAGAIN) {
 	if (res == -1 && errno != EAGAIN) {
 		return (-1);
 		return (-1);
 	}
 	}
-#if defined(CS_SOLARIS) || defined(CS_BSD) || defined(CS_DARWIN)
+#if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
 	/* Error on socket, EOF is detected when recv return 0
 	/* Error on socket, EOF is detected when recv return 0
 	 */
 	 */
 	if (res == 0) {
 	if (res == 0) {
@@ -1015,7 +1063,7 @@ static void msg_send_or_queue (void *conn, struct iovec *iov, int iov_len)
 	/*
 	/*
 	 * Exit transmission if the connection is dead
 	 * Exit transmission if the connection is dead
 	 */
 	 */
-	if (ipc_conn_exiting (conn)) {
+	if (ipc_thread_active (conn) == 0) {
 		return;
 		return;
 	}
 	}