| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851 |
- /*
- * Copyright (c) 2015-2017 Red Hat, Inc.
- *
- * All rights reserved.
- *
- * Author: Christine Caulfield <ccaulfie@redhat.com>
- *
- * This software licensed under BSD license, the text of which follows:
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * - Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * - Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- * - Neither the name of the MontaVista Software, Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from this
- * software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
- * THE POSSIBILITY OF SUCH DAMAGE.
- */
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <signal.h>
- #include <unistd.h>
- #include <assert.h>
- #include <errno.h>
- #include <time.h>
- #include <limits.h>
- #include <ctype.h>
- #include <syslog.h>
- #include <stdarg.h>
- #include <inttypes.h>
- #include <sys/time.h>
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <sys/select.h>
- #include <sys/uio.h>
- #include <sys/un.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <pthread.h>
- #include <zlib.h>
- #include <libgen.h>
- #include <getopt.h>
- #include <corosync/corotypes.h>
- #include <corosync/cpg.h>
- static cpg_handle_t handle;
- static pthread_t thread;
- #ifndef timersub
- #define timersub(a, b, result) \
- do { \
- (result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \
- (result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \
- if ((result)->tv_usec < 0) { \
- --(result)->tv_sec; \
- (result)->tv_usec += 1000000; \
- } \
- } while (0)
- #endif /* timersub */
- static int alarm_notice;
- #define MAX_NODEID 65536
- #define ONE_MEG 1048576
- #define DATASIZE (ONE_MEG*20)
- static char data[DATASIZE];
- static int send_counter = 0;
- static int do_syslog = 0;
- static int quiet = 0;
- static int report_rtt = 0;
- static int abort_on_error = 0;
- static int machine_readable = 0;
- static char delimiter = ',';
- static int to_stderr = 0;
- static unsigned int g_our_nodeid;
- static volatile int stopped;
- static unsigned int flood_start = 64;
- static unsigned int flood_multiplier = 5;
- static unsigned long flood_max = (ONE_MEG - 100);
- // stats
- static unsigned int length_errors=0;
- static unsigned int crc_errors=0;
- static unsigned int sequence_errors=0;
- static unsigned int packets_sent=0;
- static unsigned int packets_recvd=0;
- static unsigned int packets_recvd1=0; /* For flood intermediates */
- static unsigned int send_retries=0;
- static unsigned int send_fails=0;
- static unsigned long avg_rtt=0;
- static unsigned long max_rtt=0;
- static unsigned long min_rtt=LONG_MAX;
- static unsigned long interim_avg_rtt=0;
- static unsigned long interim_max_rtt=0;
- static unsigned long interim_min_rtt=LONG_MAX;
- struct cpghum_header {
- unsigned int counter;
- unsigned int crc;
- unsigned int size;
- struct timeval timestamp;
- };
- static void cpg_bm_confchg_fn (
- cpg_handle_t handle_in,
- const struct cpg_name *group_name,
- const struct cpg_address *member_list, size_t member_list_entries,
- const struct cpg_address *left_list, size_t left_list_entries,
- const struct cpg_address *joined_list, size_t joined_list_entries)
- {
- }
- static unsigned int g_recv_count;
- static unsigned int g_recv_length;
- static int g_recv_start[MAX_NODEID+1];
- static int g_recv_counter[MAX_NODEID+1];
- static int g_recv_size[MAX_NODEID+1];
- static int g_log_mask = 0xFFFF;
- typedef enum
- {
- CPGH_LOG_INFO = 1,
- CPGH_LOG_PERF = 2,
- CPGH_LOG_RTT = 4,
- CPGH_LOG_STATS = 8,
- CPGH_LOG_ERR = 16
- } log_type_t;
- static void cpgh_print_message(int syslog_level, const char *facility_name, const char *format, va_list ap)
- __attribute__((format(printf, 3, 0)));
- static void cpgh_log_printf(log_type_t type, const char *format, ...)
- __attribute__((format(printf, 2, 3)));
- static void cpgh_print_message(int syslog_level, const char *facility_name, const char *format, va_list ap)
- {
- char msg[1024];
- int start = 0;
- if (machine_readable) {
- snprintf(msg, sizeof(msg), "%s%c", facility_name, delimiter);
- start = strlen(msg);
- }
- assert(vsnprintf(msg+start, sizeof(msg)-start, format, ap) < sizeof(msg)-start);
- if (to_stderr || (syslog_level <= LOG_ERR)) {
- fprintf(stderr, "%s", msg);
- }
- else {
- printf("%s", msg);
- }
- if (do_syslog) {
- syslog(syslog_level, "%s", msg);
- }
- }
- static void cpgh_log_printf(log_type_t type, const char *format, ...)
- {
- va_list ap;
- if (!(type & g_log_mask)) {
- return;
- }
- va_start(ap, format);
- switch (type) {
- case CPGH_LOG_INFO:
- cpgh_print_message(LOG_INFO, "[Info]", format, ap);
- break;
- case CPGH_LOG_PERF:
- cpgh_print_message(LOG_INFO, "[Perf]", format, ap);
- break;
- case CPGH_LOG_RTT:
- cpgh_print_message(LOG_INFO, "[RTT]", format, ap);
- break;
- case CPGH_LOG_STATS:
- cpgh_print_message(LOG_INFO, "[Stats]", format, ap);
- break;
- case CPGH_LOG_ERR:
- cpgh_print_message(LOG_ERR, "[Err]", format, ap);
- break;
- default:
- break;
- }
- va_end(ap);
- }
- static unsigned long update_rtt(struct timeval *header_timestamp, int packet_count,
- unsigned long *rtt_min, unsigned long *rtt_avg, unsigned long *rtt_max)
- {
- struct timeval tv1;
- struct timeval rtt;
- unsigned long rtt_usecs;
- gettimeofday (&tv1, NULL);
- timersub(&tv1, header_timestamp, &rtt);
- rtt_usecs = rtt.tv_usec + rtt.tv_sec*1000000;
- if (rtt_usecs > *rtt_max) {
- *rtt_max = rtt_usecs;
- }
- if (rtt_usecs < *rtt_min) {
- *rtt_min = rtt_usecs;
- }
- /* Don't start the average with 0 */
- if (*rtt_avg == 0) {
- *rtt_avg = rtt_usecs;
- }
- else {
- *rtt_avg = ((*rtt_avg * packet_count) + rtt_usecs) / (packet_count+1);
- }
- return rtt_usecs;
- }
- static void cpg_bm_deliver_fn (
- cpg_handle_t handle_in,
- const struct cpg_name *group_name,
- uint32_t nodeid,
- uint32_t pid,
- void *msg,
- size_t msg_len)
- {
- uLong crc=0;
- struct cpghum_header *header = (struct cpghum_header *)msg;
- uLong recv_crc = header->crc & 0xFFFFFFFF;
- unsigned int *dataint = (unsigned int *)((char*)msg + sizeof(struct cpghum_header));
- unsigned int datalen;
- if (nodeid > MAX_NODEID) {
- cpgh_log_printf(CPGH_LOG_ERR, "Got message from invalid nodeid " CS_PRI_NODE_ID " (too high for us). Quitting\n", nodeid);
- exit(1);
- }
- packets_recvd++;
- packets_recvd1++;
- g_recv_length = msg_len;
- datalen = header->size - sizeof(struct cpghum_header);
- // Report RTT first in case abort_on_error is set
- if (nodeid == g_our_nodeid) {
- unsigned long rtt_usecs;
- // For flood
- update_rtt(&header->timestamp, packets_recvd1, &interim_min_rtt, &interim_avg_rtt, &interim_max_rtt);
- rtt_usecs = update_rtt(&header->timestamp, g_recv_counter[nodeid], &min_rtt, &avg_rtt, &max_rtt);
- if (report_rtt) {
- if (machine_readable) {
- cpgh_log_printf(CPGH_LOG_RTT, "%ld%c%ld%c%ld%c%ld\n", rtt_usecs, delimiter, min_rtt, delimiter, avg_rtt, delimiter, max_rtt);
- }
- else {
- cpgh_log_printf(CPGH_LOG_RTT, "%s: RTT %ld uS (min/avg/max): %ld/%ld/%ld\n", group_name->value, rtt_usecs, min_rtt, avg_rtt, max_rtt);
- }
- }
- }
- // Basic check, packets should all be the right size
- if (msg_len != header->size) {
- length_errors++;
- cpgh_log_printf(CPGH_LOG_ERR, "%s: message sizes don't match. got %zu, expected %u from node " CS_PRI_NODE_ID "\n", group_name->value, msg_len, header->size, nodeid);
- if (abort_on_error) {
- exit(2);
- }
- }
- g_recv_size[nodeid] = msg_len;
- // Sequence counters are incrementing in step?
- if (header->counter != g_recv_counter[nodeid]) {
- /* Don't report the first mismatch or a newly restarted sender, we're just catching up */
- if (g_recv_counter[nodeid] && header->counter) {
- sequence_errors++;
- cpgh_log_printf(CPGH_LOG_ERR, "%s: counters don't match. got %d, expected %d from node " CS_PRI_NODE_ID "\n", group_name->value, header->counter, g_recv_counter[nodeid], nodeid);
- if (abort_on_error) {
- exit(2);
- }
- }
- else {
- g_recv_start[nodeid] = header->counter;
- }
- /* Catch up or we'll be printing errors for ever */
- g_recv_counter[nodeid] = header->counter+1;
- }
- else {
- g_recv_counter[nodeid]++;
- }
- /* Check crc */
- crc = crc32(0, NULL, 0);
- crc = crc32(crc, (Bytef *)dataint, datalen) & 0xFFFFFFFF;
- if (crc != recv_crc) {
- crc_errors++;
- cpgh_log_printf(CPGH_LOG_ERR, "%s: CRCs don't match. got %lx, expected %lx from nodeid " CS_PRI_NODE_ID "\n", group_name->value, recv_crc, crc, nodeid);
- if (abort_on_error) {
- exit(2);
- }
- }
- g_recv_count++;
- }
- static cpg_model_v1_data_t model1_data = {
- .cpg_deliver_fn = cpg_bm_deliver_fn,
- .cpg_confchg_fn = cpg_bm_confchg_fn,
- };
- static cpg_callbacks_t callbacks = {
- .cpg_deliver_fn = cpg_bm_deliver_fn,
- .cpg_confchg_fn = cpg_bm_confchg_fn
- };
- static struct cpg_name group_name = {
- .value = "cpghum",
- .length = 7
- };
- static void set_packet(int write_size, int counter)
- {
- struct cpghum_header *header = (struct cpghum_header *)data;
- int i;
- unsigned int *dataint = (unsigned int *)(data + sizeof(struct cpghum_header));
- unsigned int datalen = write_size - sizeof(struct cpghum_header);
- struct timeval tv1;
- uLong crc;
- header->counter = counter;
- for (i=0; i<(datalen/4); i++) {
- dataint[i] = rand();
- }
- crc = crc32(0, NULL, 0);
- header->crc = crc32(crc, (Bytef*)&dataint[0], datalen);
- header->size = write_size;
- gettimeofday (&tv1, NULL);
- memcpy(&header->timestamp, &tv1, sizeof(struct timeval));
- }
- /* Basically this is cpgbench.c */
- static void cpg_flood (
- cpg_handle_t handle_in,
- int write_size)
- {
- struct timeval tv1, tv2, tv_elapsed;
- struct iovec iov;
- unsigned int res = CS_OK;
- alarm_notice = 0;
- iov.iov_base = data;
- iov.iov_len = write_size;
- alarm (10);
- packets_recvd1 = 0;
- interim_avg_rtt = 0;
- interim_max_rtt = 0;
- interim_min_rtt = LONG_MAX;
- gettimeofday (&tv1, NULL);
- do {
- if (res == CS_OK) {
- set_packet(write_size, send_counter);
- }
- res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1);
- if (res == CS_OK) {
- /* Only increment the packet counter if it was sucessfully sent */
- packets_sent++;
- send_counter++;
- }
- else {
- if (res == CS_ERR_TRY_AGAIN) {
- send_retries++;
- }
- else {
- send_fails++;
- }
- }
- } while (!stopped && alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN));
- gettimeofday (&tv2, NULL);
- timersub (&tv2, &tv1, &tv_elapsed);
- if (!quiet) {
- if (machine_readable) {
- cpgh_log_printf (CPGH_LOG_PERF, "%d%c%d%c%f%c%f%c%f%c%ld%c%ld%c%ld\n", packets_recvd1, delimiter, write_size, delimiter,
- (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)), delimiter,
- ((float)packets_recvd1) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)), delimiter,
- ((float)packets_recvd1) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0), delimiter,
- interim_min_rtt, delimiter, interim_avg_rtt, delimiter, interim_max_rtt);
- }
- else {
- cpgh_log_printf (CPGH_LOG_PERF, "%5d messages received ", packets_recvd1);
- cpgh_log_printf (CPGH_LOG_PERF, "%5d bytes per write ", write_size);
- cpgh_log_printf (CPGH_LOG_PERF, "%7.3f Seconds runtime ",
- (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
- cpgh_log_printf (CPGH_LOG_PERF, "%9.3f TP/s ",
- ((float)packets_recvd1) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
- cpgh_log_printf (CPGH_LOG_PERF, "%7.3f MB/s ",
- ((float)packets_recvd1) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
- cpgh_log_printf (CPGH_LOG_PERF, "RTT for this size (min/avg/max) %ld/%ld/%ld\n",
- interim_min_rtt, interim_avg_rtt, interim_max_rtt);
- }
- }
- }
- static void cpg_test (
- cpg_handle_t handle_in,
- int write_size,
- int delay_time,
- int print_time)
- {
- struct timeval tv1, tv2, tv_elapsed;
- struct iovec iov;
- unsigned int res;
- alarm_notice = 0;
- iov.iov_base = data;
- iov.iov_len = write_size;
- g_recv_count = 0;
- alarm (print_time);
- do {
- send_counter++;
- resend:
- set_packet(write_size, send_counter);
- res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1);
- if (res == CS_ERR_TRY_AGAIN) {
- usleep(10000);
- send_retries++;
- goto resend;
- }
- if (res != CS_OK) {
- cpgh_log_printf(CPGH_LOG_ERR, "send failed: %d\n", res);
- send_fails++;
- }
- else {
- packets_sent++;
- }
- usleep(delay_time*1000);
- } while (alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN) && stopped == 0);
- gettimeofday (&tv2, NULL);
- timersub (&tv2, &tv1, &tv_elapsed);
- if (!quiet) {
- if (machine_readable) {
- cpgh_log_printf(CPGH_LOG_RTT, "%d%c%ld%c%ld%c%ld\n", 0, delimiter, min_rtt, delimiter, avg_rtt, delimiter, max_rtt);
- }
- else {
- cpgh_log_printf(CPGH_LOG_PERF, "%s: %5d message%s received, ", group_name.value, g_recv_count, g_recv_count==1?"":"s");
- cpgh_log_printf(CPGH_LOG_PERF, "%5d bytes per write. ", write_size);
- cpgh_log_printf(CPGH_LOG_RTT, "RTT min/avg/max: %ld/%ld/%ld\n", min_rtt, avg_rtt, max_rtt);
- }
- }
- }
- static void sigalrm_handler (int num)
- {
- alarm_notice = 1;
- }
- static void sigint_handler (int num)
- {
- stopped = 1;
- }
- static void* dispatch_thread (void *arg)
- {
- cpg_dispatch (handle, CS_DISPATCH_BLOCKING);
- return NULL;
- }
- static void usage(char *cmd)
- {
- fprintf(stderr, "%s [OPTIONS]\n", cmd);
- fprintf(stderr, "\n");
- fprintf(stderr, "%s sends CPG messages to all registered users of the CPG.\n", cmd);
- fprintf(stderr, "The messages have a sequence number and a CRC so that missing or\n");
- fprintf(stderr, "corrupted messages will be detected and reported.\n");
- fprintf(stderr, "\n");
- fprintf(stderr, "%s can also be asked to simply listen for (and check) packets\n", cmd);
- fprintf(stderr, "so that there is another node in the cluster connected to the CPG.\n");
- fprintf(stderr, "\n");
- fprintf(stderr, "Multiple copies, in different CPGs, can also be run on the same or\n");
- fprintf(stderr, "different nodes by using the -n option.\n");
- fprintf(stderr, "\n");
- fprintf(stderr, "%s can handle more than 1 sender in the same CPG provided they are on\n", cmd);
- fprintf(stderr, "different nodes.\n");
- fprintf(stderr, "\n");
- fprintf(stderr, " -w<num>, --size-bytes Write size in Kbytes, default 4\n");
- fprintf(stderr, " -W<num>, --size-kb Write size in bytes, default 4096\n");
- fprintf(stderr, " -n<name>, --name CPG name to use, default 'cpghum'\n");
- fprintf(stderr, " -M Write machine-readable results\n");
- fprintf(stderr, " -D<char> Delimiter for machine-readable results (default ',')\n");
- fprintf(stderr, " -E Send normal output to stderr instead of stdout\n");
- fprintf(stderr, " -d<num>, --delay Delay between sending packets (mS), default 1000\n");
- fprintf(stderr, " -r<num> Number of repetitions, default 100\n");
- fprintf(stderr, " -p<num> Delay between printing output (seconds), default 10s\n");
- fprintf(stderr, " -l, --listen Listen and check CRCs only, don't send (^C to quit)\n");
- fprintf(stderr, " -t, --rtt Report Round Trip Times for each packet.\n");
- fprintf(stderr, " -m<num> cpg_initialise() model. Default 1.\n");
- fprintf(stderr, " -s Also send errors to syslog.\n");
- fprintf(stderr, " -f, --flood Flood test CPG (cpgbench). see --flood-* long options\n");
- fprintf(stderr, " -a Abort on crc/length/sequence error\n");
- fprintf(stderr, " -q, --quiet Quiet. Don't print messages every 10s (see also -p)\n");
- fprintf(stderr, " -qq Very quiet. Don't print stats at the end\n");
- fprintf(stderr, " --flood-start=bytes Start value for --flood\n");
- fprintf(stderr, " --flood-mult=value Packet size multiplier value for --flood\n");
- fprintf(stderr, " --flood-max=bytes Maximum packet size for --flood\n");
- fprintf(stderr, "\n");
- fprintf(stderr, " values for --flood* and -W can have K or M suffixes to indicate\n");
- fprintf(stderr, " Kilobytes or Megabytes\n");
- fprintf(stderr, "\n");
- fprintf(stderr, "%s exit code is 0 if no error happened, 1 on generic error and 2 on\n", cmd);
- fprintf(stderr, "send/crc/length/sequence error");
- fprintf(stderr, "\n");
- }
- /* Parse a size, optionally ending in 'K', 'M' */
- static long parse_bytes(const char *valstring)
- {
- unsigned int value;
- int multiplier = 1;
- char suffix = '\0';
- int have_suffix = 0;
- /* Suffix is optional */
- if (sscanf(valstring, "%u%c", &value, &suffix) == 0) {
- return 0;
- }
- if (toupper(suffix) == 'M') {
- multiplier = 1024*1024;
- have_suffix = 1;
- }
- if (toupper(suffix) == 'K') {
- multiplier = 1024;
- have_suffix = 1;
- }
- if (!have_suffix && suffix != '\0') {
- fprintf(stderr, "Invalid suffix '%c', only K or M supported\n", suffix);
- return 0;
- }
- return value * multiplier;
- }
- int main (int argc, char *argv[]) {
- int i;
- unsigned int res;
- uint32_t maxsize;
- int opt;
- int bs;
- int write_size = 4096;
- int delay_time = 1000;
- int repetitions = 100;
- int print_time = 10;
- int have_size = 0;
- int listen_only = 0;
- int flood = 0;
- int model = 1;
- int option_index = 0;
- struct option long_options[] = {
- {"flood-start", required_argument, 0, 0 },
- {"flood-mult", required_argument, 0, 0 },
- {"flood-max", required_argument, 0, 0 },
- {"size-kb", required_argument, 0, 'w' },
- {"size-bytes", required_argument, 0, 'W' },
- {"name", required_argument, 0, 'n' },
- {"rtt", no_argument, 0, 't' },
- {"flood", no_argument, 0, 'f' },
- {"quiet", no_argument, 0, 'q' },
- {"listen", no_argument, 0, 'l' },
- {"help", no_argument, 0, '?' },
- {0, 0, 0, 0 }
- };
- while ( (opt = getopt_long(argc, argv, "qlstafMEn:d:r:p:m:w:W:D:",
- long_options, &option_index)) != -1 ) {
- switch (opt) {
- case 0: // Long-only options
- if (strcmp(long_options[option_index].name, "flood-start") == 0) {
- flood_start = parse_bytes(optarg);
- if (flood_start == 0) {
- fprintf(stderr, "flood-start value invalid\n");
- exit(1);
- }
- }
- if (strcmp(long_options[option_index].name, "flood-mult") == 0) {
- flood_multiplier = parse_bytes(optarg);
- if (flood_multiplier == 0) {
- fprintf(stderr, "flood-mult value invalid\n");
- exit(1);
- }
- }
- if (strcmp(long_options[option_index].name, "flood-max") == 0) {
- flood_max = parse_bytes(optarg);
- if (flood_max == 0) {
- fprintf(stderr, "flood-max value invalid\n");
- exit(1);
- }
- }
- break;
- case 'w': // Write size in K
- bs = atoi(optarg);
- if (bs > 0) {
- write_size = bs*1024;
- have_size = 1;
- }
- break;
- case 'W': // Write size in bytes (or with a suffix)
- bs = parse_bytes(optarg);
- if (bs > 0) {
- write_size = bs;
- have_size = 1;
- }
- break;
- case 'n':
- if (strlen(optarg) >= CPG_MAX_NAME_LENGTH) {
- fprintf(stderr, "CPG name too long\n");
- exit(1);
- }
- strcpy(group_name.value, optarg);
- group_name.length = strlen(group_name.value);
- break;
- case 't':
- report_rtt = 1;
- break;
- case 'E':
- to_stderr = 1;
- break;
- case 'M':
- machine_readable = 1;
- break;
- case 'f':
- flood = 1;
- break;
- case 'a':
- abort_on_error = 1;
- break;
- case 'd':
- delay_time = atoi(optarg);
- break;
- case 'D':
- delimiter = optarg[0];
- break;
- case 'r':
- repetitions = atoi(optarg);
- break;
- case 'p':
- print_time = atoi(optarg);
- break;
- case 'l':
- listen_only = 1;
- break;
- case 's':
- do_syslog = 1;
- break;
- case 'q':
- quiet++;
- break;
- case 'm':
- model = atoi(optarg);
- if (model < 0 || model > 1) {
- fprintf(stderr, "%s: Model must be 0-1\n", argv[0]);
- exit(1);
- }
- break;
- case '?':
- usage(basename(argv[0]));
- exit(1);
- }
- }
- if (!have_size && flood) {
- write_size = flood_start;
- }
- signal (SIGALRM, sigalrm_handler);
- signal (SIGINT, sigint_handler);
- switch (model) {
- case 0:
- res = cpg_initialize (&handle, &callbacks);
- break;
- case 1:
- res = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&model1_data, NULL);
- break;
- default:
- res=999; // can't get here but it keeps the compiler happy
- break;
- }
- if (res != CS_OK) {
- cpgh_log_printf(CPGH_LOG_ERR, "cpg_initialize failed with result %d\n", res);
- exit (1);
- }
- res = cpg_local_get(handle, &g_our_nodeid);
- if (res != CS_OK) {
- cpgh_log_printf(CPGH_LOG_ERR, "cpg_local_get failed with result %d\n", res);
- exit (1);
- }
- pthread_create (&thread, NULL, dispatch_thread, NULL);
- res = cpg_join (handle, &group_name);
- if (res != CS_OK) {
- cpgh_log_printf(CPGH_LOG_ERR, "cpg_join failed with result %d\n", res);
- exit (1);
- }
- if (listen_only) {
- int secs = 0;
- while (!stopped) {
- sleep(1);
- if (++secs > print_time && !quiet) {
- int nodes_printed = 0;
- if (!machine_readable) {
- for (i=1; i<MAX_NODEID; i++) {
- if (g_recv_counter[i]) {
- cpgh_log_printf(CPGH_LOG_INFO, "%s: %5d message%s of %d bytes received from node " CS_PRI_NODE_ID "\n",
- group_name.value, g_recv_counter[i] - g_recv_start[i],
- g_recv_counter[i]==1?"":"s",
- g_recv_size[i], i);
- nodes_printed++;
- }
- }
- }
- /* Separate list of nodes if more than one */
- if (nodes_printed > 1) {
- cpgh_log_printf(CPGH_LOG_INFO, "\n");
- }
- secs = 0;
- }
- }
- }
- else {
- cpg_max_atomic_msgsize_get (handle, &maxsize);
- if (write_size > maxsize) {
- fprintf(stderr, "INFO: packet size (%d) is larger than the maximum atomic size (%d), libcpg will fragment\n",
- write_size, maxsize);
- }
- /* The main job starts here */
- if (flood) {
- for (i = 0; i < 10; i++) { /* number of repetitions - up to 50k */
- cpg_flood (handle, write_size);
- signal (SIGALRM, sigalrm_handler);
- write_size *= flood_multiplier;
- if (write_size > flood_max) {
- break;
- }
- }
- }
- else {
- send_counter = -1; /* So we start from zero to allow listeners to sync */
- for (i = 0; i < repetitions && !stopped; i++) {
- cpg_test (handle, write_size, delay_time, print_time);
- signal (SIGALRM, sigalrm_handler);
- }
- }
- }
- res = cpg_finalize (handle);
- if (res != CS_OK) {
- cpgh_log_printf(CPGH_LOG_ERR, "cpg_finalize failed with result %d\n", res);
- exit (1);
- }
- if (quiet < 2) {
- /* Don't print LONG_MAX for min_rtt if we don't have a value */
- if (min_rtt == LONG_MAX) {
- min_rtt = 0L;
- }
- if (machine_readable) {
- cpgh_log_printf(CPGH_LOG_STATS, "%d%c%d%c%d%c%d%c%d%c%d%c%d%c%ld%c%ld%c%ld\n",
- packets_sent, delimiter,
- send_fails, delimiter,
- send_retries, delimiter,
- length_errors, delimiter,
- packets_recvd, delimiter,
- sequence_errors, delimiter,
- crc_errors, delimiter,
- min_rtt, delimiter,
- avg_rtt, delimiter,
- max_rtt);
- }
- else {
- cpgh_log_printf(CPGH_LOG_STATS, "\n");
- cpgh_log_printf(CPGH_LOG_STATS, "Stats:\n");
- if (!listen_only) {
- cpgh_log_printf(CPGH_LOG_STATS, " packets sent: %d\n", packets_sent);
- cpgh_log_printf(CPGH_LOG_STATS, " send failures: %d\n", send_fails);
- cpgh_log_printf(CPGH_LOG_STATS, " send retries: %d\n", send_retries);
- }
- cpgh_log_printf(CPGH_LOG_STATS, " length errors: %d\n", length_errors);
- cpgh_log_printf(CPGH_LOG_STATS, " packets recvd: %d\n", packets_recvd);
- cpgh_log_printf(CPGH_LOG_STATS, " sequence errors: %d\n", sequence_errors);
- cpgh_log_printf(CPGH_LOG_STATS, " crc errors: %d\n", crc_errors);
- if (!listen_only) {
- cpgh_log_printf(CPGH_LOG_STATS, " min RTT: %ld\n", min_rtt);
- cpgh_log_printf(CPGH_LOG_STATS, " max RTT: %ld\n", max_rtt);
- cpgh_log_printf(CPGH_LOG_STATS, " avg RTT: %ld\n", avg_rtt);
- }
- cpgh_log_printf(CPGH_LOG_STATS, "\n");
- }
- }
- res = 0;
- if (send_fails > 0 || (have_size && length_errors > 0) || sequence_errors > 0 || crc_errors > 0) {
- res = 2;
- }
- return (res);
- }
|