Explorar o código

defect 595
There are various bugs with saCkptCheckpointOpenAsync that result in it
crashing the executive, library, or just failing the saftest conformance
suite.

(Logical change 1.200)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@650 fd59a12c-fef9-0310-b244-a6a79926bd2f

Steven Dake %!s(int64=20) %!d(string=hai) anos
pai
achega
8a8983126d
Modificáronse 5 ficheiros con 158 adicións e 89 borrados
  1. 29 13
      exec/ckpt.c
  2. 105 61
      lib/ckpt.c
  3. 3 0
      lib/util.c
  4. 1 1
      test/ckptbench.c
  5. 20 14
      test/testckpt.c

+ 29 - 13
exec/ckpt.c

@@ -1025,6 +1025,10 @@ static int message_handler_req_exec_ckpt_checkpointopen (void *message, struct i
 	 * If checkpoint doesn't exist, create one
 	 */
 	if (ckptCheckpoint == 0) {
+		if ((req_lib_ckpt_checkpointopen->checkpointOpenFlags & SA_CKPT_CHECKPOINT_CREATE) == 0) {
+			error = SA_AIS_ERR_NOT_EXIST;
+			goto error_exit;
+		}
 		ckptCheckpoint = malloc (sizeof (struct saCkptCheckpoint));
 		if (ckptCheckpoint == 0) {
 			error = SA_AIS_ERR_NO_MEMORY;
@@ -1072,8 +1076,18 @@ static int message_handler_req_exec_ckpt_checkpointopen (void *message, struct i
 		assert(ckptCheckpointSection->sectionData);
 		memcpy(ckptCheckpointSection->sectionData, "Factory installed data\0", strlen("Factory installed data\0")+1);
 		ckptCheckpointSection->expiration_timer = 0;
+	} else {
+		if (req_lib_ckpt_checkpointopen->checkpointCreationAttributesSet &&
+			memcmp (&ckptCheckpoint->checkpointCreationAttributes,
+				&req_lib_ckpt_checkpointopen->checkpointCreationAttributesSet,
+				sizeof (SaCkptCheckpointCreationAttributesT)) == 0) {
+
+			error = SA_AIS_ERR_EXIST;
+			goto error_exit;
+		}
 	}
 
+
 	/*
 	 * If the checkpoint has been unlinked, it is an invalid name
 	 */
@@ -1104,8 +1118,8 @@ static int message_handler_req_exec_ckpt_checkpointopen (void *message, struct i
 	 }
 	 else {
 	 	log_printf (LOG_LEVEL_ERROR, 
-	 				"CKPT: MAX LIMIT OF PROCESSORS reached. Cannot store new proc %p info.\n", 
-	 				ckptCheckpoint);
+	 		"CKPT: MAX LIMIT OF PROCESSORS reached. Cannot store new proc %p info.\n", 
+	 		ckptCheckpoint);
 	 }
 	/*
 	 * Reset retention duration since this checkpoint was just opened
@@ -1121,16 +1135,23 @@ error_exit:
 	 * If this node was the source of the message, respond to this node
 	 */
 	if (message_source_is_local(&req_exec_ckpt_checkpointopen->source)) {
-		if (req_exec_ckpt_checkpointopen->invocation) {
+		/*
+		 * If its an async call respond with the invocation and handle
+		 */
+		if (req_exec_ckpt_checkpointopen->async_call) {
 			res_lib_ckpt_checkpointopenasync.header.size = sizeof (struct res_lib_ckpt_checkpointopenasync);
 			res_lib_ckpt_checkpointopenasync.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPENASYNC;
 			res_lib_ckpt_checkpointopenasync.header.error = error;
 			res_lib_ckpt_checkpointopenasync.checkpointHandle = req_exec_ckpt_checkpointopen->checkpointHandle;
 			res_lib_ckpt_checkpointopenasync.invocation = req_exec_ckpt_checkpointopen->invocation;
 
-            libais_send_response (req_exec_ckpt_checkpointopen->source.conn_info->conn_info_partner, &res_lib_ckpt_checkpointopenasync,
-                sizeof (struct res_lib_ckpt_checkpointopenasync));
+			libais_send_response (req_exec_ckpt_checkpointopen->source.conn_info->conn_info_partner,
+				&res_lib_ckpt_checkpointopenasync,
+				sizeof (struct res_lib_ckpt_checkpointopenasync));
 		} else {
+			/*
+			 * otherwise respond with the normal checkpointopen response
+			 */
 			res_lib_ckpt_checkpointopen.header.size = sizeof (struct res_lib_ckpt_checkpointopen);
 			res_lib_ckpt_checkpointopen.header.id = MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPEN;
 			res_lib_ckpt_checkpointopen.header.error = error;
@@ -1151,7 +1172,6 @@ error_exit:
 		}
 	}
 
-/*	return (error == SA_AIS_OK ? 0 : -1); */
 	return (0);
 }
 
@@ -2194,6 +2214,7 @@ static int message_handler_req_lib_ckpt_checkpointopen (struct conn_info *conn_i
 		req_lib_ckpt_checkpointopen,
 		sizeof (struct req_lib_ckpt_checkpointopen));
 	
+	req_exec_ckpt_checkpointopen.async_call = 0;
 	req_exec_ckpt_checkpointopen.invocation = 0;
 	req_exec_ckpt_checkpointopen.checkpointHandle = 0;
 
@@ -2222,6 +2243,7 @@ static int message_handler_req_lib_ckpt_checkpointopenasync (struct conn_info *c
 		req_lib_ckpt_checkpointopenasync,
 		sizeof (struct req_lib_ckpt_checkpointopen));
 	
+	req_exec_ckpt_checkpointopen.async_call = 1;
 	req_exec_ckpt_checkpointopen.invocation = req_lib_ckpt_checkpointopenasync->invocation;
 	req_exec_ckpt_checkpointopen.checkpointHandle = req_lib_ckpt_checkpointopenasync->checkpointHandle;
 
@@ -2237,14 +2259,8 @@ static int message_handler_req_lib_ckpt_checkpointopenasync (struct conn_info *c
 static int message_handler_req_lib_ckpt_checkpointclose (struct conn_info *conn_info, void *message) {
 	struct req_lib_ckpt_checkpointclose *req_lib_ckpt_checkpointclose = (struct req_lib_ckpt_checkpointclose *)message;
 	struct req_exec_ckpt_checkpointclose req_exec_ckpt_checkpointclose;
-	struct saCkptCheckpoint *checkpoint;
 	struct iovec iovecs[2];
 
-	checkpoint = ckpt_checkpoint_find_global (&req_lib_ckpt_checkpointclose->checkpointName);
-	if (checkpoint->expired == 1) {
-		return (0);
-	}	
-	
 	req_exec_ckpt_checkpointclose.header.size =
 		sizeof (struct req_exec_ckpt_checkpointclose);
 	req_exec_ckpt_checkpointclose.header.id = MESSAGE_REQ_EXEC_CKPT_CHECKPOINTCLOSE;
@@ -2252,7 +2268,7 @@ static int message_handler_req_lib_ckpt_checkpointclose (struct conn_info *conn_
 	message_source_set (&req_exec_ckpt_checkpointclose.source, conn_info);
 
 	memcpy (&req_exec_ckpt_checkpointclose.checkpointName,
-		&checkpoint->name, sizeof (SaNameT));
+		&req_lib_ckpt_checkpointclose->checkpointName, sizeof (SaNameT));
 
 	iovecs[0].iov_base = (char *)&req_exec_ckpt_checkpointclose;
 	iovecs[0].iov_len = sizeof (req_exec_ckpt_checkpointclose);

+ 105 - 61
lib/ckpt.c

@@ -76,7 +76,6 @@ struct ckptCheckpointInstance {
 	SaCkptCheckpointHandleT checkpointHandle;
 	SaCkptCheckpointOpenFlagsT checkpointOpenFlags;
 	SaNameT checkpointName;
-	SaUint32T maxSectionIdSize;
 	pthread_mutex_t response_mutex;
 	struct list_head list;
 };
@@ -85,7 +84,6 @@ struct ckptSectionIterationInstance {
 	int response_fd;
 	SaCkptCheckpointHandleT checkpointHandle;
 	struct list_head sectionIdListHead;
-	SaUint32T maxSectionIdSize;
 	pthread_mutex_t response_mutex;
 };
 
@@ -256,9 +254,10 @@ saCkptDispatch (
 	struct ckptCheckpointInstance *ckptCheckpointInstance;
 
 
-	error = saHandleInstanceGet (&ckptHandleDatabase, ckptHandle, (void *)&ckptInstance);
+	error = saHandleInstanceGet (&ckptHandleDatabase, ckptHandle,
+		(void *)&ckptInstance);
 	if (error != SA_AIS_OK) {
-		goto error_put;
+		goto error_exit;
 	}
 
 	/*
@@ -284,9 +283,6 @@ saCkptDispatch (
 		pthread_mutex_lock(&ckptInstance->dispatch_mutex);
 
 		if (ckptInstance->finalize == 1) {
-#ifdef DBGLOGS
-			fprintf(stderr,"CKPT: This Ckpt has been finalised in a separate call so exit the dispatch process\n");
-#endif
 			error = SA_AIS_OK;
 			goto error_unlock;
 		}
@@ -343,16 +339,34 @@ saCkptDispatch (
 				res_lib_ckpt_checkpointopenasync->checkpointHandle,
 				(void *)&ckptCheckpointInstance);
 			assert (error == SA_AIS_OK); /* should only be valid handles here */
-			list_add (&ckptCheckpointInstance->list,
-				&ckptInstance->checkpoint_list);
-
-			saHandleInstancePut (&checkpointHandleDatabase,
-				ckptCheckpointInstance->checkpointHandle);
-
-			callbacks.saCkptCheckpointOpenCallback(res_lib_ckpt_checkpointopenasync->invocation,
-				res_lib_ckpt_checkpointopenasync->checkpointHandle,
-				res_lib_ckpt_checkpointopenasync->header.error);
-			
+			if (res_lib_ckpt_checkpointopenasync->header.error == SA_AIS_OK) {
+				/*
+				 * open succeeded without error
+				 */
+				list_init (&ckptCheckpointInstance->list);
+				list_add (&ckptCheckpointInstance->list,
+					&ckptInstance->checkpoint_list);
+
+				callbacks.saCkptCheckpointOpenCallback(
+					res_lib_ckpt_checkpointopenasync->invocation,
+					res_lib_ckpt_checkpointopenasync->checkpointHandle,
+					res_lib_ckpt_checkpointopenasync->header.error);
+				saHandleInstancePut (&checkpointHandleDatabase,
+					res_lib_ckpt_checkpointopenasync->checkpointHandle);
+			} else {
+				/*
+				 * open failed with error
+				 */
+				saHandleInstancePut (&checkpointHandleDatabase,
+					res_lib_ckpt_checkpointopenasync->checkpointHandle);
+				saHandleDestroy (&checkpointHandleDatabase,
+					res_lib_ckpt_checkpointopenasync->checkpointHandle);
+
+				callbacks.saCkptCheckpointOpenCallback(
+					res_lib_ckpt_checkpointopenasync->invocation,
+					-1,
+					res_lib_ckpt_checkpointopenasync->header.error);
+			}
 			break;
 		default:
 			/* TODO */
@@ -375,6 +389,7 @@ error_unlock:
 	pthread_mutex_unlock(&ckptInstance->dispatch_mutex);
 error_put:
 	saHandleInstancePut(&ckptHandleDatabase, ckptHandle);
+error_exit:
 	return (error);
 }
 
@@ -408,7 +423,6 @@ saCkptFinalize (
 
 	pthread_mutex_unlock (&ckptInstance->response_mutex);
 
-
 	for (list = ckptInstance->checkpoint_list.next;
 		list != &ckptInstance->checkpoint_list;
 		list = list->next) {
@@ -442,10 +456,28 @@ saCkptCheckpointOpen (
 	struct req_lib_ckpt_checkpointopen req_lib_ckpt_checkpointopen;
 	struct res_lib_ckpt_checkpointopen res_lib_ckpt_checkpointopen;
 
+	if ((checkpointOpenFlags & SA_CKPT_CHECKPOINT_CREATE) && 
+		checkpointCreationAttributes == NULL) {
+
+		return (SA_AIS_ERR_INVALID_PARAM);
+	}
+
+	if (((checkpointOpenFlags & SA_CKPT_CHECKPOINT_CREATE) == 0) && 
+		checkpointCreationAttributes != NULL) {
+
+		return (SA_AIS_ERR_INVALID_PARAM);
+	}
+
+	error = saHandleInstanceGet (&ckptHandleDatabase, ckptHandle,
+		(void *)&ckptInstance);
+	if (error != SA_AIS_OK) {
+		goto error_exit;
+	}
+
 	error = saHandleCreate (&checkpointHandleDatabase,
 		sizeof (struct ckptCheckpointInstance), checkpointHandle);
 	if (error != SA_AIS_OK) {
-		goto error_no_destroy;
+		goto error_put_ckpt;
 	}
 
 	error = saHandleInstanceGet (&checkpointHandleDatabase,
@@ -454,17 +486,8 @@ saCkptCheckpointOpen (
 		goto error_destroy;
 	}
 
-	error = saHandleInstanceGet (&ckptHandleDatabase, ckptHandle,
-		(void *)&ckptInstance);
-	if (error != SA_AIS_OK) {
-		goto error_put_destroy;
-	}
-
 	ckptCheckpointInstance->response_fd = ckptInstance->response_fd;
 
-	ckptCheckpointInstance->maxSectionIdSize =
-		checkpointCreationAttributes->maxSectionIdSize;
-
 	ckptCheckpointInstance->ckptHandle = ckptHandle;
 	ckptCheckpointInstance->checkpointHandle = *checkpointHandle;
 	ckptCheckpointInstance->checkpointOpenFlags = checkpointOpenFlags;
@@ -473,44 +496,50 @@ saCkptCheckpointOpen (
 	req_lib_ckpt_checkpointopen.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPEN;
 	memcpy (&req_lib_ckpt_checkpointopen.checkpointName, checkpointName, sizeof (SaNameT));
 	memcpy (&ckptCheckpointInstance->checkpointName, checkpointName, sizeof (SaNameT));
-	memcpy (&req_lib_ckpt_checkpointopen.checkpointCreationAttributes,
-		checkpointCreationAttributes,
-		sizeof (SaCkptCheckpointCreationAttributesT));
+	req_lib_ckpt_checkpointopen.checkpointCreationAttributesSet = 0;
+	if (checkpointCreationAttributes) {
+		memcpy (&req_lib_ckpt_checkpointopen.checkpointCreationAttributes,
+			checkpointCreationAttributes,
+			sizeof (SaCkptCheckpointCreationAttributesT));
+		req_lib_ckpt_checkpointopen.checkpointCreationAttributesSet = 1;
+	}
 	req_lib_ckpt_checkpointopen.checkpointOpenFlags = checkpointOpenFlags;
 
 	error = saSendRetry (ckptCheckpointInstance->response_fd, &req_lib_ckpt_checkpointopen,
 		sizeof (struct req_lib_ckpt_checkpointopen), MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
-		goto error_put_ckpt_destroy;
+		goto error_put_destroy;
 	}
 
 	error = saRecvRetry (ckptCheckpointInstance->response_fd, &res_lib_ckpt_checkpointopen,
 		sizeof (struct res_lib_ckpt_checkpointopen), MSG_WAITALL | MSG_NOSIGNAL);
 	if (error != SA_AIS_OK) {
-		goto error_put_ckpt_destroy;
+		goto error_put_destroy;
 	}
 	
 	if (res_lib_ckpt_checkpointopen.header.error != SA_AIS_OK) {
 		error = res_lib_ckpt_checkpointopen.header.error;
-		goto error_put_ckpt_destroy;
+		goto error_put_destroy;
 	}
 
 	pthread_mutex_init (&ckptCheckpointInstance->response_mutex, NULL);
 
 	saHandleInstancePut (&checkpointHandleDatabase, *checkpointHandle);
 
+	saHandleInstancePut (&ckptHandleDatabase, ckptHandle);
+
+	list_init (&ckptCheckpointInstance->list);
+
 	list_add (&ckptCheckpointInstance->list, &ckptInstance->checkpoint_list);
 	return (error);
 
-error_put_ckpt_destroy:
-	saHandleInstancePut (&ckptHandleDatabase, ckptHandle);
-
 error_put_destroy:
 	saHandleInstancePut (&checkpointHandleDatabase, *checkpointHandle);
-
 error_destroy:
 	saHandleDestroy (&checkpointHandleDatabase, *checkpointHandle);
-error_no_destroy:
+error_put_ckpt:
+	saHandleInstancePut (&ckptHandleDatabase, ckptHandle);
+error_exit:
 	return (error);
 }
 
@@ -528,10 +557,27 @@ saCkptCheckpointOpenAsync (
 	SaAisErrorT error;
 	struct req_lib_ckpt_checkpointopenasync req_lib_ckpt_checkpointopenasync;
 
+	if ((checkpointOpenFlags & SA_CKPT_CHECKPOINT_CREATE) && 
+		checkpointCreationAttributes == NULL) {
+
+		return (SA_AIS_ERR_INVALID_PARAM);
+	}
+
+	if (((checkpointOpenFlags & SA_CKPT_CHECKPOINT_CREATE) == 0) && 
+		checkpointCreationAttributes != NULL) {
+
+		return (SA_AIS_ERR_INVALID_PARAM);
+	}
+	error = saHandleInstanceGet (&ckptHandleDatabase, ckptHandle,
+		(void *)&ckptInstance);
+	if (error != SA_AIS_OK) {
+		goto error_exit;
+	}
+
 	error = saHandleCreate (&checkpointHandleDatabase,
 		sizeof (struct ckptCheckpointInstance), &checkpointHandle);
 	if (error != SA_AIS_OK) {
-		goto error_no_destroy;
+		goto error_put_ckpt;
 	}
 
 	error = saHandleInstanceGet (&checkpointHandleDatabase, checkpointHandle,
@@ -540,15 +586,9 @@ saCkptCheckpointOpenAsync (
 		goto error_destroy;
 	}
 
-	error = saHandleInstanceGet (&ckptHandleDatabase, ckptHandle, (void *)&ckptInstance);
-	if (error != SA_AIS_OK) {
-		goto error_put_destroy;
-	}
-
 	ckptCheckpointInstance->response_fd = ckptInstance->response_fd;
-	ckptCheckpointInstance->maxSectionIdSize =
-		checkpointCreationAttributes->maxSectionIdSize;
 	ckptCheckpointInstance->ckptHandle = ckptHandle;
+	ckptCheckpointInstance->checkpointHandle = checkpointHandle;
 	ckptCheckpointInstance->checkpointOpenFlags = checkpointOpenFlags;
 	memcpy (&ckptCheckpointInstance->checkpointName, checkpointName, sizeof (SaNameT));
 
@@ -556,9 +596,13 @@ saCkptCheckpointOpenAsync (
 	req_lib_ckpt_checkpointopenasync.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPENASYNC;
 	req_lib_ckpt_checkpointopenasync.invocation = invocation;
 	memcpy (&req_lib_ckpt_checkpointopenasync.checkpointName, checkpointName, sizeof (SaNameT));
-	memcpy (&req_lib_ckpt_checkpointopenasync.checkpointCreationAttributes,
-		checkpointCreationAttributes,
-		sizeof (SaCkptCheckpointCreationAttributesT));
+	req_lib_ckpt_checkpointopenasync.checkpointCreationAttributesSet = 0;
+	if (checkpointCreationAttributes) {
+		memcpy (&req_lib_ckpt_checkpointopenasync.checkpointCreationAttributes,
+			checkpointCreationAttributes,
+			sizeof (SaCkptCheckpointCreationAttributesT));
+		req_lib_ckpt_checkpointopenasync.checkpointCreationAttributesSet = 1;
+	}
 	
 	req_lib_ckpt_checkpointopenasync.checkpointOpenFlags = checkpointOpenFlags;
 	req_lib_ckpt_checkpointopenasync.checkpointHandle = checkpointHandle;
@@ -567,21 +611,24 @@ saCkptCheckpointOpenAsync (
 		sizeof (struct req_lib_ckpt_checkpointopenasync), MSG_NOSIGNAL);
 	
 	if (error != SA_AIS_OK) {
-		goto error_put_ckpt_destroy;
+		goto error_put_destroy;
 	}
 	
 	pthread_mutex_init (&ckptCheckpointInstance->response_mutex, NULL);
 
 	saHandleInstancePut (&checkpointHandleDatabase, checkpointHandle);
 
-	return (error);
-error_put_ckpt_destroy:
 	saHandleInstancePut (&ckptHandleDatabase, ckptHandle);
+
+	return (error);
+
 error_put_destroy:
 	saHandleInstancePut (&checkpointHandleDatabase, checkpointHandle);
 error_destroy:
 	saHandleDestroy (&checkpointHandleDatabase, checkpointHandle);
-error_no_destroy:
+error_put_ckpt:
+	saHandleInstancePut (&ckptHandleDatabase, ckptHandle);
+error_exit:
 	return (error);
 }
 
@@ -617,8 +664,6 @@ saCkptCheckpointClose (
 
 	list_del (&ckptCheckpointInstance->list);
 
-	saHandleInstancePut (&ckptHandleDatabase, ckptCheckpointInstance->ckptHandle);
-
 	saHandleDestroy (&checkpointHandleDatabase, checkpointHandle);
 
 exit_put:
@@ -1027,9 +1072,6 @@ saCkptSectionIterationInitialize (
 	 */
 	list_init (&ckptSectionIterationInstance->sectionIdListHead);
 
-	ckptSectionIterationInstance->maxSectionIdSize =
-		ckptCheckpointInstance->maxSectionIdSize;
-
 	req_lib_ckpt_sectioniteratorinitialize.header.size = sizeof (struct req_lib_ckpt_sectioniteratorinitialize); 
 	req_lib_ckpt_sectioniteratorinitialize.header.id = MESSAGE_REQ_CKPT_SECTIONITERATOR_SECTIONITERATORINITIALIZE;
 	req_lib_ckpt_sectioniteratorinitialize.sectionsChosen = sectionsChosen;
@@ -1093,8 +1135,10 @@ saCkptSectionIterationNext (
 	/*
 	 * Allocate section id storage area
 	 */
-	iteratorSectionIdListEntry = malloc (sizeof (struct list_head) +
-		ckptSectionIterationInstance->maxSectionIdSize);
+	/*
+	 *  TODO max section id size is 500
+	 */
+	iteratorSectionIdListEntry = malloc (sizeof (struct list_head) + 500);
 	if (iteratorSectionIdListEntry == 0) {
 		error = SA_AIS_ERR_NO_MEMORY;
 		goto error_put_nounlock;

+ 3 - 0
lib/util.c

@@ -499,8 +499,11 @@ saHandleDestroy (
 	SaUint64T handle)
 {
 	pthread_mutex_lock (&handleDatabase->mutex);
+
 	handleDatabase->handles[handle].state = SA_HANDLE_STATE_PENDINGREMOVAL;
+
 	pthread_mutex_unlock (&handleDatabase->mutex);
+
 	saHandleInstancePut (handleDatabase, handle);
 
 	return (SA_AIS_OK);

+ 1 - 1
test/ckptbench.c

@@ -219,7 +219,7 @@ int main (void) {
 	error = saCkptCheckpointOpen (ckptHandle,
 		&checkpointName,
 		&checkpointCreationAttributes,
-		SA_CKPT_CHECKPOINT_READ|SA_CKPT_CHECKPOINT_WRITE,
+		SA_CKPT_CHECKPOINT_CREATE|SA_CKPT_CHECKPOINT_READ|SA_CKPT_CHECKPOINT_WRITE,
 		0,
 		&checkpointHandle);
 	error = saCkptSectionCreate (checkpointHandle,

+ 20 - 14
test/testckpt.c

@@ -180,7 +180,7 @@ void OpenCallBack (
  	printf ("%s: This is a call back for open for invocation = %d\n",
 		get_test_output (error, SA_AIS_OK), (int)invocation);	
 
-	memcpy(&checkpointHandle, &chckpointHandle, sizeof(SaCkptCheckpointHandleT));
+	checkpointHandle = chckpointHandle;
 
 }
 
@@ -202,31 +202,39 @@ int main (void) {
 	struct timeval tv_end;
 	struct timeval tv_elapsed;
 	SaSelectionObjectT sel_fd;
+	fd_set read_set;
 	
-    error = saCkptInitialize (&ckptHandle, &callbacks, &version);
+	error = saCkptInitialize (&ckptHandle, &callbacks, &version);
 
 	error = saCkptCheckpointOpenAsync (ckptHandle,
 		open_invocation,
-        &checkpointName,
-        &checkpointCreationAttributes,
-        SA_CKPT_CHECKPOINT_READ|SA_CKPT_CHECKPOINT_WRITE);
+		&checkpointName,
+		&checkpointCreationAttributes,
+		SA_CKPT_CHECKPOINT_CREATE|SA_CKPT_CHECKPOINT_READ|SA_CKPT_CHECKPOINT_WRITE);
     printf ("%s: initial asynchronous open of checkpoint\n",
         get_test_output (error, SA_AIS_OK));
 
+	error = saCkptSelectionObjectGet (ckptHandle, &sel_fd);
+
+	printf ("%s: Retrieve selection object %lld\n",
+	get_test_output (error, SA_AIS_OK), sel_fd);
 
-	error = saCkptDispatch (ckptHandle, SA_DISPATCH_ONE);
+	FD_SET (sel_fd, &read_set);
+	select (sel_fd + 1, &read_set, 0, 0, 0);
+
+	error = saCkptDispatch (ckptHandle, SA_DISPATCH_ALL);
 	
 	printf ("%s: Dispatch response for open async of checkpoint\n",
 		get_test_output (error, SA_AIS_OK));
 
 	error = saCkptCheckpointClose (checkpointHandle);
-    printf ("%s: Closing checkpoint\n",
-        get_test_output (error, SA_AIS_OK));
+
+	printf ("%s: Closing checkpoint\n", get_test_output (error, SA_AIS_OK));
 	
 	error = saCkptCheckpointOpen (ckptHandle,
 		&checkpointName,
 		&checkpointCreationAttributes,
-		SA_CKPT_CHECKPOINT_READ|SA_CKPT_CHECKPOINT_WRITE,
+		SA_CKPT_CHECKPOINT_CREATE|SA_CKPT_CHECKPOINT_READ|SA_CKPT_CHECKPOINT_WRITE,
 		0,
 		&checkpointHandle);
 	printf ("%s: initial open of checkpoint\n",
@@ -304,7 +312,7 @@ printf ("Please wait, testing expiry of checkpoint sections.\n");
 	error = saCkptCheckpointOpen (ckptHandle,
 		&checkpointName,
 		&checkpointCreationAttributes,
-		SA_CKPT_CHECKPOINT_READ|SA_CKPT_CHECKPOINT_WRITE,
+		SA_CKPT_CHECKPOINT_CREATE|SA_CKPT_CHECKPOINT_READ|SA_CKPT_CHECKPOINT_WRITE,
 		0,
 		&checkpointHandle2);
 	printf ("%s: Opening unlinked checkpoint\n", 
@@ -317,7 +325,7 @@ printf ("Please wait, testing expiry of checkpoint sections.\n");
 	error = saCkptCheckpointOpen (ckptHandle,
 		&checkpointName,
 		&checkpointCreationAttributes,
-		SA_CKPT_CHECKPOINT_READ,
+		SA_CKPT_CHECKPOINT_CREATE|SA_CKPT_CHECKPOINT_READ,
 		0,
 		&checkpointHandleRead);
 
@@ -328,7 +336,7 @@ printf ("Please wait, testing expiry of checkpoint sections.\n");
 	error = saCkptCheckpointOpen (ckptHandle,
 		&checkpointName,
 		&checkpointCreationAttributes,
-		SA_CKPT_CHECKPOINT_READ|SA_CKPT_CHECKPOINT_WRITE,
+		SA_CKPT_CHECKPOINT_CREATE|SA_CKPT_CHECKPOINT_READ|SA_CKPT_CHECKPOINT_WRITE,
 		0,
 		&checkpointHandle);
 	printf ("%s: open after unlink/close\n",
@@ -482,8 +490,6 @@ printf ("Please wait, testing expiry of checkpoint sections.\n");
 		get_test_output (error, SA_AIS_OK));
 
 	error = saCkptSelectionObjectGet (ckptHandle, &sel_fd);
-	printf ("%s: Retrieve selection object %lld\n",
-		get_test_output (error, SA_AIS_OK), sel_fd);
 
 	error = saCkptFinalize (ckptHandle);
 	printf ("%s: Finalize checkpoint\n",