4
0
Эх сурвалжийг харах

Rework of IPC layer once again

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1096 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake 19 жил өмнө
parent
commit
2c42f9e9a5
1 өөрчлөгдсөн 122 нэмэгдсэн , 82 устгасан
  1. 122 82
      exec/ipc.c

+ 122 - 82
exec/ipc.c

@@ -95,7 +95,7 @@ struct outq_item {
 
 enum conn_state {
 	CONN_STATE_ACTIVE,
-	CONN_STATE_DELAYED,
+	CONN_STATE_REQUESTED,
 	CONN_STATE_CLOSED,
 	CONN_STATE_DISCONNECTED
 };
@@ -116,10 +116,11 @@ struct conn_info {
 	int authenticated;	/* Is this connection authenticated? */
 	void *private_data;	/* library connection private data */
 	struct conn_info *conn_info_partner;	/* partner connection dispatch<->response */
-	int should_exit_fn;
+        int (*lib_exit_fn) (void *conn);
 	struct timerlist timerlist;
 	pthread_mutex_t mutex;
-	int exit_thread;
+	pthread_mutex_t *shared_mutex;
+
 };
 
 static void *prioritized_poll_thread (void *conn);
@@ -188,6 +189,7 @@ static int dispatch_init_send_response (
 			error = SA_AIS_OK;
 
 		conn_info->conn_info_partner = (struct conn_info *)req_lib_dispatch_init->conn_info;
+		conn_info->conn_info_partner->shared_mutex = conn_info->shared_mutex;
 
 		msg_conn_info = (struct conn_info *)req_lib_dispatch_init->conn_info;
 		msg_conn_info->conn_info_partner = conn_info;
@@ -210,6 +212,7 @@ static int dispatch_init_send_response (
 				conn_info->conn_info_partner->private_data = NULL;
 			}
 		}
+	}
 
 	res_lib_dispatch_init.header.size = sizeof (mar_res_lib_dispatch_init_t);
 	res_lib_dispatch_init.header.id = MESSAGE_RES_INIT;
@@ -224,11 +227,10 @@ static int dispatch_init_send_response (
 		return (-1);
 	}
 
-	}
-
 	conn_info->state = CONN_STATE_ACTIVE;
-	conn_info->should_exit_fn = 1;
-	ais_service[req_lib_dispatch_init->resdis_header.service]->lib_init_fn (conn_info);
+	conn_info->conn_info_partner->state = CONN_STATE_ACTIVE;
+	conn_info->lib_exit_fn = ais_service[conn_info->service]->lib_exit_fn;
+	ais_service[conn_info->service]->lib_init_fn (conn_info);
 	return (0);
 }
 
@@ -245,6 +247,7 @@ static inline unsigned int conn_info_create (int fd) {
 	}
 
 	memset (conn_info, 0, sizeof (struct conn_info));
+
 	res = queue_init (&conn_info->outq, SIZEQUEUE,
 		sizeof (struct outq_item));
 	if (res != 0) {
@@ -252,22 +255,30 @@ static inline unsigned int conn_info_create (int fd) {
 		return (ENOMEM);
 	}
 	conn_info->inb = malloc (sizeof (char) * SIZEINB);
-	if (conn_info->inb == 0) {
+	if (conn_info->inb == NULL) {
+		queue_free (&conn_info->outq);
+		free (conn_info);
+		return (ENOMEM);
+	}
+	conn_info->shared_mutex = malloc (sizeof (pthread_mutex_t));
+	if (conn_info->shared_mutex == NULL) {
+		free (conn_info->inb);
 		queue_free (&conn_info->outq);
 		free (conn_info);
 		return (ENOMEM);
 	}
 
+	pthread_mutex_init (&conn_info->mutex, NULL);
+	pthread_mutex_init (conn_info->shared_mutex, NULL);
+
 	conn_info->state = CONN_STATE_ACTIVE;
 	conn_info->fd = fd;
 	conn_info->events = POLLIN|POLLNVAL;
 	conn_info->service = SOCKET_SERVICE_INIT;
-	conn_info->exit_thread = 0;
-	pthread_mutex_init (&conn_info->mutex, NULL);
 
 	pthread_attr_init (&conn_info->thread_attr);
 	pthread_attr_setstacksize (&conn_info->thread_attr, 100000);
-	pthread_attr_setdetachstate (&conn_info->thread_attr, PTHREAD_CREATE_JOINABLE);
+	pthread_attr_setdetachstate (&conn_info->thread_attr, PTHREAD_CREATE_DETACHED);
 	res = pthread_create (&conn_info->thread, &conn_info->thread_attr,
 		prioritized_poll_thread, conn_info);
 	return (res);
@@ -277,8 +288,6 @@ static void conn_info_destroy (struct conn_info *conn_info)
 {
 	struct outq_item *outq_item;
 
-	close (conn_info->fd);
-
 	/*
 	 * Free the outq queued items
 	 */
@@ -290,7 +299,10 @@ static void conn_info_destroy (struct conn_info *conn_info)
 
 	queue_free (&conn_info->outq);
 	free (conn_info->inb);
-
+	if (conn_info->conn_info_partner) {
+		conn_info->conn_info_partner->conn_info_partner = NULL;
+	}
+	free (conn_info);
 }
 
 static int libais_connection_active (struct conn_info *conn_info)
@@ -298,16 +310,20 @@ static int libais_connection_active (struct conn_info *conn_info)
 	return (conn_info->state == CONN_STATE_ACTIVE);
 }
 
-static void libais_disconnect_delayed (struct conn_info *conn_info)
+static void libais_disconnect_request (struct conn_info *conn_info)
 {
-	conn_info->state = CONN_STATE_DELAYED;
-	conn_info->conn_info_partner->state = CONN_STATE_DELAYED;
+	if (conn_info->state == CONN_STATE_ACTIVE) {
+		conn_info->state = CONN_STATE_REQUESTED;
+		conn_info->conn_info_partner->state = CONN_STATE_REQUESTED;
+	}
 }
 
 static int libais_disconnect (struct conn_info *conn_info)
 {
 	int res = 0;
 
+	assert (conn_info->state != CONN_STATE_ACTIVE);
+
 	if (conn_info->state == CONN_STATE_DISCONNECTED) {
 		assert (0);
 	}
@@ -315,13 +331,13 @@ static int libais_disconnect (struct conn_info *conn_info)
 	/*
 	 * Close active connections
 	 */
-	if (conn_info->state == CONN_STATE_ACTIVE || conn_info->state == CONN_STATE_DELAYED) {
+	if (conn_info->state == CONN_STATE_ACTIVE || conn_info->state == CONN_STATE_REQUESTED) {
 		close (conn_info->fd);
 		conn_info->state = CONN_STATE_CLOSED;
-		if (conn_info->conn_info_partner) {
+//		if (conn_info->conn_info_partner) {
 			close (conn_info->conn_info_partner->fd);
 			conn_info->conn_info_partner->state = CONN_STATE_CLOSED;
-		}
+//		}
 	}
 
 	/*
@@ -329,49 +345,43 @@ static int libais_disconnect (struct conn_info *conn_info)
 	 * one of the connections is closed
 	 */	
 	if (conn_info->state == CONN_STATE_CLOSED) {
-		if (conn_info->should_exit_fn &&
-			ais_service[conn_info->service]->lib_exit_fn) {
-				res = ais_service[conn_info->service]->lib_exit_fn (conn_info);
+		if (conn_info->lib_exit_fn) {
+			res = conn_info->lib_exit_fn (conn_info);
 		}
 		if (res == -1) {
 			return (-1);
 		}
-
-		if (conn_info->conn_info_partner) {
-			if (conn_info->conn_info_partner->should_exit_fn &&
-				ais_service[conn_info->conn_info_partner->service]->lib_exit_fn) {
-					res = ais_service[conn_info->conn_info_partner->service]->lib_exit_fn (conn_info);
-			}
-			if (res == -1) {
-				return (-1);
-			}
+		if (conn_info->conn_info_partner->lib_exit_fn) {
+			res = conn_info->conn_info_partner->lib_exit_fn (conn_info);
+		}
+		if (res == -1) {
+			return (-1);
 		}
-	}
-
-	/*
-	 * Exit other thread if it exists yet
-	 */
-	conn_info->exit_thread = 1;
-	if (conn_info->conn_info_partner) {
-		conn_info->conn_info_partner->exit_thread = 1;
-		pthread_kill (conn_info->conn_info_partner->thread, SIGUSR1);
-		pthread_join (conn_info->conn_info_partner->thread, NULL);
 	}
 	conn_info->state = CONN_STATE_DISCONNECTED;
 	conn_info->conn_info_partner->state = CONN_STATE_DISCONNECTED;
-	conn_info_destroy (conn_info);
-	if (conn_info->conn_info_partner) {
-		conn_info_destroy (conn_info->conn_info_partner);
-	}
+	return (0);
+}
 
-	if (conn_info->private_data) {
-		free (conn_info->private_data);
+static inline void conn_info_mutex_lock (
+	struct conn_info *conn_info,
+	unsigned int service)
+{
+	if (service == SOCKET_SERVICE_INIT) {
+		pthread_mutex_lock (&conn_info->mutex);
+	} else {
+		pthread_mutex_lock (conn_info->shared_mutex);
 	}
-	if (conn_info->conn_info_partner) {
-		free (conn_info->conn_info_partner);
+}
+static inline void conn_info_mutex_unlock (
+	struct conn_info *conn_info,
+	unsigned int service)
+{
+	if (service == SOCKET_SERVICE_INIT) {
+		pthread_mutex_unlock (&conn_info->mutex);
+	} else {
+		pthread_mutex_unlock (conn_info->shared_mutex);
 	}
-	free (conn_info);
-	return (0);
 }
 
 /*
@@ -385,7 +395,9 @@ static void *prioritized_poll_thread (void *conn)
 	int fds;
 	struct sched_param sched_param;
 	int res;
-	int timeout = 1;
+	pthread_mutex_t *rel_mutex;
+	unsigned int service;
+	struct conn_info *cinfo_partner;
 
 	sched_param.sched_priority = 1;
 	res = pthread_setschedparam (conn_info->thread, SCHED_RR, &sched_param);
@@ -393,28 +405,62 @@ static void *prioritized_poll_thread (void *conn)
 	ufd.fd = conn_info->fd;
 	for (;;) {
 retry_poll:
+		service = conn_info->service;
 		ufd.events = conn_info->events;
 		ufd.revents = 0;
-		fds = poll (&ufd, 1, timeout);
-		if (conn_info->exit_thread) {
+		fds = poll (&ufd, 1, -1);
+		
+		conn_info_mutex_lock (conn_info, service);
+		
+		switch (conn_info->state) {
+		case CONN_STATE_REQUESTED:
+		case CONN_STATE_CLOSED:
+			res = libais_disconnect (conn);
+			if (res != 0) {
+				conn_info_mutex_unlock (conn_info, service);
+				goto retry_poll;
+			}
+			break;
+
+		case CONN_STATE_DISCONNECTED:
+			rel_mutex = conn_info->shared_mutex;
+			cinfo_partner = conn_info->conn_info_partner;
+			conn_info_destroy (conn);
+			if (service == SOCKET_SERVICE_INIT) {
+				pthread_mutex_unlock (&conn_info->mutex);
+			} else {
+				pthread_mutex_unlock (rel_mutex);
+			}
+			if (cinfo_partner == NULL) {
+				free (rel_mutex);
+			}
+			pthread_exit (0);
+			/*
+			 * !! NOTE !! this is the exit point for this thread
+			 */
+			break;
+
+		default:
 			break;
 		}
+
 		if (fds == -1) {
+			conn_info_mutex_unlock (conn_info, service);
 			goto retry_poll;
 		}
-		timeout = -1;
+
 		ipc_serialize_lock_fn ();
+
 		if (fds == 1 && ufd.revents) {
-			if ((ufd.revents & (POLLERR|POLLHUP)) ||
-				conn_info->state == CONN_STATE_DELAYED) {
-				res = libais_disconnect (conn_info);
-				if (res != 0) {
-					ipc_serialize_unlock_fn ();
-					continue;
-				} else {
-					break;
-				}
+			if (ufd.revents & (POLLERR|POLLHUP)) {
+
+				libais_disconnect_request (conn_info);
+
+				conn_info_mutex_unlock (conn_info, service);
+				ipc_serialize_unlock_fn ();
+				continue;
 			}
+			
 			if (ufd.revents & POLLOUT) {
 				conn_info_outq_flush (conn_info);
 			}
@@ -423,11 +469,14 @@ retry_poll:
 				libais_deliver (conn_info);
 			}
 		}
+
 		ipc_serialize_unlock_fn ();
+		conn_info_mutex_unlock (conn_info, service);
 	}
 
-	ipc_serialize_unlock_fn ();
-	pthread_exit (0);
+	/*
+	 * This code never reached
+	 */
 	return (0);
 }
 
@@ -452,9 +501,7 @@ static int conn_info_outq_flush (struct conn_info *conn_info) {
 	struct iovec iov_send;
 	char *msg_addr;
 
-	pthread_mutex_lock (&conn_info->mutex);
 	if (!libais_connection_active (conn_info)) {
-		pthread_mutex_unlock (&conn_info->mutex);
 		return (-1);
 	}
 	outq = &conn_info->outq;
@@ -481,12 +528,10 @@ retry_sendmsg:
 			goto retry_sendmsg;
 		}
 		if (res == -1 && errno == EAGAIN) {
-			pthread_mutex_unlock (&conn_info->mutex);
 			return (0);
 		}
 		if (res == -1 && errno == EPIPE) {
-			pthread_mutex_unlock (&conn_info->mutex);
-			libais_disconnect_delayed (conn_info);
+			libais_disconnect_request (conn_info);
 			return (0);
 		}
 		if (res == -1) {
@@ -496,7 +541,6 @@ retry_sendmsg:
 		if (res + conn_info->byte_start != queue_item->mlen) {
 			conn_info->byte_start += res;
 
-			pthread_mutex_unlock (&conn_info->mutex);
 			return (0);
 		}
 
@@ -511,7 +555,6 @@ retry_sendmsg:
 	if (queue_is_empty (outq)) {
 		conn_info->events = POLLIN|POLLNVAL;
 	}
-	pthread_mutex_unlock (&conn_info->mutex);
 	return (0);
 }
 
@@ -591,6 +634,7 @@ retry_recv:
 #ifdef OPENAIS_LINUX
 	if (conn_info->authenticated == 0) {
 		cmsg = CMSG_FIRSTHDR (&msg_recv);
+		assert (cmsg);
 		cred = (struct ucred *)CMSG_DATA (cmsg);
 		if (cred) {
 			if (cred->uid == 0 || cred->gid == g_gid_valid) {
@@ -889,7 +933,6 @@ int openais_conn_send_response (
 	if (!libais_connection_active (conn_info)) {
 		return (-1);
 	}
-	pthread_mutex_lock (&conn_info->mutex);
 	outq = &conn_info->outq;
 
 	msg_send.msg_iov = &iov_send;
@@ -906,8 +949,7 @@ int openais_conn_send_response (
 		 * and report that the outgoing queue is full
 		 */
 		log_printf (LOG_LEVEL_ERROR, "Library queue is full, disconnecting library connection.\n");
-		libais_disconnect_delayed (conn_info);
-		pthread_mutex_unlock (&conn_info->mutex);
+		libais_disconnect_request (conn_info);
 		return (-1);
 	}
 	while (!queue_is_empty (outq)) {
@@ -927,8 +969,7 @@ retry_sendmsg:
 			break; /* outgoing kernel queue full */
 		}
 		if (res == -1 && errno == EPIPE) {
-			libais_disconnect_delayed (conn_info);
-			pthread_mutex_unlock (&conn_info->mutex);
+			libais_disconnect_request (conn_info);
 			return (0);
 		}
 		if (res == -1) {
@@ -952,7 +993,7 @@ retry_sendmsg:
 
 	queue_empty = queue_is_empty (outq);
 	/*
-	 * Send requested message
+	 * Send request message
 	 */
 	if (queue_empty) {
 
@@ -985,7 +1026,7 @@ retry_sendmsg_two:
 		cmsg = malloc (mlen);
 		if (cmsg == 0) {
 			log_printf (LOG_LEVEL_ERROR, "Library queue couldn't allocate a message, disconnecting library connection.\n");
-			libais_disconnect_delayed (conn_info);
+			libais_disconnect_request (conn_info);
 			return (-1);
 		}
 		queue_item_out.msg = cmsg;
@@ -1000,7 +1041,6 @@ retry_sendmsg_two:
 		conn_info->events = POLLIN|POLLOUT|POLLNVAL;
 		pthread_kill (conn_info->thread, SIGUSR1);
 	}
-	pthread_mutex_unlock (&conn_info->mutex);
 	return (0);
 }