cpg_test_agent.c 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805
  1. /*
  2. * Copyright (c) 2010 Red Hat, Inc.
  3. *
  4. * All rights reserved.
  5. *
  6. * Author: Angus Salkeld (asalkeld@redhat.com)
  7. *
  8. * This software licensed under BSD license, the text of which follows:
  9. *
  10. * Redistribution and use in source and binary forms, with or without
  11. * modification, are permitted provided that the following conditions are met:
  12. *
  13. * - Redistributions of source code must retain the above copyright notice,
  14. * this list of conditions and the following disclaimer.
  15. * - Redistributions in binary form must reproduce the above copyright notice,
  16. * this list of conditions and the following disclaimer in the documentation
  17. * and/or other materials provided with the distribution.
  18. * - Neither the name of the MontaVista Software, Inc. nor the names of its
  19. * contributors may be used to endorse or promote products derived from this
  20. * software without specific prior written permission.
  21. *
  22. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  23. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  24. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  25. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  26. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  27. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  28. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  29. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  30. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  31. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  32. * THE POSSIBILITY OF SUCH DAMAGE.
  33. */
  34. #include <errno.h>
  35. #include <unistd.h>
  36. #include <stdio.h>
  37. #include <stdlib.h>
  38. #include <assert.h>
  39. #include <string.h>
  40. #include <sys/types.h>
  41. #include <sys/socket.h>
  42. #include <sys/uio.h>
  43. #include <netinet/in.h>
  44. #include <arpa/inet.h>
  45. #include <netdb.h>
  46. #include <poll.h>
  47. #include <unistd.h>
  48. #include <fcntl.h>
  49. #include <qb/qblist.h>
  50. #include <qb/qbdefs.h>
  51. #include <qb/qbutil.h>
  52. #include <qb/qbloop.h>
  53. #include <qb/qblog.h>
  54. #include <corosync/cpg.h>
  55. #include <corosync/cfg.h>
  56. #include "common_test_agent.h"
  57. #include <nss.h>
  58. #include <pk11pub.h>
  59. typedef enum {
  60. MSG_OK,
  61. MSG_NODEID_ERR,
  62. MSG_PID_ERR,
  63. MSG_SEQ_ERR,
  64. MSG_SIZE_ERR,
  65. MSG_SHA1_ERR,
  66. } msg_status_t;
  67. typedef struct {
  68. uint32_t nodeid;
  69. pid_t pid;
  70. unsigned char sha1[20];
  71. uint32_t seq;
  72. size_t size;
  73. unsigned char payload[0];
  74. } msg_t;
  75. #define LOG_STR_SIZE 80
  76. typedef struct {
  77. char log[LOG_STR_SIZE];
  78. struct qb_list_head list;
  79. } log_entry_t;
  80. static char big_and_buf[HOW_BIG_AND_BUF];
  81. static int32_t record_config_events_g = 0;
  82. static int32_t record_messages_g = 0;
  83. static cpg_handle_t cpg_handle = 0;
  84. static corosync_cfg_handle_t cfg_handle = 0;
  85. static int32_t cpg_fd = -1;
  86. static int32_t cfg_fd = -1;
  87. static struct qb_list_head config_chg_log_head;
  88. static struct qb_list_head msg_log_head;
  89. static pid_t my_pid;
  90. static uint32_t my_nodeid;
  91. static int32_t my_seq;
  92. static int32_t use_zcb = QB_FALSE;
  93. static int32_t my_msgs_to_send;
  94. static int32_t my_msgs_sent;
  95. static int32_t total_stored_msgs = 0;
  96. static int32_t total_msgs_revd = 0;
  97. static int32_t in_cnchg = 0;
  98. static int32_t pcmk_test = 0;
  99. PK11Context* sha1_context;
  100. static void send_some_more_messages (void * unused);
  101. static char*
  102. err_status_string (char * buf, size_t buf_len, msg_status_t status)
  103. {
  104. switch (status) {
  105. case MSG_OK:
  106. strncpy (buf, "OK", buf_len);
  107. break;
  108. case MSG_NODEID_ERR:
  109. strncpy (buf, "NODEID_ERR", buf_len);
  110. break;
  111. case MSG_PID_ERR:
  112. strncpy (buf, "PID_ERR", buf_len);
  113. break;
  114. case MSG_SEQ_ERR:
  115. strncpy (buf, "SEQ_ERR", buf_len);
  116. break;
  117. case MSG_SIZE_ERR:
  118. strncpy (buf, "SIZE_ERR", buf_len);
  119. break;
  120. case MSG_SHA1_ERR:
  121. strncpy (buf, "SHA1_ERR", buf_len);
  122. break;
  123. default:
  124. strncpy (buf, "UNKNOWN_ERR", buf_len);
  125. break;
  126. }
  127. if (buf_len > 0) {
  128. buf[buf_len - 1] = '\0';
  129. }
  130. return buf;
  131. }
  132. static void delivery_callback (
  133. cpg_handle_t handle,
  134. const struct cpg_name *groupName,
  135. uint32_t nodeid,
  136. uint32_t pid,
  137. void *msg,
  138. size_t msg_len)
  139. {
  140. log_entry_t *log_pt;
  141. msg_t *msg_pt = (msg_t*)msg;
  142. msg_status_t status = MSG_OK;
  143. char status_buf[20];
  144. unsigned char sha1_compare[20];
  145. unsigned int sha1_len;
  146. if (record_messages_g == 0) {
  147. return;
  148. }
  149. if (nodeid != msg_pt->nodeid) {
  150. status = MSG_NODEID_ERR;
  151. }
  152. if (pid != msg_pt->pid) {
  153. status = MSG_PID_ERR;
  154. }
  155. if (msg_len != msg_pt->size) {
  156. status = MSG_SIZE_ERR;
  157. }
  158. PK11_DigestBegin(sha1_context);
  159. PK11_DigestOp(sha1_context, msg_pt->payload, (msg_pt->size - sizeof (msg_t)));
  160. PK11_DigestFinal(sha1_context, sha1_compare, &sha1_len, sizeof(sha1_compare));
  161. if (memcmp (sha1_compare, msg_pt->sha1, 20) != 0) {
  162. qb_log (LOG_ERR, "msg seq:%d; incorrect hash",
  163. msg_pt->seq);
  164. status = MSG_SHA1_ERR;
  165. }
  166. log_pt = malloc (sizeof(log_entry_t));
  167. qb_list_init (&log_pt->list);
  168. snprintf (log_pt->log, LOG_STR_SIZE, "%u:%d:%d:%s;",
  169. msg_pt->nodeid, msg_pt->seq, my_seq,
  170. err_status_string (status_buf, 20, status));
  171. qb_list_add_tail (&log_pt->list, &msg_log_head);
  172. total_stored_msgs++;
  173. total_msgs_revd++;
  174. my_seq++;
  175. if ((total_msgs_revd % 1000) == 0) {
  176. qb_log (LOG_INFO, "rx %d", total_msgs_revd);
  177. }
  178. }
  179. static void config_change_callback (
  180. cpg_handle_t handle,
  181. const struct cpg_name *groupName,
  182. const struct cpg_address *member_list, size_t member_list_entries,
  183. const struct cpg_address *left_list, size_t left_list_entries,
  184. const struct cpg_address *joined_list, size_t joined_list_entries)
  185. {
  186. int i;
  187. log_entry_t *log_pt;
  188. /* group_name,ip,pid,join|leave */
  189. if (record_config_events_g > 0) {
  190. qb_log (LOG_INFO, "got cpg event[recording] for group %s", groupName->value);
  191. } else {
  192. qb_log (LOG_INFO, "got cpg event[ignoring] for group %s", groupName->value);
  193. }
  194. for (i = 0; i < left_list_entries; i++) {
  195. if (record_config_events_g > 0) {
  196. log_pt = malloc (sizeof(log_entry_t));
  197. qb_list_init (&log_pt->list);
  198. snprintf (log_pt->log, LOG_STR_SIZE, "%s,%u,%u,left",
  199. groupName->value, left_list[i].nodeid,left_list[i].pid);
  200. qb_list_add_tail(&log_pt->list, &config_chg_log_head);
  201. qb_log (LOG_INFO, "cpg event %s", log_pt->log);
  202. }
  203. }
  204. for (i = 0; i < joined_list_entries; i++) {
  205. if (record_config_events_g > 0) {
  206. log_pt = malloc (sizeof(log_entry_t));
  207. qb_list_init (&log_pt->list);
  208. snprintf (log_pt->log, LOG_STR_SIZE, "%s,%u,%u,join",
  209. groupName->value, joined_list[i].nodeid,joined_list[i].pid);
  210. qb_list_add_tail (&log_pt->list, &config_chg_log_head);
  211. qb_log (LOG_INFO, "cpg event %s", log_pt->log);
  212. }
  213. }
  214. if (pcmk_test == 1) {
  215. in_cnchg = 1;
  216. send_some_more_messages (NULL);
  217. in_cnchg = 0;
  218. }
  219. }
  220. static void my_shutdown_callback (corosync_cfg_handle_t handle,
  221. corosync_cfg_shutdown_flags_t flags)
  222. {
  223. qb_log (LOG_CRIT, "flags:%d", flags);
  224. if (flags == COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST) {
  225. corosync_cfg_replyto_shutdown (cfg_handle, COROSYNC_CFG_SHUTDOWN_FLAG_YES);
  226. }
  227. }
  228. static corosync_cfg_callbacks_t cfg_callbacks = {
  229. .corosync_cfg_shutdown_callback = my_shutdown_callback,
  230. };
  231. static cpg_callbacks_t callbacks = {
  232. .cpg_deliver_fn = delivery_callback,
  233. .cpg_confchg_fn = config_change_callback,
  234. };
  235. static void record_messages (void)
  236. {
  237. record_messages_g = 1;
  238. qb_log (LOG_INFO, "record:%d", record_messages_g);
  239. }
  240. static void record_config_events (int sock)
  241. {
  242. char response[100];
  243. ssize_t rc;
  244. size_t send_len;
  245. record_config_events_g = 1;
  246. qb_log (LOG_INFO, "record:%d", record_config_events_g);
  247. snprintf (response, 100, "%s", OK_STR);
  248. send_len = strlen (response);
  249. rc = send (sock, response, send_len, 0);
  250. assert(rc == send_len);
  251. }
  252. static void read_config_event (int sock)
  253. {
  254. const char *empty = "None";
  255. log_entry_t *entry;
  256. ssize_t rc;
  257. size_t send_len;
  258. if (qb_list_empty(&config_chg_log_head) == 0) {
  259. entry = qb_list_first_entry (&config_chg_log_head, log_entry_t, list);
  260. send_len = strlen (entry->log);
  261. rc = send (sock, entry->log, send_len, 0);
  262. qb_list_del (&entry->list);
  263. free (entry);
  264. } else {
  265. qb_log (LOG_DEBUG, "no events in list");
  266. send_len = strlen (empty);
  267. rc = send (sock, empty, send_len, 0);
  268. }
  269. assert(rc == send_len);
  270. }
  271. static void read_messages (int sock, char* atmost_str)
  272. {
  273. struct qb_list_head *iter, *tmp_iter;
  274. log_entry_t *entry;
  275. int atmost = atoi (atmost_str);
  276. int packed = 0;
  277. ssize_t rc;
  278. if (atmost == 0)
  279. atmost = 1;
  280. if (atmost > (HOW_BIG_AND_BUF / LOG_STR_SIZE))
  281. atmost = (HOW_BIG_AND_BUF / LOG_STR_SIZE);
  282. big_and_buf[0] = '\0';
  283. qb_list_for_each_safe(iter, tmp_iter, &msg_log_head) {
  284. if (packed >= atmost)
  285. break;
  286. entry = qb_list_entry (iter, log_entry_t, list);
  287. strcat (big_and_buf, entry->log);
  288. packed++;
  289. qb_list_del (&entry->list);
  290. free (entry);
  291. total_stored_msgs--;
  292. }
  293. if (packed == 0) {
  294. strcpy (big_and_buf, "None");
  295. } else {
  296. if ((total_stored_msgs % 1000) == 0) {
  297. qb_log(LOG_INFO, "sending %d; total_stored_msgs:%d; len:%d",
  298. packed, total_stored_msgs, (int)strlen (big_and_buf));
  299. }
  300. }
  301. rc = send (sock, big_and_buf, strlen (big_and_buf), 0);
  302. assert(rc == strlen (big_and_buf));
  303. }
  304. static qb_loop_timer_handle more_messages_timer_handle;
  305. static void send_some_more_messages_later (void)
  306. {
  307. cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
  308. qb_loop_timer_add (
  309. ta_poll_handle_get(),
  310. QB_LOOP_MED,
  311. 300*QB_TIME_NS_IN_MSEC, NULL,
  312. send_some_more_messages,
  313. &more_messages_timer_handle);
  314. }
  315. static void send_some_more_messages_zcb (void)
  316. {
  317. msg_t *my_msg;
  318. int i;
  319. int send_now;
  320. size_t payload_size;
  321. size_t total_size;
  322. unsigned int sha1_len;
  323. cs_error_t res;
  324. cpg_flow_control_state_t fc_state;
  325. void *zcb_buffer;
  326. if (cpg_fd < 0)
  327. return;
  328. send_now = my_msgs_to_send;
  329. payload_size = (rand() % 100000);
  330. total_size = payload_size + sizeof (msg_t);
  331. cpg_zcb_alloc (cpg_handle, total_size, &zcb_buffer);
  332. my_msg = (msg_t*)zcb_buffer;
  333. qb_log(LOG_DEBUG, "send_now:%d", send_now);
  334. my_msg->pid = my_pid;
  335. my_msg->nodeid = my_nodeid;
  336. my_msg->size = sizeof (msg_t) + payload_size;
  337. my_msg->seq = my_msgs_sent;
  338. for (i = 0; i < payload_size; i++) {
  339. my_msg->payload[i] = i;
  340. }
  341. PK11_DigestBegin(sha1_context);
  342. PK11_DigestOp(sha1_context, my_msg->payload, payload_size);
  343. PK11_DigestFinal(sha1_context, my_msg->sha1, &sha1_len, sizeof(my_msg->sha1));
  344. for (i = 0; i < send_now; i++) {
  345. res = cpg_flow_control_state_get (cpg_handle, &fc_state);
  346. if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
  347. /* lets do this later */
  348. send_some_more_messages_later ();
  349. qb_log (LOG_INFO, "flow control enabled.");
  350. goto free_buffer;
  351. }
  352. res = cpg_zcb_mcast_joined (cpg_handle, CPG_TYPE_AGREED, zcb_buffer, total_size);
  353. if (res == CS_ERR_TRY_AGAIN) {
  354. /* lets do this later */
  355. send_some_more_messages_later ();
  356. goto free_buffer;
  357. } else if (res != CS_OK) {
  358. qb_log (LOG_ERR, "cpg_mcast_joined error:%d, exiting.",
  359. res);
  360. exit (-2);
  361. }
  362. my_msgs_sent++;
  363. my_msgs_to_send--;
  364. }
  365. free_buffer:
  366. cpg_zcb_free (cpg_handle, zcb_buffer);
  367. }
  368. #define cs_repeat(counter, max, code) do { \
  369. code; \
  370. if (res == CS_ERR_TRY_AGAIN) { \
  371. counter++; \
  372. sleep(counter); \
  373. } \
  374. } while (res == CS_ERR_TRY_AGAIN && counter < max)
  375. static unsigned char buffer[200000];
  376. static void send_some_more_messages_normal (void)
  377. {
  378. msg_t my_msg;
  379. struct iovec iov[2];
  380. int i;
  381. int send_now;
  382. size_t payload_size;
  383. cs_error_t res;
  384. cpg_flow_control_state_t fc_state;
  385. int retries = 0;
  386. time_t before;
  387. unsigned int sha1_len;
  388. if (cpg_fd < 0)
  389. return;
  390. send_now = my_msgs_to_send;
  391. qb_log (LOG_TRACE, "send_now:%d", send_now);
  392. my_msg.pid = my_pid;
  393. my_msg.nodeid = my_nodeid;
  394. payload_size = (rand() % 10000);
  395. my_msg.size = sizeof (msg_t) + payload_size;
  396. my_msg.seq = my_msgs_sent;
  397. for (i = 0; i < payload_size; i++) {
  398. buffer[i] = i;
  399. }
  400. PK11_DigestBegin(sha1_context);
  401. PK11_DigestOp(sha1_context, buffer, payload_size);
  402. PK11_DigestFinal(sha1_context, my_msg.sha1, &sha1_len, sizeof(my_msg.sha1));
  403. iov[0].iov_len = sizeof (msg_t);
  404. iov[0].iov_base = &my_msg;
  405. iov[1].iov_len = payload_size;
  406. iov[1].iov_base = buffer;
  407. for (i = 0; i < send_now; i++) {
  408. if (in_cnchg && pcmk_test) {
  409. retries = 0;
  410. before = time(NULL);
  411. cs_repeat(retries, 30, res = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, 2));
  412. if (retries > 20) {
  413. qb_log (LOG_ERR, "cs_repeat: blocked for :%lu secs.",
  414. (unsigned long)(time(NULL) - before));
  415. }
  416. if (res != CS_OK) {
  417. qb_log (LOG_ERR, "cpg_mcast_joined error:%d.",
  418. res);
  419. return;
  420. }
  421. } else {
  422. res = cpg_flow_control_state_get (cpg_handle, &fc_state);
  423. if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
  424. /* lets do this later */
  425. send_some_more_messages_later ();
  426. qb_log (LOG_INFO, "flow control enabled.");
  427. return;
  428. }
  429. res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 2);
  430. if (res == CS_ERR_TRY_AGAIN) {
  431. /* lets do this later */
  432. send_some_more_messages_later ();
  433. if (i > 0) {
  434. qb_log (LOG_INFO, "TRY_AGAIN %d to send.",
  435. my_msgs_to_send);
  436. }
  437. return;
  438. } else if (res != CS_OK) {
  439. qb_log (LOG_ERR, "cpg_mcast_joined error:%d, exiting.",
  440. res);
  441. exit (-2);
  442. }
  443. }
  444. my_msgs_sent++;
  445. my_msg.seq = my_msgs_sent;
  446. my_msgs_to_send--;
  447. }
  448. qb_log (LOG_TRACE, "sent %d; to send %d.",
  449. my_msgs_sent, my_msgs_to_send);
  450. }
  451. static void send_some_more_messages (void * unused)
  452. {
  453. if (use_zcb) {
  454. send_some_more_messages_zcb ();
  455. } else {
  456. send_some_more_messages_normal ();
  457. }
  458. }
  459. static void msg_blaster (int sock, char* num_to_send_str)
  460. {
  461. my_msgs_to_send = atoi (num_to_send_str);
  462. my_msgs_sent = 0;
  463. my_seq = 1;
  464. my_pid = getpid();
  465. use_zcb = QB_FALSE;
  466. total_stored_msgs = 0;
  467. cpg_local_get (cpg_handle, &my_nodeid);
  468. /* control the limits */
  469. if (my_msgs_to_send <= 0)
  470. my_msgs_to_send = 1;
  471. if (my_msgs_to_send > 10000)
  472. my_msgs_to_send = 10000;
  473. send_some_more_messages_normal ();
  474. }
  475. static void context_test (int sock)
  476. {
  477. char response[100];
  478. char *cmp;
  479. ssize_t rc;
  480. size_t send_len;
  481. cpg_context_set (cpg_handle, response);
  482. cpg_context_get (cpg_handle, (void**)&cmp);
  483. if (response != cmp) {
  484. snprintf (response, 100, "%s", FAIL_STR);
  485. }
  486. else {
  487. snprintf (response, 100, "%s", OK_STR);
  488. }
  489. send_len = strlen (response);
  490. rc = send (sock, response, send_len, 0);
  491. assert(rc == send_len);
  492. }
  493. static void msg_blaster_zcb (int sock, char* num_to_send_str)
  494. {
  495. my_msgs_to_send = atoi (num_to_send_str);
  496. my_seq = 1;
  497. my_pid = getpid();
  498. use_zcb = QB_TRUE;
  499. total_stored_msgs = 0;
  500. cpg_local_get (cpg_handle, &my_nodeid);
  501. /* control the limits */
  502. if (my_msgs_to_send <= 0)
  503. my_msgs_to_send = 1;
  504. if (my_msgs_to_send > 10000)
  505. my_msgs_to_send = 10000;
  506. send_some_more_messages_zcb ();
  507. }
  508. static int cfg_dispatch_wrapper_fn (
  509. int fd,
  510. int revents,
  511. void *data)
  512. {
  513. cs_error_t error;
  514. if (revents & POLLHUP || revents & POLLERR) {
  515. qb_log (LOG_ERR, "got POLLHUP disconnecting from CFG");
  516. corosync_cfg_finalize(cfg_handle);
  517. cfg_handle = 0;
  518. return -1;
  519. }
  520. error = corosync_cfg_dispatch (cfg_handle, CS_DISPATCH_ALL);
  521. if (error == CS_ERR_LIBRARY) {
  522. qb_log (LOG_ERR, "got LIB error disconnecting from CFG.");
  523. corosync_cfg_finalize(cfg_handle);
  524. cfg_handle = 0;
  525. return -1;
  526. }
  527. return 0;
  528. }
  529. static int cpg_dispatch_wrapper_fn (
  530. int fd,
  531. int revents,
  532. void *data)
  533. {
  534. cs_error_t error;
  535. if (revents & POLLHUP || revents & POLLERR) {
  536. qb_log (LOG_ERR, "got POLLHUP disconnecting from CPG");
  537. cpg_finalize(cpg_handle);
  538. cpg_handle = 0;
  539. return -1;
  540. }
  541. error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
  542. if (error == CS_ERR_LIBRARY) {
  543. qb_log (LOG_ERR, "got LIB error disconnecting from CPG");
  544. cpg_finalize(cpg_handle);
  545. cpg_handle = 0;
  546. return -1;
  547. }
  548. return 0;
  549. }
  550. static void do_command (int sock, char* func, char*args[], int num_args)
  551. {
  552. int result;
  553. char response[100];
  554. struct cpg_name group_name;
  555. ssize_t rc;
  556. size_t send_len;
  557. qb_log (LOG_TRACE, "RPC:%s() called.", func);
  558. if (strcmp ("cpg_mcast_joined",func) == 0) {
  559. struct iovec iov[5];
  560. int a;
  561. for (a = 0; a < num_args; a++) {
  562. iov[a].iov_base = args[a];
  563. iov[a].iov_len = strlen(args[a])+1;
  564. }
  565. cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, num_args);
  566. } else if (strcmp ("cpg_join",func) == 0) {
  567. if (strlen(args[0]) >= CPG_MAX_NAME_LENGTH) {
  568. qb_log (LOG_ERR, "Invalid group name");
  569. exit (1);
  570. }
  571. strcpy (group_name.value, args[0]);
  572. group_name.length = strlen(args[0]);
  573. result = cpg_join (cpg_handle, &group_name);
  574. if (result != CS_OK) {
  575. qb_log (LOG_ERR,
  576. "Could not join process group, error %d", result);
  577. exit (1);
  578. }
  579. qb_log (LOG_INFO, "called cpg_join(%s)!", group_name.value);
  580. } else if (strcmp ("cpg_leave",func) == 0) {
  581. strcpy (group_name.value, args[0]);
  582. group_name.length = strlen(args[0]);
  583. result = cpg_leave (cpg_handle, &group_name);
  584. if (result != CS_OK) {
  585. qb_log (LOG_ERR,
  586. "Could not leave process group, error %d", result);
  587. exit (1);
  588. }
  589. qb_log (LOG_INFO, "called cpg_leave(%s)!", group_name.value);
  590. } else if (strcmp ("cpg_initialize",func) == 0) {
  591. int retry_count = 0;
  592. result = cpg_initialize (&cpg_handle, &callbacks);
  593. while (result != CS_OK) {
  594. qb_log (LOG_ERR,
  595. "cpg_initialize error %d (attempt %d)",
  596. result, retry_count);
  597. if (retry_count >= 3) {
  598. exit (1);
  599. }
  600. sleep(1);
  601. retry_count++;
  602. result = cpg_initialize (&cpg_handle, &callbacks);
  603. }
  604. cpg_fd_get (cpg_handle, &cpg_fd);
  605. qb_loop_poll_add (ta_poll_handle_get(),
  606. QB_LOOP_MED,
  607. cpg_fd,
  608. POLLIN|POLLNVAL,
  609. NULL,
  610. cpg_dispatch_wrapper_fn);
  611. } else if (strcmp ("cpg_local_get", func) == 0) {
  612. unsigned int local_nodeid;
  613. cpg_local_get (cpg_handle, &local_nodeid);
  614. snprintf (response, 100, "%u",local_nodeid);
  615. send_len = strlen (response);
  616. rc = send (sock, response, send_len, 0);
  617. assert(rc == send_len);
  618. } else if (strcmp ("cpg_finalize", func) == 0) {
  619. if (cpg_handle > 0) {
  620. cpg_finalize (cpg_handle);
  621. cpg_handle = 0;
  622. }
  623. } else if (strcmp ("record_config_events", func) == 0) {
  624. record_config_events (sock);
  625. } else if (strcmp ("record_messages", func) == 0) {
  626. record_messages ();
  627. } else if (strcmp ("read_config_event", func) == 0) {
  628. read_config_event (sock);
  629. } else if (strcmp ("read_messages", func) == 0) {
  630. read_messages (sock, args[0]);
  631. } else if (strcmp ("msg_blaster_zcb", func) == 0) {
  632. msg_blaster_zcb (sock, args[0]);
  633. } else if (strcmp ("pcmk_test", func) == 0) {
  634. pcmk_test = 1;
  635. } else if (strcmp ("msg_blaster", func) == 0) {
  636. msg_blaster (sock, args[0]);
  637. } else if (strcmp ("context_test", func) == 0) {
  638. context_test (sock);
  639. } else if (strcmp ("are_you_ok_dude", func) == 0) {
  640. snprintf (response, 100, "%s", OK_STR);
  641. send_len = strlen (response);
  642. rc = send (sock, response, strlen (response), 0);
  643. assert(rc == send_len);
  644. } else if (strcmp ("cfg_shutdown", func) == 0) {
  645. qb_log (LOG_INFO, "calling %s() called!", func);
  646. result = corosync_cfg_try_shutdown (cfg_handle, COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST);
  647. qb_log (LOG_INFO,"%s() returned %d!", func, result);
  648. } else if (strcmp ("cfg_initialize",func) == 0) {
  649. int retry_count = 0;
  650. qb_log (LOG_INFO,"%s() called!", func);
  651. result = corosync_cfg_initialize (&cfg_handle, &cfg_callbacks);
  652. while (result != CS_OK) {
  653. qb_log (LOG_ERR,
  654. "cfg_initialize error %d (attempt %d)",
  655. result, retry_count);
  656. if (retry_count >= 3) {
  657. exit (1);
  658. }
  659. sleep(1);
  660. retry_count++;
  661. result = corosync_cfg_initialize (&cfg_handle, &cfg_callbacks);
  662. }
  663. qb_log (LOG_INFO,"corosync_cfg_initialize() == %d", result);
  664. result = corosync_cfg_fd_get (cfg_handle, &cfg_fd);
  665. qb_log (LOG_INFO,"corosync_cfg_fd_get() == %d", result);
  666. qb_loop_poll_add (ta_poll_handle_get(),
  667. QB_LOOP_MED,
  668. cfg_fd,
  669. POLLIN|POLLNVAL,
  670. NULL,
  671. cfg_dispatch_wrapper_fn);
  672. } else {
  673. qb_log(LOG_ERR, "RPC:%s not supported!", func);
  674. }
  675. }
  676. static void my_pre_exit(void)
  677. {
  678. qb_log (LOG_INFO, "%s PRE EXIT", __FILE__);
  679. if (cpg_handle > 0) {
  680. cpg_finalize (cpg_handle);
  681. cpg_handle = 0;
  682. }
  683. if (cfg_handle > 0) {
  684. corosync_cfg_finalize (cfg_handle);
  685. cfg_handle = 0;
  686. }
  687. PK11_DestroyContext(sha1_context, PR_TRUE);
  688. }
  689. int
  690. main(int argc, char *argv[])
  691. {
  692. qb_list_init (&msg_log_head);
  693. qb_list_init (&config_chg_log_head);
  694. if (NSS_NoDB_Init(".") != SECSuccess) {
  695. qb_log(LOG_ERR, "Couldn't initialize nss");
  696. exit (0);
  697. }
  698. if ((sha1_context = PK11_CreateDigestContext(SEC_OID_SHA1)) == NULL) {
  699. qb_log(LOG_ERR, "Couldn't initialize nss");
  700. exit (0);
  701. }
  702. return test_agent_run ("cpg_test_agent", 9034, do_command, my_pre_exit);
  703. }