cpg_test_agent.c 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784
  1. /*
  2. * Copyright (c) 2010 Red Hat, Inc.
  3. *
  4. * All rights reserved.
  5. *
  6. * Author: Angus Salkeld (asalkeld@redhat.com)
  7. *
  8. * This software licensed under BSD license, the text of which follows:
  9. *
  10. * Redistribution and use in source and binary forms, with or without
  11. * modification, are permitted provided that the following conditions are met:
  12. *
  13. * - Redistributions of source code must retain the above copyright notice,
  14. * this list of conditions and the following disclaimer.
  15. * - Redistributions in binary form must reproduce the above copyright notice,
  16. * this list of conditions and the following disclaimer in the documentation
  17. * and/or other materials provided with the distribution.
  18. * - Neither the name of the MontaVista Software, Inc. nor the names of its
  19. * contributors may be used to endorse or promote products derived from this
  20. * software without specific prior written permission.
  21. *
  22. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  23. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  24. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  25. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  26. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  27. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  28. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  29. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  30. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  31. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  32. * THE POSSIBILITY OF SUCH DAMAGE.
  33. */
  34. #include <errno.h>
  35. #include <unistd.h>
  36. #include <stdio.h>
  37. #include <stdlib.h>
  38. #include <assert.h>
  39. #include <string.h>
  40. #include <sys/types.h>
  41. #include <sys/socket.h>
  42. #include <netinet/in.h>
  43. #include <arpa/inet.h>
  44. #include <netdb.h>
  45. #include <poll.h>
  46. #include <unistd.h>
  47. #include <fcntl.h>
  48. #include <corosync/list.h>
  49. #include <qb/qbdefs.h>
  50. #include <qb/qbutil.h>
  51. #include <qb/qbloop.h>
  52. #include <qb/qblog.h>
  53. #include <corosync/cpg.h>
  54. #include <corosync/cfg.h>
  55. #include "../../exec/crypto.h"
  56. #include "common_test_agent.h"
  57. typedef enum {
  58. MSG_OK,
  59. MSG_NODEID_ERR,
  60. MSG_PID_ERR,
  61. MSG_SEQ_ERR,
  62. MSG_SIZE_ERR,
  63. MSG_SHA1_ERR,
  64. } msg_status_t;
  65. typedef struct {
  66. uint32_t nodeid;
  67. pid_t pid;
  68. unsigned char sha1[20];
  69. uint32_t seq;
  70. size_t size;
  71. unsigned char payload[0];
  72. } msg_t;
  73. #define LOG_STR_SIZE 256
  74. typedef struct {
  75. char log[LOG_STR_SIZE];
  76. struct list_head list;
  77. } log_entry_t;
  78. static char big_and_buf[HOW_BIG_AND_BUF];
  79. static int32_t record_config_events_g = 0;
  80. static int32_t record_messages_g = 0;
  81. static cpg_handle_t cpg_handle = 0;
  82. static corosync_cfg_handle_t cfg_handle = 0;
  83. static int32_t cpg_fd = -1;
  84. static int32_t cfg_fd = -1;
  85. static struct list_head config_chg_log_head;
  86. static struct list_head msg_log_head;
  87. static pid_t my_pid;
  88. static uint32_t my_nodeid;
  89. static int32_t my_seq;
  90. static int32_t use_zcb = QB_FALSE;
  91. static int32_t my_msgs_to_send;
  92. static int32_t my_msgs_sent;
  93. static int32_t total_stored_msgs = 0;
  94. static int32_t total_msgs_revd = 0;
  95. static int32_t in_cnchg = 0;
  96. static int32_t pcmk_test = 0;
  97. static void send_some_more_messages (void * unused);
  98. static char* err_status_string (char * buf, size_t buf_len, msg_status_t status)
  99. {
  100. switch (status) {
  101. case MSG_OK:
  102. strncpy (buf, "OK", buf_len);
  103. break;
  104. case MSG_NODEID_ERR:
  105. strncpy (buf, "NODEID_ERR", buf_len);
  106. break;
  107. case MSG_PID_ERR:
  108. strncpy (buf, "PID_ERR", buf_len);
  109. break;
  110. case MSG_SEQ_ERR:
  111. strncpy (buf, "SEQ_ERR", buf_len);
  112. break;
  113. case MSG_SIZE_ERR:
  114. strncpy (buf, "SIZE_ERR", buf_len);
  115. break;
  116. case MSG_SHA1_ERR:
  117. strncpy (buf, "SHA1_ERR", buf_len);
  118. break;
  119. default:
  120. strncpy (buf, "UNKNOWN_ERR", buf_len);
  121. break;
  122. }
  123. if (buf_len > 0) {
  124. buf[buf_len - 1] = '\0';
  125. }
  126. return buf;
  127. }
  128. static void delivery_callback (
  129. cpg_handle_t handle,
  130. const struct cpg_name *groupName,
  131. uint32_t nodeid,
  132. uint32_t pid,
  133. void *msg,
  134. size_t msg_len)
  135. {
  136. log_entry_t *log_pt;
  137. msg_t *msg_pt = (msg_t*)msg;
  138. msg_status_t status = MSG_OK;
  139. char status_buf[20];
  140. unsigned char sha1_compare[20];
  141. hash_state sha1_hash;
  142. if (record_messages_g == 0) {
  143. return;
  144. }
  145. msg_pt->seq = my_seq;
  146. my_seq++;
  147. if (nodeid != msg_pt->nodeid) {
  148. status = MSG_NODEID_ERR;
  149. }
  150. if (pid != msg_pt->pid) {
  151. status = MSG_PID_ERR;
  152. }
  153. if (msg_len != msg_pt->size) {
  154. status = MSG_SIZE_ERR;
  155. }
  156. sha1_init (&sha1_hash);
  157. sha1_process (&sha1_hash, msg_pt->payload, (msg_pt->size - sizeof (msg_t)));
  158. sha1_done (&sha1_hash, sha1_compare);
  159. if (memcmp (sha1_compare, msg_pt->sha1, 20) != 0) {
  160. qb_log (LOG_ERR, "msg seq:%d; incorrect hash",
  161. msg_pt->seq);
  162. status = MSG_SHA1_ERR;
  163. }
  164. log_pt = malloc (sizeof(log_entry_t));
  165. list_init (&log_pt->list);
  166. snprintf (log_pt->log, LOG_STR_SIZE, "%d:%d:%d:%s;",
  167. msg_pt->nodeid, msg_pt->pid, msg_pt->seq,
  168. err_status_string (status_buf, 20, status));
  169. list_add_tail (&log_pt->list, &msg_log_head);
  170. total_stored_msgs++;
  171. total_msgs_revd++;
  172. if ((total_msgs_revd % 100) == 0) {
  173. qb_log (LOG_INFO, "%d",total_msgs_revd);
  174. }
  175. }
  176. static void config_change_callback (
  177. cpg_handle_t handle,
  178. const struct cpg_name *groupName,
  179. const struct cpg_address *member_list, size_t member_list_entries,
  180. const struct cpg_address *left_list, size_t left_list_entries,
  181. const struct cpg_address *joined_list, size_t joined_list_entries)
  182. {
  183. int i;
  184. log_entry_t *log_pt;
  185. /* group_name,ip,pid,join|leave */
  186. for (i = 0; i < left_list_entries; i++) {
  187. qb_log(LOG_INFO, "Member left: %s", left_list[i].nodeid);
  188. if (record_config_events_g > 0) {
  189. log_pt = malloc (sizeof(log_entry_t));
  190. list_init (&log_pt->list);
  191. snprintf (log_pt->log, LOG_STR_SIZE, "%s,%d,%d,left",
  192. groupName->value, left_list[i].nodeid,left_list[i].pid);
  193. list_add_tail(&log_pt->list, &config_chg_log_head);
  194. }
  195. }
  196. for (i = 0; i < joined_list_entries; i++) {
  197. qb_log(LOG_INFO, "Member joined: %s", joined_list[i].nodeid);
  198. if (record_config_events_g > 0) {
  199. log_pt = malloc (sizeof(log_entry_t));
  200. list_init (&log_pt->list);
  201. snprintf (log_pt->log, LOG_STR_SIZE, "%s,%d,%d,join",
  202. groupName->value, joined_list[i].nodeid,joined_list[i].pid);
  203. list_add_tail (&log_pt->list, &config_chg_log_head);
  204. }
  205. }
  206. if (pcmk_test == 1) {
  207. in_cnchg = 1;
  208. send_some_more_messages (NULL);
  209. in_cnchg = 0;
  210. }
  211. }
  212. static void my_shutdown_callback (corosync_cfg_handle_t handle,
  213. corosync_cfg_shutdown_flags_t flags)
  214. {
  215. qb_log (LOG_CRIT, "flags:%d", flags);
  216. if (flags & COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST) {
  217. corosync_cfg_replyto_shutdown (cfg_handle, COROSYNC_CFG_SHUTDOWN_FLAG_YES);
  218. }
  219. }
  220. static corosync_cfg_callbacks_t cfg_callbacks = {
  221. .corosync_cfg_shutdown_callback = my_shutdown_callback,
  222. .corosync_cfg_state_track_callback = NULL,
  223. };
  224. static cpg_callbacks_t callbacks = {
  225. .cpg_deliver_fn = delivery_callback,
  226. .cpg_confchg_fn = config_change_callback,
  227. };
  228. static void record_messages (void)
  229. {
  230. record_messages_g = 1;
  231. qb_log (LOG_DEBUG,"record:%d", record_messages_g);
  232. }
  233. static void record_config_events (int sock)
  234. {
  235. char response[100];
  236. record_config_events_g = 1;
  237. qb_log (LOG_INFO, "record:%d", record_config_events_g);
  238. snprintf (response, 100, "%s", OK_STR);
  239. send (sock, response, strlen (response), 0);
  240. }
  241. static void read_config_event (int sock)
  242. {
  243. const char *empty = "None";
  244. struct list_head * list = config_chg_log_head.next;
  245. log_entry_t *entry;
  246. if (list != &config_chg_log_head) {
  247. entry = list_entry (list, log_entry_t, list);
  248. send (sock, entry->log, strlen (entry->log), 0);
  249. list_del (&entry->list);
  250. free (entry);
  251. } else {
  252. qb_log (LOG_DEBUG, "no events in list");
  253. send (sock, empty, strlen (empty), 0);
  254. }
  255. }
  256. static void read_messages (int sock, char* atmost_str)
  257. {
  258. struct list_head * list;
  259. log_entry_t *entry;
  260. int atmost = atoi (atmost_str);
  261. int packed = 0;
  262. if (atmost == 0)
  263. atmost = 1;
  264. if (atmost > (HOW_BIG_AND_BUF / LOG_STR_SIZE))
  265. atmost = (HOW_BIG_AND_BUF / LOG_STR_SIZE);
  266. qb_log(LOG_DEBUG, "atmost %d; total_stored_msgs:%d",
  267. atmost, total_stored_msgs);
  268. big_and_buf[0] = '\0';
  269. for (list = msg_log_head.next;
  270. (!list_empty (&msg_log_head) && packed < atmost); ) {
  271. entry = list_entry (list, log_entry_t, list);
  272. strcat (big_and_buf, entry->log);
  273. packed++;
  274. list = list->next;
  275. list_del (&entry->list);
  276. free (entry);
  277. total_stored_msgs--;
  278. }
  279. if (packed == 0) {
  280. strcpy (big_and_buf, "None");
  281. } else {
  282. qb_log(LOG_INFO, "sending %d; total_stored_msgs:%d; len:%d",
  283. packed, total_stored_msgs, (int)strlen (big_and_buf));
  284. }
  285. send (sock, big_and_buf, strlen (big_and_buf), 0);
  286. }
  287. static qb_loop_timer_handle more_messages_timer_handle;
  288. static void send_some_more_messages_later (void)
  289. {
  290. cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
  291. qb_loop_timer_add (
  292. ta_poll_handle_get(),
  293. QB_LOOP_MED,
  294. 300*QB_TIME_NS_IN_MSEC, NULL,
  295. send_some_more_messages,
  296. &more_messages_timer_handle);
  297. }
  298. static void send_some_more_messages_zcb (void)
  299. {
  300. msg_t *my_msg;
  301. int i;
  302. int send_now;
  303. size_t payload_size;
  304. size_t total_size;
  305. hash_state sha1_hash;
  306. cs_error_t res;
  307. cpg_flow_control_state_t fc_state;
  308. void *zcb_buffer;
  309. if (cpg_fd < 0)
  310. return;
  311. send_now = my_msgs_to_send;
  312. payload_size = (rand() % 100000);
  313. total_size = payload_size + sizeof (msg_t);
  314. cpg_zcb_alloc (cpg_handle, total_size, &zcb_buffer);
  315. my_msg = (msg_t*)zcb_buffer;
  316. qb_log(LOG_DEBUG, "send_now:%d", send_now);
  317. my_msg->pid = my_pid;
  318. my_msg->nodeid = my_nodeid;
  319. my_msg->size = sizeof (msg_t) + payload_size;
  320. my_msg->seq = 0;
  321. for (i = 0; i < payload_size; i++) {
  322. my_msg->payload[i] = i;
  323. }
  324. sha1_init (&sha1_hash);
  325. sha1_process (&sha1_hash, my_msg->payload, payload_size);
  326. sha1_done (&sha1_hash, my_msg->sha1);
  327. for (i = 0; i < send_now; i++) {
  328. res = cpg_flow_control_state_get (cpg_handle, &fc_state);
  329. if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
  330. /* lets do this later */
  331. send_some_more_messages_later ();
  332. qb_log (LOG_INFO, "flow control enabled.");
  333. goto free_buffer;
  334. }
  335. res = cpg_zcb_mcast_joined (cpg_handle, CPG_TYPE_AGREED, zcb_buffer, total_size);
  336. if (res == CS_ERR_TRY_AGAIN) {
  337. /* lets do this later */
  338. send_some_more_messages_later ();
  339. // if (i > 0) {
  340. // qb_log (LOG_INFO, "TRY_AGAIN %d to send.",
  341. // my_msgs_to_send);
  342. // }
  343. goto free_buffer;
  344. } else if (res != CS_OK) {
  345. qb_log (LOG_ERR, "cpg_mcast_joined error:%d, exiting.",
  346. res);
  347. exit (-2);
  348. }
  349. my_msgs_sent++;
  350. my_msgs_to_send--;
  351. }
  352. free_buffer:
  353. cpg_zcb_free (cpg_handle, zcb_buffer);
  354. }
  355. #define cs_repeat(counter, max, code) do { \
  356. code; \
  357. if(res == CS_ERR_TRY_AGAIN) { \
  358. counter++; \
  359. sleep(counter); \
  360. } \
  361. } while(res == CS_ERR_TRY_AGAIN && counter < max)
  362. static unsigned char buffer[200000];
  363. static void send_some_more_messages_normal (void)
  364. {
  365. msg_t my_msg;
  366. struct iovec iov[2];
  367. int i;
  368. int send_now;
  369. size_t payload_size;
  370. hash_state sha1_hash;
  371. cs_error_t res;
  372. cpg_flow_control_state_t fc_state;
  373. int retries = 0;
  374. time_t before;
  375. if (cpg_fd < 0)
  376. return;
  377. send_now = my_msgs_to_send;
  378. qb_log (LOG_DEBUG,"send_now:%d", send_now);
  379. my_msg.pid = my_pid;
  380. my_msg.nodeid = my_nodeid;
  381. payload_size = (rand() % 10000);
  382. my_msg.size = sizeof (msg_t) + payload_size;
  383. my_msg.seq = 0;
  384. for (i = 0; i < payload_size; i++) {
  385. buffer[i] = i;
  386. }
  387. sha1_init (&sha1_hash);
  388. sha1_process (&sha1_hash, buffer, payload_size);
  389. sha1_done (&sha1_hash, my_msg.sha1);
  390. iov[0].iov_len = sizeof (msg_t);
  391. iov[0].iov_base = &my_msg;
  392. iov[1].iov_len = payload_size;
  393. iov[1].iov_base = buffer;
  394. for (i = 0; i < send_now; i++) {
  395. if (in_cnchg && pcmk_test) {
  396. retries = 0;
  397. before = time(NULL);
  398. cs_repeat(retries, 30, res = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, 2));
  399. if (retries > 20) {
  400. qb_log (LOG_ERR, "cs_repeat: blocked for :%lu secs.",
  401. (unsigned long)(time(NULL) - before));
  402. }
  403. if (res != CS_OK) {
  404. qb_log (LOG_ERR, "cpg_mcast_joined error:%d.",
  405. res);
  406. return;
  407. }
  408. } else {
  409. res = cpg_flow_control_state_get (cpg_handle, &fc_state);
  410. if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
  411. /* lets do this later */
  412. send_some_more_messages_later ();
  413. qb_log (LOG_INFO, "flow control enabled.");
  414. return;
  415. }
  416. res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 2);
  417. if (res == CS_ERR_TRY_AGAIN) {
  418. /* lets do this later */
  419. send_some_more_messages_later ();
  420. if (i > 0) {
  421. qb_log (LOG_INFO, "TRY_AGAIN %d to send.",
  422. my_msgs_to_send);
  423. }
  424. return;
  425. } else if (res != CS_OK) {
  426. qb_log (LOG_ERR, "cpg_mcast_joined error:%d, exiting.",
  427. res);
  428. exit (-2);
  429. }
  430. }
  431. my_msgs_sent++;
  432. my_msgs_to_send--;
  433. }
  434. qb_log (LOG_INFO, "sent %d; to send %d.",
  435. my_msgs_sent, my_msgs_to_send);
  436. }
  437. static void send_some_more_messages (void * unused)
  438. {
  439. if (use_zcb) {
  440. send_some_more_messages_zcb ();
  441. } else {
  442. send_some_more_messages_normal ();
  443. }
  444. }
  445. static void msg_blaster (int sock, char* num_to_send_str)
  446. {
  447. my_msgs_to_send = atoi (num_to_send_str);
  448. my_msgs_sent = 0;
  449. my_seq = 1;
  450. my_pid = getpid();
  451. use_zcb = QB_FALSE;
  452. total_stored_msgs = 0;
  453. cpg_local_get (cpg_handle, &my_nodeid);
  454. /* control the limits */
  455. if (my_msgs_to_send <= 0)
  456. my_msgs_to_send = 1;
  457. if (my_msgs_to_send > 10000)
  458. my_msgs_to_send = 10000;
  459. send_some_more_messages_normal ();
  460. }
  461. static void context_test (int sock)
  462. {
  463. char response[100];
  464. char *cmp;
  465. cpg_context_set (cpg_handle, response);
  466. cpg_context_get (cpg_handle, (void**)&cmp);
  467. if (response != cmp) {
  468. snprintf (response, 100, "%s", FAIL_STR);
  469. }
  470. else {
  471. snprintf (response, 100, "%s", OK_STR);
  472. }
  473. send (sock, response, strlen (response), 0);
  474. }
  475. static void msg_blaster_zcb (int sock, char* num_to_send_str)
  476. {
  477. my_msgs_to_send = atoi (num_to_send_str);
  478. my_seq = 1;
  479. my_pid = getpid();
  480. use_zcb = QB_TRUE;
  481. total_stored_msgs = 0;
  482. cpg_local_get (cpg_handle, &my_nodeid);
  483. /* control the limits */
  484. if (my_msgs_to_send <= 0)
  485. my_msgs_to_send = 1;
  486. if (my_msgs_to_send > 10000)
  487. my_msgs_to_send = 10000;
  488. send_some_more_messages_zcb ();
  489. }
  490. static corosync_cfg_state_notification_t notification_buffer;
  491. static int cfg_dispatch_wrapper_fn (
  492. int fd,
  493. int revents,
  494. void *data)
  495. {
  496. cs_error_t error;
  497. if (revents & POLLHUP || revents & POLLERR) {
  498. qb_log (LOG_ERR, "got POLLHUP disconnecting from CFG");
  499. corosync_cfg_finalize(cfg_handle);
  500. cfg_handle = 0;
  501. qb_loop_poll_del (ta_poll_handle_get(), cfg_fd);
  502. close (cfg_fd);
  503. cfg_fd = -1;
  504. return -1;
  505. }
  506. error = corosync_cfg_dispatch (cfg_handle, CS_DISPATCH_ALL);
  507. if (error == CS_ERR_LIBRARY) {
  508. qb_log (LOG_ERR, "got LIB error disconnecting from CFG.");
  509. corosync_cfg_finalize(cfg_handle);
  510. cfg_handle = 0;
  511. qb_loop_poll_del (ta_poll_handle_get(), cfg_fd);
  512. close (cfg_fd);
  513. cfg_fd = -1;
  514. return -1;
  515. }
  516. return 0;
  517. }
  518. static int cpg_dispatch_wrapper_fn (
  519. int fd,
  520. int revents,
  521. void *data)
  522. {
  523. cs_error_t error;
  524. if (revents & POLLHUP || revents & POLLERR) {
  525. qb_log (LOG_ERR, "got POLLHUP disconnecting from CPG");
  526. cpg_finalize(cpg_handle);
  527. cpg_handle = 0;
  528. qb_loop_poll_del (ta_poll_handle_get(), cpg_fd);
  529. close (cpg_fd);
  530. cpg_fd = -1;
  531. return -1;
  532. }
  533. error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
  534. if (error == CS_ERR_LIBRARY) {
  535. qb_log (LOG_ERR, "got LIB error disconnecting from CPG");
  536. cpg_finalize(cpg_handle);
  537. cpg_handle = 0;
  538. qb_loop_poll_del (ta_poll_handle_get(), cpg_fd);
  539. close (cpg_fd);
  540. cpg_fd = -1;
  541. return -1;
  542. }
  543. return 0;
  544. }
  545. static void do_command (int sock, char* func, char*args[], int num_args)
  546. {
  547. int result;
  548. char response[100];
  549. struct cpg_name group_name;
  550. if (parse_debug)
  551. qb_log (LOG_DEBUG,"RPC:%s() called.", func);
  552. if (strcmp ("cpg_mcast_joined",func) == 0) {
  553. struct iovec iov[5];
  554. int a;
  555. for (a = 0; a < num_args; a++) {
  556. iov[a].iov_base = args[a];
  557. iov[a].iov_len = strlen(args[a])+1;
  558. }
  559. cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, num_args);
  560. } else if (strcmp ("cpg_join",func) == 0) {
  561. strcpy (group_name.value, args[0]);
  562. group_name.length = strlen(args[0]);
  563. result = cpg_join (cpg_handle, &group_name);
  564. if (result != CS_OK) {
  565. qb_log (LOG_ERR,
  566. "Could not join process group, error %d", result);
  567. exit (1);
  568. }
  569. qb_log (LOG_INFO, "called cpg_join()!");
  570. } else if (strcmp ("cpg_leave",func) == 0) {
  571. strcpy (group_name.value, args[0]);
  572. group_name.length = strlen(args[0]);
  573. result = cpg_leave (cpg_handle, &group_name);
  574. if (result != CS_OK) {
  575. qb_log (LOG_ERR,
  576. "Could not leave process group, error %d", result);
  577. exit (1);
  578. }
  579. qb_log (LOG_INFO, "called cpg_leave()!");
  580. } else if (strcmp ("cpg_initialize",func) == 0) {
  581. int retry_count = 0;
  582. result = cpg_initialize (&cpg_handle, &callbacks);
  583. while (result != CS_OK) {
  584. qb_log (LOG_ERR,
  585. "cpg_initialize error %d (attempt %d)",
  586. result, retry_count);
  587. if (retry_count >= 3) {
  588. exit (1);
  589. }
  590. sleep(1);
  591. retry_count++;
  592. result = cpg_initialize (&cpg_handle, &callbacks);
  593. }
  594. cpg_fd_get (cpg_handle, &cpg_fd);
  595. qb_loop_poll_add (ta_poll_handle_get(),
  596. QB_LOOP_MED,
  597. cpg_fd,
  598. POLLIN|POLLNVAL,
  599. NULL,
  600. cpg_dispatch_wrapper_fn);
  601. } else if (strcmp ("cpg_local_get", func) == 0) {
  602. unsigned int local_nodeid;
  603. cpg_local_get (cpg_handle, &local_nodeid);
  604. snprintf (response, 100, "%u",local_nodeid);
  605. send (sock, response, strlen (response), 0);
  606. } else if (strcmp ("cpg_finalize", func) == 0) {
  607. if (cpg_handle > 0) {
  608. cpg_finalize (cpg_handle);
  609. cpg_handle = 0;
  610. }
  611. if (cpg_fd > 0) {
  612. qb_loop_poll_del (ta_poll_handle_get(), cpg_fd);
  613. cpg_fd = -1;
  614. }
  615. } else if (strcmp ("record_config_events", func) == 0) {
  616. record_config_events (sock);
  617. } else if (strcmp ("record_messages", func) == 0) {
  618. record_messages ();
  619. } else if (strcmp ("read_config_event", func) == 0) {
  620. read_config_event (sock);
  621. } else if (strcmp ("read_messages", func) == 0) {
  622. read_messages (sock, args[0]);
  623. } else if (strcmp ("msg_blaster_zcb", func) == 0) {
  624. msg_blaster_zcb (sock, args[0]);
  625. } else if (strcmp ("pcmk_test", func) == 0) {
  626. pcmk_test = 1;
  627. } else if (strcmp ("msg_blaster", func) == 0) {
  628. msg_blaster (sock, args[0]);
  629. } else if (strcmp ("context_test", func) == 0) {
  630. context_test (sock);
  631. } else if (strcmp ("are_you_ok_dude", func) == 0) {
  632. snprintf (response, 100, "%s", OK_STR);
  633. send (sock, response, strlen (response), 0);
  634. } else if (strcmp ("cfg_shutdown", func) == 0) {
  635. qb_log (LOG_INFO, "calling %s() called!", func);
  636. result = corosync_cfg_try_shutdown (cfg_handle, COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST);
  637. qb_log (LOG_INFO,"%s() returned %d!", func, result);
  638. } else if (strcmp ("cfg_initialize",func) == 0) {
  639. int retry_count = 0;
  640. qb_log (LOG_INFO,"%s() called!", func);
  641. result = corosync_cfg_initialize (&cfg_handle, &cfg_callbacks);
  642. while (result != CS_OK) {
  643. qb_log (LOG_ERR,
  644. "cfg_initialize error %d (attempt %d)",
  645. result, retry_count);
  646. if (retry_count >= 3) {
  647. exit (1);
  648. }
  649. sleep(1);
  650. retry_count++;
  651. result = corosync_cfg_initialize (&cfg_handle, &cfg_callbacks);
  652. }
  653. qb_log (LOG_INFO,"corosync_cfg_initialize() == %d", result);
  654. result = corosync_cfg_fd_get (cfg_handle, &cfg_fd);
  655. qb_log (LOG_INFO,"corosync_cfg_fd_get() == %d", result);
  656. result = corosync_cfg_state_track (cfg_handle, 0, &notification_buffer);
  657. qb_log (LOG_INFO,"corosync_cfg_state_track() == %d", result);
  658. qb_loop_poll_add (ta_poll_handle_get(),
  659. QB_LOOP_MED,
  660. cfg_fd,
  661. POLLIN|POLLNVAL,
  662. NULL,
  663. cfg_dispatch_wrapper_fn);
  664. } else {
  665. qb_log(LOG_ERR, "RPC:%s not supported!", func);
  666. }
  667. }
  668. static void my_pre_exit(void)
  669. {
  670. qb_log (LOG_INFO, "%s PRE EXIT", __FILE__);
  671. if (cpg_handle > 0) {
  672. cpg_finalize (cpg_handle);
  673. cpg_handle = 0;
  674. }
  675. }
  676. int
  677. main(int argc, char *argv[])
  678. {
  679. list_init (&msg_log_head);
  680. list_init (&config_chg_log_head);
  681. return test_agent_run ("cpg_test_agent", 9034, do_command, my_pre_exit);
  682. }