Forráskód Böngészése

Add flow control to checkpointing.

(Logical change 1.55)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@174 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake 21 éve
szülő
commit
ca88544979
1 módosított fájl, 26 hozzáadás és 38 törlés
  1. 26 38
      lib/ckpt.c

+ 26 - 38
lib/ckpt.c

@@ -51,7 +51,7 @@
 #include "util.h"
 
 struct message_overlay {
-	struct message_header header;
+	struct req_header header;
 	char data[4096];
 };
 
@@ -255,8 +255,8 @@ saCkptDispatch (
 	int dispatch_avail;
 	struct timeval *timeout = 0;
 	struct ckptInstance *ckptInstance;
-	struct message_header **queue_msg;
-	struct message_header *msg;
+	struct req_header **queue_msg;
+	struct req_header *msg;
 	int empty;
 	int ignore_dispatch = 0;
 	int cont = 1; /* always continue do loop except when set to 0 */
@@ -307,13 +307,13 @@ saCkptDispatch (
 			/*
 			 * Queue empty, read response from socket
 			 */
-			error = saRecvRetry (ckptInstance->fd, &ckptInstance->message.header, sizeof (struct message_header), MSG_WAITALL | MSG_NOSIGNAL);
+			error = saRecvRetry (ckptInstance->fd, &ckptInstance->message.header, sizeof (struct req_header), MSG_WAITALL | MSG_NOSIGNAL);
 			if (error != SA_OK) {
 				goto error_exit;
 			}
-			if (ckptInstance->message.header.size > sizeof (struct message_header)) {
+			if (ckptInstance->message.header.size > sizeof (struct req_header)) {
 				error = saRecvRetry (ckptInstance->fd, &ckptInstance->message.data,
-					ckptInstance->message.header.size - sizeof (struct message_header),
+					ckptInstance->message.header.size - sizeof (struct req_header),
 					MSG_WAITALL | MSG_NOSIGNAL);
 				if (error != SA_OK) {
 					goto error_exit;
@@ -491,7 +491,6 @@ saCkptCheckpointOpen (
 		goto error_put_destroy;
 	}
 
-	req_lib_ckpt_checkpointopen.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_checkpointopen.header.size = sizeof (struct req_lib_ckpt_checkpointopen);
 	req_lib_ckpt_checkpointopen.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPEN;
 	memcpy (&req_lib_ckpt_checkpointopen.checkpointName, checkpointName, sizeof (SaNameT));
@@ -513,8 +512,8 @@ saCkptCheckpointOpen (
 		goto error_put_destroy;
 	}
 	
-	if (res_lib_ckpt_checkpointopen.error != SA_OK) {
-		error = res_lib_ckpt_checkpointopen.error;
+	if (res_lib_ckpt_checkpointopen.header.error != SA_OK) {
+		error = res_lib_ckpt_checkpointopen.header.error;
 		goto error_put_destroy;
 	}
 
@@ -549,7 +548,6 @@ saCkptCheckpointOpenAsync (
 		return (error);
 	}
 
-	req_lib_ckpt_checkpointopenasync.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_checkpointopenasync.header.size = sizeof (struct req_lib_ckpt_checkpointopenasync);
 	req_lib_ckpt_checkpointopenasync.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPENASYNC;
 	req_lib_ckpt_checkpointopenasync.invocation = invocation;
@@ -607,7 +605,6 @@ saCkptCheckpointUnlink (
 		goto exit_noclose;
 	}
 
-	req_lib_ckpt_checkpointunlink.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_checkpointunlink.header.size = sizeof (struct req_lib_ckpt_checkpointunlink);
 	req_lib_ckpt_checkpointunlink.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTUNLINK;
 	memcpy (&req_lib_ckpt_checkpointunlink.checkpointName, checkpointName, sizeof (SaNameT));
@@ -623,7 +620,7 @@ saCkptCheckpointUnlink (
 
 exit_close:
 	close (fd);
-	return (error == SA_OK ? res_lib_ckpt_checkpointunlink.error : error);
+	return (error == SA_OK ? res_lib_ckpt_checkpointunlink.header.error : error);
 exit_noclose:
 	return (error);
 }
@@ -643,7 +640,6 @@ saCkptCheckpointRetentionDurationSet (
 		goto error_exit;
 	}
 
-	req_lib_ckpt_checkpointretentiondurationset.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_checkpointretentiondurationset.header.size = sizeof (struct req_lib_ckpt_checkpointretentiondurationset);
 	req_lib_ckpt_checkpointretentiondurationset.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTRETENTIONDURATIONSET;
 
@@ -674,7 +670,6 @@ saCkptActiveCheckpointSet (
 		goto error_exit;
 	}
 
-	req_lib_ckpt_activecheckpointset.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_activecheckpointset.header.size = sizeof (struct req_lib_ckpt_activecheckpointset);
 	req_lib_ckpt_activecheckpointset.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_ACTIVECHECKPOINTSET;
 
@@ -696,7 +691,7 @@ saCkptActiveCheckpointSet (
 	saHandleInstancePut (&checkpointHandleDatabase, *checkpointHandle);
 
 error_exit:
-	return (error == SA_OK ? res_lib_ckpt_activecheckpointset.error : error);
+	return (error == SA_OK ? res_lib_ckpt_activecheckpointset.header.error : error);
 }
 
 SaErrorT
@@ -715,7 +710,6 @@ saCkptCheckpointStatusGet (
 		return (error);
 	}
 
-	req_lib_ckpt_checkpointstatusget.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_checkpointstatusget.header.size = sizeof (struct req_lib_ckpt_checkpointstatusget);
 	req_lib_ckpt_checkpointstatusget.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTSTATUSGET;
 
@@ -764,7 +758,6 @@ saCkptSectionCreate (
 		return (error);
 	}
 
-	req_lib_ckpt_sectioncreate.header.magic = MESSAGE_MAGIC;
 
 	req_lib_ckpt_sectioncreate.header.size =
 		sizeof (struct req_lib_ckpt_sectioncreate) +
@@ -810,7 +803,7 @@ saCkptSectionCreate (
 error_exit:
 	saHandleInstancePut (&checkpointHandleDatabase, *checkpointHandle);
 
-	return (error == SA_OK ? res_lib_ckpt_sectioncreate.error : error);
+	return (error == SA_OK ? res_lib_ckpt_sectioncreate.header.error : error);
 }
 
 
@@ -832,7 +825,6 @@ saCkptSectionDelete (
 
 	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
 
-	req_lib_ckpt_sectiondelete.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_sectiondelete.header.size = sizeof (struct req_lib_ckpt_sectiondelete) + sectionId->idLen; 
 	req_lib_ckpt_sectiondelete.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONDELETE;
 	req_lib_ckpt_sectiondelete.idLen = sectionId->idLen;
@@ -860,7 +852,7 @@ saCkptSectionDelete (
 
 error_exit:
 	saHandleInstancePut (&checkpointHandleDatabase, *checkpointHandle);
-	return (error == SA_OK ? res_lib_ckpt_sectiondelete.error : error);
+	return (error == SA_OK ? res_lib_ckpt_sectiondelete.header.error : error);
 }
 
 SaErrorT
@@ -880,7 +872,6 @@ saCkptSectionExpirationTimeSet (
 		goto error_exit;
 	}
 
-	req_lib_ckpt_sectionexpirationtimeset.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_sectionexpirationtimeset.header.size = sizeof (struct req_lib_ckpt_sectionexpirationtimeset) + sectionId->idLen; 
 	req_lib_ckpt_sectionexpirationtimeset.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONEXPIRATIONTIMESET;
 	req_lib_ckpt_sectionexpirationtimeset.idLen = sectionId->idLen;
@@ -914,7 +905,7 @@ saCkptSectionExpirationTimeSet (
 
 error_exit:
 	saHandleInstancePut (&checkpointHandleDatabase, *checkpointHandle);
-	return (error == SA_OK ? res_lib_ckpt_sectionexpirationtimeset.error : error);
+	return (error == SA_OK ? res_lib_ckpt_sectionexpirationtimeset.header.error : error);
 }
 
 SaErrorT
@@ -967,7 +958,6 @@ saCkptSectionIteratorInitialize (
 		goto error_put_destroy;
 	}
 
-	req_lib_ckpt_sectioniteratorinitialize.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_sectioniteratorinitialize.header.size = sizeof (struct req_lib_ckpt_sectioniteratorinitialize); 
 	req_lib_ckpt_sectioniteratorinitialize.header.id = MESSAGE_REQ_CKPT_SECTIONITERATOR_SECTIONITERATORINITIALIZE;
 	req_lib_ckpt_sectioniteratorinitialize.sectionsChosen = sectionsChosen;
@@ -997,7 +987,7 @@ saCkptSectionIteratorInitialize (
 
 	saHandleInstancePut (&checkpointHandleDatabase, *checkpointHandle);
 
-	return (error == SA_OK ? res_lib_ckpt_sectioniteratorinitialize.error : error);
+	return (error == SA_OK ? res_lib_ckpt_sectioniteratorinitialize.header.error : error);
 
 error_put_destroy:
 	saHandleInstancePut (&ckptSectionIteratorHandleDatabase, *sectionIterator);
@@ -1040,7 +1030,6 @@ saCkptSectionIteratorNext (
 		goto error_put_nounlock;
 	}
 
-	req_lib_ckpt_sectioniteratornext.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_sectioniteratornext.header.size = sizeof (struct req_lib_ckpt_sectioniteratornext); 
 	req_lib_ckpt_sectioniteratornext.header.id = MESSAGE_REQ_CKPT_SECTIONITERATOR_SECTIONITERATORNEXT;
 
@@ -1077,7 +1066,7 @@ saCkptSectionIteratorNext (
 	/*
 	 * Add to persistent memory list for this sectioniterator
 	 */
-	if (error == SA_OK && res_lib_ckpt_sectioniteratornext.error == SA_OK) {
+	if (error == SA_OK && res_lib_ckpt_sectioniteratornext.header.error == SA_OK) {
 		list_init (&iteratorSectionIdListEntry->list);
 		list_add (&iteratorSectionIdListEntry->list, &ckptSectionIteratorInstance->sectionIdListHead);
 	}
@@ -1087,7 +1076,7 @@ error_put:
 error_put_nounlock:
 	saHandleInstancePut (&ckptSectionIteratorHandleDatabase, *sectionIterator);
 error_exit:
-	return (error == SA_OK ? res_lib_ckpt_sectioniteratornext.error : error);
+	return (error == SA_OK ? res_lib_ckpt_sectioniteratornext.header.error : error);
 }
 	
 SaErrorT
@@ -1151,7 +1140,6 @@ saCkptCheckpointWrite (
 		return (error);
 	}
 
-	req_lib_ckpt_sectionwrite.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_sectionwrite.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONWRITE;
 
 	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
@@ -1189,10 +1177,14 @@ saCkptCheckpointWrite (
 			goto error_exit;
 		}
 
+		if (res_lib_ckpt_sectionwrite.header.error == SA_ERR_TRY_AGAIN) {
+			error = SA_ERR_TRY_AGAIN;
+			goto error_exit;
+		}
 		/*
 		 * If error, report back erroneous index
 		 */
-		if (res_lib_ckpt_sectionwrite.error != SA_OK) {
+		if (res_lib_ckpt_sectionwrite.header.error != SA_OK) {
 			if (erroneousVectorIndex) {
 				*erroneousVectorIndex = i;
 			}
@@ -1206,7 +1198,7 @@ error_exit:
 
 	saHandleInstancePut (&checkpointHandleDatabase, *checkpointHandle);
 
-	return (error == SA_OK ? res_lib_ckpt_sectionwrite.error : error);
+	return (error == SA_OK ? res_lib_ckpt_sectionwrite.header.error : error);
 }
 
 SaErrorT
@@ -1227,7 +1219,6 @@ saCkptSectionOverwrite (
 		return (error);
 	}
 
-	req_lib_ckpt_sectionoverwrite.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_sectionoverwrite.header.size = sizeof (struct req_lib_ckpt_sectionoverwrite) + sectionId->idLen + dataSize; 
 	req_lib_ckpt_sectionoverwrite.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONOVERWRITE;
 	req_lib_ckpt_sectionoverwrite.idLen = sectionId->idLen;
@@ -1263,7 +1254,7 @@ error_exit:
 
 	saHandleInstancePut (&checkpointHandleDatabase, *checkpointHandle);
 
-	return (error == SA_OK ? res_lib_ckpt_sectionoverwrite.error : error);
+	return (error == SA_OK ? res_lib_ckpt_sectionoverwrite.header.error : error);
 }
 
 SaErrorT
@@ -1287,7 +1278,6 @@ saCkptCheckpointRead (
 		return (error);
 	}
 
-	req_lib_ckpt_sectionread.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_sectionread.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONREAD;
 
 	pthread_mutex_lock (&ckptCheckpointInstance->mutex);
@@ -1330,7 +1320,7 @@ saCkptCheckpointRead (
 					goto error_exit;
 			}
 		}
-		if (res_lib_ckpt_sectionread.error != SA_OK) {
+		if (res_lib_ckpt_sectionread.header.error != SA_OK) {
 			if (erroneousVectorIndex) {
 				*erroneousVectorIndex = i;
 			}
@@ -1348,7 +1338,7 @@ error_exit:
 
 	saHandleInstancePut (&checkpointHandleDatabase, *checkpointHandle);
 
-	return (error == SA_OK ? res_lib_ckpt_sectionread.error : error);
+	return (error == SA_OK ? res_lib_ckpt_sectionread.header.error : error);
 }
 
 SaErrorT
@@ -1367,7 +1357,6 @@ saCkptCheckpointSynchronize (
 		return (error);
 	}
 
-	req_lib_ckpt_checkpointsynchronize.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_checkpointsynchronize.header.size = sizeof (struct req_lib_ckpt_checkpointsynchronize); 
 	req_lib_ckpt_checkpointsynchronize.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZE;
 
@@ -1390,7 +1379,7 @@ error_exit:
 
 	saHandleInstancePut (&checkpointHandleDatabase, *checkpointHandle);
 
-	return (error == SA_OK ? res_lib_ckpt_checkpointsynchronize.error : error);
+	return (error == SA_OK ? res_lib_ckpt_checkpointsynchronize.header.error : error);
 }
 
 SaErrorT
@@ -1416,7 +1405,6 @@ saCkptCheckpointSynchronizeAsync (
 		return (error);
 	}
 
-	req_lib_ckpt_checkpointsynchronizeasync.header.magic = MESSAGE_MAGIC;
 	req_lib_ckpt_checkpointsynchronizeasync.header.size = sizeof (struct req_lib_ckpt_checkpointsynchronizeasync);
 	req_lib_ckpt_checkpointsynchronizeasync.header.id = MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZEASYNC;
 	req_lib_ckpt_checkpointsynchronizeasync.invocation = invocation;