Преглед на файлове

defect 188 missed initial checkin

(Logical change 1.177)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@597 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake преди 21 години
родител
ревизия
37fb3fca52
променени са 8 файла, в които са добавени 778 реда и са изтрити 789 реда
  1. 2 2
      lib/Makefile
  2. 131 167
      lib/amf.c
  3. 101 119
      lib/ckpt.c
  4. 62 78
      lib/clm.c
  5. 86 120
      lib/evs.c
  6. 188 189
      lib/evt.c
  7. 187 110
      lib/util.c
  8. 21 4
      lib/util.h

+ 2 - 2
lib/Makefile

@@ -28,8 +28,8 @@
 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
 # THE POSSIBILITY OF SUCH DAMAGE.
 # Production mode flags
-#CFLAGS = -O3 -Wall
-#LDFLAGS = 
+CFLAGS = -O3 -Wall
+LDFLAGS = 
 
 # Debug mode flags
 CFLAGS = -g -DDEBUG -Wall

+ 131 - 167
lib/amf.c

@@ -59,13 +59,14 @@ struct res_overlay {
  * Data structure for instance data
  */
 struct amfInstance {
-	int fd;
+	int response_fd;
+	int dispatch_fd;
 	SaAmfCallbacksT callbacks;
-	struct queue inq;
 	SaNameT compName;
 	int compRegistered;
 	int finalize;
-	pthread_mutex_t mutex;
+	pthread_mutex_t response_mutex;
+	pthread_mutex_t dispatch_mutex;
 };
 
 static void amfHandleInstanceDestructor (void *);
@@ -101,13 +102,13 @@ void amfHandleInstanceDestructor (void *instance)
 {
 	struct amfInstance *amfInstance = (struct amfInstance *)instance;
 
-	if (amfInstance->fd != -1) {
-		shutdown (amfInstance->fd, 0);
-		close (amfInstance->fd);
+	if (amfInstance->response_fd != -1) {
+		shutdown (amfInstance->response_fd, 0);
+		close (amfInstance->response_fd);
 	}
-
-	if (amfInstance->inq.items) {
-		free (amfInstance->inq.items);
+	if (amfInstance->dispatch_fd != -1) {
+		shutdown (amfInstance->dispatch_fd, 0);
+		close (amfInstance->dispatch_fd);
 	}
 }
 
@@ -135,26 +136,21 @@ saAmfInitialize (
 		goto error_destroy;
 	}
 
-	amfInstance->fd = -1;
-	
-	/*
-	 * An inq is needed to store async messages while waiting for a 
-	 * sync response
-	 */
-	error = saQueueInit (&amfInstance->inq, 512, sizeof (void *));
-	if (error != SA_OK) {
-		goto error_put_destroy;
-	}
+	amfInstance->response_fd = -1;
 
-	error = saServiceConnect (&amfInstance->fd, MESSAGE_REQ_AMF_INIT);
+	amfInstance->dispatch_fd = -1;
+	
+	error = saServiceConnectTwo (&amfInstance->response_fd,
+		&amfInstance->dispatch_fd, AMF_SERVICE);
 	if (error != SA_OK) {
 		goto error_put_destroy;
-// this is wrong
 	}
 
 	memcpy (&amfInstance->callbacks, amfCallbacks, sizeof (SaAmfCallbacksT));
 
-	pthread_mutex_init (&amfInstance->mutex, NULL);
+	pthread_mutex_init (&amfInstance->response_mutex, NULL);
+
+	pthread_mutex_init (&amfInstance->dispatch_mutex, NULL);
 
 	saHandleInstancePut (&amfHandleDatabase, *amfHandle);
 
@@ -181,7 +177,7 @@ saAmfSelectionObjectGet (
 		return (error);
 	}
 
-	*selectionObject = amfInstance->fd;
+	*selectionObject = amfInstance->dispatch_fd;
 
 	saHandleInstancePut (&amfHandleDatabase, *amfHandle);
 	return (SA_OK);
@@ -195,20 +191,15 @@ saAmfDispatch (
 	struct pollfd ufds;
 	int timeout = -1;
 	SaErrorT error;
+	int cont = 1; /* always continue do loop except when set to 0 */
 	int dispatch_avail;
 	struct amfInstance *amfInstance;
-	SaAmfCallbacksT callbacks;
 	struct res_lib_amf_healthcheckcallback *res_lib_amf_healthcheckcallback;
 	struct res_lib_amf_readinessstatesetcallback *res_lib_amf_readinessstatesetcallback;
 	struct res_lib_amf_csisetcallback *res_lib_amf_csisetcallback;
 	struct res_lib_amf_csiremovecallback *res_lib_amf_csiremovecallback;
 	struct res_lib_amf_protectiongrouptrackcallback *res_lib_amf_protectiongrouptrackcallback;
-	struct req_header **queue_msg;
-	struct req_header *msg;
-	int empty;
-	int ignore_dispatch = 0;
-	int cont = 1; /* always continue do loop except when set to 0 */
-	int poll_fd;
+	SaAmfCallbacksT callbacks;
 	struct res_overlay dispatch_data;
 
 	error = saHandleInstanceGet (&amfHandleDatabase, *amfHandle,
@@ -225,12 +216,10 @@ saAmfDispatch (
 	}
 
 	do {
-		poll_fd = amfInstance->fd;
-
 		/*
 		 * Read data directly from socket
 		 */
-		ufds.fd = poll_fd;
+		ufds.fd = amfInstance->dispatch_fd;
 		ufds.events = POLLIN;
 		ufds.revents = 0;
 
@@ -239,79 +228,66 @@ saAmfDispatch (
 			goto error_nounlock;
 		}
 
-		pthread_mutex_lock (&amfInstance->mutex);
+		pthread_mutex_lock (&amfInstance->dispatch_mutex);
+
+		error = saPollRetry (&ufds, 1, 0);
+		if (error != SA_OK) {
+			goto error_nounlock;
+		}
 
 		/*
 		 * Handle has been finalized in another thread
 		 */
 		if (amfInstance->finalize == 1) {
 			error = SA_OK;
-			pthread_mutex_unlock (&amfInstance->mutex);
+			pthread_mutex_unlock (&amfInstance->dispatch_mutex);
 			goto error_unlock;
 		}
 
 		dispatch_avail = ufds.revents & POLLIN;
 		if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
-			pthread_mutex_unlock (&amfInstance->mutex);
+			pthread_mutex_unlock (&amfInstance->dispatch_mutex);
 			break; /* exit do while cont is 1 loop */
 		} else
 		if (dispatch_avail == 0) {
-			pthread_mutex_unlock (&amfInstance->mutex);
+			pthread_mutex_unlock (&amfInstance->dispatch_mutex);
 			continue; /* next poll */
 		}
 
-		saQueueIsEmpty(&amfInstance->inq, &empty);
-		if (empty == 0) {
-			/*
-			 * Queue is not empty, read data from queue
-			 */
-			saQueueItemGet (&amfInstance->inq, (void *)&queue_msg);
-			msg = *queue_msg;
-			memcpy (&dispatch_data, msg, msg->size);
-			saQueueItemRemove (&amfInstance->inq);
-			free (msg);
-		} else {
+		if (ufds.revents & POLLIN) {
 			/*
 			 * Queue empty, read response from socket
 			 */
-			error = saRecvRetry (amfInstance->fd, &dispatch_data.header,
+			error = saRecvRetry (amfInstance->dispatch_fd, &dispatch_data.header,
 				sizeof (struct res_header), MSG_WAITALL | MSG_NOSIGNAL);
 			if (error != SA_OK) {
 				goto error_unlock;
 			}
 			if (dispatch_data.header.size > sizeof (struct res_header)) {
-				error = saRecvRetry (amfInstance->fd, &dispatch_data.data,
+				error = saRecvRetry (amfInstance->dispatch_fd, &dispatch_data.data,
 					dispatch_data.header.size - sizeof (struct res_header),
 					MSG_WAITALL | MSG_NOSIGNAL);
 				if (error != SA_OK) {
 					goto error_unlock;
 				}
 			}
+		} else {
+			pthread_mutex_unlock (&amfInstance->dispatch_mutex);
+			continue;
 		}
+
 		/*
 		 * Make copy of callbacks, message data, unlock instance, and call callback
 		 * A risk of this dispatch method is that the callback routines may
 		 * operate at the same time that amfFinalize has been called in another thread.
 		 */
 		memcpy (&callbacks, &amfInstance->callbacks, sizeof (SaAmfCallbacksT));
-		pthread_mutex_unlock (&amfInstance->mutex);
-
+		pthread_mutex_unlock (&amfInstance->dispatch_mutex);
 
 		/*
 		 * Dispatch incoming response
 		 */
 		switch (dispatch_data.header.id) {
-		case MESSAGE_RES_LIB_ACTIVATEPOLL:
-			/*
-			 * This is a do nothing message which the node executive sends
-			 * to activate the file amfHandle in poll when the library has
-			 * queued a message into amfHandle->inq
-			 * The dispatch is ignored for the following two cases:
-			 * 1) setting of timeout to zero for the DISPATCH_ALL case
-			 * 2) expiration of the do loop for the DISPATCH_ONE case
-			 */
-			ignore_dispatch = 1;
-			break;
 
 		case MESSAGE_RES_AMF_HEALTHCHECKCALLBACK:
 			res_lib_amf_healthcheckcallback = (struct res_lib_amf_healthcheckcallback *)&dispatch_data;
@@ -375,16 +351,9 @@ saAmfDispatch (
 		 */
 		switch (dispatchFlags) {
 		case SA_DISPATCH_ONE:
-			if (ignore_dispatch) {
-				ignore_dispatch = 0;
-			} else {
-				cont = 0;
-			}
+			cont = 0;
 			break;
 		case SA_DISPATCH_ALL:
-			if (ignore_dispatch) {
-				ignore_dispatch = 0;
-			}
 			break;
 		case SA_DISPATCH_BLOCKING:
 			break;
@@ -409,22 +378,25 @@ saAmfFinalize (
 		return (error);
 	}
 
-	pthread_mutex_lock (&amfInstance->mutex);
+	pthread_mutex_lock (&amfInstance->dispatch_mutex);
+
+	pthread_mutex_lock (&amfInstance->response_mutex);
 
 	/*
 	 * Another thread has already started finalizing
 	 */
 	if (amfInstance->finalize) {
-		pthread_mutex_unlock (&amfInstance->mutex);
+		pthread_mutex_unlock (&amfInstance->response_mutex);
+		pthread_mutex_unlock (&amfInstance->dispatch_mutex);
 		saHandleInstancePut (&amfHandleDatabase, *amfHandle);
 		return (SA_ERR_BAD_HANDLE);
 	}
 
 	amfInstance->finalize = 1;
 
-	saActivatePoll (amfInstance->fd);
+	pthread_mutex_unlock (&amfInstance->response_mutex);
 
-	pthread_mutex_unlock (&amfInstance->mutex);
+	pthread_mutex_unlock (&amfInstance->dispatch_mutex);
 
 	saHandleDestroy (&amfHandleDatabase, *amfHandle);
 
@@ -458,21 +430,12 @@ saAmfComponentRegister (
 		return (error);
 	}
 
-	pthread_mutex_lock (&amfInstance->mutex);
+	pthread_mutex_lock (&amfInstance->response_mutex);
 
-	error = saSendRetry (amfInstance->fd, &req_lib_amf_componentregister, sizeof (struct req_lib_amf_componentregister), MSG_NOSIGNAL);
-	if (error != SA_OK) {
-		goto error_unlock;
-	}
-
-	/*
-	 * Search for COMPONENTREGISTER responses and queue any
-	 * messages that dont match in this amfHandle's inq.
-	 * This must be done to avoid dropping async messages
-	 * during this sync message retrieval
-	 */
-	error = saRecvQueue (amfInstance->fd, &res_lib_amf_componentregister,
-		&amfInstance->inq, MESSAGE_RES_AMF_COMPONENTREGISTER);
+	error - saSendReceiveReply (amfInstance->response_fd, &req_lib_amf_componentregister,
+		sizeof (struct req_lib_amf_componentregister),
+		&res_lib_amf_componentregister,
+		sizeof (struct res_lib_amf_componentregister));
 	if (error != SA_OK) {
 		goto error_unlock;
 	}
@@ -485,7 +448,7 @@ saAmfComponentRegister (
 	error = res_lib_amf_componentregister.header.error;
 
 error_unlock:
-	pthread_mutex_unlock (&amfInstance->mutex);
+	pthread_mutex_unlock (&amfInstance->response_mutex);
 	saHandleInstancePut (&amfHandleDatabase, *amfHandle);
 	return (error);
 }
@@ -515,33 +478,24 @@ saAmfComponentUnregister (
 		return (error);
 	}
 
-	pthread_mutex_lock (&amfInstance->mutex);
+	pthread_mutex_lock (&amfInstance->response_mutex);
 
-	error = saSendRetry (amfInstance->fd, &req_lib_amf_componentunregister,
-		sizeof (struct req_lib_amf_componentunregister), MSG_NOSIGNAL);
+	error - saSendReceiveReply (amfInstance->response_fd,
+		&req_lib_amf_componentunregister,
+		sizeof (struct req_lib_amf_componentunregister),
+		&res_lib_amf_componentunregister,
+		sizeof (struct res_lib_amf_componentunregister));
 	if (error != SA_OK) {
 		goto error_unlock;
 	}
 
-	/*
-	 * Search for COMPONENTUNREGISTER responses and queue any
-	 * messages that dont match in this amfHandle's inq.
-	 * This must be done to avoid dropping async messages
-	 * during this sync message retrieval
-	 */
-	error = saRecvQueue (amfInstance->fd, &res_lib_amf_componentunregister,
-		&amfInstance->inq, MESSAGE_RES_AMF_COMPONENTUNREGISTER);
-	if (error != SA_OK) {
-		goto error_unlock;
-	}
-
-	if (res_lib_amf_componentunregister.header.error == SA_OK) {
+	error = res_lib_amf_componentunregister.header.error;
+	if (error == SA_OK) {
 		amfInstance->compRegistered = 0;
 	}
-	error = res_lib_amf_componentunregister.header.error;
 
 error_unlock:
-	pthread_mutex_unlock (&amfInstance->mutex);
+	pthread_mutex_unlock (&amfInstance->response_mutex);
 	saHandleInstancePut (&amfHandleDatabase, *amfHandle);
 	return (error);
 }
@@ -559,18 +513,19 @@ saAmfCompNameGet (
 		return (error);
 	}
 
-	pthread_mutex_lock (&amfInstance->mutex);
+	pthread_mutex_lock (&amfInstance->response_mutex);
 
 	if (amfInstance->compRegistered == 0) {
-		pthread_mutex_unlock (&amfInstance->mutex);
+		pthread_mutex_unlock (&amfInstance->response_mutex);
 		return (SA_ERR_NOT_EXIST);
 	}
 
 	memcpy (compName, &amfInstance->compName, sizeof (SaNameT));
 
-	pthread_mutex_unlock (&amfInstance->mutex);
+	pthread_mutex_unlock (&amfInstance->response_mutex);
 
 	saHandleInstancePut (&amfHandleDatabase, *amfHandle);
+
 	return (SA_OK);
 }
 
@@ -579,12 +534,13 @@ saAmfReadinessStateGet (
 	const SaNameT *compName,
 	SaAmfReadinessStateT *readinessState)
 {
-	int fd;
+	int fd_response;
+	int fd_dispatch;
 	SaErrorT error;
 	struct req_amf_readinessstateget req_amf_readinessstateget;
 	struct res_lib_amf_readinessstateget res_lib_amf_readinessstateget;
 
-	error = saServiceConnect (&fd, MESSAGE_REQ_AMF_INIT);
+	error = saServiceConnectTwo (&fd_response, &fd_dispatch, AMF_SERVICE);
 	if (error != SA_OK) {
 		goto exit_noclose;
 	}
@@ -592,22 +548,22 @@ saAmfReadinessStateGet (
 	req_amf_readinessstateget.header.size = sizeof (struct req_amf_readinessstateget);
 	memcpy (&req_amf_readinessstateget.compName, compName, sizeof (SaNameT));
 
-	error = saSendRetry (fd, &req_amf_readinessstateget,
-		sizeof (struct req_amf_readinessstateget), MSG_NOSIGNAL);
+	error - saSendReceiveReply (fd_response,
+		&req_amf_readinessstateget, sizeof (struct req_amf_readinessstateget),
+		&res_lib_amf_readinessstateget, sizeof (struct res_lib_amf_readinessstateget));
 	if (error != SA_OK) {
 		goto exit_close;
 	}
 
-	error = saRecvRetry (fd, &res_lib_amf_readinessstateget,
-		sizeof (struct res_lib_amf_readinessstateget), MSG_WAITALL | MSG_NOSIGNAL);
+	error = res_lib_amf_readinessstateget.header.error;
 	if (error == SA_OK) {
 		memcpy (readinessState, &res_lib_amf_readinessstateget.readinessState, 
 			sizeof (SaAmfReadinessStateT));
-		error = res_lib_amf_readinessstateget.header.error;
 	}
 		
 exit_close:
-	close (fd);
+	close (fd_response);
+	close (fd_dispatch);
 exit_noclose:
 	return (error);
 }
@@ -618,6 +574,7 @@ saAmfStoppingComplete (
 	SaErrorT error)
 {
 	struct req_amf_stoppingcomplete req_amf_stoppingcomplete;
+	struct res_lib_amf_stoppingcomplete res_lib_amf_stoppingcomplete;
 	int fd;
 	SaErrorT errorResult;
 
@@ -629,11 +586,17 @@ saAmfStoppingComplete (
 	req_amf_stoppingcomplete.header.size = sizeof (struct req_amf_stoppingcomplete);
 	req_amf_stoppingcomplete.invocation = invocation;
 	req_amf_stoppingcomplete.error = error;
-	errorResult = saSendRetry (fd, &req_amf_stoppingcomplete,
-		sizeof (struct req_amf_stoppingcomplete), MSG_NOSIGNAL);
+	error = saSendReceiveReply (fd,
+		&req_amf_stoppingcomplete, sizeof (struct req_amf_stoppingcomplete),
+		&res_lib_amf_stoppingcomplete, sizeof (struct res_lib_amf_stoppingcomplete));
+	if (error != SA_OK) {
+		goto exit_close;
+	}
+	error = res_lib_amf_stoppingcomplete.header.error;
+// TODO executive needs to send reply of stopping complete
 
+exit_close:
 	close (fd);
-
 exit_noclose:
 	return (errorResult);
 }
@@ -658,17 +621,16 @@ saAmfHAStateGet (
 	memcpy (&req_amf_hastateget.compName, compName, sizeof (SaNameT));
 	memcpy (&req_amf_hastateget.csiName, csiName, sizeof (SaNameT));
 
-	error = saSendRetry (fd, &req_amf_hastateget,
-			sizeof (struct req_amf_hastateget), MSG_NOSIGNAL);
+	error = saSendReceiveReply (fd,
+		&req_amf_hastateget, sizeof (struct req_amf_hastateget),
+		&res_lib_amf_hastateget, sizeof (struct res_lib_amf_hastateget));
 	if (error != SA_OK) {
 		goto exit_close;
 	}
 
-	error = saRecvRetry (fd, &res_lib_amf_hastateget,
-		sizeof (struct res_lib_amf_hastateget), MSG_WAITALL | MSG_NOSIGNAL);
+	error = res_lib_amf_hastateget.header.error;
 	if (error == SA_OK) {
 		memcpy (haState, &res_lib_amf_hastateget.haState, sizeof (SaAmfHAStateT));
-		error = res_lib_amf_hastateget.header.error;
 	}
 	
 exit_close:
@@ -702,21 +664,21 @@ saAmfProtectionGroupTrackStart (
 		return (error);
 	}
 
-	pthread_mutex_lock (&amfInstance->mutex);
+	pthread_mutex_lock (&amfInstance->response_mutex);
 
-	error = saSendRetry (amfInstance->fd, &req_amf_protectiongrouptrackstart,
-		sizeof (struct req_amf_protectiongrouptrackstart), MSG_NOSIGNAL);
+	error = saSendReceiveReply (amfInstance->response_fd,
+		&req_amf_protectiongrouptrackstart,
+		sizeof (struct req_amf_protectiongrouptrackstart),
+		&res_lib_amf_protectiongrouptrackstart,
+		sizeof (struct res_lib_amf_protectiongrouptrackstart));
 	if (error != SA_OK) {
 		goto error_unlock;
 	}
 
-	error = saRecvQueue (amfInstance->fd, &res_lib_amf_protectiongrouptrackstart,
-		&amfInstance->inq, MESSAGE_RES_AMF_PROTECTIONGROUPTRACKSTART);
-
 	error = res_lib_amf_protectiongrouptrackstart.header.error;
 
 error_unlock:
-	pthread_mutex_unlock (&amfInstance->mutex);
+	pthread_mutex_unlock (&amfInstance->response_mutex);
 	saHandleInstancePut (&amfHandleDatabase, *amfHandle);
 	return (error);
 }
@@ -740,21 +702,21 @@ saAmfProtectionGroupTrackStop (
 		return (error);
 	}
 
-	pthread_mutex_lock (&amfInstance->mutex);
+	pthread_mutex_lock (&amfInstance->response_mutex);
 
-	error = saSendRetry (amfInstance->fd, &req_amf_protectiongrouptrackstop,
-		sizeof (struct req_amf_protectiongrouptrackstop), MSG_NOSIGNAL);
+	error = saSendReceiveReply (amfInstance->response_fd,
+		&req_amf_protectiongrouptrackstop,
+		sizeof (struct req_amf_protectiongrouptrackstop),
+		&res_lib_amf_protectiongrouptrackstop,
+		sizeof (struct res_lib_amf_protectiongrouptrackstop));
 	if (error != SA_OK) {
 		goto error_unlock;
 	}
 
-	error = saRecvQueue (amfInstance->fd, &res_lib_amf_protectiongrouptrackstop,
-		&amfInstance->inq, MESSAGE_RES_AMF_PROTECTIONGROUPTRACKSTOP);
-
 	error = res_lib_amf_protectiongrouptrackstop.header.error;
 
 error_unlock:
-	pthread_mutex_unlock (&amfInstance->mutex);
+	pthread_mutex_unlock (&amfInstance->response_mutex);
 	saHandleInstancePut (&amfHandleDatabase, *amfHandle);
 	return (error);
 }
@@ -788,14 +750,9 @@ saAmfErrorReport (
 		additionalData, sizeof (SaAmfAdditionalDataT));
 	*/
 
-	error = saSendRetry (fd, &req_lib_amf_errorreport,
-		sizeof (struct req_lib_amf_errorreport), MSG_NOSIGNAL);
-
-	/*
-	 * Get response from executive and respond to user application
-	 */
-	error = saRecvRetry (fd, &res_lib_amf_errorreport,
-		sizeof (struct res_lib_amf_errorreport), MSG_WAITALL | MSG_NOSIGNAL);
+	error = saSendReceiveReply (fd,
+		&req_lib_amf_errorreport, sizeof (struct req_lib_amf_errorreport),
+		&res_lib_amf_errorreport, sizeof (struct res_lib_amf_errorreport));
 	if (error != SA_OK) {
 		goto exit_close;
 	}
@@ -825,14 +782,9 @@ saAmfErrorCancelAll (
 	req_lib_amf_errorcancelall.header.size = sizeof (struct req_lib_amf_errorcancelall);
 	memcpy (&req_lib_amf_errorcancelall.compName, compName, sizeof (SaNameT));
 
-	error = saSendRetry (fd, &req_lib_amf_errorcancelall,
-		sizeof (struct req_lib_amf_errorcancelall), MSG_NOSIGNAL);
-
-	/*
-	 * Get response from executive and respond to user application
-	 */
-	error = saRecvRetry (fd, &res_lib_amf_errorcancelall,
-		sizeof (struct res_lib_amf_errorcancelall), MSG_WAITALL | MSG_NOSIGNAL);
+	error = saSendReceiveReply (fd,
+		&req_lib_amf_errorcancelall, sizeof (struct req_lib_amf_errorcancelall),
+		&res_lib_amf_errorcancelall, sizeof (struct res_lib_amf_errorcancelall));
 	if (error != SA_OK) {
 		goto exit_close;
 	}
@@ -864,19 +816,20 @@ saAmfComponentCapabilityModelGet (
 	req_amf_componentcapabilitymodelget.header.size = sizeof (struct req_amf_componentcapabilitymodelget);
 	memcpy (&req_amf_componentcapabilitymodelget.compName, compName, sizeof (SaNameT));
 
-	error = saSendRetry (fd, &req_amf_componentcapabilitymodelget,
-		sizeof (struct req_amf_componentcapabilitymodelget), MSG_NOSIGNAL);
+	error = saSendReceiveReply (fd,
+		&req_amf_componentcapabilitymodelget,
+		sizeof (struct req_amf_componentcapabilitymodelget),
+		&res_lib_amf_componentcapabilitymodelget,
+		sizeof (struct res_lib_amf_componentcapabilitymodelget));
 	if (error != SA_OK) {
 		goto exit_close;
 	}
+	error = res_lib_amf_componentcapabilitymodelget.header.error;
 
-	error = saRecvRetry (fd, &res_lib_amf_componentcapabilitymodelget,
-		sizeof (struct res_lib_amf_componentcapabilitymodelget), MSG_WAITALL | MSG_NOSIGNAL);
 	if (error == SA_OK) {
 		memcpy (componentCapabilityModel,
 			&res_lib_amf_componentcapabilitymodelget.componentCapabilityModel, 
 			sizeof (SaAmfComponentCapabilityModelT));
-		error = res_lib_amf_componentcapabilitymodelget.header.error;
 	}
 		
 exit_close:
@@ -900,10 +853,12 @@ saAmfResponse (
 	SaErrorT error)
 {
 	struct req_amf_response req_amf_response;
-	int fd;
+	struct res_lib_amf_response res_lib_amf_response;
+	int fd_response;
+	int fd_dispatch;
 	SaErrorT errorResult;
 
-	errorResult = saServiceConnect (&fd, MESSAGE_REQ_AMF_INIT);
+	errorResult = saServiceConnectTwo (&fd_response, &fd_dispatch, AMF_SERVICE);
 	if (errorResult != SA_OK) {
 		goto exit_noclose;
 	}
@@ -911,10 +866,19 @@ saAmfResponse (
 	req_amf_response.header.size = sizeof (struct req_amf_response);
 	req_amf_response.invocation = invocation;
 	req_amf_response.error = error;
-	errorResult = saSendRetry (fd, &req_amf_response,
-		sizeof (struct req_amf_response), MSG_NOSIGNAL);
 
-	close (fd);
+	errorResult = saSendReceiveReply (fd_response,
+		&req_amf_response,
+		sizeof (struct req_amf_response),
+		&res_lib_amf_response,
+		sizeof (struct res_lib_amf_response));
+
+	close (fd_response);
+	close (fd_dispatch);
+
+	if (errorResult == SA_OK) {
+		errorResult = res_lib_amf_response.header.error;
+	}
 
 exit_noclose:
 	return (errorResult);

+ 101 - 119
lib/ckpt.c

@@ -60,28 +60,29 @@ struct message_overlay {
  * Data structure for instance data
  */
 struct ckptInstance {
-	int fd;
-	struct queue inq;
+	int response_fd;
+	int dispatch_fd;
 	SaCkptCallbacksT callbacks;
 	int finalize;
-	pthread_mutex_t mutex;
+	pthread_mutex_t response_mutex;
+	pthread_mutex_t dispatch_mutex;
 };
 
 struct ckptCheckpointInstance {
-	int fd;
+	int response_fd;
 	SaCkptHandleT ckptHandle;
 	SaCkptCheckpointOpenFlagsT checkpointOpenFlags;
 	SaNameT checkpointName;
 	SaUint32T maxSectionIdSize;
-	pthread_mutex_t mutex;
+	pthread_mutex_t response_mutex;
 };
 
 struct ckptSectionIterationInstance {
-	int fd;
+	int response_fd;
 	SaCkptCheckpointHandleT checkpointHandle;
 	struct list_head sectionIdListHead;
 	SaUint32T maxSectionIdSize;
-	pthread_mutex_t mutex;
+	pthread_mutex_t response_mutex;
 };
 
 void ckptHandleInstanceDestructor (void *instance);
@@ -139,12 +140,9 @@ void ckptHandleInstanceDestructor (void *instance)
 {
 struct ckptInstance *ckptInstance = (struct ckptInstance *)instance;
 
-	if (ckptInstance->fd != -1) {
-		shutdown (ckptInstance->fd, 0);
-		close (ckptInstance->fd);
-	}
-	if (ckptInstance->inq.items) {
-		free (ckptInstance->inq.items);
+	if (ckptInstance->response_fd != -1) {
+		shutdown (ckptInstance->response_fd, 0);
+		close (ckptInstance->response_fd);
 	}
 }
 
@@ -157,10 +155,10 @@ void ckptSectionIterationHandleInstanceDestructor (void *instance)
 {
 	struct ckptSectionIterationInstance *ckptSectionIterationInstance = (struct ckptSectionIterationInstance *)instance;
 
-	if (ckptSectionIterationInstance->fd != -1) {
-		shutdown (ckptSectionIterationInstance->fd, 0);
+	if (ckptSectionIterationInstance->response_fd != -1) {
+		shutdown (ckptSectionIterationInstance->response_fd, 0);
 
-		close (ckptSectionIterationInstance->fd);
+		close (ckptSectionIterationInstance->response_fd);
 	}
 }
 
@@ -190,25 +188,17 @@ saCkptInitialize (
 		goto error_destroy;
 	}
 
-	ckptInstance->fd = -1;
-
-	/*
-	 * An inq is needed to store async messages while waiting for a 
-	 * sync response
-	 */
-	error = saQueueInit (&ckptInstance->inq, 512, sizeof (void *));
-	if (error != SA_AIS_OK) {
-		goto error_put_destroy;
-	}
+	ckptInstance->response_fd = -1;
 
-	error = saServiceConnect (&ckptInstance->fd, MESSAGE_REQ_CKPT_INIT);
+	error = saServiceConnectTwo (&ckptInstance->response_fd,
+		&ckptInstance->dispatch_fd, CKPT_SERVICE);
 	if (error != SA_AIS_OK) {
 		goto error_put_destroy;
 	}
 
 	memcpy (&ckptInstance->callbacks, callbacks, sizeof (SaCkptCallbacksT));
 
-	pthread_mutex_init (&ckptInstance->mutex, NULL);
+	pthread_mutex_init (&ckptInstance->response_mutex, NULL);
 
 	saHandleInstancePut (&ckptHandleDatabase, *ckptHandle);
 
@@ -235,7 +225,7 @@ saCkptSelectionObjectGet (
 		return (error);
 	}
 
-	*selectionObject = ckptInstance->fd;
+	*selectionObject = ckptInstance->response_fd;
 
 	saHandleInstancePut (&ckptHandleDatabase, ckptHandle);
 
@@ -276,14 +266,14 @@ saCkptDispatch (
 		 * Read data directly from socket
 		 */
 		FD_ZERO (&read_fds);
-		FD_SET (ckptInstance->fd, &read_fds);
+		FD_SET (ckptInstance->response_fd, &read_fds);
 
-		error = saSelectRetry (ckptInstance->fd + 1, &read_fds, 0, 0, timeout);
+		error = saSelectRetry (ckptInstance->response_fd + 1, &read_fds, 0, 0, timeout);
 		if (error != SA_AIS_OK) {
 			goto error_exit;
 		}
 
-		dispatch_avail = FD_ISSET (ckptInstance->fd, &read_fds);
+		dispatch_avail = FD_ISSET (ckptInstance->response_fd, &read_fds);
 		if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
 			break; /* exit do while cont is 1 loop */
 		} else
@@ -305,12 +295,12 @@ saCkptDispatch (
 			/*
 			 * Queue empty, read response from socket
 			 */
-			error = saRecvRetry (ckptInstance->fd, &ckptInstance->message.header, sizeof (struct req_header), MSG_WAITALL | MSG_NOSIGNAL);
+			error = saRecvRetry (ckptInstance->response_fd, &ckptInstance->message.header, sizeof (struct req_header), MSG_WAITALL | MSG_NOSIGNAL);
 			if (error != SA_AIS_OK) {
 				goto error_exit;
 			}
 			if (ckptInstance->message.header.size > sizeof (struct req_header)) {
-				error = saRecvRetry (ckptInstance->fd, &ckptInstance->message.data,
+				error = saRecvRetry (ckptInstance->response_fd, &ckptInstance->message.data,
 					ckptInstance->message.header.size - sizeof (struct req_header),
 					MSG_WAITALL | MSG_NOSIGNAL);
 				if (error != SA_AIS_OK) {
@@ -430,22 +420,20 @@ saCkptFinalize (
 		return (error);
 	}
 
-	pthread_mutex_lock (&ckptInstance->mutex);
+	pthread_mutex_lock (&ckptInstance->response_mutex);
 
 	/*
 	 * Another thread has already started finalizing
 	 */
 	if (ckptInstance->finalize) {
-		pthread_mutex_unlock (&ckptInstance->mutex);
+		pthread_mutex_unlock (&ckptInstance->response_mutex);
 		saHandleInstancePut (&ckptHandleDatabase, ckptHandle);
 		return (SA_AIS_ERR_BAD_HANDLE);
 	}
 
 	ckptInstance->finalize = 1;
 
-	saActivatePoll (ckptInstance->fd);
-
-	pthread_mutex_unlock (&ckptInstance->mutex);
+	pthread_mutex_unlock (&ckptInstance->response_mutex);
 
 	saHandleInstancePut (&ckptHandleDatabase, ckptHandle);
 
@@ -487,7 +475,7 @@ saCkptCheckpointOpen (
 		goto error_put_destroy;
 	}
 
-	ckptCheckpointInstance->fd = ckptInstance->fd;
+	ckptCheckpointInstance->response_fd = ckptInstance->response_fd;
 
 	ckptCheckpointInstance->maxSectionIdSize =
 		checkpointCreationAttributes->maxSectionIdSize;
@@ -504,13 +492,13 @@ saCkptCheckpointOpen (
 		sizeof (SaCkptCheckpointCreationAttributesT));
 	req_lib_ckpt_checkpointopen.checkpointOpenFlags = checkpointOpenFlags;
 
-	error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_checkpointopen,
+	error = saSendRetry (ckptCheckpointInstance->response_fd, &req_lib_ckpt_checkpointopen,
 		sizeof (struct req_lib_ckpt_checkpointopen), MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
 		goto error_put_ckpt_destroy;
 	}
 
-	error = saRecvRetry (ckptCheckpointInstance->fd, &res_lib_ckpt_checkpointopen,
+	error = saRecvRetry (ckptCheckpointInstance->response_fd, &res_lib_ckpt_checkpointopen,
 		sizeof (struct res_lib_ckpt_checkpointopen), MSG_WAITALL | MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
 		goto error_put_ckpt_destroy;
@@ -521,7 +509,7 @@ saCkptCheckpointOpen (
 		goto error_put_ckpt_destroy;
 	}
 
-	pthread_mutex_init (&ckptCheckpointInstance->mutex, NULL);
+	pthread_mutex_init (&ckptCheckpointInstance->response_mutex, NULL);
 
 	saHandleInstancePut (&checkpointHandleDatabase, *checkpointHandle);
 
@@ -566,12 +554,12 @@ saCkptCheckpointOpenAsync (
 	
 	req_lib_ckpt_checkpointopenasync.checkpointOpenFlags = checkpointOpenFlags;
 
-	pthread_mutex_lock (&ckptInstance->mutex);
+	pthread_mutex_lock (&ckptInstance->response_mutex);
 
-        error = saSendRetry (ckptInstance->fd, &req_lib_ckpt_checkpointopenasync,
+        error = saSendRetry (ckptInstance->response_fd, &req_lib_ckpt_checkpointopenasync,
 		sizeof (struct req_lib_ckpt_checkpointopenasync), MSG_NOSIGNAL);
 
-	pthread_mutex_unlock (&ckptInstance->mutex);
+	pthread_mutex_unlock (&ckptInstance->response_mutex);
 
 	saHandleInstancePut (&ckptHandleDatabase, ckptHandle);
 
@@ -598,13 +586,13 @@ saCkptCheckpointClose (
 	memcpy (&req_lib_ckpt_checkpointclose.checkpointName,
 		&ckptCheckpointInstance->checkpointName, sizeof (SaNameT));
 
-	error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_checkpointclose,
+	error = saSendRetry (ckptCheckpointInstance->response_fd, &req_lib_ckpt_checkpointclose,
 		sizeof (struct req_lib_ckpt_checkpointclose), MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
 		goto exit_put;
 	}
 
-	error = saRecvRetry (ckptCheckpointInstance->fd, &res_lib_ckpt_checkpointclose,
+	error = saRecvRetry (ckptCheckpointInstance->response_fd, &res_lib_ckpt_checkpointclose,
 		sizeof (struct res_lib_ckpt_checkpointclose), MSG_WAITALL | MSG_NOSIGNAL);
 
 
@@ -639,13 +627,13 @@ saCkptCheckpointUnlink (
 	memcpy (&req_lib_ckpt_checkpointunlink.checkpointName, checkpointName, sizeof (SaNameT));
 
 
-	error = saSendRetry (ckptInstance->fd, &req_lib_ckpt_checkpointunlink,
+	error = saSendRetry (ckptInstance->response_fd, &req_lib_ckpt_checkpointunlink,
 		sizeof (struct req_lib_ckpt_checkpointunlink), MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
 		goto exit_put;
 	}
 
-	error = saRecvRetry (ckptInstance->fd, &res_lib_ckpt_checkpointunlink,
+	error = saRecvRetry (ckptInstance->response_fd, &res_lib_ckpt_checkpointunlink,
 		sizeof (struct res_lib_ckpt_checkpointunlink), MSG_WAITALL | MSG_NOSIGNAL);
 
 exit_put:
@@ -680,19 +668,19 @@ saCkptCheckpointRetentionDurationSet (
 	memcpy (&req_lib_ckpt_checkpointretentiondurationset.checkpointName,
 		&ckptCheckpointInstance->checkpointName, sizeof (SaNameT));
 
-	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
 
-	error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_checkpointretentiondurationset, sizeof (struct req_lib_ckpt_checkpointretentiondurationset), MSG_NOSIGNAL);
+	error = saSendRetry (ckptCheckpointInstance->response_fd, &req_lib_ckpt_checkpointretentiondurationset, sizeof (struct req_lib_ckpt_checkpointretentiondurationset), MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
 		goto error_exit;
 	}
 
-	error = saRecvRetry (ckptCheckpointInstance->fd,
+	error = saRecvRetry (ckptCheckpointInstance->response_fd,
 		&res_lib_ckpt_checkpointretentiondurationset,
 		sizeof (struct res_lib_ckpt_checkpointretentiondurationset),
 		MSG_WAITALL | MSG_NOSIGNAL);
 
-	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_unlock (&ckptCheckpointInstance->response_mutex);
 
 error_exit:
 	saHandleInstancePut (&checkpointHandleDatabase, checkpointHandle);
@@ -718,20 +706,20 @@ saCkptActiveReplicaSet (
 	req_lib_ckpt_activereplicaset.header.size = sizeof (struct req_lib_ckpt_activereplicaset);
 	req_lib_ckpt_activereplicaset.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_ACTIVEREPLICASET;
 
-	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
 
-	error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_activereplicaset,
+	error = saSendRetry (ckptCheckpointInstance->response_fd, &req_lib_ckpt_activereplicaset,
 		sizeof (struct req_lib_ckpt_activereplicaset), MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
 		goto error_exit;
 	}
 
-	error = saRecvRetry (ckptCheckpointInstance->fd,
+	error = saRecvRetry (ckptCheckpointInstance->response_fd,
 		&res_lib_ckpt_activereplicaset,
 		sizeof (struct res_lib_ckpt_activereplicaset),
 		MSG_WAITALL | MSG_NOSIGNAL);
 
-	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_unlock (&ckptCheckpointInstance->response_mutex);
 
 	saHandleInstancePut (&checkpointHandleDatabase, checkpointHandle);
 
@@ -761,15 +749,15 @@ saCkptCheckpointStatusGet (
 	memcpy (&req_lib_ckpt_checkpointstatusget.checkpointName,
 		&ckptCheckpointInstance->checkpointName, sizeof (SaNameT));
 
-	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
 
-	error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_checkpointstatusget,
+	error = saSendRetry (ckptCheckpointInstance->response_fd, &req_lib_ckpt_checkpointstatusget,
 		sizeof (struct req_lib_ckpt_checkpointstatusget), MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
 		goto error_exit;
 	}
 
-	error = saRecvRetry (ckptCheckpointInstance->fd,
+	error = saRecvRetry (ckptCheckpointInstance->response_fd,
 		&res_lib_ckpt_checkpointstatusget,
 		sizeof (struct res_lib_ckpt_checkpointstatusget),
 		MSG_WAITALL | MSG_NOSIGNAL);
@@ -777,7 +765,7 @@ saCkptCheckpointStatusGet (
 		goto error_exit;
 	}
 
-	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_unlock (&ckptCheckpointInstance->response_mutex);
 
 	memcpy (checkpointStatus,
 		&res_lib_ckpt_checkpointstatusget.checkpointDescriptor,
@@ -824,9 +812,9 @@ saCkptSectionCreate (
 	memcpy (&req_lib_ckpt_sectioncreate.checkpointName,
 		&ckptCheckpointInstance->checkpointName, sizeof (SaNameT));
 
-	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
 
-	error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_sectioncreate,
+	error = saSendRetry (ckptCheckpointInstance->response_fd, &req_lib_ckpt_sectioncreate,
 		sizeof (struct req_lib_ckpt_sectioncreate), MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
 		goto error_exit;
@@ -835,24 +823,24 @@ saCkptSectionCreate (
 	/*
 	 * Write section identifier to server
 	 */
-	error = saSendRetry (ckptCheckpointInstance->fd, sectionCreationAttributes->sectionId->id,
+	error = saSendRetry (ckptCheckpointInstance->response_fd, sectionCreationAttributes->sectionId->id,
 		sectionCreationAttributes->sectionId->idLen, MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
 		goto error_exit;
 	}
 
-	error = saSendRetry (ckptCheckpointInstance->fd, initialData,
+	error = saSendRetry (ckptCheckpointInstance->response_fd, initialData,
 		initialDataSize, MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
 		goto error_exit;
 	}
 
-	error = saRecvRetry (ckptCheckpointInstance->fd,
+	error = saRecvRetry (ckptCheckpointInstance->response_fd,
 		&res_lib_ckpt_sectioncreate,
 		sizeof (struct res_lib_ckpt_sectioncreate),
 		MSG_WAITALL | MSG_NOSIGNAL);
 
-	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_unlock (&ckptCheckpointInstance->response_mutex);
 
 error_exit:
 	saHandleInstancePut (&checkpointHandleDatabase, checkpointHandle);
@@ -877,7 +865,7 @@ saCkptSectionDelete (
 		return (error);
 	}
 
-	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
 
 	req_lib_ckpt_sectiondelete.header.size = sizeof (struct req_lib_ckpt_sectiondelete) + sectionId->idLen; 
 	req_lib_ckpt_sectiondelete.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONDELETE;
@@ -886,7 +874,7 @@ saCkptSectionDelete (
 	memcpy (&req_lib_ckpt_sectiondelete.checkpointName,
 		&ckptCheckpointInstance->checkpointName, sizeof (SaNameT));
 
-	error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_sectiondelete,
+	error = saSendRetry (ckptCheckpointInstance->response_fd, &req_lib_ckpt_sectiondelete,
 		sizeof (struct req_lib_ckpt_sectiondelete), MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
 		goto error_exit;
@@ -895,17 +883,17 @@ saCkptSectionDelete (
 	/*
 	 * Write section identifier to server
 	 */
-	error = saSendRetry (ckptCheckpointInstance->fd, sectionId->id,
+	error = saSendRetry (ckptCheckpointInstance->response_fd, sectionId->id,
 		sectionId->idLen, MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
 		goto error_exit;
 	}
-	error = saRecvRetry (ckptCheckpointInstance->fd,
+	error = saRecvRetry (ckptCheckpointInstance->response_fd,
 		&res_lib_ckpt_sectiondelete,
 		sizeof (struct res_lib_ckpt_sectiondelete),
 		MSG_WAITALL | MSG_NOSIGNAL);
 
-	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_unlock (&ckptCheckpointInstance->response_mutex);
 
 error_exit:
 	saHandleInstancePut (&checkpointHandleDatabase, checkpointHandle);
@@ -937,9 +925,9 @@ saCkptSectionExpirationTimeSet (
 	memcpy (&req_lib_ckpt_sectionexpirationtimeset.checkpointName,
 		&ckptCheckpointInstance->checkpointName, sizeof (SaNameT));
 
-	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
 
-	error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_sectionexpirationtimeset,
+	error = saSendRetry (ckptCheckpointInstance->response_fd, &req_lib_ckpt_sectionexpirationtimeset,
 		sizeof (struct req_lib_ckpt_sectionexpirationtimeset), MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
 		goto error_exit;
@@ -949,19 +937,19 @@ saCkptSectionExpirationTimeSet (
 	 * Write section identifier to server
 	 */
 	if (sectionId->idLen) {
-		error = saSendRetry (ckptCheckpointInstance->fd, sectionId->id,
+		error = saSendRetry (ckptCheckpointInstance->response_fd, sectionId->id,
 			sectionId->idLen, MSG_NOSIGNAL);
 		if (error != SA_AIS_OK) {
 			goto error_exit;
 		}
 	}
 
-	error = saRecvRetry (ckptCheckpointInstance->fd,
+	error = saRecvRetry (ckptCheckpointInstance->response_fd,
 		&res_lib_ckpt_sectionexpirationtimeset,
 		sizeof (struct res_lib_ckpt_sectionexpirationtimeset),
 		MSG_WAITALL | MSG_NOSIGNAL);
 
-	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_unlock (&ckptCheckpointInstance->response_mutex);
 
 error_exit:
 	saHandleInstancePut (&checkpointHandleDatabase, checkpointHandle);
@@ -1000,11 +988,11 @@ saCkptSectionIterationInitialize (
 		goto error_destroy;
 	}
 
-	ckptSectionIterationInstance->fd = ckptCheckpointInstance->fd;
+	ckptSectionIterationInstance->response_fd = ckptCheckpointInstance->response_fd;
 
 	ckptSectionIterationInstance->checkpointHandle = checkpointHandle;
 
-	pthread_mutex_init (&ckptSectionIterationInstance->mutex, NULL);
+	pthread_mutex_init (&ckptSectionIterationInstance->response_mutex, NULL);
 
 	/*
 	 * Setup section id list for iterator next
@@ -1014,12 +1002,6 @@ saCkptSectionIterationInitialize (
 	ckptSectionIterationInstance->maxSectionIdSize =
 		ckptCheckpointInstance->maxSectionIdSize;
 
-	error = saServiceConnect (&ckptSectionIterationInstance->fd,	
-		MESSAGE_REQ_CKPT_INIT);
-	if (error != SA_AIS_OK) {
-		goto error_put_destroy;
-	}
-
 	req_lib_ckpt_sectioniteratorinitialize.header.size = sizeof (struct req_lib_ckpt_sectioniteratorinitialize); 
 	req_lib_ckpt_sectioniteratorinitialize.header.id = MESSAGE_REQ_CKPT_SECTIONITERATOR_SECTIONITERATORINITIALIZE;
 	req_lib_ckpt_sectioniteratorinitialize.sectionsChosen = sectionsChosen;
@@ -1027,9 +1009,9 @@ saCkptSectionIterationInitialize (
 	memcpy (&req_lib_ckpt_sectioniteratorinitialize.checkpointName,
 		&ckptCheckpointInstance->checkpointName, sizeof (SaNameT));
 
-	pthread_mutex_lock (&ckptSectionIterationInstance->mutex);
+	pthread_mutex_lock (&ckptSectionIterationInstance->response_mutex);
 
-	error = saSendRetry (ckptSectionIterationInstance->fd,
+	error = saSendRetry (ckptSectionIterationInstance->response_fd,
 		&req_lib_ckpt_sectioniteratorinitialize,
 		sizeof (struct req_lib_ckpt_sectioniteratorinitialize),
 		MSG_NOSIGNAL);
@@ -1038,12 +1020,12 @@ saCkptSectionIterationInitialize (
 		goto error_put_destroy;
 	}
 
-	error = saRecvRetry (ckptSectionIterationInstance->fd,
+	error = saRecvRetry (ckptSectionIterationInstance->response_fd,
 		&res_lib_ckpt_sectioniteratorinitialize,
 		sizeof (struct res_lib_ckpt_sectioniteratorinitialize),
 		MSG_WAITALL | MSG_NOSIGNAL);
 
-	pthread_mutex_unlock (&ckptSectionIterationInstance->mutex);
+	pthread_mutex_unlock (&ckptSectionIterationInstance->response_mutex);
 
 	saHandleInstancePut (&ckptSectionIterationHandleDatabase, *sectionIterationHandle);
 
@@ -1093,9 +1075,9 @@ saCkptSectionIterationNext (
 	req_lib_ckpt_sectioniteratornext.header.size = sizeof (struct req_lib_ckpt_sectioniteratornext); 
 	req_lib_ckpt_sectioniteratornext.header.id = MESSAGE_REQ_CKPT_SECTIONITERATOR_SECTIONITERATORNEXT;
 
-	pthread_mutex_lock (&ckptSectionIterationInstance->mutex);
+	pthread_mutex_lock (&ckptSectionIterationInstance->response_mutex);
 
-	error = saSendRetry (ckptSectionIterationInstance->fd,
+	error = saSendRetry (ckptSectionIterationInstance->response_fd,
 		&req_lib_ckpt_sectioniteratornext,
 		sizeof (struct req_lib_ckpt_sectioniteratornext), MSG_NOSIGNAL);
 
@@ -1103,7 +1085,7 @@ saCkptSectionIterationNext (
 		goto error_put;
 	}
 
-	error = saRecvRetry (ckptSectionIterationInstance->fd, &res_lib_ckpt_sectioniteratornext,
+	error = saRecvRetry (ckptSectionIterationInstance->response_fd, &res_lib_ckpt_sectioniteratornext,
 		sizeof (struct res_lib_ckpt_sectioniteratornext), MSG_WAITALL | MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
 		goto error_put;
@@ -1116,7 +1098,7 @@ saCkptSectionIterationNext (
 	sectionDescriptor->sectionId.id = &iteratorSectionIdListEntry->data[0];
 	
 	if ((res_lib_ckpt_sectioniteratornext.header.size - sizeof (struct res_lib_ckpt_sectioniteratornext)) > 0) {
-		error = saRecvRetry (ckptSectionIterationInstance->fd,
+		error = saRecvRetry (ckptSectionIterationInstance->response_fd,
 			sectionDescriptor->sectionId.id,
 			res_lib_ckpt_sectioniteratornext.header.size -
 				sizeof (struct res_lib_ckpt_sectioniteratornext),
@@ -1132,7 +1114,7 @@ saCkptSectionIterationNext (
 	}
 
 error_put:
-	pthread_mutex_unlock (&ckptSectionIterationInstance->mutex);
+	pthread_mutex_unlock (&ckptSectionIterationInstance->response_mutex);
 error_put_nounlock:
 	saHandleInstancePut (&ckptSectionIterationHandleDatabase, sectionIterationHandle);
 error_exit:
@@ -1209,7 +1191,7 @@ saCkptCheckpointWrite (
 	}
 	req_lib_ckpt_sectionwrite.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONWRITE;
 
-	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
 
 	for (i = 0; i < numberOfElements; i++) {
 
@@ -1231,7 +1213,7 @@ saCkptCheckpointWrite (
 		iov[2].iov_base = ioVector[i].dataBuffer;
 		iov[2].iov_len = ioVector[i].dataSize;
 
-		error = saSendMsgRetry (ckptCheckpointInstance->fd,
+		error = saSendMsgRetry (ckptCheckpointInstance->response_fd,
 			iov,
 			3);
 		if (error != SA_AIS_OK) {
@@ -1241,7 +1223,7 @@ saCkptCheckpointWrite (
 		/*
 		 * Receive response
 		 */
-		error = saRecvRetry (ckptCheckpointInstance->fd, &res_lib_ckpt_sectionwrite,
+		error = saRecvRetry (ckptCheckpointInstance->response_fd, &res_lib_ckpt_sectionwrite,
 			sizeof (struct res_lib_ckpt_sectionwrite), MSG_WAITALL | MSG_NOSIGNAL);
 		if (error != SA_AIS_OK) {
 			goto error_exit;
@@ -1263,7 +1245,7 @@ saCkptCheckpointWrite (
 	}
 
 error_exit:
-	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_unlock (&ckptCheckpointInstance->response_mutex);
 
 error_put:
 	saHandleInstancePut (&checkpointHandleDatabase, checkpointHandle);
@@ -1300,33 +1282,33 @@ saCkptSectionOverwrite (
 	memcpy (&req_lib_ckpt_sectionoverwrite.checkpointName,
 		&ckptCheckpointInstance->checkpointName, sizeof (SaNameT));
 	
-	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
 
-	error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_sectionoverwrite,
+	error = saSendRetry (ckptCheckpointInstance->response_fd, &req_lib_ckpt_sectionoverwrite,
 		sizeof (struct req_lib_ckpt_sectionoverwrite), MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
 		goto error_exit;
 	}
 
 	if (sectionId->idLen) {
-		error = saSendRetry (ckptCheckpointInstance->fd, sectionId->id,
+		error = saSendRetry (ckptCheckpointInstance->response_fd, sectionId->id,
 			sectionId->idLen, MSG_NOSIGNAL);
 		if (error != SA_AIS_OK) {
 			goto error_exit;
 		}
 	}
-	error = saSendRetry (ckptCheckpointInstance->fd, dataBuffer, dataSize, MSG_NOSIGNAL);
+	error = saSendRetry (ckptCheckpointInstance->response_fd, dataBuffer, dataSize, MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
 		goto error_exit;
 	}
 
-	error = saRecvRetry (ckptCheckpointInstance->fd,
+	error = saRecvRetry (ckptCheckpointInstance->response_fd,
 		&res_lib_ckpt_sectionoverwrite,
 		sizeof (struct res_lib_ckpt_sectionoverwrite),
 		MSG_WAITALL | MSG_NOSIGNAL);
 
 error_exit:
-	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_unlock (&ckptCheckpointInstance->response_mutex);
 
 	saHandleInstancePut (&checkpointHandleDatabase, checkpointHandle);
 
@@ -1360,7 +1342,7 @@ saCkptCheckpointRead (
 
 	req_lib_ckpt_sectionread.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONREAD;
 
-	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
 
 	for (i = 0; i < numberOfElements; i++) {
 		req_lib_ckpt_sectionread.header.size = sizeof (struct req_lib_ckpt_sectionread) +
@@ -1378,14 +1360,14 @@ saCkptCheckpointRead (
 		iov[1].iov_base = ioVector[i].sectionId.id;
 		iov[1].iov_len = ioVector[i].sectionId.idLen;
 
-		error = saSendMsgRetry (ckptCheckpointInstance->fd,
+		error = saSendMsgRetry (ckptCheckpointInstance->response_fd,
 			iov,
 			2);
 
 		/*
 		 * Receive response header
 		 */
-		error = saRecvRetry (ckptCheckpointInstance->fd, &res_lib_ckpt_sectionread,
+		error = saRecvRetry (ckptCheckpointInstance->response_fd, &res_lib_ckpt_sectionread,
 			sizeof (struct res_lib_ckpt_sectionread), MSG_WAITALL | MSG_NOSIGNAL);
 		if (error != SA_AIS_OK) {
 				goto error_exit;
@@ -1397,7 +1379,7 @@ saCkptCheckpointRead (
 		 * Receive checkpoint section data
 		 */
 		if (dataLength > 0) {
-			error = saRecvRetry (ckptCheckpointInstance->fd, ioVector[i].dataBuffer,
+			error = saRecvRetry (ckptCheckpointInstance->response_fd, ioVector[i].dataBuffer,
 				dataLength, MSG_WAITALL | MSG_NOSIGNAL);
 			if (error != SA_AIS_OK) {
 					goto error_exit;
@@ -1417,7 +1399,7 @@ saCkptCheckpointRead (
 	}
 
 error_exit:
-	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_unlock (&ckptCheckpointInstance->response_mutex);
 
 	saHandleInstancePut (&checkpointHandleDatabase, checkpointHandle);
 
@@ -1443,22 +1425,22 @@ saCkptCheckpointSynchronize (
 	req_lib_ckpt_checkpointsynchronize.header.size = sizeof (struct req_lib_ckpt_checkpointsynchronize); 
 	req_lib_ckpt_checkpointsynchronize.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZE;
 
-	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
 
-	error = saSendRetry (ckptCheckpointInstance->fd, &req_lib_ckpt_checkpointsynchronize,
+	error = saSendRetry (ckptCheckpointInstance->response_fd, &req_lib_ckpt_checkpointsynchronize,
 		sizeof (struct req_lib_ckpt_checkpointsynchronize), MSG_NOSIGNAL);
 
 	if (error != SA_AIS_OK) {
 		goto error_exit;
 	}
 
-	error = saRecvRetry (ckptCheckpointInstance->fd,
+	error = saRecvRetry (ckptCheckpointInstance->response_fd,
 		&res_lib_ckpt_checkpointsynchronize,
 		sizeof (struct res_lib_ckpt_checkpointsynchronize),
 		MSG_WAITALL | MSG_NOSIGNAL);
 
 error_exit:
-	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_unlock (&ckptCheckpointInstance->response_mutex);
 
 	saHandleInstancePut (&checkpointHandleDatabase, checkpointHandle);
 
@@ -1492,16 +1474,16 @@ saCkptCheckpointSynchronizeAsync (
 	req_lib_ckpt_checkpointsynchronizeasync.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZEASYNC;
 	req_lib_ckpt_checkpointsynchronizeasync.invocation = invocation;
 
-	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_lock (&ckptCheckpointInstance->response_mutex);
 
-	pthread_mutex_lock (&ckptInstance->mutex);
+	pthread_mutex_lock (&ckptInstance->response_mutex);
 
-	error = saSendRetry (ckptInstance->fd, &req_lib_ckpt_checkpointsynchronizeasync,
+	error = saSendRetry (ckptInstance->response_fd, &req_lib_ckpt_checkpointsynchronizeasync,
 		sizeof (struct req_lib_ckpt_checkpointsynchronizeasync), MSG_NOSIGNAL);
 
-	pthread_mutex_unlock (&ckptInstance->mutex);
+	pthread_mutex_unlock (&ckptInstance->response_mutex);
 
-	pthread_mutex_unlock (&ckptCheckpointInstance->mutex);
+	pthread_mutex_unlock (&ckptCheckpointInstance->response_mutex);
 
 	saHandleInstancePut (&checkpointHandleDatabase, *checkpointHandle);
 

+ 62 - 78
lib/clm.c

@@ -53,16 +53,17 @@
 
 struct res_overlay {
 	struct res_header header;
-	char data[128000];
+	char data[512000];
 };
 
 struct clmInstance {
-	int fd;
-	struct queue inq;
+	int response_fd;
+	int dispatch_fd;
 	SaClmCallbacksT callbacks;
 	int finalize;
 	SaClmClusterNotificationBufferT notificationBuffer;
-	pthread_mutex_t mutex;
+	pthread_mutex_t response_mutex;
+	pthread_mutex_t dispatch_mutex;
 };
 
 static void clmHandleInstanceDestructor (void *);
@@ -91,9 +92,13 @@ void clmHandleInstanceDestructor (void *instance)
 {
 	struct clmInstance *clmInstance = (struct clmInstance *)instance;
 
-	if (clmInstance->fd != -1) {
-		shutdown (clmInstance->fd, 0);
-		close (clmInstance->fd);
+	if (clmInstance->response_fd != -1) {
+		shutdown (clmInstance->response_fd, 0);
+		close (clmInstance->response_fd);
+	}
+	if (clmInstance->dispatch_fd != -1) {
+		shutdown (clmInstance->dispatch_fd, 0);
+		close (clmInstance->dispatch_fd);
 	}
 }
 
@@ -124,25 +129,21 @@ saClmInitialize (
 		goto error_destroy;
 	}
 
-	clmInstance->fd = -1;
+	clmInstance->response_fd = -1;
 
-	/*
-	 * An inq is needed to store async messages while waiting for a
-	 * sync response
-	 */
-	error = saQueueInit (&clmInstance->inq, 512, sizeof (void *));
-	if (error != SA_OK) {
-		goto error_put_destroy;
-	}
+	clmInstance->dispatch_fd = -1;
 
-	error = saServiceConnect (&clmInstance->fd, MESSAGE_REQ_CLM_INIT);
+	error = saServiceConnectTwo (&clmInstance->response_fd,
+		&clmInstance->dispatch_fd, CLM_SERVICE);
 	if (error != SA_OK) {
 		goto error_put_destroy;
 	}
 
 	memcpy (&clmInstance->callbacks, clmCallbacks, sizeof (SaClmCallbacksT));
 
-	pthread_mutex_init (&clmInstance->mutex, NULL);
+	pthread_mutex_init (&clmInstance->response_mutex, NULL);
+
+	pthread_mutex_init (&clmInstance->dispatch_mutex, NULL);
 
 	clmInstance->notificationBuffer.notification = 0;
 
@@ -172,7 +173,7 @@ saClmSelectionObjectGet (
 		return (error);
 	}
 
-	*selectionObject = clmInstance->fd;
+	*selectionObject = clmInstance->dispatch_fd;
 
 	saHandleInstancePut (&clmHandleDatabase, clmHandle);
 	return (SA_OK);
@@ -188,15 +189,11 @@ saClmDispatch (
 	SaAisErrorT error;
 	int cont = 1; /* always continue do loop except when set to 0 */
 	int dispatch_avail;
-	int poll_fd;
 	struct clmInstance *clmInstance;
 	struct res_clm_trackcallback *res_clm_trackcallback;
 	struct res_clm_nodegetcallback *res_clm_nodegetcallback;
 	SaClmCallbacksT callbacks;
 	struct res_overlay dispatch_data;
-	int empty;
-	struct res_header **queue_msg;
-	struct res_header *msg;
 	SaClmClusterNotificationBufferT notificationBuffer;
 	int copy_items;
 
@@ -215,19 +212,17 @@ saClmDispatch (
 	}
 
 	do {
-		poll_fd = clmInstance->fd;
-
-		ufds.fd = poll_fd;
+		ufds.fd = clmInstance->dispatch_fd;
 		ufds.events = POLLIN;
 		ufds.revents = 0;
 
+		pthread_mutex_lock (&clmInstance->dispatch_mutex);
+
 		error = saPollRetry (&ufds, 1, timeout);
 		if (error != SA_OK) {
-			goto error_put;
+			goto error_unlock;
 		}
 
-		pthread_mutex_lock (&clmInstance->mutex);
-
 		/*
 		 * Handle has been finalized in another thread
 		 */
@@ -238,42 +233,33 @@ saClmDispatch (
 
 		dispatch_avail = ufds.revents & POLLIN;
 		if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
-			pthread_mutex_unlock (&clmInstance->mutex);
+			pthread_mutex_unlock (&clmInstance->dispatch_mutex);
 			break; /* exit do while cont is 1 loop */
 		} else
 		if (dispatch_avail == 0) {
-			pthread_mutex_unlock (&clmInstance->mutex);
+			pthread_mutex_unlock (&clmInstance->dispatch_mutex);
 			continue; /* next poll */
 		}
 
-		saQueueIsEmpty(&clmInstance->inq, &empty);
-		if (empty == 0) {
-			/*
-			 * Queue is not empty, read data from queue
-			 */
-			saQueueItemGet (&clmInstance->inq, (void *)&queue_msg);
-			msg = *queue_msg;
-			memcpy (&dispatch_data, msg, msg->size);
-			saQueueItemRemove (&clmInstance->inq);
-			free (msg);
-		} else {
-			/*
-			 * Queue empty, read response from socket
-			 */
-			error = saRecvRetry (clmInstance->fd, &dispatch_data.header,
+		if (ufds.revents & POLLIN) {
+			error = saRecvRetry (clmInstance->dispatch_fd, &dispatch_data.header,
 				sizeof (struct res_header), MSG_WAITALL | MSG_NOSIGNAL);
 			if (error != SA_OK) {
 				goto error_unlock;
 			}
 			if (dispatch_data.header.size > sizeof (struct res_header)) {
-				error = saRecvRetry (clmInstance->fd, &dispatch_data.data,
+				error = saRecvRetry (clmInstance->dispatch_fd, &dispatch_data.data,
 					dispatch_data.header.size - sizeof (struct res_header),
 					MSG_WAITALL | MSG_NOSIGNAL);
 				if (error != SA_OK) {
 					goto error_unlock;
 				}
 			}
+		} else {
+			pthread_mutex_unlock (&clmInstance->dispatch_mutex);
+			continue;
 		}
+			
 		/*
 		 * Make copy of callbacks, message data, unlock instance, and call callback
 		 * A risk of this dispatch method is that the callback routines may
@@ -283,7 +269,7 @@ saClmDispatch (
 		memcpy (&notificationBuffer, &clmInstance->notificationBuffer,
 			sizeof (SaClmClusterNotificationBufferT));
 
-		pthread_mutex_unlock (&clmInstance->mutex);
+		pthread_mutex_unlock (&clmInstance->dispatch_mutex);
 
 		/*
 		 * Dispatch incoming message
@@ -354,8 +340,11 @@ saClmDispatch (
 		}
 	} while (cont);
 
+	goto error_put;
+
 error_unlock:
-	pthread_mutex_unlock (&clmInstance->mutex);
+	pthread_mutex_unlock (&clmInstance->dispatch_mutex);
+
 error_put:
 	saHandleInstancePut (&clmHandleDatabase, clmHandle);
 	return (error);
@@ -374,22 +363,23 @@ saClmFinalize (
 		return (error);
 	}
 
-       pthread_mutex_lock (&clmInstance->mutex);
+       pthread_mutex_lock (&clmInstance->dispatch_mutex);
+       pthread_mutex_lock (&clmInstance->response_mutex);
 
 	/*
 	 * Another thread has already started finalizing
 	 */
 	if (clmInstance->finalize) {
-		pthread_mutex_unlock (&clmInstance->mutex);
+		pthread_mutex_unlock (&clmInstance->dispatch_mutex);
+		pthread_mutex_unlock (&clmInstance->response_mutex);
 		saHandleInstancePut (&clmHandleDatabase, clmHandle);
 		return (SA_ERR_BAD_HANDLE);
 	}
 
 	clmInstance->finalize = 1;
 
-	saActivatePoll (clmInstance->fd);
-
-	pthread_mutex_unlock (&clmInstance->mutex);
+	pthread_mutex_unlock (&clmInstance->dispatch_mutex);
+	pthread_mutex_unlock (&clmInstance->response_mutex);
 
 	saHandleDestroy (&clmHandleDatabase, clmHandle);
 
@@ -437,21 +427,23 @@ saClmClusterTrack (
 		return (error);
 	}
 
-	pthread_mutex_lock (&clmInstance->mutex);
+	pthread_mutex_lock (&clmInstance->response_mutex);
 
 	if (clmInstance->callbacks.saClmClusterTrackCallback == 0) {
 		error = SA_AIS_ERR_INIT;
 		goto error_exit;
 	}
 
-	error = saSendRetry (clmInstance->fd, &req_clustertrack,
+	error = saSendRetry (clmInstance->response_fd, &req_clustertrack,
 		sizeof (struct req_clm_clustertrack), MSG_NOSIGNAL);
 
 	memcpy (&clmInstance->notificationBuffer, notificationBuffer,
 		sizeof (SaClmClusterNotificationBufferT));
 
+// TODO get response packet with saRecvRetry, but need to implement that 
+// in executive service
 error_exit:
-	pthread_mutex_unlock (&clmInstance->mutex);
+	pthread_mutex_unlock (&clmInstance->response_mutex);
 
 	saHandleInstancePut (&clmHandleDatabase, clmHandle);
 
@@ -475,14 +467,16 @@ saClmClusterTrackStop (
 		return (error);
 	}
 
-	pthread_mutex_lock (&clmInstance->mutex);
+	pthread_mutex_lock (&clmInstance->response_mutex);
 
-	error = saSendRetry (clmInstance->fd, &req_trackstop,
+	error = saSendRetry (clmInstance->response_fd, &req_trackstop,
 		sizeof (struct req_clm_trackstop), MSG_NOSIGNAL);
 
 	clmInstance->notificationBuffer.notification = 0;
 
-	pthread_mutex_unlock (&clmInstance->mutex);
+	pthread_mutex_unlock (&clmInstance->response_mutex);
+	// TODO what about getting response from executive?  The
+	// executive should send a response
 
 	saHandleInstancePut (&clmHandleDatabase, clmHandle);
 
@@ -507,7 +501,7 @@ saClmClusterNodeGet (
 		return (error);
 	}
 
-	pthread_mutex_lock (&clmInstance->mutex);
+	pthread_mutex_lock (&clmInstance->response_mutex);
 
 	/*
 	 * Send request message
@@ -515,15 +509,9 @@ saClmClusterNodeGet (
 	req_clm_nodeget.header.size = sizeof (struct req_clm_nodeget);
 	req_clm_nodeget.header.id = MESSAGE_REQ_CLM_NODEGET;
 	req_clm_nodeget.nodeId = nodeId;
-	error = saSendRetry (clmInstance->fd, &req_clm_nodeget,
-		sizeof (struct req_clm_nodeget), MSG_NOSIGNAL);
-	if (error != SA_OK) {
-		goto error_exit;
-	}
 
-	error = saRecvRetry (clmInstance->fd, &res_clm_nodeget,
-		sizeof (struct res_clm_nodeget),
-		MSG_WAITALL | MSG_NOSIGNAL);
+	error = saSendReceiveReply (clmInstance->response_fd, &req_clm_nodeget,
+		sizeof (struct req_clm_nodeget), &res_clm_nodeget, sizeof (res_clm_nodeget));
 	if (error != SA_OK) {
 		goto error_exit;
 	}
@@ -534,7 +522,7 @@ saClmClusterNodeGet (
 		sizeof (SaClmClusterNodeT));
 
 error_exit:
-	pthread_mutex_unlock (&clmInstance->mutex);
+	pthread_mutex_unlock (&clmInstance->response_mutex);
 
 	saHandleInstancePut (&clmHandleDatabase, clmHandle);
 
@@ -564,15 +552,11 @@ saClmClusterNodeGetAsync (
 		return (error);
 	}
 
-	pthread_mutex_lock (&clmInstance->mutex);
-
-	error = saSendRetry (clmInstance->fd, &req_clm_nodegetasync,
-		sizeof (struct req_clm_nodegetasync), MSG_NOSIGNAL);
-
-	pthread_mutex_unlock (&clmInstance->mutex);
+	pthread_mutex_lock (&clmInstance->response_mutex);
 
-	error = saRecvQueue (clmInstance->fd, &res_clm_nodegetasync,
-		&clmInstance->inq, MESSAGE_RES_CLM_NODEGETASYNC);
+	error = saSendReceiveReply (clmInstance->response_fd, &req_clm_nodegetasync,
+		sizeof (struct req_clm_nodegetasync),
+		&res_clm_nodegetasync, sizeof (struct res_clm_nodegetasync));
 	if (error != SA_OK) {
 		goto error_exit;
 	}

+ 86 - 120
lib/evs.c

@@ -1,4 +1,6 @@
 /*
+ * vi: set autoindent tabstop=4 shiftwidth=4 :
+
  * Copyright (c) 2004 MontaVista Software, Inc.
  *
  * All rights reserved.
@@ -41,6 +43,7 @@
 #include <pthread.h>
 #include <sys/types.h>
 #include <sys/socket.h>
+#include <errno.h>
 #include "util.h"
 
 #include "../include/ais_msg.h"
@@ -48,12 +51,12 @@
 #include "../include/evs.h"
 
 struct evs_inst {
-	int fd;
+	int response_fd;
+	int dispatch_fd;
 	int finalize;
 	evs_callbacks_t callbacks;
-	struct queue inq;
-	char dispatch_buffer[512000];
-	pthread_mutex_t mutex;
+	pthread_mutex_t response_mutex;
+	pthread_mutex_t dispatch_mutex;
 };
 
 static void evs_instance_destructor (void *instance);
@@ -71,31 +74,17 @@ static struct saHandleDatabase evs_handle_t_db = {
 static void evs_instance_destructor (void *instance)
 {
     struct evs_inst *evs_inst = instance;
-    void **msg;
-    int empty;
-
-    /*
-     * Empty out the queue if there are any pending messages
-     */
-    while ((saQueueIsEmpty(&evs_inst->inq, &empty) == SA_OK) && !empty) {
-        saQueueItemGet(&evs_inst->inq, (void *)&msg);
-        saQueueItemRemove(&evs_inst->inq);
-        free(*msg);
-    }
-
-    /*
-     * clean up the queue itself
-     */
-    if (evs_inst->inq.items) {
-            free(evs_inst->inq.items);
-    }
 
     /*
      * Disconnect from the server
      */
-    if (evs_inst->fd != -1) {
-        shutdown(evs_inst->fd, 0);
-        close(evs_inst->fd);
+    if (evs_inst->response_fd != -1) {
+        shutdown(evs_inst->response_fd, 0);
+        close(evs_inst->response_fd);
+    }
+    if (evs_inst->dispatch_fd != -1) {
+        shutdown(evs_inst->dispatch_fd, 0);
+        close(evs_inst->dispatch_fd);
     }
 }
 
@@ -117,23 +106,18 @@ evs_error_t evs_initialize (
 		goto error_destroy;
 	}
 
-	/*
-	 * An inq is needed to store async messages while waiting for a
-	 * sync response
-	 */
-	error = saQueueInit (&evs_inst->inq, 128, sizeof (void *));
+	error = saServiceConnectTwo (&evs_inst->response_fd,
+		&evs_inst->dispatch_fd,
+		EVS_SERVICE);
 	if (error != SA_OK) {
 		goto error_put_destroy;
 	}
 
-	error = saServiceConnect (&evs_inst->fd, MESSAGE_REQ_EVS_INIT);
-	if (error != SA_OK) {
-		goto error_put_destroy;
-	}
-	
 	memcpy (&evs_inst->callbacks, callbacks, sizeof (evs_callbacks_t));
 
-	pthread_mutex_init (&evs_inst->mutex, NULL);
+	pthread_mutex_init (&evs_inst->response_mutex, NULL);
+
+	pthread_mutex_init (&evs_inst->dispatch_mutex, NULL);
 
 	saHandleInstancePut (&evs_handle_t_db, *handle);
 
@@ -157,20 +141,21 @@ evs_error_t evs_finalize (
 	if (error != SA_OK) {
 		return (error);
 	}
+//	  TODO is the locking right here
+	pthread_mutex_lock (&evs_inst->response_mutex);
+
 	/*
 	 * Another thread has already started finalizing
 	 */
 	if (evs_inst->finalize) {
-		pthread_mutex_unlock (&evs_inst->mutex);
+		pthread_mutex_unlock (&evs_inst->response_mutex);
 		saHandleInstancePut (&evs_handle_t_db, handle);
 		return (EVS_ERR_BAD_HANDLE);
 	}
 
 	evs_inst->finalize = 1;
 
-	saActivatePoll (evs_inst->fd);
-
-	pthread_mutex_unlock (&evs_inst->mutex);
+	pthread_mutex_unlock (&evs_inst->response_mutex);
 
 	saHandleInstancePut (&evs_handle_t_db, handle);
 
@@ -191,16 +176,16 @@ evs_error_t evs_fd_get (
 		return (error);
 	}
 
-	*fd = evs_inst->fd; 
+	*fd = evs_inst->dispatch_fd; 
 
 	saHandleInstancePut (&evs_handle_t_db, handle);
 
 	return (SA_OK);
 }
 
-struct message_overlay {
+struct res_overlay {
 	struct res_header header;
-	char data[4096];
+	char data[512000];
 };
 
 evs_error_t evs_dispatch (
@@ -212,15 +197,11 @@ evs_error_t evs_dispatch (
 	SaErrorT error;
 	int cont = 1; /* always continue do loop except when set to 0 */
 	int dispatch_avail;
-	int poll_fd;
 	struct evs_inst *evs_inst;
 	struct res_evs_confchg_callback *res_evs_confchg_callback;
 	struct res_evs_deliver_callback *res_evs_deliver_callback;
 	evs_callbacks_t callbacks;
-	struct message_overlay *dispatch_data;
-	int empty;
-	struct res_header **queue_msg;
-	struct res_header *msg = NULL;
+	struct res_overlay dispatch_data;
 	int ignore_dispatch = 0;
 
 	error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
@@ -237,76 +218,65 @@ evs_error_t evs_dispatch (
 	}
 
 	do {
-		poll_fd = evs_inst->fd;
-
-		ufds.fd = poll_fd;
+		ufds.fd = evs_inst->dispatch_fd;
 		ufds.events = POLLIN;
 		ufds.revents = 0;
 
-		pthread_mutex_lock (&evs_inst->mutex);
-		saQueueIsEmpty (&evs_inst->inq, &empty);
-		if (empty == 1) {
-			pthread_mutex_unlock (&evs_inst->mutex);
+		error = saPollRetry (&ufds, 1, timeout);
+		if (error != SA_OK) {
+			goto error_nounlock;
+		}
 
-			error = saPollRetry (&ufds, 1, timeout);
-			if (error != SA_OK) {
-				goto error_nounlock;
-			}
+		pthread_mutex_lock (&evs_inst->dispatch_mutex);
 
-			pthread_mutex_lock (&evs_inst->mutex);
+		/*
+		 * Regather poll data in case ufds has changed since taking lock
+		 */
+		error = saPollRetry (&ufds, 1, 0);
+		if (error != SA_OK) {
+			goto error_nounlock;
 		}
 
 		/*
 		 * Handle has been finalized in another thread
 		 */
 		if (evs_inst->finalize == 1) {
-			error = SA_OK;
-			pthread_mutex_unlock (&evs_inst->mutex);
+			error = EVS_OK;
+			pthread_mutex_unlock (&evs_inst->dispatch_mutex);
 			goto error_unlock;
 		}
 
-		dispatch_avail = (ufds.revents & POLLIN) | (empty == 0);
+		dispatch_avail = ufds.revents & POLLIN;
 		if (dispatch_avail == 0 && dispatch_types == EVS_DISPATCH_ALL) {
-			pthread_mutex_unlock (&evs_inst->mutex);
+			pthread_mutex_unlock (&evs_inst->dispatch_mutex);
 			break; /* exit do while cont is 1 loop */
-		} else
+		} else 
 		if (dispatch_avail == 0) {
-			pthread_mutex_unlock (&evs_inst->mutex);
+			pthread_mutex_unlock (&evs_inst->dispatch_mutex);
 			continue; /* next poll */
 		}
 
-		saQueueIsEmpty (&evs_inst->inq, &empty);
-		if (empty == 0) {
+		if (ufds.revents & POLLIN) {
 			/*
-			 * Queue is not empty, read data from queue
+			 * Queue empty, read response from socket
 			 */
-			saQueueItemGet (&evs_inst->inq, (void *)&queue_msg);
-			msg = *queue_msg;
-			dispatch_data = (struct message_overlay *)msg;
-			res_evs_deliver_callback = (struct res_evs_deliver_callback *)msg;
-			res_evs_confchg_callback = (struct res_evs_confchg_callback *)msg;
-
-			saQueueItemRemove (&evs_inst->inq);
-		} else {
-			dispatch_data = (struct message_overlay *)evs_inst->dispatch_buffer;
-			res_evs_deliver_callback = (struct res_evs_deliver_callback *)dispatch_data;
-			res_evs_confchg_callback = (struct res_evs_confchg_callback *)dispatch_data;
-			/*
-			* Queue empty, read response from socket
-			*/
-			error = saRecvRetry (evs_inst->fd, &dispatch_data->header,
+			error = saRecvRetry (evs_inst->dispatch_fd, &dispatch_data.header,
 				sizeof (struct res_header), MSG_WAITALL | MSG_NOSIGNAL);
 			if (error != SA_OK) {
 				goto error_unlock;
 			}
-			if (dispatch_data->header.size > sizeof (struct res_header)) {
-				error = saRecvRetry (evs_inst->fd, &dispatch_data->data,
-					dispatch_data->header.size - sizeof (struct res_header),
+			if (dispatch_data.header.size > sizeof (struct res_header)) {
+				error = saRecvRetry (evs_inst->dispatch_fd, &dispatch_data.data,
+					dispatch_data.header.size - sizeof (struct res_header),
 					MSG_WAITALL | MSG_NOSIGNAL);
+
 				if (error != SA_OK) {
 					goto error_unlock;
 				}
 			}
+		} else {
+			pthread_mutex_unlock (&evs_inst->dispatch_mutex);
+			continue;
 		}
 
 		/*
@@ -316,16 +286,13 @@ evs_error_t evs_dispatch (
 		*/
 		memcpy (&callbacks, &evs_inst->callbacks, sizeof (evs_callbacks_t));
 
-		pthread_mutex_unlock (&evs_inst->mutex);
+		pthread_mutex_unlock (&evs_inst->dispatch_mutex);
 		/*
 		 * Dispatch incoming message
 		 */
-		switch (dispatch_data->header.id) {
-		case MESSAGE_RES_LIB_ACTIVATEPOLL:
-			ignore_dispatch = 1;
-			break;
-
+		switch (dispatch_data.header.id) {
 		case MESSAGE_RES_EVS_DELIVER_CALLBACK:
+			res_evs_deliver_callback = (struct res_evs_deliver_callback *)&dispatch_data;
 			callbacks.evs_deliver_fn (
 				res_evs_deliver_callback->source_addr,
 				&res_evs_deliver_callback->msg,
@@ -333,6 +300,7 @@ evs_error_t evs_dispatch (
 			break;
 
 		case MESSAGE_RES_EVS_CONFCHG_CALLBACK:
+			res_evs_confchg_callback = (struct res_evs_confchg_callback *)&dispatch_data;
 			callbacks.evs_confchg_fn (
 				res_evs_confchg_callback->member_list,
 				res_evs_confchg_callback->member_list_entries,
@@ -347,9 +315,6 @@ evs_error_t evs_dispatch (
 			goto error_nounlock;
 			break;
 		}
-		if (empty == 0) {
-			free (msg);
-		}
 
 		/*
 		 * Determine if more messages should be processed
@@ -393,6 +358,7 @@ evs_error_t evs_join (
 	if (error != SA_OK) {
 		return (error);
 	}
+
 	req_lib_evs_join.header.size = sizeof (struct req_lib_evs_join) + 
 		(group_entries * sizeof (struct evs_group));
 	req_lib_evs_join.header.id = MESSAGE_REQ_EVS_JOIN;
@@ -403,13 +369,13 @@ evs_error_t evs_join (
 	iov[1].iov_base = groups;
 	iov[1].iov_len = (group_entries * sizeof (struct evs_group));
 	
-	error = saSendMsgRetry (evs_inst->fd, iov, 2);
-	if (error != SA_OK) {
-		goto error_exit;
-	}
+	pthread_mutex_lock (&evs_inst->response_mutex);
+
+	error = saSendMsgReceiveReply (evs_inst->response_fd, iov, 2,
+		&res_lib_evs_join, sizeof (struct res_lib_evs_join));
+
+	pthread_mutex_unlock (&evs_inst->response_mutex);
 
-	error = saRecvRetry (evs_inst->fd, &res_lib_evs_join,
-		sizeof (struct res_lib_evs_join), MSG_WAITALL | MSG_NOSIGNAL);
 	if (error != SA_OK) {
 		goto error_exit;
 	}
@@ -437,6 +403,7 @@ evs_error_t evs_leave (
 	if (error != SA_OK) {
 		return (error);
 	}
+
 	req_lib_evs_leave.header.size = sizeof (struct req_lib_evs_leave) + 
 		(group_entries * sizeof (struct evs_group));
 	req_lib_evs_leave.header.id = MESSAGE_REQ_EVS_LEAVE;
@@ -447,13 +414,13 @@ evs_error_t evs_leave (
 	iov[1].iov_base = groups;
 	iov[1].iov_len = (group_entries * sizeof (struct evs_group));
 	
-	error = saSendMsgRetry (evs_inst->fd, iov, 2);
-	if (error != SA_OK) {
-		goto error_exit;
-	}
+	pthread_mutex_lock (&evs_inst->response_mutex);
+
+	error = saSendMsgReceiveReply (evs_inst->response_fd, iov, 2,
+		&res_lib_evs_leave, sizeof (struct res_lib_evs_leave));
+
+	pthread_mutex_unlock (&evs_inst->response_mutex);
 
-	error = saRecvRetry (evs_inst->fd, &res_lib_evs_leave,
-		sizeof (struct res_lib_evs_leave), MSG_WAITALL | MSG_NOSIGNAL);
 	if (error != SA_OK) {
 		goto error_exit;
 	}
@@ -500,13 +467,13 @@ evs_error_t evs_mcast_joined (
 	iov[0].iov_len = sizeof (struct req_lib_evs_mcast_joined);
 	memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
 	
-	error = saSendMsgRetry (evs_inst->fd, iov, 1 + iov_len);
-	if (error != SA_OK) {
-		goto error_exit;
-	}
+	pthread_mutex_lock (&evs_inst->response_mutex);
+
+	error = saSendMsgReceiveReply (evs_inst->response_fd, iov, iov_len + 1,
+		&res_lib_evs_mcast_joined, sizeof (struct res_lib_evs_mcast_joined));
+
+	pthread_mutex_unlock (&evs_inst->response_mutex);
 
-	error = saRecvQueue (evs_inst->fd, &res_lib_evs_mcast_joined, &evs_inst->inq,
-		MESSAGE_RES_EVS_MCAST_JOINED);
 	if (error != SA_OK) {
 		goto error_exit;
 	}
@@ -555,13 +522,12 @@ evs_error_t evs_mcast_groups (
 	iov[1].iov_len = (group_entries * sizeof (struct evs_group));
 	memcpy (&iov[2], iovec, iov_len * sizeof (struct iovec));
 	
-	error = saSendMsgRetry (evs_inst->fd, iov, 2 + iov_len);
-	if (error != SA_OK) {
-		goto error_exit;
-	}
+	pthread_mutex_lock (&evs_inst->response_mutex);
+
+	error = saSendMsgReceiveReply (evs_inst->response_fd, iov, iov_len + 2,
+		&res_lib_evs_mcast_groups, sizeof (struct res_lib_evs_mcast_groups));
 
-	error = saRecvQueue (evs_inst->fd, &res_lib_evs_mcast_groups, &evs_inst->inq,
-		MESSAGE_RES_EVS_MCAST_GROUPS);
+	pthread_mutex_unlock (&evs_inst->response_mutex);
 	if (error != SA_OK) {
 		goto error_exit;
 	}

+ 188 - 189
lib/evt.c

@@ -90,34 +90,35 @@ struct saHandleDatabase event_handle_db = {
 	.handleInstanceDestructor	= eventHandleInstanceDestructor
 };
 
-struct message_overlay {
+struct res_overlay {
 	struct res_header header;
-	char data[0];
+	char data[MESSAGE_SIZE_MAX];
 };
 
 /*
  * data required to support events for a given initialization
  *
- * ei_fd:			fd received from the evtInitialize call.
+ * ei_dispatch_fd:	fd used for getting callback data e.g. async event data
+ * ei_response_fd:	fd used for everything else (i.e. evt sync api commands).
  * ei_callback:		callback function.
  * ei_version:		version sent to the evtInitialize call.
  * ei_node_id:		our node id.
  * ei_node_name:	our node name.
  * ei_finalize:		instance in finalize flag
- * ei_queue:		queue for async messages while doing sync commands
- * ei_mutex:		instance mutex
+ * ei_dispatch_mutex:	mutex for dispatch fd
+ * ei_response_mutex:	mutex for response fd
  *
  */
 struct event_instance {
-	int 					ei_fd;
+	int 					ei_dispatch_fd;
+	int 					ei_response_fd;
 	SaEvtCallbacksT			ei_callback;
 	SaVersionT				ei_version;
 	SaClmNodeIdT			ei_node_id;
 	SaNameT					ei_node_name;
 	int						ei_finalize;
-	struct queue			ei_inq;
-	char 					ei_message[MESSAGE_SIZE_MAX];
-	pthread_mutex_t			ei_mutex;
+	pthread_mutex_t			ei_dispatch_mutex;
+	pthread_mutex_t			ei_response_mutex;
 };
 
 
@@ -186,31 +187,18 @@ struct event_data_instance {
 static void evtHandleInstanceDestructor(void *instance)
 {
 	struct event_instance *evti = instance;
-	void **msg;
-	int empty;
 
 	/*
-	 * Empty out the queue if there are any pending messages
+	 * Disconnect from the server
 	 */
-	while ((saQueueIsEmpty(&evti->ei_inq, &empty) == SA_AIS_OK) && !empty) {
-		saQueueItemGet(&evti->ei_inq, (void *)&msg);
-		saQueueItemRemove(&evti->ei_inq);
-		free(*msg);
+    if (evti->ei_response_fd != -1) {
+		shutdown(evti->ei_response_fd, 0);
+		close(evti->ei_response_fd);
 	}
 
-	/*
-	 * clean up the queue itself
-	 */
-	if (evti->ei_inq.items) {
-			free(evti->ei_inq.items);
-	}
-
-	/*
-	 * Disconnect from the server
-	 */
-	if (evti->ei_fd != -1) {
-		shutdown(evti->ei_fd, 0);
-		close(evti->ei_fd);
+	if (evti->ei_dispatch_fd != -1) {
+		shutdown(evti->ei_dispatch_fd, 0);
+		close(evti->ei_dispatch_fd);
 	}
 }
 
@@ -239,6 +227,34 @@ static void eventHandleInstanceDestructor(void *instance)
 	}
 }
 
+static SaErrorT evt_recv_event(int fd, struct lib_event_data **msg)
+{
+	SaErrorT error;
+	struct res_header hdr;
+	void *data;
+
+	error = saRecvRetry(fd, &hdr, sizeof(hdr), MSG_WAITALL | MSG_NOSIGNAL);
+	if (error != SA_AIS_OK) {
+		goto msg_out;
+	}
+	*msg = malloc(hdr.size);
+	if (!*msg) {
+		error = SA_AIS_ERR_LIBRARY;
+		goto msg_out;
+	}
+	data = (void *)((unsigned long)*msg) + sizeof(hdr);
+	memcpy(*msg, &hdr, sizeof(hdr));
+	if (hdr.size > sizeof(hdr)) {
+		error = saRecvRetry(fd, data, hdr.size - sizeof(hdr),
+				MSG_WAITALL | MSG_NOSIGNAL);
+		if (error != SA_AIS_OK) {
+			goto msg_out;
+		}
+	}
+msg_out:
+	return error;
+}
+
 /* 
  * The saEvtInitialize() function initializes the Event Service for the 
  * invoking process. A user of the Event Service must invoke this function 
@@ -286,19 +302,11 @@ saEvtInitialize(
 	 */
 	evti->ei_version = *version;
 
-	/*
-	 * An inq is needed to store async messages while waiting for a 
-	 * sync response
-	 */
-	error = saQueueInit(&evti->ei_inq, 1024, sizeof(void *));
-	if (error != SA_AIS_OK) {
-		goto error_handle_put;
-	}
-
 	/*
 	 * Set up communication with the event server
 	 */
-	error = saServiceConnect(&evti->ei_fd, MESSAGE_REQ_EVT_INIT);
+	error = saServiceConnectTwo(&evti->ei_response_fd,
+		&evti->ei_dispatch_fd, EVT_SERVICE);
 	if (error != SA_AIS_OK) {
 		goto error_handle_put;
 	}
@@ -312,7 +320,8 @@ saEvtInitialize(
 				sizeof(evti->ei_callback));
 	}
 
-	pthread_mutex_init(&evti->ei_mutex, NULL);
+ 	pthread_mutex_init(&evti->ei_dispatch_mutex, NULL);
+ 	pthread_mutex_init(&evti->ei_response_mutex, NULL);
 	saHandleInstancePut(&evt_instance_handle_db, *evtHandle);
 
 	return SA_AIS_OK;
@@ -349,7 +358,7 @@ saEvtSelectionObjectGet(
 		return error;
 	}
 
-	*selectionObject = evti->ei_fd;
+	*selectionObject = evti->ei_dispatch_fd;
 
 	saHandleInstancePut(&evt_instance_handle_db, evtHandle);
 
@@ -445,14 +454,11 @@ saEvtDispatch(
 	struct event_instance *evti;
 	SaEvtEventHandleT event_handle;
 	SaEvtCallbacksT callbacks;
-	struct res_header **queue_msg;
-	struct res_header *msg = 0;
-	int empty;
 	int ignore_dispatch = 0;
 	int cont = 1; /* always continue do loop except when set to 0 */
 	int poll_fd;
-	struct message_overlay *dispatch_data;
-	struct lib_event_data *evt;
+	struct res_overlay dispatch_data;
+	struct lib_event_data *evt = 0;
 	struct res_evt_event_data res;
 
 	error = saHandleInstanceGet(&evt_instance_handle_db, evtHandle,
@@ -469,25 +475,26 @@ saEvtDispatch(
 	}
 
 	do {
-		poll_fd = evti->ei_fd;
+		poll_fd = evti->ei_dispatch_fd;
 
 		ufds.fd = poll_fd;
 		ufds.events = POLLIN;
 		ufds.revents = 0;
 
-		pthread_mutex_lock(&evti->ei_mutex);
-		saQueueIsEmpty(&evti->ei_inq, &empty);
+		error = saPollRetry(&ufds, 1, timeout);
+		if (error != SA_AIS_OK) {
+			goto error_unlock;
+		}
+
+		pthread_mutex_lock(&evti->ei_dispatch_mutex);
+
 		/*
-		 * Read from the socket if there is nothing in
-		 * our queue.
+		 * Check the poll data in case the fd status has changed
+		 * since taking the lock
 		 */
-		if (empty == 1) {
-			pthread_mutex_unlock(&evti->ei_mutex);
-			error = saPollRetry(&ufds, 1, timeout);
-			if (error != SA_AIS_OK) {
-				goto error_nounlock;
-			}
-			pthread_mutex_lock(&evti->ei_mutex);
+		error = saPollRetry(&ufds, 1, 0);
+		if (error != SA_AIS_OK) {
+			goto error_unlock;
 		}
 
 		/*
@@ -495,49 +502,37 @@ saEvtDispatch(
 		 */
 		if (evti->ei_finalize == 1) {
 			error = SA_AIS_OK;
-			pthread_mutex_unlock(&evti->ei_mutex);
+ 			pthread_mutex_unlock(&evti->ei_dispatch_mutex);
 			goto error_unlock;
 		}
 
-		dispatch_avail = (ufds.revents & POLLIN) | (empty == 0);
+ 		dispatch_avail = ufds.revents & POLLIN;
 		if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
-			pthread_mutex_unlock(&evti->ei_mutex);
+			pthread_mutex_unlock(&evti->ei_dispatch_mutex);
 			break; /* exit do while cont is 1 loop */
-		} else
-		if (dispatch_avail == 0) {
-			pthread_mutex_unlock(&evti->ei_mutex);
+		} else if (dispatch_avail == 0) {
+			pthread_mutex_unlock(&evti->ei_dispatch_mutex);
 			continue; /* next poll */
 		}
 
-		saQueueIsEmpty(&evti->ei_inq, &empty);
-		if (empty == 0) {
-			/*
-			 * Queue is not empty, read data from queue
-			 */
-			saQueueItemGet(&evti->ei_inq, (void *)&queue_msg);
-			msg = *queue_msg;
-			dispatch_data = (struct message_overlay *)msg;
-			saQueueItemRemove(&evti->ei_inq);
-		} else {
-			/*
-			 * Queue empty, read response from socket
-			 */
-			dispatch_data = (struct message_overlay *)&evti->ei_message;
-			error = saRecvRetry(evti->ei_fd, &dispatch_data->header,
-				sizeof(struct res_header), MSG_WAITALL | MSG_NOSIGNAL);
+ 		if (ufds.revents & POLLIN) {
+ 			error = saRecvRetry (evti->ei_dispatch_fd, &dispatch_data.header,
+ 				sizeof (struct res_header), MSG_WAITALL | MSG_NOSIGNAL);
+
 			if (error != SA_AIS_OK) {
-				pthread_mutex_unlock(&evti->ei_mutex);
 				goto error_unlock;
 			}
-			if (dispatch_data->header.size > sizeof(struct res_header)) {
-				error = saRecvRetry(evti->ei_fd, dispatch_data->data,
-					dispatch_data->header.size - sizeof(struct res_header),
-					MSG_WAITALL | MSG_NOSIGNAL);
+ 			if (dispatch_data.header.size > sizeof (struct res_header)) {
+ 				error = saRecvRetry (evti->ei_dispatch_fd, &dispatch_data.data,
+ 					dispatch_data.header.size - sizeof (struct res_header),
+  					MSG_WAITALL | MSG_NOSIGNAL);
 				if (error != SA_AIS_OK) {
-					pthread_mutex_unlock(&evti->ei_mutex);
 					goto error_unlock;
 				}
-			}
+			} 
+		} else {
+ 			pthread_mutex_unlock(&evti->ei_dispatch_mutex);
+ 			continue;
 		}
 		/*
 		 * Make copy of callbacks, message data, unlock instance, 
@@ -546,50 +541,41 @@ saEvtDispatch(
 		 * EvtFinalize has been called in another thread.
 		 */
 		memcpy(&callbacks, &evti->ei_callback, sizeof(evti->ei_callback));
-		pthread_mutex_unlock(&evti->ei_mutex);
+		pthread_mutex_unlock(&evti->ei_dispatch_mutex);
 
 
 		/*
 		 * Dispatch incoming response
 		 */
-		switch (dispatch_data->header.id) {
-		case MESSAGE_RES_LIB_ACTIVATEPOLL:
-			/*
-			 * This is a do nothing message which the node 
-			 * executive sends to activate the file evtHandle 
-			 * in poll when the library has queued a message into 
-			 * evti->ei_inq. The dispatch is ignored for the 
-			 * following two cases:
-			 * 1) setting of timeout to zero for the 
-			 *    DISPATCH_ALL case
-			 * 2) expiration of the do loop for the 
-			 *    DISPATCH_ONE case
-			 */
-			ignore_dispatch = 1;
-			break;
+		switch (dispatch_data.header.id) {
 
 		case MESSAGE_RES_EVT_AVAILABLE:
 			/*
 			 * There are events available.  Send a request for one and then
 			 * dispatch it.
 			 */
-			evt = (struct lib_event_data *)&evti->ei_message;
 			res.evd_head.id = MESSAGE_REQ_EVT_EVENT_DATA;
 			res.evd_head.size = sizeof(res);
-			error = saSendRetry(evti->ei_fd, &res, sizeof(res), MSG_NOSIGNAL);
+ 
+ 			pthread_mutex_lock(&evti->ei_response_mutex);
+ 			error = saSendRetry(evti->ei_response_fd, &res, sizeof(res),
+ 				MSG_NOSIGNAL);
+ 
 			if (error != SA_AIS_OK) {
 				printf("MESSAGE_RES_EVT_AVAILABLE: send failed: %d\n", error);
 					break;
 			}
-			error = saRecvQueue(evti->ei_fd, evt, &evti->ei_inq, 
-											MESSAGE_RES_EVT_EVENT_DATA);
+ 			error = evt_recv_event(evti->ei_response_fd, &evt);
+ 			pthread_mutex_unlock(&evti->ei_response_mutex);
+
 			if (error != SA_AIS_OK) {
 				printf("MESSAGE_RES_EVT_AVAILABLE: receive failed: %d\n", 
 						error);
 				break;
 			}
 			/*
-			 * No data available.  This is OK.
+ 			 * No data available.  This is OK, another thread may have
+ 			 * grabbed it.
 			 */
 			if (evt->led_head.error == SA_AIS_ERR_NOT_EXIST) {
 				// printf("MESSAGE_RES_EVT_AVAILABLE: No event data\n");
@@ -621,7 +607,7 @@ saEvtDispatch(
 		case MESSAGE_RES_EVT_CHAN_OPEN_CALLBACK:
 		{
 			struct res_evt_open_chan_async *resa = 
-				(struct res_evt_open_chan_async *)dispatch_data;
+				(struct res_evt_open_chan_async *)&dispatch_data;
 			struct event_channel_instance *eci;
 
 			/*
@@ -655,7 +641,7 @@ saEvtDispatch(
 
 		default:
 			printf("Dispatch: Bad message type 0x%x\n",
-					dispatch_data->header.id);
+					dispatch_data.header.id);
 			error = SA_AIS_ERR_LIBRARY;	
 			goto error_nounlock;
 			break;
@@ -666,8 +652,9 @@ saEvtDispatch(
 		 * message from the queue and we are responsible
 		 * for freeing it.
 		 */
-		if (empty == 0) {
-			free(msg);
+		if (evt) {
+			free(evt);
+			evt = 0;
 		}
 
 		/*
@@ -719,22 +706,20 @@ saEvtFinalize(SaEvtHandleT evtHandle)
 		return error;
 	}
 
-       pthread_mutex_lock(&evti->ei_mutex);
+       pthread_mutex_lock(&evti->ei_response_mutex);
 
 	/*
 	 * Another thread has already started finalizing
 	 */
 	if (evti->ei_finalize) {
-		pthread_mutex_unlock(&evti->ei_mutex);
+		pthread_mutex_unlock(&evti->ei_response_mutex);
 		saHandleInstancePut(&evt_instance_handle_db, evtHandle);
 		return SA_AIS_ERR_BAD_HANDLE;
 	}
 
 	evti->ei_finalize = 1;
 
-	saActivatePoll(evti->ei_fd);
-
-	pthread_mutex_unlock(&evti->ei_mutex);
+	pthread_mutex_unlock(&evti->ei_response_mutex);
 
 	saHandleDestroy(&evt_instance_handle_db, evtHandle);
 	saHandleInstancePut(&evt_instance_handle_db, evtHandle);
@@ -767,6 +752,7 @@ saEvtChannelOpen(
 	struct res_evt_channel_open res;
 	struct event_channel_instance *eci;
 	SaAisErrorT error;
+	struct iovec iov;
 
 	error = saHandleInstanceGet(&evt_instance_handle_db, evtHandle,
 			(void*)&evti);
@@ -791,7 +777,6 @@ saEvtChannelOpen(
 		goto chan_open_put;
 	}
 
-
 	/*
 	 * Send the request to the server and wait for a response
 	 */
@@ -803,19 +788,22 @@ saEvtChannelOpen(
 	req.ico_channel_name = *channelName;
 
 
-	pthread_mutex_lock(&evti->ei_mutex);
+	iov.iov_base = &req;
+	iov.iov_len = sizeof(req);
+
+	pthread_mutex_lock(&evti->ei_response_mutex);
+
+	error = saSendMsgReceiveReply(evti->ei_response_fd, &iov, 1,
+		&res, sizeof(res));
+
+	pthread_mutex_unlock (&evti->ei_response_mutex);
 
-	error = saSendRetry(evti->ei_fd, &req, sizeof(req), MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
-		pthread_mutex_unlock (&evti->ei_mutex);
 		goto chan_open_free;
 	}
-	error = saRecvQueue(evti->ei_fd, &res, &evti->ei_inq, 
-					MESSAGE_RES_EVT_OPEN_CHANNEL);
 
-	pthread_mutex_unlock (&evti->ei_mutex);
-
-	if (error != SA_AIS_OK) {
+	if (res.ico_head.id != MESSAGE_RES_EVT_OPEN_CHANNEL) {
+		error = SA_AIS_ERR_LIBRARY;
 		goto chan_open_free;
 	}
 
@@ -857,6 +845,7 @@ saEvtChannelClose(SaEvtChannelHandleT channelHandle)
 	struct event_channel_instance *eci;
 	struct req_evt_channel_close req;
 	struct res_evt_channel_close res;
+	struct iovec iov;
 
 	error = saHandleInstanceGet(&channel_handle_db, channelHandle,
 			(void*)&eci);
@@ -883,9 +872,6 @@ saEvtChannelClose(SaEvtChannelHandleT channelHandle)
 		return SA_AIS_ERR_BAD_HANDLE;
 	}
 	eci->eci_closing = 1;
-
-	saActivatePoll(evti->ei_fd);
-
 	pthread_mutex_unlock(&eci->eci_mutex);
 	
 
@@ -896,18 +882,22 @@ saEvtChannelClose(SaEvtChannelHandleT channelHandle)
 	req.icc_head.id = MESSAGE_REQ_EVT_CLOSE_CHANNEL;
 	req.icc_channel_handle = eci->eci_svr_channel_handle;
 
-	pthread_mutex_lock(&evti->ei_mutex);
+	iov.iov_base = &req;
+	iov.iov_len = sizeof (req);
+
+	pthread_mutex_lock(&evti->ei_response_mutex);
+
+	error = saSendMsgReceiveReply (evti->ei_response_fd, &iov, 1,
+		&res, sizeof (res));
+
+	pthread_mutex_unlock(&evti->ei_response_mutex);
 
-	error = saSendRetry(evti->ei_fd, &req, sizeof(req), MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
-		pthread_mutex_unlock(&evti->ei_mutex);
 		eci->eci_closing = 0;
 		goto chan_close_put2;
 	}
-	error = saRecvQueue(evti->ei_fd, &res, &evti->ei_inq, 
-					MESSAGE_RES_EVT_CLOSE_CHANNEL);
-	pthread_mutex_unlock(&evti->ei_mutex);
-	if (error != SA_AIS_OK) {
+	if (res.icc_head.id != MESSAGE_RES_EVT_CLOSE_CHANNEL) {
+		error = SA_AIS_ERR_LIBRARY;
 		eci->eci_closing = 0;
 		goto chan_close_put2;
 	}
@@ -948,6 +938,7 @@ saEvtChannelOpenAsync(SaEvtHandleT evtHandle,
 	struct event_channel_instance *eci;
 	SaEvtChannelHandleT channel_handle;
 	SaAisErrorT error;
+	struct iovec iov;
 
 	error = saHandleInstanceGet(&evt_instance_handle_db, evtHandle,
 			(void*)&evti);
@@ -994,21 +985,23 @@ saEvtChannelOpenAsync(SaEvtHandleT evtHandle,
 	req.ico_invocation = invocation;
 	req.ico_open_flag = channelOpenFlags;
 	req.ico_channel_name = *channelName;
+	iov.iov_base = &req;
+	iov.iov_len = sizeof(req);
+
+
+	pthread_mutex_lock(&evti->ei_response_mutex);
 
+	error = saSendMsgReceiveReply (evti->ei_response_fd, &iov, 1,
+		&res, sizeof (res));
 
-	pthread_mutex_lock(&evti->ei_mutex);
+	pthread_mutex_unlock(&evti->ei_response_mutex);
 
-	error = saSendRetry(evti->ei_fd, &req, sizeof(req), MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
-		pthread_mutex_unlock (&evti->ei_mutex);
 		goto chan_open_free;
 	}
-	error = saRecvQueue(evti->ei_fd, &res, &evti->ei_inq, 
-					MESSAGE_RES_EVT_OPEN_CHANNEL);
 
-	pthread_mutex_unlock (&evti->ei_mutex);
-
-	if (error != SA_AIS_OK) {
+	if (res.ico_head.id != MESSAGE_RES_EVT_OPEN_CHANNEL) {
+		error = SA_AIS_ERR_LIBRARY;
 		goto chan_open_free;
 	}
 
@@ -1062,6 +1055,7 @@ saEvtChannelUnlink(
 	struct event_instance *evti;
 	struct req_evt_channel_unlink req;
 	struct res_evt_channel_unlink res;
+	struct iovec iov;
 	SaAisErrorT error;
 
 	error = saHandleInstanceGet(&evt_instance_handle_db, evtHandle,
@@ -1077,21 +1071,23 @@ saEvtChannelUnlink(
 	req.iuc_head.size = sizeof(req);
 	req.iuc_head.id = MESSAGE_REQ_EVT_UNLINK_CHANNEL;
 	req.iuc_channel_name = *channelName;
+	iov.iov_base = &req;
+	iov.iov_len = sizeof(req);
+
 
+	pthread_mutex_lock(&evti->ei_response_mutex);
 
-	pthread_mutex_lock(&evti->ei_mutex);
+	error = saSendMsgReceiveReply (evti->ei_response_fd, &iov, 1,
+		&res, sizeof (res));
+
+	pthread_mutex_unlock(&evti->ei_response_mutex);
 
-	error = saSendRetry(evti->ei_fd, &req, sizeof(req), MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
-		pthread_mutex_unlock (&evti->ei_mutex);
 		goto chan_unlink_put;
 	}
-	error = saRecvQueue(evti->ei_fd, &res, &evti->ei_inq, 
-					MESSAGE_RES_EVT_UNLINK_CHANNEL);
-
-	pthread_mutex_unlock (&evti->ei_mutex);
 
-	if (error != SA_AIS_OK) {
+	if (res.iuc_head.id != MESSAGE_RES_EVT_UNLINK_CHANNEL) {
+		error = SA_AIS_ERR_LIBRARY;
 		goto chan_unlink_put;
 	}
 
@@ -1564,6 +1560,7 @@ saEvtEventPublish(
 	size_t pattern_size;
 	struct event_pattern *patterns;
 	void   *data_start;
+	struct iovec iov;
 
 	if (eventDataSize > SA_EVT_DATA_MAX_LEN) {
 		error = SA_AIS_ERR_INVALID_PARAM;
@@ -1644,20 +1641,21 @@ saEvtEventPublish(
 	req->led_priority = edi->edi_priority;
 	req->led_publisher_name = edi->edi_pub_name;
 
-	pthread_mutex_lock(&evti->ei_mutex);
-	error = saSendRetry(evti->ei_fd, req, req->led_head.size, MSG_NOSIGNAL);
+	iov.iov_base = req;
+	iov.iov_len = req->led_head.size;
+
+	pthread_mutex_lock(&evti->ei_response_mutex);
+
+	error = saSendMsgReceiveReply(evti->ei_response_fd, &iov, 1, &res,
+		sizeof(res));
+
+	pthread_mutex_unlock (&evti->ei_response_mutex);
 	free(req);
 	if (error != SA_AIS_OK) {
-		pthread_mutex_unlock (&evti->ei_mutex);
+		pthread_mutex_unlock (&evti->ei_response_mutex);
 		goto pub_put3;
 	}
 
-	error = saRecvQueue(evti->ei_fd, &res, &evti->ei_inq, 
-					MESSAGE_RES_EVT_PUBLISH);
-	pthread_mutex_unlock (&evti->ei_mutex);
-	if (error != SA_AIS_OK) {
-		goto pub_put3;
-	}
 	error = res.iep_head.error;
 	if (error == SA_AIS_OK) {
 		*eventId = res.iep_event_id;
@@ -1702,6 +1700,7 @@ saEvtEventSubscribe(
 	struct req_evt_event_subscribe *req;
 	struct res_evt_event_subscribe res;
 	int	sz;
+	struct iovec iov;
 
 	error = saHandleInstanceGet(&channel_handle_db, channelHandle,
 			(void*)&eci);
@@ -1757,20 +1756,16 @@ saEvtEventSubscribe(
 	req->ics_channel_handle = eci->eci_svr_channel_handle;
 	req->ics_sub_id = subscriptionId;
 	req->ics_filter_size = sz;
+	iov.iov_base = req;
+	iov.iov_len = req->ics_head.size;
 
-	pthread_mutex_lock(&evti->ei_mutex);
-	error = saSendRetry(evti->ei_fd, req, req->ics_head.size, MSG_NOSIGNAL);
+	pthread_mutex_lock(&evti->ei_response_mutex);
+	error = saSendMsgReceiveReply(evti->ei_response_fd, &iov, 1,
+		&res, sizeof(res));
+	pthread_mutex_unlock (&evti->ei_response_mutex);
 	free(req);
-	if (error != SA_AIS_OK) {
-		pthread_mutex_unlock (&evti->ei_mutex);
-		goto subscribe_put2;
-	}
-	error = saRecvQueue(evti->ei_fd, &res, &evti->ei_inq, 
-					MESSAGE_RES_EVT_SUBSCRIBE);
 
-	pthread_mutex_unlock (&evti->ei_mutex);
-
-	if (error != SA_AIS_OK) {
+	if (res.ics_head.id != MESSAGE_RES_EVT_SUBSCRIBE) {
 		goto subscribe_put2;
 	}
 
@@ -1805,6 +1800,7 @@ saEvtEventUnsubscribe(
 	struct event_channel_instance *eci;
 	struct req_evt_event_unsubscribe req;
 	struct res_evt_event_unsubscribe res;
+	struct iovec iov;
 
 	error = saHandleInstanceGet(&channel_handle_db, channelHandle,
 			(void*)&eci);
@@ -1823,19 +1819,20 @@ saEvtEventUnsubscribe(
 
 	req.icu_channel_handle = eci->eci_svr_channel_handle;
 	req.icu_sub_id = subscriptionId;
+	iov.iov_base = &req;
+	iov.iov_len = sizeof(req);
+
+	pthread_mutex_lock(&evti->ei_response_mutex);
+ 	error = saSendMsgReceiveReply(evti->ei_response_fd, &iov, 1,
+ 		&res, sizeof(res));
+ 	pthread_mutex_unlock (&evti->ei_response_mutex);
 
-	pthread_mutex_lock(&evti->ei_mutex);
-	error = saSendRetry(evti->ei_fd, &req, sizeof(req), MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
-		pthread_mutex_unlock (&evti->ei_mutex);
 		goto unsubscribe_put2;
 	}
-	error = saRecvQueue(evti->ei_fd, &res, &evti->ei_inq, 
-					MESSAGE_RES_EVT_UNSUBSCRIBE);
 
-	pthread_mutex_unlock (&evti->ei_mutex);
-
-	if (error != SA_AIS_OK) {
+	if (res.icu_head.id != MESSAGE_RES_EVT_UNSUBSCRIBE) {
+		error = SA_AIS_ERR_LIBRARY;
 		goto unsubscribe_put2;
 	}
 
@@ -1869,6 +1866,7 @@ saEvtEventRetentionTimeClear(
 	struct event_channel_instance *eci;
 	struct req_evt_event_clear_retentiontime req;
 	struct res_evt_event_clear_retentiontime res;
+	struct iovec iov;
 
 	error = saHandleInstanceGet(&channel_handle_db, channelHandle,
 			(void*)&eci);
@@ -1888,20 +1886,21 @@ saEvtEventRetentionTimeClear(
 	req.iec_channel_handle = eci->eci_svr_channel_handle;
 	req.iec_event_id = eventId;
 
-	pthread_mutex_lock(&evti->ei_mutex);
-	error = saSendRetry(evti->ei_fd, &req, sizeof(req), MSG_NOSIGNAL);
-	if (error != SA_AIS_OK) {
-		pthread_mutex_unlock (&evti->ei_mutex);
-		goto ret_time_put2;
-	}
-	error = saRecvQueue(evti->ei_fd, &res, &evti->ei_inq, 
-					MESSAGE_RES_EVT_CLEAR_RETENTIONTIME);
+	iov.iov_base = &req;
+	iov.iov_len = sizeof(req);
 
-	pthread_mutex_unlock (&evti->ei_mutex);
+	pthread_mutex_lock(&evti->ei_response_mutex);
+	error = saSendMsgReceiveReply(evti->ei_response_fd, &iov, 1,
+		&res, sizeof(res));
+	pthread_mutex_unlock (&evti->ei_response_mutex);
 
 	if (error != SA_AIS_OK) {
 		goto ret_time_put2;
 	}
+	if (res.iec_head.id != MESSAGE_RES_EVT_CLEAR_RETENTIONTIME) {
+		error = SA_AIS_ERR_LIBRARY;
+		goto ret_time_put2;
+	}
 
 	error = res.iec_head.error;
 

+ 187 - 110
lib/util.c

@@ -1,4 +1,6 @@
 /*
+ * vi: set autoindent tabstop=4 shiftwidth=4 :
+ *
  * Copyright (c) 2002-2004 MontaVista Software, Inc.
  *
  * All rights reserved.
@@ -67,13 +69,13 @@ struct saHandle {
 SaErrorT
 saServiceConnect (
 	int *fdOut,
-	enum req_init_types initType)
+	enum service_types service)
 {
 	int fd;
 	int result;
 	struct sockaddr_un address;
-	struct req_lib_init req_lib_init;
-	struct res_lib_init res_lib_init;
+	struct req_lib_response_init req_lib_response_init;
+	struct res_lib_response_init res_lib_response_init;
 	SaErrorT error;
 	gid_t egid;
 
@@ -95,16 +97,17 @@ saServiceConnect (
 		return (SA_ERR_TRY_AGAIN);
 	}
 
-	req_lib_init.header.size = sizeof (req_lib_init);
-	req_lib_init.header.id = initType;
+	req_lib_response_init.resdis_header.size = sizeof (req_lib_response_init);
+	req_lib_response_init.resdis_header.id = MESSAGE_REQ_RESPONSE_INIT;
+	req_lib_response_init.resdis_header.service = service;
 
-	error = saSendRetry (fd, &req_lib_init, sizeof (struct req_lib_init),
-		MSG_NOSIGNAL);
+	error = saSendRetry (fd, &req_lib_response_init,
+		sizeof (struct req_lib_response_init), MSG_NOSIGNAL);
 	if (error != SA_OK) {
 		goto error_exit;
 	}
-	error = saRecvRetry (fd, &res_lib_init,
-		sizeof (struct res_lib_init), MSG_WAITALL | MSG_NOSIGNAL);
+	error = saRecvRetry (fd, &res_lib_response_init,
+		sizeof (struct res_lib_response_init), MSG_WAITALL | MSG_NOSIGNAL);
 	if (error != SA_OK) {
 		goto error_exit;
 	}
@@ -112,8 +115,8 @@ saServiceConnect (
 	/*
 	 * Check for security errors
 	 */
-	if (res_lib_init.header.error != SA_OK) {
-		error = res_lib_init.header.error;
+	if (res_lib_response_init.header.error != SA_OK) {
+		error = res_lib_response_init.header.error;
 		goto error_exit;
 	}
 
@@ -124,6 +127,115 @@ error_exit:
 	return (error);
 }
 
+SaErrorT
+saServiceConnectTwo (
+	int *responseOut,
+	int *callbackOut,
+	enum service_types service)
+{
+	int responseFD;
+	int callbackFD;
+	int result;
+	struct sockaddr_un address;
+	struct req_lib_response_init req_lib_response_init;
+	struct res_lib_response_init res_lib_response_init;
+	struct req_lib_dispatch_init req_lib_dispatch_init;
+	struct res_lib_dispatch_init res_lib_dispatch_init;
+	SaErrorT error;
+	gid_t egid;
+
+	/*
+	 * Allow set group id binaries to be authenticated
+	 */
+	egid = getegid();
+	setregid (egid, -1);
+
+	memset (&address, 0, sizeof (struct sockaddr_un));
+	address.sun_family = PF_UNIX;
+	strcpy (address.sun_path + 1, "libais.socket");
+	responseFD = socket (PF_UNIX, SOCK_STREAM, 0);
+	if (responseFD == -1) {
+		return (SA_ERR_SYSTEM);
+	}
+	result = connect (responseFD, (struct sockaddr *)&address, sizeof (address));
+	if (result == -1) {
+		return (SA_ERR_TRY_AGAIN);
+	}
+
+	req_lib_response_init.resdis_header.size = sizeof (req_lib_response_init);
+	req_lib_response_init.resdis_header.id = MESSAGE_REQ_RESPONSE_INIT;
+	req_lib_response_init.resdis_header.service = service;
+
+	error = saSendRetry (responseFD, &req_lib_response_init,
+		sizeof (struct req_lib_response_init),
+		MSG_NOSIGNAL);
+	if (error != SA_OK) {
+		goto error_exit;
+	}
+	error = saRecvRetry (responseFD, &res_lib_response_init,
+		sizeof (struct res_lib_response_init),
+		MSG_WAITALL | MSG_NOSIGNAL);
+	if (error != SA_OK) {
+		goto error_exit;
+	}
+
+	/*
+	 * Check for security errors
+	 */
+	if (res_lib_response_init.header.error != SA_OK) {
+		error = res_lib_response_init.header.error;
+		goto error_exit;
+	}
+
+	*responseOut = responseFD;
+
+/* if I comment out the 4 lines below the executive crashes */
+	callbackFD = socket (PF_UNIX, SOCK_STREAM, 0);
+	if (callbackFD == -1) {
+		return (SA_ERR_SYSTEM);
+	}
+	result = connect (callbackFD, (struct sockaddr *)&address, sizeof (address));
+	if (result == -1) {
+		return (SA_ERR_TRY_AGAIN);
+	}
+
+	req_lib_dispatch_init.resdis_header.size = sizeof (req_lib_dispatch_init);
+	req_lib_dispatch_init.resdis_header.id = MESSAGE_REQ_DISPATCH_INIT;
+	req_lib_dispatch_init.resdis_header.service = service;
+
+	req_lib_dispatch_init.conn_info = res_lib_response_init.conn_info;
+
+	error = saSendRetry (callbackFD, &req_lib_dispatch_init,
+		sizeof (struct req_lib_dispatch_init),
+		MSG_NOSIGNAL);
+	if (error != SA_OK) {
+		goto error_exit_two;
+	}
+	error = saRecvRetry (callbackFD, &res_lib_dispatch_init,
+		sizeof (struct res_lib_dispatch_init),
+		MSG_WAITALL | MSG_NOSIGNAL);
+	if (error != SA_OK) {
+		goto error_exit_two;
+	}
+
+	/*
+	 * Check for security errors
+	 */
+	if (res_lib_dispatch_init.header.error != SA_OK) {
+		error = res_lib_dispatch_init.header.error;
+		goto error_exit;
+	}
+
+	*callbackOut = callbackFD;
+	return (SA_OK);
+
+error_exit_two:
+	close (callbackFD);
+error_exit:
+	close (responseFD);
+	return (error);
+}
+
 SaErrorT
 saRecvRetry (
 	int s,
@@ -135,9 +247,8 @@ saRecvRetry (
 	int result;
 	struct msghdr msg_recv;
 	struct iovec iov_recv;
-
-	iov_recv.iov_base = (void *)msg;
-	iov_recv.iov_len = len;
+	char *rbuf = (char *)msg;
+	int processed = 0;
 
 	msg_recv.msg_iov = &iov_recv;
 	msg_recv.msg_iovlen = 1;
@@ -148,14 +259,23 @@ saRecvRetry (
 	msg_recv.msg_flags = 0;
 
 retry_recv:
+	iov_recv.iov_base = (void *)&rbuf[processed];
+	iov_recv.iov_len = len - processed;
+
 	result = recvmsg (s, &msg_recv, flags);
 	if (result == -1 && errno == EINTR) {
 		goto retry_recv;
 	}
-	if (result == -1 || result != len) {
-		error = SA_ERR_SYSTEM;
+	if (result == -1 || result == 0) {
+		error = SA_ERR_MESSAGE_ERROR;
+		goto error_exit;
+	}
+	processed += result;
+	if (processed != len) {
+		goto retry_recv;
 	}
-	assert (result == len);
+	assert (processed == len);
+error_exit:
 	return (error);
 }
 
@@ -164,99 +284,6 @@ struct res_overlay {
 	char payload[0];
 };
 
-SaErrorT
-saRecvQueue (
-	int s,
-	void *msg,
-	struct queue *queue,
-	int findMessageId)
-{
-	struct res_overlay *overlay;
-	void *inq_msg;
-	int match;
-	SaErrorT error;
-
-	do {
-		overlay = (struct res_overlay *)msg;
-		error = saRecvRetry (s, overlay, sizeof (struct res_header),
-			MSG_WAITALL | MSG_NOSIGNAL);
-		if (error != SA_OK) {
-			goto error_exit;
-		}
-		assert (overlay->header.size != 0);
-
-		match = (overlay->header.id == findMessageId);
-
-		/*
-		 * Item doesn't match, queue it
-		 */
-		if (match == 0 && queue) {
-			inq_msg = (void *)malloc (overlay->header.size);
-			if (inq_msg == 0) {
-				error = SA_ERR_NO_MEMORY;
-				goto error_exit;
-			}
-			memcpy (inq_msg, overlay, sizeof (struct res_header));
-			overlay = (struct res_overlay *)inq_msg;
-			if (overlay->header.size > sizeof (struct res_header)) {
-				error = saRecvRetry (s, &overlay->payload,
-					overlay->header.size - sizeof (struct res_header),
-					MSG_WAITALL | MSG_NOSIGNAL);
-				if (error != SA_OK) {
-					goto error_exit;
-				}
-			}
-
-			if (overlay->header.id != MESSAGE_RES_LIB_ACTIVATEPOLL) {
-				error = saQueueItemAdd (queue, &inq_msg);
-				if (error != SA_OK) {
-					free (inq_msg);
-					goto error_exit;
-				}
-
-				error = saActivatePoll (s);
-				if (error != SA_OK) {
-					goto error_exit;
-				}
-			}
-		} else {
-		/*
-		 *  its a match, so deliver it
-		 */
-			overlay = (struct res_overlay *)msg;
-			if (overlay->header.size > sizeof (struct res_header)) {
-				error = saRecvRetry (s, &overlay->payload,
-					overlay->header.size - sizeof (struct res_header),
-					MSG_WAITALL | MSG_NOSIGNAL);
-				if (error != SA_OK) {
-					goto error_exit;
-				}
-			}
-			break;
-		}
-	} while (match == 0);
-
-error_exit:
-	return (error);
-}
-
-SaErrorT
-saActivatePoll (int s) {
-	struct req_lib_activatepoll req_lib_activatepoll;
-	SaErrorT error;
-
-	/*
-	 * Send activate poll to tell nodeexec to activate poll
-	 * on this file descriptor
-	 */
-	req_lib_activatepoll.header.size = sizeof (req_lib_activatepoll);
-	req_lib_activatepoll.header.id = MESSAGE_REQ_LIB_ACTIVATEPOLL;
-
-	error = saSendRetry (s, &req_lib_activatepoll,
-		sizeof (struct req_lib_activatepoll), MSG_NOSIGNAL);
-	return (error);
-}
-
 SaErrorT
 saSendRetry (
 	int s,
@@ -321,6 +348,55 @@ retry_send:
 	return (error);
 }
 
+SaErrorT saSendMsgReceiveReply (
+        int s,
+        struct iovec *iov,
+        int iov_len,
+        void *responseMessage,
+        int responseLen)
+{
+	SaErrorT error = SA_OK;
+
+	error = saSendMsgRetry (s, iov, iov_len);
+	if (error != SA_OK) {
+		goto error_exit;
+	}
+	
+	error = saRecvRetry (s, responseMessage, responseLen,
+		MSG_WAITALL | MSG_NOSIGNAL);
+	if (error != SA_OK) {
+		goto error_exit;
+	}
+
+error_exit:
+	return (error);
+}
+
+SaErrorT saSendReceiveReply (
+        int s,
+        void *requestMessage,
+        int requestLen,
+        void *responseMessage,
+        int responseLen)
+{
+	SaErrorT error = SA_OK;
+
+	error = saSendRetry (s, requestMessage, requestLen,
+		MSG_NOSIGNAL);
+	if (error != SA_OK) {
+		goto error_exit;
+	}
+	
+	error = saRecvRetry (s, responseMessage, responseLen,
+		MSG_WAITALL | MSG_NOSIGNAL);
+	if (error != SA_OK) {
+		goto error_exit;
+	}
+
+error_exit:
+	return (error);
+}
+
 SaErrorT
 saSelectRetry (
 	int s,
@@ -560,6 +636,7 @@ saQueueItemAdd (
 	queueItem += queuePosition * queue->bytesPerItem;
 	memcpy (queueItem, item, queue->bytesPerItem);
 
+	assert (queue->tail != queue->head);
 	if (queue->tail == queue->head) {
 		return (SA_ERR_LIBRARY);
 	}

+ 21 - 4
lib/util.h

@@ -66,7 +66,13 @@ struct queue {
 SaErrorT
 saServiceConnect (
 	int *fdOut,
-	enum req_init_types init_type);
+	enum service_types service);
+
+SaErrorT
+saServiceConnectTwo (
+        int *responseOut,
+        int *callbackOut,
+        enum service_types service);
 
 SaErrorT
 saRecvRetry (
@@ -82,9 +88,6 @@ saRecvQueue (
 	struct queue *queue,
 	int findMessageId);
 
-SaErrorT
-saActivatePoll (int s);
-
 SaErrorT
 saSendRetry (
 	int s,
@@ -97,6 +100,20 @@ SaErrorT saSendMsgRetry (
 	struct iovec *iov,
 	int iov_len);
 
+SaErrorT saSendMsgReceiveReply (
+	int s,
+	struct iovec *iov,
+	int iov_len,
+	void *responseMessage,
+	int responseLen);
+
+SaErrorT saSendReceiveReply (
+	int s,
+	void *requestMessage,
+	int requestLen,
+	void *responseMessage,
+	int responseLen);
+
 SaErrorT
 saSelectRetry (
 	int s,