Просмотр исходного кода

Use new APIs for handle reference counting

(Logical change 1.21)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@56 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake 21 лет назад
Родитель
Сommit
b13fc41331
1 измененных файлов с 130 добавлено и 120 удалено
  1. 130 120
      lib/amf.c

+ 130 - 120
lib/amf.c

@@ -33,25 +33,6 @@
  * THE POSSIBILITY OF SUCH DAMAGE.
  */
 
-/*
- * thread locking model is as follows
- *
- *	APIs that use handles:
- *
- *	Every handle database has a lock.
- *	Each interface started with SaAmfInitialize has a lock.
- *	Handle database lock is taken.
- *	amfInstance lock is taken.
- *	Handle database lock is released early.
- *	amfInstance lock is released after amfInstance is out of use.
- *
- *	Finalize API:
- *	Handle database lock is taken
- *	amf instance lock is taken
- *	handle is removed
- *	amf instance lock is released
- *	handle database lock is released
- */
 #include <stdio.h>
 #include <string.h>
 #include <stdlib.h>
@@ -83,10 +64,11 @@ struct amfInstance {
 	struct queue inq;
 	SaNameT compName;
 	int compRegistered;
-	struct message_overlay message;
+	int finalize;
 	pthread_mutex_t mutex;
 };
-#define AMFINSTANCE_MUTEX_OFFSET offset_of(struct amfInstance, mutex)
+
+static void amfHandleInstanceDestructor (void *);
 
 /*
  * All instances in one database
@@ -94,8 +76,8 @@ struct amfInstance {
 static struct saHandleDatabase amfHandleDatabase = {
 	handleCount: 0,
 	handles: 0,
-	generation: 0,
-	mutex: PTHREAD_MUTEX_INITIALIZER
+	mutex: PTHREAD_MUTEX_INITIALIZER,
+	handleInstanceDestructor: amfHandleInstanceDestructor
 };
 
 /*
@@ -114,6 +96,21 @@ static struct saVersionDatabase amfVersionDatabase = {
 /*
  * Implementation
  */
+
+void amfHandleInstanceDestructor (void *instance)
+{
+	struct amfInstance *amfInstance = (struct amfInstance *)instance;
+
+	if (amfInstance->fd != -1) {
+		shutdown (amfInstance->fd, 0);
+		close (amfInstance->fd);
+	}
+
+	if (amfInstance->inq.items) {
+		free (amfInstance->inq.items);
+	}
+}
+
 SaErrorT
 saAmfInitialize (
 	SaAmfHandleT *amfHandle,
@@ -125,40 +122,48 @@ saAmfInitialize (
 
 	error = saVersionVerify (&amfVersionDatabase, version);
 	if (error != SA_OK) {
-		goto error_nofree;
+		goto error_no_destroy;
 	}
 	
-	error = saHandleCreate (&amfHandleDatabase, (void *)&amfInstance,
-		sizeof (struct amfInstance), amfHandle);
+	error = saHandleCreate (&amfHandleDatabase, sizeof (struct amfInstance), amfHandle);
 	if (error != SA_OK) {
-		goto error_nofree;
+		goto error_no_destroy;
 	}
 
+	error = saHandleInstanceGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
+	if (error != SA_OK) {
+		goto error_destroy;
+	}
+
+	amfInstance->fd = -1;
+	
 	/*
 	 * An inq is needed to store async messages while waiting for a 
 	 * sync response
 	 */
 	error = saQueueInit (&amfInstance->inq, 512, sizeof (void *));
 	if (error != SA_OK) {
-		goto error_free;
+		goto error_put_destroy;
 	}
 
 	error = saServiceConnect (&amfInstance->fd, MESSAGE_REQ_AMF_INIT);
 	if (error != SA_OK) {
-		goto error_free2;
+		goto error_put_destroy;
 	}
 
 	memcpy (&amfInstance->callbacks, amfCallbacks, sizeof (SaAmfCallbacksT));
 
 	pthread_mutex_init (&amfInstance->mutex, NULL);
 
+	saHandleInstancePut (&amfHandleDatabase, *amfHandle);
+
 	return (SA_OK);
 
-error_free2:
-	free (amfInstance->inq.items);
-error_free:
-	saHandleRemove (&amfHandleDatabase, *amfHandle);
-error_nofree:
+error_put_destroy:
+	saHandleInstancePut (&amfHandleDatabase, *amfHandle);
+error_destroy:
+	saHandleDestroy (&amfHandleDatabase, *amfHandle);
+error_no_destroy:
 	return (error);
 }
 
@@ -170,14 +175,14 @@ saAmfSelectionObjectGet (
 	struct amfInstance *amfInstance;
 	SaErrorT error;
 
-	error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleInstanceGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
 	*selectionObject = amfInstance->fd;
 
-	pthread_mutex_unlock (&amfInstance->mutex);
+	saHandleInstancePut (&amfHandleDatabase, *amfHandle);
 	return (SA_OK);
 }
 
@@ -202,12 +207,15 @@ saAmfDispatch (
 	int empty;
 	int ignore_dispatch = 0;
 	int cont = 1; /* always continue do loop except when set to 0 */
-	int handle_verified = 0;
 	int poll_fd;
-	unsigned int gen_first;
-	unsigned int gen_second;
 	struct message_overlay dispatch_data;
 
+	error = saHandleInstanceGet (&amfHandleDatabase, *amfHandle,
+		(void *)&amfInstance);
+	if (error != SA_OK) {
+		return (error);
+	}
+
 	/*
 	 * Timeout instantly for SA_DISPATCH_ALL
 	 */
@@ -216,25 +224,8 @@ saAmfDispatch (
 	}
 
 	do {
-		/*
-		 * If flags are SA_DISPATCH_BLOCKING and handle has been
-		 * verified, return SA_OK because a Finalize has been
-		 * called.  Else return error from saHandleConvert
-		 */
-		error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, &gen_first);
-		if (error != SA_OK) {
-			return (handle_verified ? SA_OK : error);
-		}
-		handle_verified = 1;
-
 		poll_fd = amfInstance->fd;
 
-		/*
-		 * Unlock mutex for potentially long wait in select.  If fd
-		* is closed by amfFinalize in select, select will return
-		 */
-		pthread_mutex_unlock (&amfInstance->mutex);
-		
 		/*
 		 * Read data directly from socket
 		 */
@@ -247,28 +238,27 @@ saAmfDispatch (
 			goto error_nounlock;
 		}
 
+		pthread_mutex_lock (&amfInstance->mutex);
+
+		/*
+		 * Handle has been finalized in another thread
+		 */
+		if (amfInstance->finalize == 1) {
+			error = SA_OK;
+			pthread_mutex_unlock (&amfInstance->mutex);
+			goto error_unlock;
+		}
+
 		dispatch_avail = ufds.revents & POLLIN;
 		if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
+			pthread_mutex_unlock (&amfInstance->mutex);
 			break; /* exit do while cont is 1 loop */
 		} else
 		if (dispatch_avail == 0) {
-			continue; /* next select */
+			pthread_mutex_unlock (&amfInstance->mutex);
+			continue; /* next poll */
 		}
 
-		/*
-		 * Re-verify amfHandle
-		 */
-		error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, &gen_second);
-		if (error != SA_OK) {
-			return (handle_verified ? SA_OK : error);
-		}
-
-		/*
-		 * Handle has been removed and then reallocated
-		 */
-		if (gen_first != gen_second) {
-			return (SA_OK);
-		}
 		saQueueIsEmpty(&amfInstance->inq, &empty);
 		if (empty == 0) {
 			/*
@@ -276,19 +266,21 @@ saAmfDispatch (
 			 */
 			saQueueItemGet (&amfInstance->inq, (void *)&queue_msg);
 			msg = *queue_msg;
-			memcpy (&amfInstance->message, msg, msg->size);
+			memcpy (&dispatch_data, msg, msg->size);
 			saQueueItemRemove (&amfInstance->inq);
 		} else {
 			/*
 			 * Queue empty, read response from socket
 			 */
-			error = saRecvRetry (amfInstance->fd, &amfInstance->message.header, sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL);
+			error = saRecvRetry (amfInstance->fd, &dispatch_data.header,
+				sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL);
 			if (error != SA_OK) {
 				goto error_unlock;
 			}
-			if (amfInstance->message.header.size > sizeof (struct message_header)) {
-				error = saRecvRetry (amfInstance->fd, &amfInstance->message.data,
-					amfInstance->message.header.size - sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL);
+			if (dispatch_data.header.size > sizeof (struct message_header)) {
+				error = saRecvRetry (amfInstance->fd, &dispatch_data.data,
+					dispatch_data.header.size - sizeof (struct message_header),
+					MSG_WAITALL | MSG_NOSIGNAL);
 				if (error != SA_OK) {
 					goto error_unlock;
 				}
@@ -300,18 +292,17 @@ saAmfDispatch (
 		 * operate at the same time that amfFinalize has been called in another thread.
 		 */
 		memcpy (&callbacks, &amfInstance->callbacks, sizeof (SaAmfCallbacksT));
-		memcpy (&dispatch_data, &amfInstance->message, sizeof (struct message_overlay));
-
 		pthread_mutex_unlock (&amfInstance->mutex);
 
+
 		/*
 		 * Dispatch incoming response
 		 */
-		switch (amfInstance->message.header.id) {
-		case MESSAGE_RES_AMF_ACTIVATEPOLL:
+		switch (dispatch_data.header.id) {
+		case MESSAGE_RES_LIB_ACTIVATEPOLL:
 			/*
 			 * This is a do nothing message which the node executive sends
-			 * to activate the file handle in poll when the library has
+			 * to activate the file amfHandle in poll when the library has
 			 * queued a message into amfHandle->inq
 			 * The dispatch is ignored for the following two cases:
 			 * 1) setting of timeout to zero for the DISPATCH_ALL case
@@ -398,10 +389,8 @@ saAmfDispatch (
 		}
 	} while (cont);
 
-	return (error);
-
 error_unlock:
-	pthread_mutex_unlock (&amfInstance->mutex);
+	saHandleInstancePut (&amfHandleDatabase, *amfHandle);
 error_nounlock:
 	return (error);
 }
@@ -413,20 +402,31 @@ saAmfFinalize (
 	struct amfInstance *amfInstance;
 	SaErrorT error;
 
-	error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET | HANDLECONVERT_DONTUNLOCKDB, 0);
+	error = saHandleInstanceGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
-	shutdown (amfInstance->fd, 0);
-	close (amfInstance->fd);
-	free (amfInstance->inq.items);
+	pthread_mutex_lock (&amfInstance->mutex);
 
-	error = saHandleRemove (&amfHandleDatabase, *amfHandle);
+	/*
+	 * Another thread has already started finalizing
+	 */
+	if (amfInstance->finalize) {
+		pthread_mutex_unlock (&amfInstance->mutex);
+		saHandleInstancePut (&amfHandleDatabase, *amfHandle);
+		return (SA_ERR_BAD_HANDLE);
+	}
+
+	amfInstance->finalize = 1;
+
+	saActivatePoll (amfInstance->fd);
 
 	pthread_mutex_unlock (&amfInstance->mutex);
 
-	saHandleUnlockDatabase (&amfHandleDatabase);
+	saHandleDestroy (&amfHandleDatabase, *amfHandle);
+
+	saHandleInstancePut (&amfHandleDatabase, *amfHandle);
 
 	return (error);
 }
@@ -441,6 +441,7 @@ saAmfComponentRegister (
 	SaErrorT error;
 	struct req_lib_amf_componentregister req_lib_amf_componentregister;
 	struct res_lib_amf_componentregister *res_lib_amf_componentregister;
+	struct message_overlay message;
 
 	req_lib_amf_componentregister.header.magic = MESSAGE_MAGIC;
 	req_lib_amf_componentregister.header.size = sizeof (struct req_lib_amf_componentregister);
@@ -452,11 +453,13 @@ saAmfComponentRegister (
 		memset (&req_lib_amf_componentregister.proxyCompName, 0, sizeof (SaNameT));
 	}
 
-	error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleInstanceGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
+	pthread_mutex_lock (&amfInstance->mutex);
+
 	error = saSendRetry (amfInstance->fd, &req_lib_amf_componentregister, sizeof (struct req_lib_amf_componentregister), MSG_NOSIGNAL);
 	if (error != SA_OK) {
 		goto error_unlock;
@@ -464,17 +467,17 @@ saAmfComponentRegister (
 
 	/*
 	 * Search for COMPONENTREGISTER responses and queue any
-	 * messages that dont match in this handle's inq.
+	 * messages that dont match in this amfHandle's inq.
 	 * This must be done to avoid dropping async messages
 	 * during this sync message retrieval
 	 */
-	error = saRecvQueue (amfInstance->fd, &amfInstance->message,
+	error = saRecvQueue (amfInstance->fd, &message,
 		&amfInstance->inq, MESSAGE_RES_AMF_COMPONENTREGISTER);
 	if (error != SA_OK) {
 		goto error_unlock;
 	}
 
-	res_lib_amf_componentregister = (struct res_lib_amf_componentregister *)&amfInstance->message;
+	res_lib_amf_componentregister = (struct res_lib_amf_componentregister *)&message;
 	if (res_lib_amf_componentregister->error == SA_OK) {
 		amfInstance->compRegistered = 1;
 		memcpy (&amfInstance->compName, compName, sizeof (SaNameT));
@@ -484,6 +487,7 @@ saAmfComponentRegister (
 
 error_unlock:
 	pthread_mutex_unlock (&amfInstance->mutex);
+	saHandleInstancePut (&amfHandleDatabase, *amfHandle);
 	return (error);
 }
 
@@ -496,6 +500,7 @@ saAmfComponentUnregister (
 	struct req_lib_amf_componentunregister req_lib_amf_componentunregister;
 	struct res_lib_amf_componentunregister *res_lib_amf_componentunregister;
 	struct amfInstance *amfInstance;
+	struct message_overlay message;
 	SaErrorT error;
 
 	req_lib_amf_componentunregister.header.magic = MESSAGE_MAGIC;
@@ -508,11 +513,13 @@ saAmfComponentUnregister (
 		memset (&req_lib_amf_componentunregister.proxyCompName, 0, sizeof (SaNameT));
 	}	
 
-	error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleInstanceGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
+	pthread_mutex_lock (&amfInstance->mutex);
+
 	error = saSendRetry (amfInstance->fd, &req_lib_amf_componentunregister,
 		sizeof (struct req_lib_amf_componentunregister), MSG_NOSIGNAL);
 	if (error != SA_OK) {
@@ -521,16 +528,17 @@ saAmfComponentUnregister (
 
 	/*
 	 * Search for COMPONENTUNREGISTER responses and queue any
-	 * messages that dont match in this handle's inq.
+	 * messages that dont match in this amfHandle's inq.
 	 * This must be done to avoid dropping async messages
 	 * during this sync message retrieval
 	 */
-	error = saRecvQueue (amfInstance->fd, &amfInstance->message,
+	error = saRecvQueue (amfInstance->fd, &message,
 		&amfInstance->inq, MESSAGE_RES_AMF_COMPONENTUNREGISTER);
 	if (error != SA_OK) {
 		goto error_unlock;
 	}
-	res_lib_amf_componentunregister = (struct res_lib_amf_componentunregister *)&amfInstance->message;
+
+	res_lib_amf_componentunregister = (struct res_lib_amf_componentunregister *)&message;
 	if (res_lib_amf_componentunregister->error == SA_OK) {
 		amfInstance->compRegistered = 0;
 	}
@@ -538,6 +546,7 @@ saAmfComponentUnregister (
 
 error_unlock:
 	pthread_mutex_unlock (&amfInstance->mutex);
+	saHandleInstancePut (&amfHandleDatabase, *amfHandle);
 	return (error);
 }
 
@@ -549,17 +558,23 @@ saAmfCompNameGet (
 	struct amfInstance *amfInstance;
 	SaErrorT error;
 
-	error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleInstanceGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
+	pthread_mutex_lock (&amfInstance->mutex);
+
 	if (amfInstance->compRegistered == 0) {
+		pthread_mutex_unlock (&amfInstance->mutex);
 		return (SA_ERR_NOT_EXIST);
 	}
+
 	memcpy (compName, &amfInstance->compName, sizeof (SaNameT));
 
 	pthread_mutex_unlock (&amfInstance->mutex);
+
+	saHandleInstancePut (&amfHandleDatabase, *amfHandle);
 	return (SA_OK);
 }
 
@@ -686,6 +701,7 @@ saAmfProtectionGroupTrackStart (
 	struct amfInstance *amfInstance;
 	struct req_amf_protectiongrouptrackstart req_amf_protectiongrouptrackstart;
 	struct res_amf_protectiongrouptrackstart *res_amf_protectiongrouptrackstart;
+	struct message_overlay message;
 	SaErrorT error;
 
 	req_amf_protectiongrouptrackstart.header.magic = MESSAGE_MAGIC;
@@ -696,33 +712,29 @@ saAmfProtectionGroupTrackStart (
 	req_amf_protectiongrouptrackstart.notificationBufferAddress = (SaAmfProtectionGroupNotificationT *)notificationBuffer;
 	req_amf_protectiongrouptrackstart.numberOfItems = numberOfItems;
 
-	error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleInstanceGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
+	pthread_mutex_lock (&amfInstance->mutex);
+
 	error = saSendRetry (amfInstance->fd, &req_amf_protectiongrouptrackstart,
 		sizeof (struct req_amf_protectiongrouptrackstart), MSG_NOSIGNAL);
 	if (error != SA_OK) {
 		goto error_unlock;
 	}
 
-	error = saRecvQueue (amfInstance->fd, &amfInstance->message,
+	error = saRecvQueue (amfInstance->fd, &message,
 		&amfInstance->inq, MESSAGE_RES_AMF_PROTECTIONGROUPTRACKSTART);
 
-	pthread_mutex_unlock (&amfInstance->mutex);
+	res_amf_protectiongrouptrackstart = (struct res_amf_protectiongrouptrackstart *)&message;
 
-	res_amf_protectiongrouptrackstart = (struct res_amf_protectiongrouptrackstart *)&amfInstance->message;
-
-	if (error == SA_OK) {
-		return (res_amf_protectiongrouptrackstart->error);
-	}
-
-	return (error);
+	error = res_amf_protectiongrouptrackstart->error;
 
 error_unlock:
 	pthread_mutex_unlock (&amfInstance->mutex);
-
+	saHandleInstancePut (&amfHandleDatabase, *amfHandle);
 	return (error);
 }
 
@@ -734,6 +746,7 @@ saAmfProtectionGroupTrackStop (
 	struct amfInstance *amfInstance;
 	struct req_amf_protectiongrouptrackstop req_amf_protectiongrouptrackstop;
 	struct res_amf_protectiongrouptrackstop *res_amf_protectiongrouptrackstop;
+	struct message_overlay message;
 	SaErrorT error;
 
 	req_amf_protectiongrouptrackstop.header.magic = MESSAGE_MAGIC;
@@ -741,32 +754,29 @@ saAmfProtectionGroupTrackStop (
 	req_amf_protectiongrouptrackstop.header.id = MESSAGE_REQ_AMF_PROTECTIONGROUPTRACKSTOP;
 	memcpy (&req_amf_protectiongrouptrackstop.csiName, csiName, sizeof (SaNameT));
 
-	error = saHandleConvert (&amfHandleDatabase, *amfHandle, (void *)&amfInstance, AMFINSTANCE_MUTEX_OFFSET, 0);
+	error = saHandleInstanceGet (&amfHandleDatabase, *amfHandle, (void *)&amfInstance);
 	if (error != SA_OK) {
 		return (error);
 	}
 
+	pthread_mutex_lock (&amfInstance->mutex);
+
 	error = saSendRetry (amfInstance->fd, &req_amf_protectiongrouptrackstop,
 		sizeof (struct req_amf_protectiongrouptrackstop), MSG_NOSIGNAL);
 	if (error != SA_OK) {
 		goto error_unlock;
 	}
 
-	error = saRecvQueue (amfInstance->fd, &amfInstance->message,
+	error = saRecvQueue (amfInstance->fd, &message,
 		&amfInstance->inq, MESSAGE_RES_AMF_PROTECTIONGROUPTRACKSTOP);
 
-	pthread_mutex_unlock (&amfInstance->mutex);
-
-	res_amf_protectiongrouptrackstop = (struct res_amf_protectiongrouptrackstop *)&amfInstance->message;
+	res_amf_protectiongrouptrackstop = (struct res_amf_protectiongrouptrackstop *)&message;
 
-	if (error == SA_OK) {
-		return (res_amf_protectiongrouptrackstop->error);
-	}
-
-	return (error);
+	error = res_amf_protectiongrouptrackstop->error;
 
 error_unlock:
 	pthread_mutex_unlock (&amfInstance->mutex);
+	saHandleInstancePut (&amfHandleDatabase, *amfHandle);
 	return (error);
 }