4
0

cpghum.c 25 KB


  1. /*
  2. * Copyright (c) 2015-2017 Red Hat, Inc.
  3. *
  4. * All rights reserved.
  5. *
  6. * Author: Christine Caulfield <ccaulfie@redhat.com>
  7. *
  8. * This software licensed under BSD license, the text of which follows:
  9. *
  10. * Redistribution and use in source and binary forms, with or without
  11. * modification, are permitted provided that the following conditions are met:
  12. *
  13. * - Redistributions of source code must retain the above copyright notice,
  14. * this list of conditions and the following disclaimer.
  15. * - Redistributions in binary form must reproduce the above copyright notice,
  16. * this list of conditions and the following disclaimer in the documentation
  17. * and/or other materials provided with the distribution.
  18. * - Neither the name of the MontaVista Software, Inc. nor the names of its
  19. * contributors may be used to endorse or promote products derived from this
  20. * software without specific prior written permission.
  21. *
  22. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  23. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  24. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  25. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  26. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  27. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  28. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  29. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  30. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  31. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  32. * THE POSSIBILITY OF SUCH DAMAGE.
  33. */
  34. #include <stdio.h>
  35. #include <stdlib.h>
  36. #include <string.h>
  37. #include <signal.h>
  38. #include <unistd.h>
  39. #include <assert.h>
  40. #include <errno.h>
  41. #include <time.h>
  42. #include <limits.h>
  43. #include <ctype.h>
  44. #include <syslog.h>
  45. #include <stdarg.h>
  46. #include <inttypes.h>
  47. #include <sys/time.h>
  48. #include <sys/types.h>
  49. #include <sys/socket.h>
  50. #include <sys/select.h>
  51. #include <sys/uio.h>
  52. #include <sys/un.h>
  53. #include <netinet/in.h>
  54. #include <arpa/inet.h>
  55. #include <pthread.h>
  56. #include <zlib.h>
  57. #include <libgen.h>
  58. #include <getopt.h>
  59. #include <corosync/corotypes.h>
  60. #include <corosync/cpg.h>
  61. static cpg_handle_t handle;
  62. static pthread_t thread;
  63. #ifndef timersub
  64. #define timersub(a, b, result) \
  65. do { \
  66. (result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \
  67. (result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \
  68. if ((result)->tv_usec < 0) { \
  69. --(result)->tv_sec; \
  70. (result)->tv_usec += 1000000; \
  71. } \
  72. } while (0)
  73. #endif /* timersub */
  74. static int alarm_notice;
  75. #define MAX_NODEID 65536
  76. #define ONE_MEG 1048576
  77. #define DATASIZE (ONE_MEG*20)
  78. static char data[DATASIZE];
  79. static int send_counter = 0;
  80. static int do_syslog = 0;
  81. static int quiet = 0;
  82. static int report_rtt = 0;
  83. static int abort_on_error = 0;
  84. static int machine_readable = 0;
  85. static char delimiter = ',';
  86. static int to_stderr = 0;
  87. static unsigned int g_our_nodeid;
  88. static volatile int stopped;
  89. static unsigned int flood_start = 64;
  90. static unsigned int flood_multiplier = 5;
  91. static unsigned long flood_max = (ONE_MEG - 100);
  92. // stats
  93. static unsigned int length_errors=0;
  94. static unsigned int crc_errors=0;
  95. static unsigned int sequence_errors=0;
  96. static unsigned int packets_sent=0;
  97. static unsigned int packets_recvd=0;
  98. static unsigned int packets_recvd1=0; /* For flood intermediates */
  99. static unsigned int send_retries=0;
  100. static unsigned int send_fails=0;
  101. static unsigned long avg_rtt=0;
  102. static unsigned long max_rtt=0;
  103. static unsigned long min_rtt=LONG_MAX;
  104. static unsigned long interim_avg_rtt=0;
  105. static unsigned long interim_max_rtt=0;
  106. static unsigned long interim_min_rtt=LONG_MAX;
  107. struct cpghum_header {
  108. unsigned int counter;
  109. unsigned int crc;
  110. unsigned int size;
  111. struct timeval timestamp;
  112. };
  113. static void cpg_bm_confchg_fn (
  114. cpg_handle_t handle_in,
  115. const struct cpg_name *group_name,
  116. const struct cpg_address *member_list, size_t member_list_entries,
  117. const struct cpg_address *left_list, size_t left_list_entries,
  118. const struct cpg_address *joined_list, size_t joined_list_entries)
  119. {
  120. }
  121. static unsigned int g_recv_count;
  122. static unsigned int g_recv_length;
  123. static int g_recv_start[MAX_NODEID+1];
  124. static int g_recv_counter[MAX_NODEID+1];
  125. static int g_recv_size[MAX_NODEID+1];
  126. static int g_log_mask = 0xFFFF;
  127. typedef enum
  128. {
  129. CPGH_LOG_INFO = 1,
  130. CPGH_LOG_PERF = 2,
  131. CPGH_LOG_RTT = 4,
  132. CPGH_LOG_STATS = 8,
  133. CPGH_LOG_ERR = 16
  134. } log_type_t;
  135. static void cpgh_print_message(int syslog_level, const char *facility_name, const char *format, va_list ap)
  136. __attribute__((format(printf, 3, 0)));
  137. static void cpgh_log_printf(log_type_t type, const char *format, ...)
  138. __attribute__((format(printf, 2, 3)));
  139. static void cpgh_print_message(int syslog_level, const char *facility_name, const char *format, va_list ap)
  140. {
  141. char msg[1024];
  142. int start = 0;
  143. if (machine_readable) {
  144. snprintf(msg, sizeof(msg), "%s%c", facility_name, delimiter);
  145. start = strlen(msg);
  146. }
  147. assert(vsnprintf(msg+start, sizeof(msg)-start, format, ap) < sizeof(msg)-start);
  148. if (to_stderr || (syslog_level <= LOG_ERR)) {
  149. fprintf(stderr, "%s", msg);
  150. }
  151. else {
  152. printf("%s", msg);
  153. }
  154. if (do_syslog) {
  155. syslog(syslog_level, "%s", msg);
  156. }
  157. }
  158. static void cpgh_log_printf(log_type_t type, const char *format, ...)
  159. {
  160. va_list ap;
  161. if (!(type & g_log_mask)) {
  162. return;
  163. }
  164. va_start(ap, format);
  165. switch (type) {
  166. case CPGH_LOG_INFO:
  167. cpgh_print_message(LOG_INFO, "[Info]", format, ap);
  168. break;
  169. case CPGH_LOG_PERF:
  170. cpgh_print_message(LOG_INFO, "[Perf]", format, ap);
  171. break;
  172. case CPGH_LOG_RTT:
  173. cpgh_print_message(LOG_INFO, "[RTT]", format, ap);
  174. break;
  175. case CPGH_LOG_STATS:
  176. cpgh_print_message(LOG_INFO, "[Stats]", format, ap);
  177. break;
  178. case CPGH_LOG_ERR:
  179. cpgh_print_message(LOG_ERR, "[Err]", format, ap);
  180. break;
  181. default:
  182. break;
  183. }
  184. va_end(ap);
  185. }
  186. static unsigned long update_rtt(struct timeval *header_timestamp, int packet_count,
  187. unsigned long *rtt_min, unsigned long *rtt_avg, unsigned long *rtt_max)
  188. {
  189. struct timeval tv1;
  190. struct timeval rtt;
  191. unsigned long rtt_usecs;
  192. gettimeofday (&tv1, NULL);
  193. timersub(&tv1, header_timestamp, &rtt);
  194. rtt_usecs = rtt.tv_usec + rtt.tv_sec*1000000;
  195. if (rtt_usecs > *rtt_max) {
  196. *rtt_max = rtt_usecs;
  197. }
  198. if (rtt_usecs < *rtt_min) {
  199. *rtt_min = rtt_usecs;
  200. }
  201. /* Don't start the average with 0 */
  202. if (*rtt_avg == 0) {
  203. *rtt_avg = rtt_usecs;
  204. }
  205. else {
  206. *rtt_avg = ((*rtt_avg * packet_count) + rtt_usecs) / (packet_count+1);
  207. }
  208. return rtt_usecs;
  209. }
  210. static void cpg_bm_deliver_fn (
  211. cpg_handle_t handle_in,
  212. const struct cpg_name *group_name,
  213. uint32_t nodeid,
  214. uint32_t pid,
  215. void *msg,
  216. size_t msg_len)
  217. {
  218. uLong crc=0;
  219. struct cpghum_header *header = (struct cpghum_header *)msg;
  220. uLong recv_crc = header->crc & 0xFFFFFFFF;
  221. unsigned int *dataint = (unsigned int *)((char*)msg + sizeof(struct cpghum_header));
  222. unsigned int datalen;
  223. if (nodeid > MAX_NODEID) {
  224. cpgh_log_printf(CPGH_LOG_ERR, "Got message from invalid nodeid " CS_PRI_NODE_ID " (too high for us). Quitting\n", nodeid);
  225. exit(1);
  226. }
  227. packets_recvd++;
  228. packets_recvd1++;
  229. g_recv_length = msg_len;
  230. datalen = header->size - sizeof(struct cpghum_header);
  231. // Report RTT first in case abort_on_error is set
  232. if (nodeid == g_our_nodeid) {
  233. unsigned long rtt_usecs;
  234. // For flood
  235. update_rtt(&header->timestamp, packets_recvd1, &interim_min_rtt, &interim_avg_rtt, &interim_max_rtt);
  236. rtt_usecs = update_rtt(&header->timestamp, g_recv_counter[nodeid], &min_rtt, &avg_rtt, &max_rtt);
  237. if (report_rtt) {
  238. if (machine_readable) {
  239. cpgh_log_printf(CPGH_LOG_RTT, "%ld%c%ld%c%ld%c%ld\n", rtt_usecs, delimiter, min_rtt, delimiter, avg_rtt, delimiter, max_rtt);
  240. }
  241. else {
  242. cpgh_log_printf(CPGH_LOG_RTT, "%s: RTT %ld uS (min/avg/max): %ld/%ld/%ld\n", group_name->value, rtt_usecs, min_rtt, avg_rtt, max_rtt);
  243. }
  244. }
  245. }
  246. // Basic check, packets should all be the right size
  247. if (msg_len != header->size) {
  248. length_errors++;
  249. cpgh_log_printf(CPGH_LOG_ERR, "%s: message sizes don't match. got %zu, expected %u from node " CS_PRI_NODE_ID "\n", group_name->value, msg_len, header->size, nodeid);
  250. if (abort_on_error) {
  251. exit(2);
  252. }
  253. }
  254. g_recv_size[nodeid] = msg_len;
  255. // Sequence counters are incrementing in step?
  256. if (header->counter != g_recv_counter[nodeid]) {
  257. /* Don't report the first mismatch or a newly restarted sender, we're just catching up */
  258. if (g_recv_counter[nodeid] && header->counter) {
  259. sequence_errors++;
  260. cpgh_log_printf(CPGH_LOG_ERR, "%s: counters don't match. got %d, expected %d from node " CS_PRI_NODE_ID "\n", group_name->value, header->counter, g_recv_counter[nodeid], nodeid);
  261. if (abort_on_error) {
  262. exit(2);
  263. }
  264. }
  265. else {
  266. g_recv_start[nodeid] = header->counter;
  267. }
  268. /* Catch up or we'll be printing errors for ever */
  269. g_recv_counter[nodeid] = header->counter+1;
  270. }
  271. else {
  272. g_recv_counter[nodeid]++;
  273. }
  274. /* Check crc */
  275. crc = crc32(0, NULL, 0);
  276. crc = crc32(crc, (Bytef *)dataint, datalen) & 0xFFFFFFFF;
  277. if (crc != recv_crc) {
  278. crc_errors++;
  279. cpgh_log_printf(CPGH_LOG_ERR, "%s: CRCs don't match. got %lx, expected %lx from nodeid " CS_PRI_NODE_ID "\n", group_name->value, recv_crc, crc, nodeid);
  280. if (abort_on_error) {
  281. exit(2);
  282. }
  283. }
  284. g_recv_count++;
  285. }
  286. static cpg_model_v1_data_t model1_data = {
  287. .cpg_deliver_fn = cpg_bm_deliver_fn,
  288. .cpg_confchg_fn = cpg_bm_confchg_fn,
  289. };
  290. static cpg_callbacks_t callbacks = {
  291. .cpg_deliver_fn = cpg_bm_deliver_fn,
  292. .cpg_confchg_fn = cpg_bm_confchg_fn
  293. };
  294. static struct cpg_name group_name = {
  295. .value = "cpghum",
  296. .length = 7
  297. };
  298. static void set_packet(int write_size, int counter)
  299. {
  300. struct cpghum_header *header = (struct cpghum_header *)data;
  301. int i;
  302. unsigned int *dataint = (unsigned int *)(data + sizeof(struct cpghum_header));
  303. unsigned int datalen = write_size - sizeof(struct cpghum_header);
  304. struct timeval tv1;
  305. uLong crc;
  306. header->counter = counter;
  307. for (i=0; i<(datalen/4); i++) {
  308. // coverity[DC.WEAK_CRYPTO:SUPPRESS] rand is not used in a security context
  309. dataint[i] = rand();
  310. }
  311. crc = crc32(0, NULL, 0);
  312. header->crc = crc32(crc, (Bytef*)&dataint[0], datalen);
  313. header->size = write_size;
  314. gettimeofday (&tv1, NULL);
  315. memcpy(&header->timestamp, &tv1, sizeof(struct timeval));
  316. }
  317. /* Basically this is cpgbench.c */
  318. static void cpg_flood (
  319. cpg_handle_t handle_in,
  320. int write_size)
  321. {
  322. struct timeval tv1, tv2, tv_elapsed;
  323. struct iovec iov;
  324. unsigned int res = CS_OK;
  325. alarm_notice = 0;
  326. iov.iov_base = data;
  327. iov.iov_len = write_size;
  328. alarm (10);
  329. packets_recvd1 = 0;
  330. interim_avg_rtt = 0;
  331. interim_max_rtt = 0;
  332. interim_min_rtt = LONG_MAX;
  333. gettimeofday (&tv1, NULL);
  334. do {
  335. if (res == CS_OK) {
  336. set_packet(write_size, send_counter);
  337. }
  338. res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1);
  339. if (res == CS_OK) {
  340. /* Only increment the packet counter if it was sucessfully sent */
  341. packets_sent++;
  342. send_counter++;
  343. }
  344. else {
  345. if (res == CS_ERR_TRY_AGAIN) {
  346. send_retries++;
  347. }
  348. else {
  349. send_fails++;
  350. }
  351. }
  352. } while (!stopped && alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN));
  353. gettimeofday (&tv2, NULL);
  354. timersub (&tv2, &tv1, &tv_elapsed);
  355. if (!quiet) {
  356. if (machine_readable) {
  357. cpgh_log_printf (CPGH_LOG_PERF, "%d%c%d%c%f%c%f%c%f%c%ld%c%ld%c%ld\n", packets_recvd1, delimiter, write_size, delimiter,
  358. (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)), delimiter,
  359. ((float)packets_recvd1) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)), delimiter,
  360. ((float)packets_recvd1) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0), delimiter,
  361. interim_min_rtt, delimiter, interim_avg_rtt, delimiter, interim_max_rtt);
  362. }
  363. else {
  364. cpgh_log_printf (CPGH_LOG_PERF, "%5d messages received ", packets_recvd1);
  365. cpgh_log_printf (CPGH_LOG_PERF, "%5d bytes per write ", write_size);
  366. cpgh_log_printf (CPGH_LOG_PERF, "%7.3f Seconds runtime ",
  367. (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
  368. cpgh_log_printf (CPGH_LOG_PERF, "%9.3f TP/s ",
  369. ((float)packets_recvd1) / (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
  370. cpgh_log_printf (CPGH_LOG_PERF, "%7.3f MB/s ",
  371. ((float)packets_recvd1) * ((float)write_size) / ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
  372. cpgh_log_printf (CPGH_LOG_PERF, "RTT for this size (min/avg/max) %ld/%ld/%ld\n",
  373. interim_min_rtt, interim_avg_rtt, interim_max_rtt);
  374. }
  375. }
  376. }
  377. static int cpg_test (
  378. cpg_handle_t handle_in,
  379. int write_size,
  380. int delay_time,
  381. int print_time)
  382. {
  383. struct iovec iov;
  384. unsigned int res;
  385. alarm_notice = 0;
  386. iov.iov_base = data;
  387. iov.iov_len = write_size;
  388. g_recv_count = 0;
  389. alarm (print_time);
  390. do {
  391. send_counter++;
  392. resend:
  393. set_packet(write_size, send_counter);
  394. res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1);
  395. if (res == CS_ERR_TRY_AGAIN) {
  396. usleep(10000);
  397. send_retries++;
  398. goto resend;
  399. }
  400. if (res == CS_ERR_LIBRARY) {
  401. send_counter--;
  402. return -1;
  403. }
  404. if (res != CS_OK) {
  405. cpgh_log_printf(CPGH_LOG_ERR, "send failed: %d\n", res);
  406. send_fails++;
  407. }
  408. else {
  409. packets_sent++;
  410. }
  411. usleep(delay_time*1000);
  412. } while (alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN) && stopped == 0);
  413. if (!quiet) {
  414. if (machine_readable) {
  415. cpgh_log_printf(CPGH_LOG_RTT, "%d%c%ld%c%ld%c%ld\n", 0, delimiter, min_rtt, delimiter, avg_rtt, delimiter, max_rtt);
  416. }
  417. else {
  418. cpgh_log_printf(CPGH_LOG_PERF, "%s: %5d message%s received, ", group_name.value, g_recv_count, g_recv_count==1?"":"s");
  419. cpgh_log_printf(CPGH_LOG_PERF, "%5d bytes per write. ", write_size);
  420. cpgh_log_printf(CPGH_LOG_RTT, "RTT min/avg/max: %ld/%ld/%ld\n", min_rtt, avg_rtt, max_rtt);
  421. }
  422. }
  423. return 0;
  424. }
  425. static void sigalrm_handler (int num)
  426. {
  427. alarm_notice = 1;
  428. }
  429. static void sigint_handler (int num)
  430. {
  431. stopped = 1;
  432. }
  433. static void* dispatch_thread (void *arg)
  434. {
  435. cpg_dispatch (handle, CS_DISPATCH_BLOCKING);
  436. return NULL;
  437. }
  438. static void usage(char *cmd)
  439. {
  440. fprintf(stderr, "%s [OPTIONS]\n", cmd);
  441. fprintf(stderr, "\n");
  442. fprintf(stderr, "%s sends CPG messages to all registered users of the CPG.\n", cmd);
  443. fprintf(stderr, "The messages have a sequence number and a CRC so that missing or\n");
  444. fprintf(stderr, "corrupted messages will be detected and reported.\n");
  445. fprintf(stderr, "\n");
  446. fprintf(stderr, "%s can also be asked to simply listen for (and check) packets\n", cmd);
  447. fprintf(stderr, "so that there is another node in the cluster connected to the CPG.\n");
  448. fprintf(stderr, "\n");
  449. fprintf(stderr, "Multiple copies, in different CPGs, can also be run on the same or\n");
  450. fprintf(stderr, "different nodes by using the -n option.\n");
  451. fprintf(stderr, "\n");
  452. fprintf(stderr, "%s can handle more than 1 sender in the same CPG provided they are on\n", cmd);
  453. fprintf(stderr, "different nodes.\n");
  454. fprintf(stderr, "\n");
  455. fprintf(stderr, " -w<num>, --size-bytes Write size in Kbytes, default 4\n");
  456. fprintf(stderr, " -W<num>, --size-kb Write size in bytes, default 4096\n");
  457. fprintf(stderr, " -n<name>, --name CPG name to use, default 'cpghum'\n");
  458. fprintf(stderr, " -M Write machine-readable results\n");
  459. fprintf(stderr, " -D<char> Delimiter for machine-readable results (default ',')\n");
  460. fprintf(stderr, " -E Send normal output to stderr instead of stdout\n");
  461. fprintf(stderr, " -d<num>, --delay Delay between sending packets (mS), default 1000\n");
  462. fprintf(stderr, " -r<num> Number of repetitions, default 100\n");
  463. fprintf(stderr, " -p<num> Delay between printing output (seconds), default 10s\n");
  464. fprintf(stderr, " -l, --listen Listen and check CRCs only, don't send (^C to quit)\n");
  465. fprintf(stderr, " -t, --rtt Report Round Trip Times for each packet.\n");
  466. fprintf(stderr, " -m<num> cpg_initialise() model. Default 1.\n");
  467. fprintf(stderr, " -s Also send errors to syslog.\n");
  468. fprintf(stderr, " -f, --flood Flood test CPG (cpgbench). see --flood-* long options\n");
  469. fprintf(stderr, " -a Abort on crc/length/sequence error\n");
  470. fprintf(stderr, " -q, --quiet Quiet. Don't print messages every 10s (see also -p)\n");
  471. fprintf(stderr, " -qq Very quiet. Don't print stats at the end\n");
  472. fprintf(stderr, " --flood-start=bytes Start value for --flood\n");
  473. fprintf(stderr, " --flood-mult=value Packet size multiplier value for --flood\n");
  474. fprintf(stderr, " --flood-max=bytes Maximum packet size for --flood\n");
  475. fprintf(stderr, "\n");
  476. fprintf(stderr, " values for --flood* and -W can have K or M suffixes to indicate\n");
  477. fprintf(stderr, " Kilobytes or Megabytes\n");
  478. fprintf(stderr, "\n");
  479. fprintf(stderr, "%s exit code is 0 if no error happened, 1 on generic error and 2 on\n", cmd);
  480. fprintf(stderr, "send/crc/length/sequence error");
  481. fprintf(stderr, "\n");
  482. }
  483. /* Parse a size, optionally ending in 'K', 'M' */
  484. static long parse_bytes(const char *valstring)
  485. {
  486. unsigned int value;
  487. int multiplier = 1;
  488. char suffix = '\0';
  489. int have_suffix = 0;
  490. /* Suffix is optional */
  491. if (sscanf(valstring, "%u%c", &value, &suffix) == 0) {
  492. return 0;
  493. }
  494. if (toupper(suffix) == 'M') {
  495. multiplier = 1024*1024;
  496. have_suffix = 1;
  497. }
  498. if (toupper(suffix) == 'K') {
  499. multiplier = 1024;
  500. have_suffix = 1;
  501. }
  502. if (!have_suffix && suffix != '\0') {
  503. fprintf(stderr, "Invalid suffix '%c', only K or M supported\n", suffix);
  504. return 0;
  505. }
  506. return value * multiplier;
  507. }
  508. static int connect_and_join(int model, int verbose)
  509. {
  510. int res;
  511. switch (model) {
  512. case 0:
  513. res = cpg_initialize (&handle, &callbacks);
  514. break;
  515. case 1:
  516. res = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&model1_data, NULL);
  517. break;
  518. default:
  519. res=999; // can't get here but it keeps the compiler happy
  520. break;
  521. }
  522. if (res != CS_OK) {
  523. if (verbose) {
  524. cpgh_log_printf(CPGH_LOG_ERR, "cpg_initialize failed with result %d\n", res);
  525. }
  526. return -1;
  527. }
  528. res = cpg_join (handle, &group_name);
  529. if (res != CS_OK) {
  530. if (verbose) {
  531. cpgh_log_printf(CPGH_LOG_ERR, "cpg_join failed with result %d\n", res);
  532. }
  533. cpg_finalize(handle);
  534. return -1;
  535. }
  536. pthread_create (&thread, NULL, dispatch_thread, NULL);
  537. return CS_OK;
  538. }
  539. int main (int argc, char *argv[]) {
  540. int i;
  541. unsigned int res;
  542. uint32_t maxsize;
  543. int opt;
  544. int bs;
  545. int write_size = 4096;
  546. int delay_time = 1000;
  547. int repetitions = 100;
  548. int print_time = 10;
  549. int have_size = 0;
  550. int listen_only = 0;
  551. int flood = 0;
  552. int model = 1;
  553. int option_index = 0;
  554. struct option long_options[] = {
  555. {"flood-start", required_argument, 0, 0 },
  556. {"flood-mult", required_argument, 0, 0 },
  557. {"flood-max", required_argument, 0, 0 },
  558. {"size-kb", required_argument, 0, 'w' },
  559. {"size-bytes", required_argument, 0, 'W' },
  560. {"name", required_argument, 0, 'n' },
  561. {"rtt", no_argument, 0, 't' },
  562. {"flood", no_argument, 0, 'f' },
  563. {"quiet", no_argument, 0, 'q' },
  564. {"listen", no_argument, 0, 'l' },
  565. {"help", no_argument, 0, '?' },
  566. {0, 0, 0, 0 }
  567. };
  568. while ( (opt = getopt_long(argc, argv, "qlstafMEn:d:r:p:m:w:W:D:",
  569. long_options, &option_index)) != -1 ) {
  570. switch (opt) {
  571. case 0: // Long-only options
  572. if (strcmp(long_options[option_index].name, "flood-start") == 0) {
  573. flood_start = parse_bytes(optarg);
  574. if (flood_start == 0) {
  575. fprintf(stderr, "flood-start value invalid\n");
  576. exit(1);
  577. }
  578. }
  579. if (strcmp(long_options[option_index].name, "flood-mult") == 0) {
  580. flood_multiplier = parse_bytes(optarg);
  581. if (flood_multiplier == 0) {
  582. fprintf(stderr, "flood-mult value invalid\n");
  583. exit(1);
  584. }
  585. }
  586. if (strcmp(long_options[option_index].name, "flood-max") == 0) {
  587. flood_max = parse_bytes(optarg);
  588. if (flood_max == 0) {
  589. fprintf(stderr, "flood-max value invalid\n");
  590. exit(1);
  591. }
  592. }
  593. break;
  594. case 'w': // Write size in K
  595. bs = atoi(optarg);
  596. if (bs > 0) {
  597. write_size = bs*1024;
  598. have_size = 1;
  599. }
  600. break;
  601. case 'W': // Write size in bytes (or with a suffix)
  602. bs = parse_bytes(optarg);
  603. if (bs > 0) {
  604. write_size = bs;
  605. have_size = 1;
  606. }
  607. break;
  608. case 'n':
  609. if (strlen(optarg) >= CPG_MAX_NAME_LENGTH) {
  610. fprintf(stderr, "CPG name too long\n");
  611. exit(1);
  612. }
  613. strcpy(group_name.value, optarg);
  614. group_name.length = strlen(group_name.value);
  615. break;
  616. case 't':
  617. report_rtt = 1;
  618. break;
  619. case 'E':
  620. to_stderr = 1;
  621. break;
  622. case 'M':
  623. machine_readable = 1;
  624. break;
  625. case 'f':
  626. flood = 1;
  627. break;
  628. case 'a':
  629. abort_on_error = 1;
  630. break;
  631. case 'd':
  632. delay_time = atoi(optarg);
  633. break;
  634. case 'D':
  635. delimiter = optarg[0];
  636. break;
  637. case 'r':
  638. repetitions = atoi(optarg);
  639. break;
  640. case 'p':
  641. print_time = atoi(optarg);
  642. break;
  643. case 'l':
  644. listen_only = 1;
  645. break;
  646. case 's':
  647. do_syslog = 1;
  648. break;
  649. case 'q':
  650. quiet++;
  651. break;
  652. case 'm':
  653. model = atoi(optarg);
  654. if (model < 0 || model > 1) {
  655. fprintf(stderr, "%s: Model must be 0-1\n", argv[0]);
  656. exit(1);
  657. }
  658. break;
  659. case '?':
  660. usage(basename(argv[0]));
  661. exit(1);
  662. }
  663. }
  664. if (!have_size && flood) {
  665. write_size = flood_start;
  666. }
  667. signal (SIGALRM, sigalrm_handler);
  668. signal (SIGINT, sigint_handler);
  669. if (connect_and_join(model, 1) != CS_OK) {
  670. exit(1);
  671. }
  672. res = cpg_local_get(handle, &g_our_nodeid);
  673. if (res != CS_OK) {
  674. cpgh_log_printf(CPGH_LOG_ERR, "cpg_local_get failed with result %d\n", res);
  675. exit (1);
  676. }
  677. if (listen_only) {
  678. int secs = 0;
  679. while (!stopped) {
  680. sleep(1);
  681. if (++secs > print_time && !quiet) {
  682. int nodes_printed = 0;
  683. if (!machine_readable) {
  684. for (i=1; i<MAX_NODEID; i++) {
  685. if (g_recv_counter[i]) {
  686. cpgh_log_printf(CPGH_LOG_INFO, "%s: %5d message%s of %d bytes received from node " CS_PRI_NODE_ID "\n",
  687. group_name.value, g_recv_counter[i] - g_recv_start[i],
  688. g_recv_counter[i]==1?"":"s",
  689. g_recv_size[i], i);
  690. nodes_printed++;
  691. }
  692. }
  693. }
  694. /* Separate list of nodes if more than one */
  695. if (nodes_printed > 1) {
  696. cpgh_log_printf(CPGH_LOG_INFO, "\n");
  697. }
  698. secs = 0;
  699. }
  700. }
  701. }
  702. else {
  703. cpg_max_atomic_msgsize_get (handle, &maxsize);
  704. if (write_size > maxsize) {
  705. fprintf(stderr, "INFO: packet size (%d) is larger than the maximum atomic size (%d), libcpg will fragment\n",
  706. write_size, maxsize);
  707. }
  708. /* The main job starts here */
  709. if (flood) {
  710. for (i = 0; i < 10; i++) { /* number of repetitions - up to 50k */
  711. cpg_flood (handle, write_size);
  712. signal (SIGALRM, sigalrm_handler);
  713. write_size *= flood_multiplier;
  714. if (write_size > flood_max) {
  715. break;
  716. }
  717. }
  718. }
  719. else {
  720. send_counter = -1; /* So we start from zero to allow listeners to sync */
  721. for (i = 0; i < repetitions && !stopped; i++) {
  722. if (cpg_test (handle, write_size, delay_time, print_time) == -1) {
  723. /* Try to reconnect when corosync stops */
  724. res = -1;
  725. cpg_finalize(handle);
  726. pthread_cancel(thread);
  727. signal (SIGINT, SIG_DFL);
  728. printf("Reconnecting...");
  729. fflush(stdout);
  730. while (res != CS_OK) {
  731. sleep(1);
  732. printf(".");
  733. fflush(stdout);
  734. res = connect_and_join(model, 0);
  735. }
  736. printf("done\n");
  737. signal (SIGINT, sigint_handler);
  738. }
  739. signal (SIGALRM, sigalrm_handler);
  740. }
  741. }
  742. }
  743. res = cpg_finalize (handle);
  744. if (res != CS_OK) {
  745. cpgh_log_printf(CPGH_LOG_ERR, "cpg_finalize failed with result %d\n", res);
  746. exit (1);
  747. }
  748. if (quiet < 2) {
  749. /* Don't print LONG_MAX for min_rtt if we don't have a value */
  750. if (min_rtt == LONG_MAX) {
  751. min_rtt = 0L;
  752. }
  753. if (machine_readable) {
  754. cpgh_log_printf(CPGH_LOG_STATS, "%d%c%d%c%d%c%d%c%d%c%d%c%d%c%ld%c%ld%c%ld\n",
  755. packets_sent, delimiter,
  756. send_fails, delimiter,
  757. send_retries, delimiter,
  758. length_errors, delimiter,
  759. packets_recvd, delimiter,
  760. sequence_errors, delimiter,
  761. crc_errors, delimiter,
  762. min_rtt, delimiter,
  763. avg_rtt, delimiter,
  764. max_rtt);
  765. }
  766. else {
  767. cpgh_log_printf(CPGH_LOG_STATS, "\n");
  768. cpgh_log_printf(CPGH_LOG_STATS, "Stats:\n");
  769. if (!listen_only) {
  770. cpgh_log_printf(CPGH_LOG_STATS, " packets sent: %d\n", packets_sent);
  771. cpgh_log_printf(CPGH_LOG_STATS, " send failures: %d\n", send_fails);
  772. cpgh_log_printf(CPGH_LOG_STATS, " send retries: %d\n", send_retries);
  773. }
  774. cpgh_log_printf(CPGH_LOG_STATS, " length errors: %d\n", length_errors);
  775. cpgh_log_printf(CPGH_LOG_STATS, " packets recvd: %d\n", packets_recvd);
  776. cpgh_log_printf(CPGH_LOG_STATS, " sequence errors: %d\n", sequence_errors);
  777. cpgh_log_printf(CPGH_LOG_STATS, " crc errors: %d\n", crc_errors);
  778. if (!listen_only) {
  779. cpgh_log_printf(CPGH_LOG_STATS, " min RTT: %ld\n", min_rtt);
  780. cpgh_log_printf(CPGH_LOG_STATS, " max RTT: %ld\n", max_rtt);
  781. cpgh_log_printf(CPGH_LOG_STATS, " avg RTT: %ld\n", avg_rtt);
  782. }
  783. cpgh_log_printf(CPGH_LOG_STATS, "\n");
  784. }
  785. }
  786. res = 0;
  787. if (send_fails > 0 || (have_size && length_errors > 0) || sequence_errors > 0 || crc_errors > 0) {
  788. res = 2;
  789. }
  790. return (res);
  791. }