cpghum.c 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735
  1. /*
  2. * Copyright (c) 2015-2017 Red Hat, Inc.
  3. *
  4. * All rights reserved.
  5. *
  6. * Author: Christine Caulfield <ccaulfie@redhat.com>
  7. *
  8. * This software licensed under BSD license, the text of which follows:
  9. *
  10. * Redistribution and use in source and binary forms, with or without
  11. * modification, are permitted provided that the following conditions are met:
  12. *
  13. * - Redistributions of source code must retain the above copyright notice,
  14. * this list of conditions and the following disclaimer.
  15. * - Redistributions in binary form must reproduce the above copyright notice,
  16. * this list of conditions and the following disclaimer in the documentation
  17. * and/or other materials provided with the distribution.
  18. * - Neither the name of the MontaVista Software, Inc. nor the names of its
  19. * contributors may be used to endorse or promote products derived from this
  20. * software without specific prior written permission.
  21. *
  22. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  23. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  24. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  25. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  26. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  27. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  28. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  29. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  30. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  31. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  32. * THE POSSIBILITY OF SUCH DAMAGE.
  33. */
  34. #include <stdio.h>
  35. #include <stdlib.h>
  36. #include <string.h>
  37. #include <signal.h>
  38. #include <unistd.h>
  39. #include <errno.h>
  40. #include <time.h>
  41. #include <limits.h>
  42. #include <syslog.h>
  43. #include <stdarg.h>
  44. #include <sys/time.h>
  45. #include <sys/types.h>
  46. #include <sys/socket.h>
  47. #include <sys/select.h>
  48. #include <sys/uio.h>
  49. #include <sys/un.h>
  50. #include <netinet/in.h>
  51. #include <arpa/inet.h>
  52. #include <pthread.h>
  53. #include <zlib.h>
  54. #include <libgen.h>
  55. #include <corosync/corotypes.h>
  56. #include <corosync/cpg.h>
  57. static cpg_handle_t handle;
  58. static pthread_t thread;
  59. #ifndef timersub
  60. #define timersub(a, b, result) \
  61. do { \
  62. (result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \
  63. (result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \
  64. if ((result)->tv_usec < 0) { \
  65. --(result)->tv_sec; \
  66. (result)->tv_usec += 1000000; \
  67. } \
  68. } while (0)
  69. #endif /* timersub */
  70. static int alarm_notice;
  71. #define MAX_NODEID 65536
  72. #define ONE_MEG 1048576
  73. #define DATASIZE (ONE_MEG*20)
  74. static char data[DATASIZE];
  75. static int send_counter = 0;
  76. static int do_syslog = 0;
  77. static int quiet = 0;
  78. static int report_rtt = 0;
  79. static int abort_on_error = 0;
  80. static int machine_readable = 0;
  81. static char delimiter = ',';
  82. static int to_stderr = 0;
  83. static unsigned int g_our_nodeid;
  84. static volatile int stopped;
  85. // stats
  86. static unsigned int length_errors=0;
  87. static unsigned int crc_errors=0;
  88. static unsigned int sequence_errors=0;
  89. static unsigned int packets_sent=0;
  90. static unsigned int packets_recvd=0;
  91. static unsigned int packets_recvd1=0; /* For flood intermediates */
  92. static unsigned int send_retries=0;
  93. static unsigned int send_fails=0;
  94. static unsigned long avg_rtt=0;
  95. static unsigned long max_rtt=0;
  96. static unsigned long min_rtt=LONG_MAX;
  97. struct cpghum_header {
  98. unsigned int counter;
  99. unsigned int crc;
  100. unsigned int size;
  101. struct timeval timestamp;
  102. };
  103. static void cpg_bm_confchg_fn (
  104. cpg_handle_t handle_in,
  105. const struct cpg_name *group_name,
  106. const struct cpg_address *member_list, size_t member_list_entries,
  107. const struct cpg_address *left_list, size_t left_list_entries,
  108. const struct cpg_address *joined_list, size_t joined_list_entries)
  109. {
  110. }
  111. static unsigned int g_recv_count;
  112. static unsigned int g_recv_length;
  113. static int g_recv_start[MAX_NODEID+1];
  114. static int g_recv_counter[MAX_NODEID+1];
  115. static int g_recv_size[MAX_NODEID+1];
  116. static int g_log_mask = 0xFFFF;
  117. typedef enum
  118. {
  119. CPGH_LOG_INFO = 1,
  120. CPGH_LOG_PERF = 2,
  121. CPGH_LOG_RTT = 4,
  122. CPGH_LOG_STATS = 8,
  123. CPGH_LOG_ERR = 16
  124. } log_type_t;
  125. static void cpgh_print_message(int syslog_level, const char *facility_name, const char *format, va_list ap)
  126. {
  127. char msg[1024];
  128. int start = 0;
  129. if (machine_readable) {
  130. snprintf(msg, sizeof(msg), "%s%c ", facility_name, delimiter);
  131. start = strlen(msg);
  132. }
  133. vsnprintf(msg+start, sizeof(msg)-start, format, ap);
  134. if (to_stderr || (syslog_level <= LOG_ERR)) {
  135. fprintf(stderr, "%s", msg);
  136. }
  137. else {
  138. printf("%s", msg);
  139. }
  140. if (do_syslog) {
  141. syslog(syslog_level, "%s", msg);
  142. }
  143. }
  144. static void cpgh_log_printf(log_type_t type, const char *format, ...)
  145. {
  146. va_list ap;
  147. if (!(type & g_log_mask)) {
  148. return;
  149. }
  150. va_start(ap, format);
  151. switch (type) {
  152. case CPGH_LOG_INFO:
  153. cpgh_print_message(LOG_INFO, "[Info]", format, ap);
  154. break;
  155. case CPGH_LOG_PERF:
  156. cpgh_print_message(LOG_INFO, "[Perf]", format, ap);
  157. break;
  158. case CPGH_LOG_RTT:
  159. cpgh_print_message(LOG_INFO, "[RTT]", format, ap);
  160. break;
  161. case CPGH_LOG_STATS:
  162. cpgh_print_message(LOG_INFO, "[Stats]", format, ap);
  163. break;
  164. case CPGH_LOG_ERR:
  165. cpgh_print_message(LOG_ERR, "[Err]", format, ap);
  166. break;
  167. default:
  168. break;
  169. }
  170. va_end(ap);
  171. }
  172. static void cpg_bm_deliver_fn (
  173. cpg_handle_t handle_in,
  174. const struct cpg_name *group_name,
  175. uint32_t nodeid,
  176. uint32_t pid,
  177. void *msg,
  178. size_t msg_len)
  179. {
  180. uLong crc=0;
  181. struct cpghum_header *header = (struct cpghum_header *)msg;
  182. uLong recv_crc = header->crc & 0xFFFFFFFF;
  183. unsigned int *dataint = (unsigned int *)((char*)msg + sizeof(struct cpghum_header));
  184. unsigned int datalen;
  185. if (nodeid > MAX_NODEID) {
  186. cpgh_log_printf(CPGH_LOG_ERR, "Got message from invalid nodeid %d (too high for us). Quitting\n", nodeid);
  187. exit(1);
  188. }
  189. packets_recvd++;
  190. packets_recvd1++;
  191. g_recv_length = msg_len;
  192. datalen = header->size - sizeof(struct cpghum_header);
  193. // Report RTT first in case abort_on_error is set
  194. if (nodeid == g_our_nodeid) {
  195. struct timeval tv1;
  196. struct timeval rtt;
  197. unsigned long rtt_usecs;
  198. gettimeofday (&tv1, NULL);
  199. timersub(&tv1, &header->timestamp, &rtt);
  200. rtt_usecs = rtt.tv_usec + rtt.tv_sec*1000000;
  201. if (rtt_usecs > max_rtt) {
  202. max_rtt = rtt_usecs;
  203. }
  204. if (rtt_usecs < min_rtt) {
  205. min_rtt = rtt_usecs;
  206. }
  207. /* Don't start the average with 0 */
  208. if (avg_rtt == 0) {
  209. avg_rtt = rtt_usecs;
  210. }
  211. else {
  212. avg_rtt = ((avg_rtt * g_recv_counter[nodeid]) + rtt_usecs) / (g_recv_counter[nodeid]+1);
  213. }
  214. if (report_rtt) {
  215. if (machine_readable) {
  216. cpgh_log_printf(CPGH_LOG_RTT, "%ld%c %ld%c %ld%c %ld\n", rtt_usecs, delimiter, min_rtt, delimiter, avg_rtt, delimiter, max_rtt);
  217. }
  218. else {
  219. cpgh_log_printf(CPGH_LOG_RTT, "%s: RTT %ld uS (min/avg/max): %ld/%ld/%ld\n", group_name->value, rtt_usecs, min_rtt, avg_rtt, max_rtt);
  220. }
  221. }
  222. }
  223. // Basic check, packets should all be the right size
  224. if (msg_len != header->size) {
  225. length_errors++;
  226. cpgh_log_printf(CPGH_LOG_ERR, "%s: message sizes don't match. got %zu, expected %u from node %d\n", group_name->value, msg_len, header->size, nodeid);
  227. if (abort_on_error) {
  228. exit(2);
  229. }
  230. }
  231. g_recv_size[nodeid] = msg_len;
  232. // Sequence counters are incrementing in step?
  233. if (header->counter != g_recv_counter[nodeid]) {
  234. /* Don't report the first mismatch or a newly restarted sender, we're just catching up */
  235. if (g_recv_counter[nodeid] && header->counter) {
  236. sequence_errors++;
  237. cpgh_log_printf(CPGH_LOG_ERR, "%s: counters don't match. got %d, expected %d from node %d\n", group_name->value, header->counter, g_recv_counter[nodeid], nodeid);
  238. if (abort_on_error) {
  239. exit(2);
  240. }
  241. }
  242. else {
  243. g_recv_start[nodeid] = header->counter;
  244. }
  245. /* Catch up or we'll be printing errors for ever */
  246. g_recv_counter[nodeid] = header->counter+1;
  247. }
  248. else {
  249. g_recv_counter[nodeid]++;
  250. }
  251. /* Check crc */
  252. crc = crc32(0, NULL, 0);
  253. crc = crc32(crc, (Bytef *)dataint, datalen) & 0xFFFFFFFF;
  254. if (crc != recv_crc) {
  255. crc_errors++;
  256. cpgh_log_printf(CPGH_LOG_ERR, "%s: CRCs don't match. got %lx, expected %lx from nodeid %d\n", group_name->value, recv_crc, crc, nodeid);
  257. if (abort_on_error) {
  258. exit(2);
  259. }
  260. }
  261. g_recv_count++;
  262. }
  263. static cpg_model_v1_data_t model1_data = {
  264. .cpg_deliver_fn = cpg_bm_deliver_fn,
  265. .cpg_confchg_fn = cpg_bm_confchg_fn,
  266. };
  267. static cpg_callbacks_t callbacks = {
  268. .cpg_deliver_fn = cpg_bm_deliver_fn,
  269. .cpg_confchg_fn = cpg_bm_confchg_fn
  270. };
  271. static struct cpg_name group_name = {
  272. .value = "cpghum",
  273. .length = 7
  274. };
  275. static void set_packet(int write_size, int counter)
  276. {
  277. struct cpghum_header *header = (struct cpghum_header *)data;
  278. int i;
  279. unsigned int *dataint = (unsigned int *)(data + sizeof(struct cpghum_header));
  280. unsigned int datalen = write_size - sizeof(struct cpghum_header);
  281. struct timeval tv1;
  282. uLong crc;
  283. header->counter = counter;
  284. for (i=0; i<(datalen/4); i++) {
  285. dataint[i] = rand();
  286. }
  287. crc = crc32(0, NULL, 0);
  288. header->crc = crc32(crc, (Bytef*)&dataint[0], datalen);
  289. header->size = write_size;
  290. gettimeofday (&tv1, NULL);
  291. memcpy(&header->timestamp, &tv1, sizeof(struct timeval));
  292. }
  293. /* Basically this is cpgbench.c */
  294. static void cpg_flood (
  295. cpg_handle_t handle_in,
  296. int write_size)
  297. {
  298. struct timeval tv1, tv2, tv_elapsed;
  299. struct iovec iov;
  300. unsigned int res = CS_OK;
  301. alarm_notice = 0;
  302. iov.iov_base = data;
  303. iov.iov_len = write_size;
  304. alarm (10);
  305. packets_recvd1 = 0;
  306. gettimeofday (&tv1, NULL);
  307. do {
  308. if (res == CS_OK) {
  309. set_packet(write_size, send_counter);
  310. }
  311. res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1);
  312. if (res == CS_OK) {
  313. /* Only increment the packet counter if it was sucessfully sent */
  314. packets_sent++;
  315. send_counter++;
  316. }
  317. else {
  318. if (res == CS_ERR_TRY_AGAIN) {
  319. send_retries++;
  320. }
  321. else {
  322. send_fails++;
  323. }
  324. }
  325. } while (!stopped && alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN));
  326. gettimeofday (&tv2, NULL);
  327. timersub (&tv2, &tv1, &tv_elapsed);
  328. if (!quiet) {
  329. if (machine_readable) {
  330. cpgh_log_printf (CPGH_LOG_PERF, "%d%c %d%c %f%c %f%c %f\n", packets_recvd1, delimiter, write_size, delimiter,
  331. (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)), delimiter,
  332. ((float)packets_recvd1) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)), delimiter,
  333. ((float)packets_recvd1) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
  334. }
  335. else {
  336. cpgh_log_printf (CPGH_LOG_PERF, "%5d messages received ", packets_recvd1);
  337. cpgh_log_printf (CPGH_LOG_PERF, "%5d bytes per write ", write_size);
  338. cpgh_log_printf (CPGH_LOG_PERF, "%7.3f Seconds runtime ",
  339. (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
  340. cpgh_log_printf (CPGH_LOG_PERF, "%9.3f TP/s ",
  341. ((float)packets_recvd1) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
  342. cpgh_log_printf (CPGH_LOG_PERF, "%7.3f MB/s.\n",
  343. ((float)packets_recvd1) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
  344. }
  345. }
  346. }
  347. static void cpg_test (
  348. cpg_handle_t handle_in,
  349. int write_size,
  350. int delay_time,
  351. int print_time)
  352. {
  353. struct timeval tv1, tv2, tv_elapsed;
  354. struct iovec iov;
  355. unsigned int res;
  356. alarm_notice = 0;
  357. iov.iov_base = data;
  358. iov.iov_len = write_size;
  359. g_recv_count = 0;
  360. alarm (print_time);
  361. do {
  362. send_counter++;
  363. resend:
  364. set_packet(write_size, send_counter);
  365. res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1);
  366. if (res == CS_ERR_TRY_AGAIN) {
  367. usleep(10000);
  368. send_retries++;
  369. goto resend;
  370. }
  371. if (res != CS_OK) {
  372. cpgh_log_printf(CPGH_LOG_ERR, "send failed: %d\n", res);
  373. send_fails++;
  374. }
  375. else {
  376. packets_sent++;
  377. }
  378. usleep(delay_time*1000);
  379. } while (alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN) && stopped == 0);
  380. gettimeofday (&tv2, NULL);
  381. timersub (&tv2, &tv1, &tv_elapsed);
  382. if (!quiet) {
  383. if (machine_readable) {
  384. cpgh_log_printf(CPGH_LOG_RTT, "%d%c %ld%c %ld%c %ld\n", 0, delimiter, min_rtt, delimiter, avg_rtt, delimiter, max_rtt);
  385. }
  386. else {
  387. cpgh_log_printf(CPGH_LOG_PERF, "%s: %5d message%s received, ", group_name.value, g_recv_count, g_recv_count==1?"":"s");
  388. cpgh_log_printf(CPGH_LOG_PERF, "%5d bytes per write. ", write_size);
  389. cpgh_log_printf(CPGH_LOG_RTT, "RTT min/avg/max: %ld/%ld/%ld\n", min_rtt, avg_rtt, max_rtt);
  390. }
  391. }
  392. }
  393. static void sigalrm_handler (int num)
  394. {
  395. alarm_notice = 1;
  396. }
  397. static void sigint_handler (int num)
  398. {
  399. stopped = 1;
  400. }
  401. static void* dispatch_thread (void *arg)
  402. {
  403. cpg_dispatch (handle, CS_DISPATCH_BLOCKING);
  404. return NULL;
  405. }
  406. static void usage(char *cmd)
  407. {
  408. fprintf(stderr, "%s [OPTIONS]\n", cmd);
  409. fprintf(stderr, "\n");
  410. fprintf(stderr, "%s sends CPG messages to all registered users of the CPG.\n", cmd);
  411. fprintf(stderr, "The messages have a sequence number and a CRC so that missing or\n");
  412. fprintf(stderr, "corrupted messages will be detected and reported.\n");
  413. fprintf(stderr, "\n");
  414. fprintf(stderr, "%s can also be asked to simply listen for (and check) packets\n", cmd);
  415. fprintf(stderr, "so that there is another node in the cluster connected to the CPG.\n");
  416. fprintf(stderr, "\n");
  417. fprintf(stderr, "Multiple copies, in different CPGs, can also be run on the same or\n");
  418. fprintf(stderr, "different nodes by using the -n option.\n");
  419. fprintf(stderr, "\n");
  420. fprintf(stderr, "%s can handle more than 1 sender in the same CPG provided they are on\n", cmd);
  421. fprintf(stderr, "different nodes.\n");
  422. fprintf(stderr, "\n");
  423. fprintf(stderr, " -w<num> Write size in Kbytes, default 4\n");
  424. fprintf(stderr, " -W<num> Write size in bytes, default 4096\n");
  425. fprintf(stderr, " -n<name> CPG name to use, default 'cpghum'\n");
  426. fprintf(stderr, " -M Write machine-readable results\n");
  427. fprintf(stderr, " -D<char> Delimiter for machine-readable results (default ',')\n");
  428. fprintf(stderr, " -E Send normal output to stderr instead of stdout\n");
  429. fprintf(stderr, " -d<num> Delay between sending packets (mS), default 1000\n");
  430. fprintf(stderr, " -r<num> Number of repetitions, default 100\n");
  431. fprintf(stderr, " -p<num> Delay between printing output (seconds), default 10s\n");
  432. fprintf(stderr, " -l Listen and check CRCs only, don't send (^C to quit)\n");
  433. fprintf(stderr, " -t Report Round Trip Times for each packet.\n");
  434. fprintf(stderr, " -m<num> cpg_initialise() model. Default 1.\n");
  435. fprintf(stderr, " -s Also send errors to syslog (for daemon log correlation).\n");
  436. fprintf(stderr, " -f Flood test CPG (cpgbench). -W starts at 64 in this case.\n");
  437. fprintf(stderr, " -a Abort on crc/length/sequence error\n");
  438. fprintf(stderr, " -q Quiet. Don't print messages every 10 seconds (see also -p)\n");
  439. fprintf(stderr, " -qq Very quiet. Don't print stats at the end\n");
  440. fprintf(stderr, "\n");
  441. fprintf(stderr, "%s exit code is 0 if no error happened, 1 on generic error and 2 on\n", cmd);
  442. fprintf(stderr, "send/crc/length/sequence error");
  443. fprintf(stderr, "\n");
  444. }
  445. int main (int argc, char *argv[]) {
  446. int i;
  447. unsigned int res;
  448. uint32_t maxsize;
  449. int opt;
  450. int bs;
  451. int write_size = 4096;
  452. int delay_time = 1000;
  453. int repetitions = 100;
  454. int print_time = 10;
  455. int have_size = 0;
  456. int listen_only = 0;
  457. int flood = 0;
  458. int model = 1;
  459. while ( (opt = getopt(argc, argv, "qlstafMEn:d:r:p:m:w:W:D:")) != -1 ) {
  460. switch (opt) {
  461. case 'w': // Write size in K
  462. bs = atoi(optarg);
  463. if (bs > 0) {
  464. write_size = bs*1024;
  465. have_size = 1;
  466. }
  467. break;
  468. case 'W': // Write size in bytes
  469. bs = atoi(optarg);
  470. if (bs > 0) {
  471. write_size = bs;
  472. have_size = 1;
  473. }
  474. break;
  475. case 'n':
  476. if (strlen(optarg) >= CPG_MAX_NAME_LENGTH) {
  477. fprintf(stderr, "CPG name too long\n");
  478. exit(1);
  479. }
  480. strcpy(group_name.value, optarg);
  481. group_name.length = strlen(group_name.value);
  482. break;
  483. case 't':
  484. report_rtt = 1;
  485. break;
  486. case 'E':
  487. to_stderr = 1;
  488. break;
  489. case 'M':
  490. machine_readable = 1;
  491. break;
  492. case 'f':
  493. flood = 1;
  494. break;
  495. case 'a':
  496. abort_on_error = 1;
  497. break;
  498. case 'd':
  499. delay_time = atoi(optarg);
  500. break;
  501. case 'D':
  502. delimiter = optarg[0];
  503. break;
  504. case 'r':
  505. repetitions = atoi(optarg);
  506. break;
  507. case 'p':
  508. print_time = atoi(optarg);
  509. break;
  510. case 'l':
  511. listen_only = 1;
  512. break;
  513. case 's':
  514. do_syslog = 1;
  515. break;
  516. case 'q':
  517. quiet++;
  518. break;
  519. case 'm':
  520. model = atoi(optarg);
  521. if (model < 0 || model > 1) {
  522. fprintf(stderr, "%s: Model must be 0-1\n", argv[0]);
  523. exit(1);
  524. }
  525. break;
  526. case '?':
  527. usage(basename(argv[0]));
  528. exit(1);
  529. }
  530. }
  531. if (!have_size && flood) {
  532. write_size = 64;
  533. }
  534. signal (SIGALRM, sigalrm_handler);
  535. signal (SIGINT, sigint_handler);
  536. switch (model) {
  537. case 0:
  538. res = cpg_initialize (&handle, &callbacks);
  539. break;
  540. case 1:
  541. res = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&model1_data, NULL);
  542. break;
  543. default:
  544. res=999; // can't get here but it keeps the compiler happy
  545. break;
  546. }
  547. if (res != CS_OK) {
  548. cpgh_log_printf(CPGH_LOG_ERR, "cpg_initialize failed with result %d\n", res);
  549. exit (1);
  550. }
  551. cpg_local_get(handle, &g_our_nodeid);
  552. pthread_create (&thread, NULL, dispatch_thread, NULL);
  553. res = cpg_join (handle, &group_name);
  554. if (res != CS_OK) {
  555. cpgh_log_printf(CPGH_LOG_ERR, "cpg_join failed with result %d\n", res);
  556. exit (1);
  557. }
  558. if (listen_only) {
  559. int secs = 0;
  560. while (!stopped) {
  561. sleep(1);
  562. if (++secs > print_time && !quiet) {
  563. int nodes_printed = 0;
  564. if (!machine_readable) {
  565. for (i=1; i<MAX_NODEID; i++) {
  566. if (g_recv_counter[i]) {
  567. cpgh_log_printf(CPGH_LOG_INFO, "%s: %5d message%s of %d bytes received from node %d\n",
  568. group_name.value, g_recv_counter[i] - g_recv_start[i],
  569. g_recv_counter[i]==1?"":"s",
  570. g_recv_size[i], i);
  571. nodes_printed++;
  572. }
  573. }
  574. }
  575. /* Separate list of nodes if more than one */
  576. if (nodes_printed > 1) {
  577. cpgh_log_printf(CPGH_LOG_INFO, "\n");
  578. }
  579. secs = 0;
  580. }
  581. }
  582. }
  583. else {
  584. cpg_max_atomic_msgsize_get (handle, &maxsize);
  585. if (write_size > maxsize) {
  586. fprintf(stderr, "INFO: packet size (%d) is larger than the maximum atomic size (%d), libcpg will fragment\n",
  587. write_size, maxsize);
  588. }
  589. /* The main job starts here */
  590. if (flood) {
  591. for (i = 0; i < 10; i++) { /* number of repetitions - up to 50k */
  592. cpg_flood (handle, write_size);
  593. signal (SIGALRM, sigalrm_handler);
  594. write_size *= 5;
  595. if (write_size >= (ONE_MEG - 100)) {
  596. break;
  597. }
  598. }
  599. }
  600. else {
  601. send_counter = -1; /* So we start from zero to allow listeners to sync */
  602. for (i = 0; i < repetitions && !stopped; i++) {
  603. cpg_test (handle, write_size, delay_time, print_time);
  604. signal (SIGALRM, sigalrm_handler);
  605. }
  606. }
  607. }
  608. res = cpg_finalize (handle);
  609. if (res != CS_OK) {
  610. cpgh_log_printf(CPGH_LOG_ERR, "cpg_finalize failed with result %d\n", res);
  611. exit (1);
  612. }
  613. if (quiet < 2) {
  614. /* Don't print LONG_MAX for min_rtt if we don't have a value */
  615. if (min_rtt == LONG_MAX) {
  616. min_rtt = 0L;
  617. }
  618. if (machine_readable) {
  619. cpgh_log_printf(CPGH_LOG_STATS, "%d%c %d%c %d%c %d%c %d%c %d%c %d%c %ld%c %ld%c %ld\n",
  620. packets_sent, delimiter,
  621. send_fails, delimiter,
  622. send_retries, delimiter,
  623. length_errors, delimiter,
  624. packets_recvd, delimiter,
  625. sequence_errors, delimiter,
  626. crc_errors, delimiter,
  627. min_rtt, delimiter,
  628. max_rtt, delimiter,
  629. avg_rtt);
  630. }
  631. else {
  632. cpgh_log_printf(CPGH_LOG_STATS, "\n");
  633. cpgh_log_printf(CPGH_LOG_STATS, "Stats:\n");
  634. if (!listen_only) {
  635. cpgh_log_printf(CPGH_LOG_STATS, " packets sent: %d\n", packets_sent);
  636. cpgh_log_printf(CPGH_LOG_STATS, " send failures: %d\n", send_fails);
  637. cpgh_log_printf(CPGH_LOG_STATS, " send retries: %d\n", send_retries);
  638. }
  639. cpgh_log_printf(CPGH_LOG_STATS, " length errors: %d\n", length_errors);
  640. cpgh_log_printf(CPGH_LOG_STATS, " packets recvd: %d\n", packets_recvd);
  641. cpgh_log_printf(CPGH_LOG_STATS, " sequence errors: %d\n", sequence_errors);
  642. cpgh_log_printf(CPGH_LOG_STATS, " crc errors: %d\n", crc_errors);
  643. if (!listen_only) {
  644. cpgh_log_printf(CPGH_LOG_STATS, " min RTT: %ld\n", min_rtt);
  645. cpgh_log_printf(CPGH_LOG_STATS, " max RTT: %ld\n", max_rtt);
  646. cpgh_log_printf(CPGH_LOG_STATS, " avg RTT: %ld\n", avg_rtt);
  647. }
  648. cpgh_log_printf(CPGH_LOG_STATS, "\n");
  649. }
  650. }
  651. res = 0;
  652. if (send_fails > 0 || (have_size && length_errors > 0) || sequence_errors > 0 || crc_errors > 0) {
  653. res = 2;
  654. }
  655. return (res);
  656. }