|
|
@@ -1,5 +1,5 @@
|
|
|
/*
|
|
|
- * Copyright (c) 2015-2016 Red Hat, Inc.
|
|
|
+ * Copyright (c) 2015-2017 Red Hat, Inc.
|
|
|
*
|
|
|
* All rights reserved.
|
|
|
*
|
|
|
@@ -40,6 +40,8 @@
|
|
|
#include <errno.h>
|
|
|
#include <time.h>
|
|
|
#include <limits.h>
|
|
|
+#include <syslog.h>
|
|
|
+#include <stdarg.h>
|
|
|
#include <sys/time.h>
|
|
|
#include <sys/types.h>
|
|
|
#include <sys/socket.h>
|
|
|
@@ -52,9 +54,6 @@
|
|
|
#include <zlib.h>
|
|
|
#include <libgen.h>
|
|
|
|
|
|
-#include <qb/qblog.h>
|
|
|
-#include <qb/qbutil.h>
|
|
|
-
|
|
|
#include <corosync/corotypes.h>
|
|
|
#include <corosync/cpg.h>
|
|
|
|
|
|
@@ -75,6 +74,7 @@ static pthread_t thread;
|
|
|
#endif /* timersub */
|
|
|
|
|
|
static int alarm_notice;
|
|
|
+#define MAX_NODEID 65536
|
|
|
#define ONE_MEG 1048576
|
|
|
#define DATASIZE (ONE_MEG*20)
|
|
|
static char data[DATASIZE];
|
|
|
@@ -83,6 +83,9 @@ static int do_syslog = 0;
|
|
|
static int quiet = 0;
|
|
|
static int report_rtt = 0;
|
|
|
static int abort_on_error = 0;
|
|
|
+static int machine_readable = 0;
|
|
|
+static char delimiter = ',';
|
|
|
+static int to_stderr = 0;
|
|
|
static unsigned int g_our_nodeid;
|
|
|
static volatile int stopped;
|
|
|
|
|
|
@@ -117,8 +120,73 @@ static void cpg_bm_confchg_fn (
|
|
|
|
|
|
static unsigned int g_recv_count;
|
|
|
static unsigned int g_recv_length;
|
|
|
-static int g_recv_counter = 1;
|
|
|
-static int g_check_counter = 1;
|
|
|
+static int g_recv_start[MAX_NODEID+1];
|
|
|
+static int g_recv_counter[MAX_NODEID+1];
|
|
|
+static int g_recv_size[MAX_NODEID+1];
|
|
|
+static int g_log_mask = 0xFFFF;
|
|
|
+typedef enum
|
|
|
+{
|
|
|
+ CPGH_LOG_INFO = 1,
|
|
|
+ CPGH_LOG_PERF = 2,
|
|
|
+ CPGH_LOG_RTT = 4,
|
|
|
+ CPGH_LOG_STATS = 8,
|
|
|
+ CPGH_LOG_ERR = 16
|
|
|
+} log_type_t;
|
|
|
+
|
|
|
+static void cpgh_print_message(int syslog_level, const char *facility_name, const char *format, va_list ap)
|
|
|
+{
|
|
|
+ char msg[1024];
|
|
|
+ int start = 0;
|
|
|
+
|
|
|
+ if (machine_readable) {
|
|
|
+ snprintf(msg, sizeof(msg), "%s%c ", facility_name, delimiter);
|
|
|
+ start = strlen(msg);
|
|
|
+ }
|
|
|
+
|
|
|
+ vsnprintf(msg+start, sizeof(msg)-start, format, ap);
|
|
|
+ if (to_stderr || (syslog_level <= LOG_ERR)) {
|
|
|
+ fprintf(stderr, "%s", msg);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ printf("%s", msg);
|
|
|
+ }
|
|
|
+ if (do_syslog) {
|
|
|
+ syslog(syslog_level, "%s", msg);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+static void cpgh_log_printf(log_type_t type, const char *format, ...)
|
|
|
+{
|
|
|
+ va_list ap;
|
|
|
+
|
|
|
+ if (!(type & g_log_mask)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ va_start(ap, format);
|
|
|
+
|
|
|
+ switch (type) {
|
|
|
+ case CPGH_LOG_INFO:
|
|
|
+ cpgh_print_message(LOG_INFO, "[Info]", format, ap);
|
|
|
+ break;
|
|
|
+ case CPGH_LOG_PERF:
|
|
|
+ cpgh_print_message(LOG_INFO, "[Perf]", format, ap);
|
|
|
+ break;
|
|
|
+ case CPGH_LOG_RTT:
|
|
|
+ cpgh_print_message(LOG_INFO, "[RTT]", format, ap);
|
|
|
+ break;
|
|
|
+ case CPGH_LOG_STATS:
|
|
|
+ cpgh_print_message(LOG_INFO, "[Stats]", format, ap);
|
|
|
+ break;
|
|
|
+ case CPGH_LOG_ERR:
|
|
|
+ cpgh_print_message(LOG_ERR, "[Err]", format, ap);
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ va_end(ap);
|
|
|
+}
|
|
|
|
|
|
static void cpg_bm_deliver_fn (
|
|
|
cpg_handle_t handle_in,
|
|
|
@@ -134,6 +202,11 @@ static void cpg_bm_deliver_fn (
|
|
|
unsigned int *dataint = (unsigned int *)((char*)msg + sizeof(struct cpghum_header));
|
|
|
unsigned int datalen;
|
|
|
|
|
|
+ if (nodeid > MAX_NODEID) {
|
|
|
+ cpgh_log_printf(CPGH_LOG_ERR, "Got message from invalid nodeid %d (too high for us). Quitting\n", nodeid);
|
|
|
+ exit(1);
|
|
|
+ }
|
|
|
+
|
|
|
packets_recvd++;
|
|
|
packets_recvd1++;
|
|
|
g_recv_length = msg_len;
|
|
|
@@ -143,7 +216,7 @@ static void cpg_bm_deliver_fn (
|
|
|
if (nodeid == g_our_nodeid) {
|
|
|
struct timeval tv1;
|
|
|
struct timeval rtt;
|
|
|
- unsigned long rtt_usecs = rtt.tv_usec + rtt.tv_sec*1000000;
|
|
|
+ unsigned long rtt_usecs;
|
|
|
|
|
|
gettimeofday (&tv1, NULL);
|
|
|
timersub(&tv1, &header->timestamp, &rtt);
|
|
|
@@ -155,12 +228,21 @@ static void cpg_bm_deliver_fn (
|
|
|
if (rtt_usecs < min_rtt) {
|
|
|
min_rtt = rtt_usecs;
|
|
|
}
|
|
|
- avg_rtt = ((avg_rtt * g_recv_counter) + rtt_usecs) / (g_recv_counter+1);
|
|
|
+
|
|
|
+ /* Don't start the average with 0 */
|
|
|
+ if (avg_rtt == 0) {
|
|
|
+ avg_rtt = rtt_usecs;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ avg_rtt = ((avg_rtt * g_recv_counter[nodeid]) + rtt_usecs) / (g_recv_counter[nodeid]+1);
|
|
|
+ }
|
|
|
|
|
|
if (report_rtt) {
|
|
|
- fprintf(stderr, "%s: RTT %ld uS (min/avg/max): %ld/%ld/%ld\n", group_name->value, rtt_usecs, min_rtt, avg_rtt, max_rtt);
|
|
|
- if (do_syslog) {
|
|
|
- syslog(LOG_ERR, "%s: RTT %ld uS (min/avg/max): %ld/%ld/%ld\n", group_name->value, rtt_usecs, min_rtt, avg_rtt, max_rtt);
|
|
|
+ if (machine_readable) {
|
|
|
+ cpgh_log_printf(CPGH_LOG_RTT, "%ld%c %ld%c %ld%c %ld\n", rtt_usecs, delimiter, min_rtt, delimiter, avg_rtt, delimiter, max_rtt);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ cpgh_log_printf(CPGH_LOG_RTT, "%s: RTT %ld uS (min/avg/max): %ld/%ld/%ld\n", group_name->value, rtt_usecs, min_rtt, avg_rtt, max_rtt);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -168,44 +250,44 @@ static void cpg_bm_deliver_fn (
|
|
|
// Basic check, packets should all be the right size
|
|
|
if (msg_len != header->size) {
|
|
|
length_errors++;
|
|
|
- fprintf(stderr, "%s: message sizes don't match. got %zu, expected %u\n", group_name->value, msg_len, header->size);
|
|
|
- if (do_syslog) {
|
|
|
- syslog(LOG_ERR, "%s: message sizes don't match. got %zu, expected %u\n", group_name->value, msg_len, header->size);
|
|
|
- }
|
|
|
+ cpgh_log_printf(CPGH_LOG_ERR, "%s: message sizes don't match. got %zu, expected %u from node %d\n", group_name->value, msg_len, header->size, nodeid);
|
|
|
|
|
|
if (abort_on_error) {
|
|
|
exit(2);
|
|
|
}
|
|
|
}
|
|
|
+ g_recv_size[nodeid] = msg_len;
|
|
|
|
|
|
// Sequence counters are incrementing in step?
|
|
|
- if (g_check_counter && (header->counter != g_recv_counter)) {
|
|
|
- sequence_errors++;
|
|
|
- fprintf(stderr, "%s: counters don't match. got %d, expected %d\n", group_name->value, header->counter, g_recv_counter);
|
|
|
- if (do_syslog) {
|
|
|
- syslog(LOG_ERR, "%s: counters don't match. got %d, expected %d\n", group_name->value, header->counter, g_recv_counter);
|
|
|
- }
|
|
|
+ if (header->counter != g_recv_counter[nodeid]) {
|
|
|
|
|
|
- if (abort_on_error) {
|
|
|
- exit(2);
|
|
|
+ /* Don't report the first mismatch or a newly restarted sender, we're just catching up */
|
|
|
+ if (g_recv_counter[nodeid] && header->counter) {
|
|
|
+ sequence_errors++;
|
|
|
+ cpgh_log_printf(CPGH_LOG_ERR, "%s: counters don't match. got %d, expected %d from node %d\n", group_name->value, header->counter, g_recv_counter[nodeid], nodeid);
|
|
|
+
|
|
|
+ if (abort_on_error) {
|
|
|
+ exit(2);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ g_recv_start[nodeid] = header->counter;
|
|
|
}
|
|
|
|
|
|
- // Catch up or we'll be printing errors for ever
|
|
|
- g_recv_counter = header->counter +1;
|
|
|
+ /* Catch up or we'll be printing errors for ever */
|
|
|
+ g_recv_counter[nodeid] = header->counter+1;
|
|
|
}
|
|
|
else {
|
|
|
- g_recv_counter++;
|
|
|
+ g_recv_counter[nodeid]++;
|
|
|
}
|
|
|
|
|
|
- // Check crc
|
|
|
+ /* Check crc */
|
|
|
crc = crc32(0, NULL, 0);
|
|
|
crc = crc32(crc, (Bytef *)dataint, datalen) & 0xFFFFFFFF;
|
|
|
if (crc != recv_crc) {
|
|
|
crc_errors++;
|
|
|
- fprintf(stderr, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc);
|
|
|
- if (do_syslog) {
|
|
|
- syslog(LOG_ERR, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc);
|
|
|
- }
|
|
|
+ cpgh_log_printf(CPGH_LOG_ERR, "%s: CRCs don't match. got %lx, expected %lx from nodeid %d\n", group_name->value, recv_crc, crc, nodeid);
|
|
|
+
|
|
|
if (abort_on_error) {
|
|
|
exit(2);
|
|
|
}
|
|
|
@@ -252,7 +334,7 @@ static void set_packet(int write_size, int counter)
|
|
|
memcpy(&header->timestamp, &tv1, sizeof(struct timeval));
|
|
|
}
|
|
|
|
|
|
-
|
|
|
+/* Basically this is cpgbench.c */
|
|
|
static void cpg_flood (
|
|
|
cpg_handle_t handle_in,
|
|
|
int write_size)
|
|
|
@@ -270,14 +352,15 @@ static void cpg_flood (
|
|
|
|
|
|
gettimeofday (&tv1, NULL);
|
|
|
do {
|
|
|
- /* Only increment the packet counter if it sucessfully sent */
|
|
|
- if (res != CS_ERR_TRY_AGAIN) {
|
|
|
- set_packet(write_size, send_counter++);
|
|
|
+ if (res == CS_OK) {
|
|
|
+ set_packet(write_size, send_counter);
|
|
|
}
|
|
|
|
|
|
res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1);
|
|
|
if (res == CS_OK) {
|
|
|
+ /* Only increment the packet counter if it was sucessfully sent */
|
|
|
packets_sent++;
|
|
|
+ send_counter++;
|
|
|
}
|
|
|
else {
|
|
|
if (res == CS_ERR_TRY_AGAIN) {
|
|
|
@@ -292,16 +375,23 @@ static void cpg_flood (
|
|
|
timersub (&tv2, &tv1, &tv_elapsed);
|
|
|
|
|
|
if (!quiet) {
|
|
|
- printf ("%5d messages received ", packets_recvd1);
|
|
|
- printf ("%5d bytes per write ", write_size);
|
|
|
- printf ("%7.3f Seconds runtime ",
|
|
|
- (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
|
|
|
- printf ("%9.3f TP/s ",
|
|
|
- ((float)packets_recvd1) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
|
|
|
- printf ("%7.3f MB/s.\n",
|
|
|
- ((float)packets_recvd1) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
|
|
|
+ if (machine_readable) {
|
|
|
+ cpgh_log_printf (CPGH_LOG_PERF, "%d%c %d%c %f%c %f%c %f\n", packets_recvd1, delimiter, write_size, delimiter,
|
|
|
+ (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)), delimiter,
|
|
|
+ ((float)packets_recvd1) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)), delimiter,
|
|
|
+ ((float)packets_recvd1) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ cpgh_log_printf (CPGH_LOG_PERF, "%5d messages received ", packets_recvd1);
|
|
|
+ cpgh_log_printf (CPGH_LOG_PERF, "%5d bytes per write ", write_size);
|
|
|
+ cpgh_log_printf (CPGH_LOG_PERF, "%7.3f Seconds runtime ",
|
|
|
+ (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
|
|
|
+ cpgh_log_printf (CPGH_LOG_PERF, "%9.3f TP/s ",
|
|
|
+ ((float)packets_recvd1) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
|
|
|
+ cpgh_log_printf (CPGH_LOG_PERF, "%7.3f MB/s.\n",
|
|
|
+ ((float)packets_recvd1) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
static void cpg_test (
|
|
|
@@ -333,7 +423,7 @@ static void cpg_test (
|
|
|
goto resend;
|
|
|
}
|
|
|
if (res != CS_OK) {
|
|
|
- fprintf(stderr, "send failed: %d\n", res);
|
|
|
+ cpgh_log_printf(CPGH_LOG_ERR, "send failed: %d\n", res);
|
|
|
send_fails++;
|
|
|
}
|
|
|
else {
|
|
|
@@ -345,9 +435,14 @@ static void cpg_test (
|
|
|
timersub (&tv2, &tv1, &tv_elapsed);
|
|
|
|
|
|
if (!quiet) {
|
|
|
- printf ("%s: %5d message%s received, ", group_name.value, g_recv_count, g_recv_count==1?"":"s");
|
|
|
- printf ("%5d bytes per write. ", write_size);
|
|
|
- printf ("RTT min/avg/max: %ld/%ld/%ld\n", min_rtt, avg_rtt, max_rtt);
|
|
|
+ if (machine_readable) {
|
|
|
+ cpgh_log_printf(CPGH_LOG_RTT, "%d%c %ld%c %ld%c %ld\n", 0, delimiter, min_rtt, delimiter, avg_rtt, delimiter, max_rtt);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ cpgh_log_printf(CPGH_LOG_PERF, "%s: %5d message%s received, ", group_name.value, g_recv_count, g_recv_count==1?"":"s");
|
|
|
+ cpgh_log_printf(CPGH_LOG_PERF, "%5d bytes per write. ", write_size);
|
|
|
+ cpgh_log_printf(CPGH_LOG_RTT, "RTT min/avg/max: %ld/%ld/%ld\n", min_rtt, avg_rtt, max_rtt);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
}
|
|
|
@@ -382,23 +477,26 @@ static void usage(char *cmd)
|
|
|
fprintf(stderr, "Multiple copies, in different CPGs, can also be run on the same or\n");
|
|
|
fprintf(stderr, "different nodes by using the -n option.\n");
|
|
|
fprintf(stderr, "\n");
|
|
|
- fprintf(stderr, "%s can't handle more than 1 sender in the same CPG as it messes with the\n", cmd);
|
|
|
- fprintf(stderr, "sequence numbers.\n");
|
|
|
+ fprintf(stderr, "%s can handle more than 1 sender in the same CPG provided they are on\n", cmd);
|
|
|
+ fprintf(stderr, "different nodes.\n");
|
|
|
fprintf(stderr, "\n");
|
|
|
- fprintf(stderr, " -w Write size in Kbytes, default 4\n");
|
|
|
- fprintf(stderr, " -W Write size in bytes, default 4096\n");
|
|
|
- fprintf(stderr, " -n CPG name to use, default 'cpghum'\n");
|
|
|
- fprintf(stderr, " -d Delay between sending packets (mS), default 1000\n");
|
|
|
- fprintf(stderr, " -r Number of repetitions, default 100\n");
|
|
|
- fprintf(stderr, " -p Delay between printing output(S), default 10s\n");
|
|
|
- fprintf(stderr, " -l Listen and check CRCs only, don't send (^C to quit)\n");
|
|
|
- fprintf(stderr, " -t Report Round Trip Times for each packet.\n");
|
|
|
- fprintf(stderr, " -m cpg_initialise() model. Default 1.\n");
|
|
|
- fprintf(stderr, " -s Also send errors to syslog (for daemon log correlation).\n");
|
|
|
- fprintf(stderr, " -f Flood test CPG (like cpgbench). -W defaults to 64 in this case.\n");
|
|
|
- fprintf(stderr, " -a Abort on crc/length/sequence error\n");
|
|
|
- fprintf(stderr, " -q Quiet. Don't print messages every 10 seconds (see also -p)\n");
|
|
|
- fprintf(stderr, " -qq Very quiet. Don't print stats at the end)\n");
|
|
|
+ fprintf(stderr, " -w<num> Write size in Kbytes, default 4\n");
|
|
|
+ fprintf(stderr, " -W<num> Write size in bytes, default 4096\n");
|
|
|
+ fprintf(stderr, " -n<name> CPG name to use, default 'cpghum'\n");
|
|
|
+ fprintf(stderr, " -M Write machine-readable results\n");
|
|
|
+ fprintf(stderr, " -D<char> Delimiter for machine-readable results (default ',')\n");
|
|
|
+ fprintf(stderr, " -E Send normal output to stderr instead of stdout\n");
|
|
|
+ fprintf(stderr, " -d<num> Delay between sending packets (mS), default 1000\n");
|
|
|
+ fprintf(stderr, " -r<num> Number of repetitions, default 100\n");
|
|
|
+ fprintf(stderr, " -p<num> Delay between printing output (seconds), default 10s\n");
|
|
|
+ fprintf(stderr, " -l Listen and check CRCs only, don't send (^C to quit)\n");
|
|
|
+ fprintf(stderr, " -t Report Round Trip Times for each packet.\n");
|
|
|
+ fprintf(stderr, " -m<num> cpg_initialise() model. Default 1.\n");
|
|
|
+ fprintf(stderr, " -s Also send errors to syslog (for daemon log correlation).\n");
|
|
|
+ fprintf(stderr, " -f Flood test CPG (cpgbench). -W starts at 64 in this case.\n");
|
|
|
+ fprintf(stderr, " -a Abort on crc/length/sequence error\n");
|
|
|
+ fprintf(stderr, " -q Quiet. Don't print messages every 10 seconds (see also -p)\n");
|
|
|
+ fprintf(stderr, " -qq Very quiet. Don't print stats at the end\n");
|
|
|
fprintf(stderr, "\n");
|
|
|
fprintf(stderr, "%s exit code is 0 if no error happened, 1 on generic error and 2 on\n", cmd);
|
|
|
fprintf(stderr, "send/crc/length/sequence error");
|
|
|
@@ -420,7 +518,7 @@ int main (int argc, char *argv[]) {
|
|
|
int flood = 0;
|
|
|
int model = 1;
|
|
|
|
|
|
- while ( (opt = getopt(argc, argv, "qlstafn:d:r:p:m:w:W:")) != -1 ) {
|
|
|
+ while ( (opt = getopt(argc, argv, "qlstafMEn:d:r:p:m:w:W:D:")) != -1 ) {
|
|
|
switch (opt) {
|
|
|
case 'w': // Write size in K
|
|
|
bs = atoi(optarg);
|
|
|
@@ -448,9 +546,14 @@ int main (int argc, char *argv[]) {
|
|
|
case 't':
|
|
|
report_rtt = 1;
|
|
|
break;
|
|
|
+ case 'E':
|
|
|
+ to_stderr = 1;
|
|
|
+ break;
|
|
|
+ case 'M':
|
|
|
+ machine_readable = 1;
|
|
|
+ break;
|
|
|
case 'f':
|
|
|
flood = 1;
|
|
|
- g_check_counter = 0; /* packets can get 'lost' */
|
|
|
break;
|
|
|
case 'a':
|
|
|
abort_on_error = 1;
|
|
|
@@ -458,6 +561,9 @@ int main (int argc, char *argv[]) {
|
|
|
case 'd':
|
|
|
delay_time = atoi(optarg);
|
|
|
break;
|
|
|
+ case 'D':
|
|
|
+ delimiter = optarg[0];
|
|
|
+ break;
|
|
|
case 'r':
|
|
|
repetitions = atoi(optarg);
|
|
|
break;
|
|
|
@@ -490,12 +596,6 @@ int main (int argc, char *argv[]) {
|
|
|
write_size = 64;
|
|
|
}
|
|
|
|
|
|
- qb_log_init("cpghum", LOG_USER, LOG_EMERG);
|
|
|
- qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
|
|
|
- qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
|
|
|
- QB_LOG_FILTER_FILE, "*", LOG_DEBUG);
|
|
|
- qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
|
|
|
-
|
|
|
signal (SIGALRM, sigalrm_handler);
|
|
|
signal (SIGINT, sigint_handler);
|
|
|
switch (model) {
|
|
|
@@ -511,7 +611,7 @@ int main (int argc, char *argv[]) {
|
|
|
}
|
|
|
|
|
|
if (res != CS_OK) {
|
|
|
- printf ("cpg_initialize failed with result %d\n", res);
|
|
|
+ cpgh_log_printf(CPGH_LOG_ERR, "cpg_initialize failed with result %d\n", res);
|
|
|
exit (1);
|
|
|
}
|
|
|
cpg_local_get(handle, &g_our_nodeid);
|
|
|
@@ -520,25 +620,35 @@ int main (int argc, char *argv[]) {
|
|
|
|
|
|
res = cpg_join (handle, &group_name);
|
|
|
if (res != CS_OK) {
|
|
|
- printf ("cpg_join failed with result %d\n", res);
|
|
|
+ cpgh_log_printf(CPGH_LOG_ERR, "cpg_join failed with result %d\n", res);
|
|
|
exit (1);
|
|
|
}
|
|
|
|
|
|
if (listen_only) {
|
|
|
int secs = 0;
|
|
|
- if (!quiet) {
|
|
|
- printf("-- Listening on CPG %s\n", group_name.value);
|
|
|
- printf("-- Ignore any starting \"counters don't match\" error while we catch up\n");
|
|
|
- }
|
|
|
|
|
|
while (!stopped) {
|
|
|
sleep(1);
|
|
|
if (++secs > print_time && !quiet) {
|
|
|
- printf ("%s: %5d message%s received. %d bytes. RTT min/avg/max: %ld/%ld/%ld\n",
|
|
|
- group_name.value, g_recv_count, g_recv_count==1?"":"s", g_recv_length,
|
|
|
- min_rtt, avg_rtt, max_rtt);
|
|
|
+ int nodes_printed = 0;
|
|
|
+
|
|
|
+ if (!machine_readable) {
|
|
|
+ for (i=1; i<MAX_NODEID; i++) {
|
|
|
+ if (g_recv_counter[i]) {
|
|
|
+ cpgh_log_printf(CPGH_LOG_INFO, "%s: %5d message%s of %d bytes received from node %d\n",
|
|
|
+ group_name.value, g_recv_counter[i] - g_recv_start[i],
|
|
|
+ g_recv_counter[i]==1?"":"s",
|
|
|
+ g_recv_size[i], i);
|
|
|
+ nodes_printed++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /* Separate list of nodes if more than one */
|
|
|
+ if (nodes_printed > 1) {
|
|
|
+ cpgh_log_printf(CPGH_LOG_INFO, "\n");
|
|
|
+ }
|
|
|
secs = 0;
|
|
|
- g_recv_count = 0;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -548,6 +658,8 @@ int main (int argc, char *argv[]) {
|
|
|
fprintf(stderr, "INFO: packet size (%d) is larger than the maximum atomic size (%d), libcpg will fragment\n",
|
|
|
write_size, maxsize);
|
|
|
}
|
|
|
+
|
|
|
+ /* The main job starts here */
|
|
|
if (flood) {
|
|
|
for (i = 0; i < 10; i++) { /* number of repetitions - up to 50k */
|
|
|
cpg_flood (handle, write_size);
|
|
|
@@ -559,6 +671,7 @@ int main (int argc, char *argv[]) {
|
|
|
}
|
|
|
}
|
|
|
else {
|
|
|
+ send_counter = -1; /* So we start from zero to allow listeners to sync */
|
|
|
for (i = 0; i < repetitions && !stopped; i++) {
|
|
|
cpg_test (handle, write_size, delay_time, print_time);
|
|
|
signal (SIGALRM, sigalrm_handler);
|
|
|
@@ -568,28 +681,48 @@ int main (int argc, char *argv[]) {
|
|
|
|
|
|
res = cpg_finalize (handle);
|
|
|
if (res != CS_OK) {
|
|
|
- printf ("cpg_finalize failed with result %d\n", res);
|
|
|
+ cpgh_log_printf(CPGH_LOG_ERR, "cpg_finalize failed with result %d\n", res);
|
|
|
exit (1);
|
|
|
}
|
|
|
|
|
|
if (quiet < 2) {
|
|
|
- printf("\n");
|
|
|
- printf("Stats:\n");
|
|
|
- if (!listen_only) {
|
|
|
- printf(" packets sent: %d\n", packets_sent);
|
|
|
- printf(" send failures: %d\n", send_fails);
|
|
|
- printf(" send retries: %d\n", send_retries);
|
|
|
+ /* Don't print LONG_MAX for min_rtt if we don't have a value */
|
|
|
+ if (min_rtt == LONG_MAX) {
|
|
|
+ min_rtt = 0L;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (machine_readable) {
|
|
|
+ cpgh_log_printf(CPGH_LOG_STATS, "%d%c %d%c %d%c %d%c %d%c %d%c %d%c %ld%c %ld%c %ld\n",
|
|
|
+ packets_sent, delimiter,
|
|
|
+ send_fails, delimiter,
|
|
|
+ send_retries, delimiter,
|
|
|
+ length_errors, delimiter,
|
|
|
+ packets_recvd, delimiter,
|
|
|
+ sequence_errors, delimiter,
|
|
|
+ crc_errors, delimiter,
|
|
|
+ min_rtt, delimiter,
|
|
|
+ max_rtt, delimiter,
|
|
|
+ avg_rtt);
|
|
|
}
|
|
|
- printf(" length errors: %d\n", length_errors);
|
|
|
- printf(" packets recvd: %d\n", packets_recvd);
|
|
|
- printf(" sequence errors: %d\n", sequence_errors);
|
|
|
- printf(" crc errors: %d\n", crc_errors);
|
|
|
- if (!listen_only) {
|
|
|
- printf(" min RTT: %ld\n", min_rtt);
|
|
|
- printf(" max RTT: %ld\n", max_rtt);
|
|
|
- printf(" avg RTT: %ld\n", avg_rtt);
|
|
|
+ else {
|
|
|
+ cpgh_log_printf(CPGH_LOG_STATS, "\n");
|
|
|
+ cpgh_log_printf(CPGH_LOG_STATS, "Stats:\n");
|
|
|
+ if (!listen_only) {
|
|
|
+ cpgh_log_printf(CPGH_LOG_STATS, " packets sent: %d\n", packets_sent);
|
|
|
+ cpgh_log_printf(CPGH_LOG_STATS, " send failures: %d\n", send_fails);
|
|
|
+ cpgh_log_printf(CPGH_LOG_STATS, " send retries: %d\n", send_retries);
|
|
|
+ }
|
|
|
+ cpgh_log_printf(CPGH_LOG_STATS, " length errors: %d\n", length_errors);
|
|
|
+ cpgh_log_printf(CPGH_LOG_STATS, " packets recvd: %d\n", packets_recvd);
|
|
|
+ cpgh_log_printf(CPGH_LOG_STATS, " sequence errors: %d\n", sequence_errors);
|
|
|
+ cpgh_log_printf(CPGH_LOG_STATS, " crc errors: %d\n", crc_errors);
|
|
|
+ if (!listen_only) {
|
|
|
+ cpgh_log_printf(CPGH_LOG_STATS, " min RTT: %ld\n", min_rtt);
|
|
|
+ cpgh_log_printf(CPGH_LOG_STATS, " max RTT: %ld\n", max_rtt);
|
|
|
+ cpgh_log_printf(CPGH_LOG_STATS, " avg RTT: %ld\n", avg_rtt);
|
|
|
+ }
|
|
|
+ cpgh_log_printf(CPGH_LOG_STATS, "\n");
|
|
|
}
|
|
|
- printf("\n");
|
|
|
}
|
|
|
|
|
|
res = 0;
|