Răsfoiți Sursa

Rewrite of the checkpoint benchmark using threads tool

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1098 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake 19 ani în urmă
părinte
comite
c3354368da
1 a modificat fișierele cu 80 adăugiri și 54 ștergeri
  1. 80 54
      test/ckptbenchth.c

+ 80 - 54
test/ckptbenchth.c

@@ -46,10 +46,13 @@
 #include <sys/un.h>
 #include <sys/un.h>
 #include <pthread.h>
 #include <pthread.h>
 #include <assert.h>
 #include <assert.h>
+#include <signal.h>
 
 
 #include "saAis.h"
 #include "saAis.h"
 #include "saCkpt.h"
 #include "saCkpt.h"
 
 
+int alarm_notice = 0;
+
 void printSaNameT (SaNameT *name)
 void printSaNameT (SaNameT *name)
 {
 {
 	int i;
 	int i;
@@ -153,142 +156,165 @@ SaCkptIOVectorElementT WriteVectorElements[] = {
 int runs = 0;
 int runs = 0;
 
 
 struct threaddata {
 struct threaddata {
-	SaCkptHandleT ckptHandle;
-	SaCkptCheckpointHandleT checkpointHandle;
-	int write_count;
+	SaCkptHandleT ckpt_handle;
+	SaCkptCheckpointHandleT checkpoint_handle;
 	int write_size;
 	int write_size;
 	int thread;
 	int thread;
+	pthread_attr_t thread_attr;
+	pthread_t thread_id;
+	int written;
 };
 };
 
 
+
 void *benchmark_thread (void *arg) 
 void *benchmark_thread (void *arg) 
 {
 {
 	
 	
-	SaCkptCheckpointHandleT checkpointHandle;
-	SaCkptHandleT ckptHandle;
-	int write_count;
+	SaCkptCheckpointHandleT checkpoint_handle;
+	SaCkptHandleT ckpt_handle;
 	int write_size;
 	int write_size;
 	SaAisErrorT error;
 	SaAisErrorT error;
 	SaUint32T erroroneousVectorIndex = 0;
 	SaUint32T erroroneousVectorIndex = 0;
 	struct threaddata *td = (struct threaddata *)arg;
 	struct threaddata *td = (struct threaddata *)arg;
-	int ckptinv;
 
 
-	checkpointHandle = td->checkpointHandle;
-	ckptHandle = td->ckptHandle;
-	write_count = td->write_count;
+	checkpoint_handle = td->checkpoint_handle;
+	ckpt_handle = td->ckpt_handle;
 	write_size = td->write_size;
 	write_size = td->write_size;
 
 
 	WriteVectorElements[0].dataSize = write_size;
 	WriteVectorElements[0].dataSize = write_size;
 
 
-	for (ckptinv = 0; ckptinv < write_count; ckptinv++) {
+	do {
 		/*
 		/*
 		 * Test checkpoint write
 		 * Test checkpoint write
 		 */
 		 */
 		do {
 		do {
-		error = saCkptCheckpointWrite (checkpointHandle,
+		error = saCkptCheckpointWrite (checkpoint_handle,
 			WriteVectorElements,
 			WriteVectorElements,
 			1,
 			1,
 			&erroroneousVectorIndex);
 			&erroroneousVectorIndex);
 		} while (error == SA_AIS_ERR_TRY_AGAIN);
 		} while (error == SA_AIS_ERR_TRY_AGAIN);
 		if (error != SA_AIS_OK) {
 		if (error != SA_AIS_OK) {
-			printf ("saCkptCheckpointWrite result %d (should be 1)\n", error);
+			printf ("saCkptCheckpointWrite result %d handle (should be 1)\n", error);
 			exit (1);
 			exit (1);
 		}
 		}
-	}
+		td->written += 1;
+	} while (alarm_notice == 0);
 	pthread_exit (0);
 	pthread_exit (0);
 }
 }
 
 
 
 
-void threaded_bench (SaCkptHandleT *ckptHandles, SaCkptCheckpointHandleT *checkpointHandles,
-	int threads, int write_count, int write_size)
+void threaded_bench (
+	SaCkptHandleT *ckpt_handles,
+	SaCkptCheckpointHandleT *checkpoint_handles,
+	int threads,
+	int write_size)
 {
 {
 	struct timeval tv1, tv2, tv_elapsed;
 	struct timeval tv1, tv2, tv_elapsed;
 	struct threaddata td[100];
 	struct threaddata td[100];
 	int i;
 	int i;
-	pthread_t threadt[100];
 	int res;
 	int res;
+	int written = 0;
 
 
 	runs = threads;
 	runs = threads;
 	gettimeofday (&tv1, NULL);
 	gettimeofday (&tv1, NULL);
 
 
 	for (i = 0; i < threads; i++) {
 	for (i = 0; i < threads; i++) {
-		td[i].ckptHandle = ckptHandles[i];
-		td[i].checkpointHandle = checkpointHandles[i];
-		td[i].write_count = write_count;
+		td[i].ckpt_handle = ckpt_handles[i];
+		td[i].checkpoint_handle = checkpoint_handles[i];
 		td[i].write_size = write_size;
 		td[i].write_size = write_size;
 		td[i].thread = i;
 		td[i].thread = i;
+		td[i].written = 0;
+		pthread_attr_init (&td[i].thread_attr);
+		pthread_attr_setstacksize (&td[i].thread_attr, 16384);
+		pthread_attr_setdetachstate (&td[i].thread_attr, PTHREAD_CREATE_JOINABLE);
 
 
-		res = pthread_create (&threadt[i], NULL, benchmark_thread, (void *)&td[i]);
+		res = pthread_create (&td[i].thread_id, &td[i].thread_attr,
+			benchmark_thread, (void *)&td[i]);
 	}
 	}
 
 
 	for (i = 0; i < threads; i++) {
 	for (i = 0; i < threads; i++) {
-		pthread_join (threadt[i], NULL);
+		pthread_join (td[i].thread_id, NULL);
+		written += td[i].written;
 	}
 	}
+	alarm_notice = 0;
 
 
 	gettimeofday (&tv2, NULL);
 	gettimeofday (&tv2, NULL);
 	timersub (&tv2, &tv1, &tv_elapsed);
 	timersub (&tv2, &tv1, &tv_elapsed);
 
 
-	printf ("%5d Writes ", write_count * threads);
+	printf ("%5d Writes ", written);
 	printf ("%5d bytes per write ", write_size);
 	printf ("%5d bytes per write ", write_size);
 	printf ("%7.3f Seconds runtime ", 
 	printf ("%7.3f Seconds runtime ", 
 		(tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
 		(tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
 	printf ("%9.3f TP/s ",
 	printf ("%9.3f TP/s ",
-		((float)write_count * (float)threads) /  (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
+		((float)written) /  (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
 	printf ("%7.3f MB/s.\n", 
 	printf ("%7.3f MB/s.\n", 
-		((float)write_count * (float)threads) * ((float)write_size) /  ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
+		((float)written) * ((float)write_size) /  ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
+}
+
+SaNameT checkpointName;
+
+#define CHECKPOINT_THREADS_START 25
+#define CHECKPOINT_THREADS_MAX 500
+ 
+void sigalrm_handler (int num)
+{
+	alarm_notice = 1;
 }
 }
 
 
-SaNameT checkpointName = { 12, "abra\0" };
 
 
-#define CHECKPOINT_THREADS 50
 int main (void) {
 int main (void) {
-	SaCkptHandleT ckptHandles[500];
-	SaCkptCheckpointHandleT checkpointHandles[500];
+	SaCkptHandleT ckpt_handles[CHECKPOINT_THREADS_MAX];
+	SaCkptCheckpointHandleT checkpoint_handles[CHECKPOINT_THREADS_MAX];
 	SaAisErrorT error;
 	SaAisErrorT error;
 	int size;
 	int size;
-	int count;
 	int i, j;
 	int i, j;
 	
 	
+	signal (SIGALRM, sigalrm_handler); 
+
+	printf ("Creating (%d) checkpoints.\n", CHECKPOINT_THREADS_MAX);
 	/*
 	/*
-	 * Create CHECPOINT_THREADS checkpoints
+	 * Create CHECPOINT_THREADS_MAX checkpoints
 	 */
 	 */
-	for (i  = 0; i < CHECKPOINT_THREADS; i++) {
-		sprintf ((char *)checkpointName.value, "checkpoint%d \n", i);
-		error = saCkptInitialize (&ckptHandles[i], &callbacks, &version);
+	for (i  = 0; i < CHECKPOINT_THREADS_MAX; i++) {
+		sprintf ((char *)checkpointName.value, "checkpoint (%d)", i);
+		checkpointName.length = strlen (checkpointName.value);
+		do {
+			error = saCkptInitialize (&ckpt_handles[i], &callbacks, &version);
+		} while (error == SA_AIS_ERR_TRY_AGAIN);
 		assert (error == SA_AIS_OK);
 		assert (error == SA_AIS_OK);
 
 
-		error = saCkptCheckpointOpen (ckptHandles[i],
+		do {
+		error = saCkptCheckpointOpen (ckpt_handles[i],
 			&checkpointName,
 			&checkpointName,
 			&checkpointCreationAttributes,
 			&checkpointCreationAttributes,
 			SA_CKPT_CHECKPOINT_CREATE|SA_CKPT_CHECKPOINT_READ|SA_CKPT_CHECKPOINT_WRITE,
 			SA_CKPT_CHECKPOINT_CREATE|SA_CKPT_CHECKPOINT_READ|SA_CKPT_CHECKPOINT_WRITE,
 			SA_TIME_END,
 			SA_TIME_END,
-			&checkpointHandles[i]);
+			&checkpoint_handles[i]);
+		} while (error == SA_AIS_ERR_TRY_AGAIN);
 		assert (error == SA_AIS_OK);
 		assert (error == SA_AIS_OK);
 
 
-		error = saCkptSectionCreate (checkpointHandles[i],
-			&sectionCreationAttributes1,
-			"Initial Data #0",
-			strlen ("Initial Data #0") + 1);
+		do {
+			error = saCkptSectionCreate (checkpoint_handles[i],
+				&sectionCreationAttributes1,
+				"Initial Data #0",
+				strlen ("Initial Data #0") + 1);
+		} while (error == SA_AIS_ERR_TRY_AGAIN);
 		assert (error == SA_AIS_OK);
 		assert (error == SA_AIS_OK);
 
 
-		error = saCkptSectionCreate (checkpointHandles[i],
-			&sectionCreationAttributes2,
-			"Initial Data #0",
-			strlen ("Initial Data #0") + 1);
+		do {
+			error = saCkptSectionCreate (checkpoint_handles[i],
+				&sectionCreationAttributes2,
+				"Initial Data #0",
+				strlen ("Initial Data #0") + 1);
+		} while (error == SA_AIS_ERR_TRY_AGAIN);
 		assert (error == SA_AIS_OK);
 		assert (error == SA_AIS_OK);
 	}
 	}
 
 
-	for (i = 25; i < CHECKPOINT_THREADS; i++) {	/* i threads */
-		count = 3000; /* initial write count */
+	for (i = CHECKPOINT_THREADS_START; i < CHECKPOINT_THREADS_MAX; i++) {	/* i threads */
+		printf ("Starting benchmark with (%d) threads.\n", i);
 		size = 10000; /* initial size */
 		size = 10000; /* initial size */
-		printf ("THREADS %d\n", i);
 		for (j = 0; j < 5; j++) { /* number of runs with i threads */
 		for (j = 0; j < 5; j++) { /* number of runs with i threads */
-			threaded_bench (ckptHandles, checkpointHandles, i, count, size);
-			/*
-			 * Adjust count to 95% of previous count
-			 * adjust size upwards by 1500
-			 * This keeps the run times similiar
-			 */
-			count = (((float)count) * 0.95);
+			alarm (10);
+			threaded_bench (ckpt_handles, checkpoint_handles, i, size);
 			size += 1000;
 			size += 1000;
 		}
 		}
 	}
 	}