Selaa lähdekoodia

Properly detect shutdown of corosync process

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@3022 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake 15 vuotta sitten
vanhempi
commit
5a3c285fbd
3 muutettua tiedostoa jossa 58 lisäystä ja 22 poistoa
  1. 2 3
      exec/coroipcs.c
  2. 53 16
      include/corosync/coroipc_ipc.h
  3. 3 3
      lib/coroipcc.c

+ 2 - 3
exec/coroipcs.c

@@ -670,7 +670,7 @@ static void *pthread_ipc_consumer (void *conn)
 #endif
 
 	for (;;) {
-		ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT);
+		ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST_OR_FLUSH_OR_EXIT, IPC_SEMWAIT_NOFILE);
 		if (ipc_thread_active (conn_info) == 0) {
 			coroipcs_refcount_dec (conn_info);
 			pthread_exit (0);
@@ -681,7 +681,7 @@ static void *pthread_ipc_consumer (void *conn)
 		ipc_sem_getvalue (conn_info->control_buffer, SEMAPHORE_REQUEST, &sem_value);
 		if (sem_value > 0) {
 		
-			res = ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST);
+			res = ipc_sem_wait (conn_info->control_buffer, SEMAPHORE_REQUEST, IPC_SEMWAIT_NOFILE);
 		} else {
 			continue;
 		}
@@ -691,7 +691,6 @@ static void *pthread_ipc_consumer (void *conn)
 		 * There is no new message to process, continue for loop
 		 */
 		if (new_message == 0) {
-printf ("continuing\n");
 			continue;
 		}
 

+ 53 - 16
include/corosync/coroipc_ipc.h

@@ -64,6 +64,8 @@
  */
 #define IPC_SEMWAIT_TIMEOUT 2
 
+#define IPC_SEMWAIT_NOFILE 0
+
 enum req_init_types {
 	MESSAGE_REQ_RESPONSE_INIT = 0,
 	MESSAGE_REQ_DISPATCH_INIT = 1
@@ -163,12 +165,14 @@ struct coroipcs_zc_header {
 static inline cs_error_t
 ipc_sem_wait (
 	struct control_buffer *control_buffer,
-	enum ipc_semaphore_identifiers sem_id)
+	enum ipc_semaphore_identifiers sem_id,
+	int fd)
 {
 #if _POSIX_THREAD_PROCESS_SHARED < 1
 	struct sembuf sop;
 #else
 	struct timespec timeout;
+	struct pollfd pfd;
 	sem_t *sem = NULL;
 #endif
 	int res;
@@ -189,25 +193,58 @@ ipc_sem_wait (
 		break;
 	}
 
-	timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
-	timeout.tv_nsec = 0;
+	if (fd == IPC_SEMWAIT_NOFILE) {
+retry_sem_wait:
+		res = sem_wait (sem);
+		if (res == -1 && errno == EINTR) {
+			goto retry_sem_wait;
+		} else
+		if (res == -1) {
+			return (CS_ERR_LIBRARY);
+		}
+	} else { 
+		timeout.tv_sec = time(NULL) + IPC_SEMWAIT_TIMEOUT;
+		timeout.tv_nsec = 0;
 
 retry_sem_timedwait:
-	res = sem_timedwait (sem, &timeout);
-	if (res == -1 && errno == ETIMEDOUT) {
-		return (CS_ERR_LIBRARY);
-	} else
-	if (res == -1 && errno == EINTR) {
-		goto retry_sem_timedwait;
-	} else
-	if (res == -1) {
-		return (CS_ERR_LIBRARY);
+		res = sem_timedwait (sem, &timeout);
+		if (res == -1 && errno == ETIMEDOUT) {
+			pfd.fd = fd;
+			pfd.events = 0;
+
+			/*
+			 * Determine if server has failed (ERR_LIBRARY) or
+			 * is just performing slowly or in configuration change
+			 * (retry sem op)
+			 */
+			 
+retry_poll:
+			res = poll (&pfd, 1, 0);
+			if (res == -1 && errno == EINTR) {
+				goto retry_poll;
+			} else
+			if (res == -1) {
+				return (CS_ERR_LIBRARY);
+			}
+
+			if (res == 1) {
+				if (pfd.revents == POLLERR ||
+					pfd.revents == POLLHUP ||
+					pfd.revents == POLLNVAL) {
+
+					return (CS_ERR_LIBRARY);
+				}
+			}
+                	goto retry_sem_timedwait;
+		} else
+		if (res == -1 && errno == EINTR) {
+			goto retry_sem_timedwait;
+		} else
+		if (res == -1) {
+			return (CS_ERR_LIBRARY);
+		}
 	}
 #else
-	/*
-	 * Wait for semaphore indicating a new message from server
-	 * to client in queue
-	 */
 	sop.sem_num = sem_id;
 	sop.sem_op = -1;
 	sop.sem_flg = 0;

+ 3 - 3
lib/coroipcc.c

@@ -512,7 +512,7 @@ reply_receive (
 	cs_error_t res;
 
 retry_ipc_sem_wait:
-	res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE);
+	res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE, ipc_instance->fd);
 	if (res != CS_OK) {
 		if (res == CS_ERR_TRY_AGAIN) {
 			priv_change_send (ipc_instance);
@@ -539,7 +539,7 @@ reply_receive_in_buf (
 	cs_error_t res;
 
 retry_ipc_sem_wait:
-	res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE);
+	res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_RESPONSE, ipc_instance->fd);
 	if (res != CS_OK) {
 		if (res == CS_ERR_TRY_AGAIN) {
 			priv_change_send (ipc_instance);
@@ -917,7 +917,7 @@ coroipcc_dispatch_put (hdb_handle_t handle)
 	}
 
 retry_ipc_sem_wait:
-	res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_DISPATCH);
+	res = ipc_sem_wait (ipc_instance->control_buffer, SEMAPHORE_DISPATCH, ipc_instance->fd);
 	if (res != CS_OK) {
 		if (res == CS_ERR_TRY_AGAIN) {
 			priv_change_send (ipc_instance);