Просмотр исходного кода

test: Fold cpgbench into cpghum (#205)

* test: Fold cpgbench into cpghum

cpgbench and cpghum share a lot of code & concepts so it makes
sense to merge them into a single test program that can both
benchmark and sanity check CPG.

Signed-off-by: Christine Caulfield <ccaulfie@redhat.com>
Reviewed-by: Jan Friesse <jfriesse@redhat.com>
Chrissie Caulfield 8 лет назад
Родитель
Сommit
0be72f03f3
1 измененных файлов с 133 добавлено и 52 удалено
  1. 133 52
      test/cpghum.c

+ 133 - 52
test/cpghum.c

@@ -92,6 +92,7 @@ static unsigned int crc_errors=0;
 static unsigned int sequence_errors=0;
 static unsigned int packets_sent=0;
 static unsigned int packets_recvd=0;
+static unsigned int packets_recvd1=0; /* For flood intermediates */
 static unsigned int send_retries=0;
 static unsigned int send_fails=0;
 static unsigned long avg_rtt=0;
@@ -101,6 +102,7 @@ static unsigned long min_rtt=LONG_MAX;
 struct cpghum_header {
 	unsigned int counter;
 	unsigned int crc;
+	unsigned int size;
 	struct timeval timestamp;
 };
 
@@ -115,8 +117,8 @@ static void cpg_bm_confchg_fn (
 
 static unsigned int g_recv_count;
 static unsigned int g_recv_length;
-static unsigned int g_write_size;
-static int g_recv_counter = 0;
+static int g_recv_counter = 1;
+static int g_check_counter = 1;
 
 static void cpg_bm_deliver_fn (
 	cpg_handle_t handle_in,
@@ -130,10 +132,12 @@ static void cpg_bm_deliver_fn (
 	struct cpghum_header *header = (struct cpghum_header *)msg;
 	uLong recv_crc = header->crc & 0xFFFFFFFF;
 	unsigned int *dataint = (unsigned int *)((char*)msg + sizeof(struct cpghum_header));
-	unsigned int datalen = g_write_size - sizeof(struct cpghum_header);
+	unsigned int datalen;
 
 	packets_recvd++;
+	packets_recvd1++;
 	g_recv_length = msg_len;
+	datalen = header->size - sizeof(struct cpghum_header);
 
 	// Report RTT first in case abort_on_error is set
 	if (nodeid == g_our_nodeid) {
@@ -162,11 +166,11 @@ static void cpg_bm_deliver_fn (
 	}
 
 	// Basic check, packets should all be the right size
-	if (g_write_size && (msg_len != g_write_size)) {
+	if (msg_len != header->size) {
 		length_errors++;
-		fprintf(stderr, "%s: message sizes don't match. got %zu, expected %u\n", group_name->value, msg_len, g_write_size);
+		fprintf(stderr, "%s: message sizes don't match. got %zu, expected %u\n", group_name->value, msg_len, header->size);
 		if (do_syslog) {
-			syslog(LOG_ERR, "%s: message sizes don't match. got %zu, expected %u\n", group_name->value, msg_len, g_write_size);
+			syslog(LOG_ERR, "%s: message sizes don't match. got %zu, expected %u\n", group_name->value, msg_len, header->size);
 		}
 
 		if (abort_on_error) {
@@ -175,7 +179,7 @@ static void cpg_bm_deliver_fn (
 	}
 
 	// Sequence counters are incrementing in step?
-	if (header->counter != g_recv_counter) {
+	if (g_check_counter && (header->counter != g_recv_counter)) {
 		sequence_errors++;
 		fprintf(stderr, "%s: counters don't match. got %d, expected %d\n", group_name->value, header->counter, g_recv_counter);
 		if (do_syslog) {
@@ -227,6 +231,79 @@ static struct cpg_name group_name = {
 	.length = 7
 };
 
+static void set_packet(int write_size, int counter)
+{
+	struct cpghum_header *header = (struct cpghum_header *)data;
+	int i;
+	unsigned int *dataint = (unsigned int *)(data + sizeof(struct cpghum_header));
+	unsigned int datalen = write_size - sizeof(struct cpghum_header);
+	struct timeval tv1;
+	uLong crc;
+
+	header->counter = counter;
+	for (i=0; i<(datalen/4); i++) {
+		dataint[i] = rand();
+	}
+	crc = crc32(0, NULL, 0);
+	header->crc = crc32(crc, (Bytef*)&dataint[0], datalen);
+	header->size = write_size;
+
+	gettimeofday (&tv1, NULL);
+	memcpy(&header->timestamp, &tv1, sizeof(struct timeval));
+}
+
+
+static void cpg_flood (
+	cpg_handle_t handle_in,
+	int write_size)
+{
+	struct timeval tv1, tv2, tv_elapsed;
+	struct iovec iov;
+	unsigned int res = CS_OK;
+
+	alarm_notice = 0;
+	iov.iov_base = data;
+	iov.iov_len = write_size;
+
+	alarm (10);
+	packets_recvd1 = 0;
+
+	gettimeofday (&tv1, NULL);
+	do {
+		/* Only increment the packet counter if it sucessfully sent */
+		if (res != CS_ERR_TRY_AGAIN) {
+			set_packet(write_size, send_counter++);
+		}
+
+		res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1);
+		if (res == CS_OK) {
+			packets_sent++;
+		}
+		else {
+			if (res == CS_ERR_TRY_AGAIN) {
+				send_retries++;
+			}
+			else {
+				send_fails++;
+			}
+		}
+	} while (!stopped && alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN));
+	gettimeofday (&tv2, NULL);
+	timersub (&tv2, &tv1, &tv_elapsed);
+
+	if (!quiet) {
+		printf ("%5d messages received ", packets_recvd1);
+		printf ("%5d bytes per write ", write_size);
+		printf ("%7.3f Seconds runtime ",
+			(tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
+		printf ("%9.3f TP/s ",
+			((float)packets_recvd1) /  (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
+		printf ("%7.3f MB/s.\n",
+			((float)packets_recvd1) * ((float)write_size) /  ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
+	}
+
+}
+
 static void cpg_test (
 	cpg_handle_t handle_in,
 	int write_size,
@@ -236,11 +313,6 @@ static void cpg_test (
 	struct timeval tv1, tv2, tv_elapsed;
 	struct iovec iov;
 	unsigned int res;
-	int i;
-	unsigned int *dataint = (unsigned int *)(data + sizeof(struct cpghum_header));
-	unsigned int datalen = write_size - sizeof(struct cpghum_header);
-	uLong crc;
-	struct cpghum_header *header = (struct cpghum_header *)data;
 
 	alarm_notice = 0;
 	iov.iov_base = data;
@@ -250,15 +322,9 @@ static void cpg_test (
 	alarm (print_time);
 
 	do {
-		header->counter = send_counter++;
-		for (i=0; i<(datalen/4); i++) {
-			dataint[i] = rand();
-		}
-		crc = crc32(0, NULL, 0);
-		header->crc = crc32(crc, (Bytef*)&dataint[0], datalen);
+		send_counter++;
 	resend:
-		gettimeofday (&tv1, NULL);
-		memcpy(&header->timestamp, &tv1, sizeof(struct timeval));
+		set_packet(write_size, send_counter);
 
 		res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1);
 		if (res == CS_ERR_TRY_AGAIN) {
@@ -313,9 +379,6 @@ static void usage(char *cmd)
 	fprintf(stderr, "%s can also be asked to simply listen for (and check) packets\n", cmd);
 	fprintf(stderr, "so that there is another node in the cluster connected to the CPG.\n");
 	fprintf(stderr, "\n");
-	fprintf(stderr, "When -l is present, packet size is only checked if specified by -w or -W\n");
-	fprintf(stderr, "and it, obviously, must match that of the sender.\n");
-	fprintf(stderr, "\n");
 	fprintf(stderr, "Multiple copies, in different CPGs, can also be run on the same or\n");
 	fprintf(stderr, "different nodes by using the -n option.\n");
 	fprintf(stderr, "\n");
@@ -332,8 +395,10 @@ static void usage(char *cmd)
 	fprintf(stderr, "	-t    Report Round Trip Times for each packet.\n");
 	fprintf(stderr, "	-m    cpg_initialise() model. Default 1.\n");
 	fprintf(stderr, "	-s    Also send errors to syslog (for daemon log correlation).\n");
+	fprintf(stderr, "	-f    Flood test CPG (like cpgbench). -W defaults to 64 in this case.\n");
 	fprintf(stderr, "	-a    Abort on crc/length/sequence error\n");
 	fprintf(stderr, "	-q    Quiet. Don't print messages every 10 seconds (see also -p)\n");
+	fprintf(stderr, "	-qq   Very quiet. Don't print stats at the end)\n");
 	fprintf(stderr, "\n");
 	fprintf(stderr, "%s exit code is 0 if no error happened, 1 on generic error and 2 on\n", cmd);
 	fprintf(stderr, "send/crc/length/sequence error");
@@ -352,9 +417,10 @@ int main (int argc, char *argv[]) {
 	int print_time = 10;
 	int have_size = 0;
 	int listen_only = 0;
+	int flood = 0;
 	int model = 1;
 
-	while ( (opt = getopt(argc, argv, "qlstan:d:r:p:m:w:W:")) != -1 ) {
+	while ( (opt = getopt(argc, argv, "qlstafn:d:r:p:m:w:W:")) != -1 ) {
 		switch (opt) {
 		case 'w': // Write size in K
 			bs = atoi(optarg);
@@ -382,6 +448,10 @@ int main (int argc, char *argv[]) {
 		case 't':
 			report_rtt = 1;
 			break;
+		case 'f':
+			flood = 1;
+			g_check_counter = 0; /* packets can get 'lost' */
+			break;
 		case 'a':
 			abort_on_error = 1;
 			break;
@@ -401,7 +471,7 @@ int main (int argc, char *argv[]) {
 			do_syslog = 1;
 			break;
 		case 'q':
-			quiet = 1;
+			quiet++;
 			break;
 		case 'm':
 			model = atoi(optarg);
@@ -416,14 +486,16 @@ int main (int argc, char *argv[]) {
 		}
 	}
 
+	if (!have_size && flood) {
+		write_size = 64;
+	}
+
 	qb_log_init("cpghum", LOG_USER, LOG_EMERG);
 	qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
 	qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
 			  QB_LOG_FILTER_FILE, "*", LOG_DEBUG);
 	qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
 
-	g_write_size = write_size;
-
 	signal (SIGALRM, sigalrm_handler);
 	signal (SIGINT, sigint_handler);
 	switch (model) {
@@ -459,11 +531,6 @@ int main (int argc, char *argv[]) {
 			printf("-- Ignore any starting \"counters don't match\" error while we catch up\n");
 		}
 
-		/* Only check packet size if specified on the command-line */
-		if (!have_size) {
-			g_write_size = 0;
-		}
-
 		while (!stopped) {
 			sleep(1);
 			if (++secs > print_time && !quiet) {
@@ -477,13 +544,25 @@ int main (int argc, char *argv[]) {
 	}
 	else {
 		cpg_max_atomic_msgsize_get (handle, &maxsize);
-		if ( write_size > maxsize) {
+		if (write_size > maxsize) {
 			fprintf(stderr, "INFO: packet size (%d) is larger than the maximum atomic size (%d), libcpg will fragment\n",
 				write_size, maxsize);
 		}
-		for (i = 0; i < repetitions && !stopped; i++) {
-			cpg_test (handle, write_size, delay_time, print_time);
-			signal (SIGALRM, sigalrm_handler);
+		if (flood) {
+			for (i = 0; i < 10; i++) { /* number of repetitions - up to 50k */
+				cpg_flood (handle, write_size);
+				signal (SIGALRM, sigalrm_handler);
+				write_size *= 5;
+				if (write_size >= (ONE_MEG - 100)) {
+					break;
+				}
+			}
+		}
+		else {
+			for (i = 0; i < repetitions && !stopped; i++) {
+				cpg_test (handle, write_size, delay_time, print_time);
+				signal (SIGALRM, sigalrm_handler);
+			}
 		}
 	}
 
@@ -493,26 +572,28 @@ int main (int argc, char *argv[]) {
 		exit (1);
 	}
 
-	printf("\n");
-	printf("Stats:\n");
-	if (!listen_only) {
-		printf("   packets sent:    %d\n", packets_sent);
-		printf("   send failures:   %d\n", send_fails);
-		printf("   send retries:    %d\n", send_retries);
-	}
-	if (have_size) {
+	if (quiet < 2) {
+		printf("\n");
+		printf("Stats:\n");
+		if (!listen_only) {
+			printf("   packets sent:    %d\n", packets_sent);
+			printf("   send failures:   %d\n", send_fails);
+			printf("   send retries:    %d\n", send_retries);
+		}
 		printf("   length errors:   %d\n", length_errors);
+		printf("   packets recvd:   %d\n", packets_recvd);
+		printf("   sequence errors: %d\n", sequence_errors);
+		printf("   crc errors:	    %d\n", crc_errors);
+		if (!listen_only) {
+			printf("   min RTT:         %ld\n", min_rtt);
+			printf("   max RTT:         %ld\n", max_rtt);
+			printf("   avg RTT:         %ld\n", avg_rtt);
+		}
+		printf("\n");
 	}
-	printf("   packets recvd:   %d\n", packets_recvd);
-	printf("   sequence errors: %d\n", sequence_errors);
-	printf("   crc errors:	    %d\n", crc_errors);
-	if (!listen_only) {
-		printf("   max RTT:         %ld\n", max_rtt);
-		printf("   avg RTT:         %ld\n", avg_rtt);
-	}
-	printf("\n");
 
 	res = 0;
+
 	if (send_fails > 0 || (have_size && length_errors > 0) || sequence_errors > 0 || crc_errors > 0) {
 		res = 2;
 	}