cpg_test_agent.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738
  1. /*
  2. * Copyright (c) 2010 Red Hat, Inc.
  3. *
  4. * All rights reserved.
  5. *
  6. * Author: Angus Salkeld (asalkeld@redhat.com)
  7. *
  8. * This software licensed under BSD license, the text of which follows:
  9. *
  10. * Redistribution and use in source and binary forms, with or without
  11. * modification, are permitted provided that the following conditions are met:
  12. *
  13. * - Redistributions of source code must retain the above copyright notice,
  14. * this list of conditions and the following disclaimer.
  15. * - Redistributions in binary form must reproduce the above copyright notice,
  16. * this list of conditions and the following disclaimer in the documentation
  17. * and/or other materials provided with the distribution.
  18. * - Neither the name of the MontaVista Software, Inc. nor the names of its
  19. * contributors may be used to endorse or promote products derived from this
  20. * software without specific prior written permission.
  21. *
  22. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  23. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  24. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  25. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  26. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  27. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  28. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  29. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  30. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  31. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  32. * THE POSSIBILITY OF SUCH DAMAGE.
  33. */
  34. #include <errno.h>
  35. #include <unistd.h>
  36. #include <stdio.h>
  37. #include <stdlib.h>
  38. #include <assert.h>
  39. #include <string.h>
  40. #include <sys/types.h>
  41. #include <sys/socket.h>
  42. #include <netinet/in.h>
  43. #include <arpa/inet.h>
  44. #include <netdb.h>
  45. #include <syslog.h>
  46. #include <poll.h>
  47. #include <unistd.h>
  48. #include <fcntl.h>
  49. #include <corosync/totem/coropoll.h>
  50. #include <corosync/list.h>
  51. #include <corosync/cpg.h>
  52. #include <corosync/cfg.h>
  53. #include "../../exec/crypto.h"
  54. #include "common_test_agent.h"
  55. typedef enum {
  56. MSG_OK,
  57. MSG_NODEID_ERR,
  58. MSG_PID_ERR,
  59. MSG_SEQ_ERR,
  60. MSG_SIZE_ERR,
  61. MSG_SHA1_ERR,
  62. } msg_status_t;
  63. typedef struct {
  64. uint32_t nodeid;
  65. pid_t pid;
  66. unsigned char sha1[20];
  67. uint32_t seq;
  68. size_t size;
  69. unsigned char payload[0];
  70. } msg_t;
  71. #define LOG_STR_SIZE 256
  72. typedef struct {
  73. char log[LOG_STR_SIZE];
  74. struct list_head list;
  75. } log_entry_t;
  76. static char big_and_buf[HOW_BIG_AND_BUF];
  77. static int32_t record_config_events_g = 0;
  78. static int32_t record_messages_g = 0;
  79. static cpg_handle_t cpg_handle = 0;
  80. static corosync_cfg_handle_t cfg_handle = 0;
  81. static int32_t cpg_fd = -1;
  82. static int32_t cfg_fd = -1;
  83. static struct list_head config_chg_log_head;
  84. static struct list_head msg_log_head;
  85. static pid_t my_pid;
  86. static uint32_t my_nodeid;
  87. static int32_t my_seq;
  88. static int32_t use_zcb = 0;
  89. static int32_t my_msgs_to_send;
  90. static int32_t total_stored_msgs = 0;
  91. static int32_t in_cnchg = 0;
  92. static int32_t pcmk_test = 0;
  93. static void send_some_more_messages (void * unused);
  94. static char* err_status_string (char * buf, size_t buf_len, msg_status_t status)
  95. {
  96. switch (status) {
  97. case MSG_OK:
  98. strncpy (buf, "OK", buf_len);
  99. break;
  100. case MSG_NODEID_ERR:
  101. strncpy (buf, "NODEID_ERR", buf_len);
  102. break;
  103. case MSG_PID_ERR:
  104. strncpy (buf, "PID_ERR", buf_len);
  105. break;
  106. case MSG_SEQ_ERR:
  107. strncpy (buf, "SEQ_ERR", buf_len);
  108. break;
  109. case MSG_SIZE_ERR:
  110. strncpy (buf, "SIZE_ERR", buf_len);
  111. break;
  112. case MSG_SHA1_ERR:
  113. strncpy (buf, "SHA1_ERR", buf_len);
  114. break;
  115. default:
  116. strncpy (buf, "UNKNOWN_ERR", buf_len);
  117. break;
  118. }
  119. return buf;
  120. }
  121. static void delivery_callback (
  122. cpg_handle_t handle,
  123. const struct cpg_name *groupName,
  124. uint32_t nodeid,
  125. uint32_t pid,
  126. void *msg,
  127. size_t msg_len)
  128. {
  129. log_entry_t *log_pt;
  130. msg_t *msg_pt = (msg_t*)msg;
  131. msg_status_t status = MSG_OK;
  132. char status_buf[20];
  133. unsigned char sha1_compare[20];
  134. hash_state sha1_hash;
  135. if (record_messages_g == 0) {
  136. return;
  137. }
  138. msg_pt->seq = my_seq;
  139. my_seq++;
  140. if (nodeid != msg_pt->nodeid) {
  141. status = MSG_NODEID_ERR;
  142. }
  143. if (pid != msg_pt->pid) {
  144. status = MSG_PID_ERR;
  145. }
  146. if (msg_len != msg_pt->size) {
  147. status = MSG_SIZE_ERR;
  148. }
  149. sha1_init (&sha1_hash);
  150. sha1_process (&sha1_hash, msg_pt->payload, (msg_pt->size - sizeof (msg_t)));
  151. sha1_done (&sha1_hash, sha1_compare);
  152. if (memcmp (sha1_compare, msg_pt->sha1, 20) != 0) {
  153. syslog (LOG_ERR, "%s(); msg seq:%d; incorrect hash",
  154. __func__, msg_pt->seq);
  155. status = MSG_SHA1_ERR;
  156. }
  157. log_pt = malloc (sizeof(log_entry_t));
  158. list_init (&log_pt->list);
  159. snprintf (log_pt->log, LOG_STR_SIZE, "%d:%d:%d:%s;",
  160. msg_pt->nodeid, msg_pt->pid, msg_pt->seq,
  161. err_status_string (status_buf, 20, status));
  162. list_add_tail (&log_pt->list, &msg_log_head);
  163. total_stored_msgs++;
  164. // if ((total_stored_msgs % 100) == 0) {
  165. // syslog (LOG_INFO, "%s(); %d", __func__, total_stored_msgs);
  166. // }
  167. }
  168. static void config_change_callback (
  169. cpg_handle_t handle,
  170. const struct cpg_name *groupName,
  171. const struct cpg_address *member_list, size_t member_list_entries,
  172. const struct cpg_address *left_list, size_t left_list_entries,
  173. const struct cpg_address *joined_list, size_t joined_list_entries)
  174. {
  175. int i;
  176. log_entry_t *log_pt;
  177. /* group_name,ip,pid,join|leave */
  178. for (i = 0; i < left_list_entries; i++) {
  179. syslog (LOG_INFO, "%s(%d) %d:%s left", __func__, record_config_events_g,
  180. left_list[i].nodeid, groupName->value);
  181. if (record_config_events_g > 0) {
  182. log_pt = malloc (sizeof(log_entry_t));
  183. list_init (&log_pt->list);
  184. snprintf (log_pt->log, LOG_STR_SIZE, "%s,%d,%d,left",
  185. groupName->value, left_list[i].nodeid,left_list[i].pid);
  186. list_add_tail(&log_pt->list, &config_chg_log_head);
  187. }
  188. }
  189. for (i = 0; i < joined_list_entries; i++) {
  190. syslog (LOG_INFO, "%s(%d) %d:%s joined", __func__, record_config_events_g,
  191. left_list[i].nodeid, groupName->value);
  192. if (record_config_events_g > 0) {
  193. log_pt = malloc (sizeof(log_entry_t));
  194. list_init (&log_pt->list);
  195. snprintf (log_pt->log, LOG_STR_SIZE, "%s,%d,%d,join",
  196. groupName->value, joined_list[i].nodeid,joined_list[i].pid);
  197. list_add_tail (&log_pt->list, &config_chg_log_head);
  198. }
  199. }
  200. if (pcmk_test == 1) {
  201. in_cnchg = 1;
  202. send_some_more_messages (NULL);
  203. in_cnchg = 0;
  204. }
  205. }
  206. static void my_shutdown_callback (corosync_cfg_handle_t handle,
  207. corosync_cfg_shutdown_flags_t flags)
  208. {
  209. syslog (LOG_CRIT, "%s flags:%d", __func__, flags);
  210. if (flags & COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST) {
  211. corosync_cfg_replyto_shutdown (cfg_handle, COROSYNC_CFG_SHUTDOWN_FLAG_YES);
  212. }
  213. }
  214. static corosync_cfg_callbacks_t cfg_callbacks = {
  215. .corosync_cfg_shutdown_callback = my_shutdown_callback,
  216. .corosync_cfg_state_track_callback = NULL,
  217. };
  218. static cpg_callbacks_t callbacks = {
  219. .cpg_deliver_fn = delivery_callback,
  220. .cpg_confchg_fn = config_change_callback,
  221. };
  222. static void record_messages (void)
  223. {
  224. record_messages_g = 1;
  225. syslog (LOG_DEBUG,"%s() record:%d", __func__, record_messages_g);
  226. }
  227. static void record_config_events (int sock)
  228. {
  229. char response[100];
  230. record_config_events_g = 1;
  231. syslog (LOG_INFO, "%s() record:%d", __func__, record_config_events_g);
  232. snprintf (response, 100, "%s", OK_STR);
  233. send (sock, response, strlen (response), 0);
  234. }
  235. static void read_config_event (int sock)
  236. {
  237. const char *empty = "None";
  238. struct list_head * list = config_chg_log_head.next;
  239. log_entry_t *entry;
  240. if (list != &config_chg_log_head) {
  241. entry = list_entry (list, log_entry_t, list);
  242. send (sock, entry->log, strlen (entry->log), 0);
  243. list_del (&entry->list);
  244. free (entry);
  245. } else {
  246. syslog (LOG_DEBUG,"%s() no events in list", __func__);
  247. send (sock, empty, strlen (empty), 0);
  248. }
  249. }
  250. static void read_messages (int sock, char* atmost_str)
  251. {
  252. struct list_head * list;
  253. log_entry_t *entry;
  254. int atmost = atoi (atmost_str);
  255. int packed = 0;
  256. if (atmost == 0)
  257. atmost = 1;
  258. if (atmost > (HOW_BIG_AND_BUF / LOG_STR_SIZE))
  259. atmost = (HOW_BIG_AND_BUF / LOG_STR_SIZE);
  260. syslog (LOG_DEBUG, "%s() atmost %d; total_stored_msgs:%d",
  261. __func__, atmost, total_stored_msgs);
  262. big_and_buf[0] = '\0';
  263. for (list = msg_log_head.next;
  264. (!list_empty (&msg_log_head) && packed < atmost); ) {
  265. entry = list_entry (list, log_entry_t, list);
  266. strcat (big_and_buf, entry->log);
  267. packed++;
  268. list = list->next;
  269. list_del (&entry->list);
  270. free (entry);
  271. total_stored_msgs--;
  272. }
  273. syslog (LOG_DEBUG, "%s() sending %d; total_stored_msgs:%d; len:%d",
  274. __func__, packed, total_stored_msgs, (int)strlen (big_and_buf));
  275. if (packed == 0) {
  276. strcpy (big_and_buf, "None");
  277. }
  278. send (sock, big_and_buf, strlen (big_and_buf), 0);
  279. }
  280. static void send_some_more_messages_later (void)
  281. {
  282. poll_timer_handle timer_handle;
  283. cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
  284. poll_timer_add (
  285. ta_poll_handle_get(),
  286. 300, NULL,
  287. send_some_more_messages,
  288. &timer_handle);
  289. }
  290. static void send_some_more_messages_zcb (void)
  291. {
  292. msg_t *my_msg;
  293. int i;
  294. int send_now;
  295. size_t payload_size;
  296. size_t total_size;
  297. hash_state sha1_hash;
  298. cs_error_t res;
  299. cpg_flow_control_state_t fc_state;
  300. void *zcb_buffer;
  301. if (cpg_fd < 0)
  302. return;
  303. send_now = my_msgs_to_send;
  304. payload_size = (rand() % 100000);
  305. total_size = payload_size + sizeof (msg_t);
  306. cpg_zcb_alloc (cpg_handle, total_size, &zcb_buffer);
  307. my_msg = (msg_t*)zcb_buffer;
  308. //syslog (LOG_DEBUG,"%s() send_now:%d", __func__, send_now);
  309. my_msg->pid = my_pid;
  310. my_msg->nodeid = my_nodeid;
  311. my_msg->size = sizeof (msg_t) + payload_size;
  312. my_msg->seq = 0;
  313. for (i = 0; i < payload_size; i++) {
  314. my_msg->payload[i] = i;
  315. }
  316. sha1_init (&sha1_hash);
  317. sha1_process (&sha1_hash, my_msg->payload, payload_size);
  318. sha1_done (&sha1_hash, my_msg->sha1);
  319. for (i = 0; i < send_now; i++) {
  320. res = cpg_flow_control_state_get (cpg_handle, &fc_state);
  321. if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
  322. /* lets do this later */
  323. send_some_more_messages_later ();
  324. syslog (LOG_INFO, "%s() flow control enabled.", __func__);
  325. goto free_buffer;
  326. }
  327. res = cpg_zcb_mcast_joined (cpg_handle, CPG_TYPE_AGREED, zcb_buffer, total_size);
  328. if (res == CS_ERR_TRY_AGAIN) {
  329. /* lets do this later */
  330. send_some_more_messages_later ();
  331. // if (i > 0) {
  332. // syslog (LOG_INFO, "%s() TRY_AGAIN %d to send.",
  333. // __func__, my_msgs_to_send);
  334. // }
  335. goto free_buffer;
  336. } else if (res != CS_OK) {
  337. syslog (LOG_ERR, "%s() -> cpg_mcast_joined error:%d, exiting.",
  338. __func__, res);
  339. exit (-2);
  340. }
  341. my_msgs_to_send--;
  342. }
  343. free_buffer:
  344. cpg_zcb_free (cpg_handle, zcb_buffer);
  345. }
  346. #define cs_repeat(counter, max, code) do { \
  347. code; \
  348. if(res == CS_ERR_TRY_AGAIN) { \
  349. counter++; \
  350. sleep(counter); \
  351. } \
  352. } while(res == CS_ERR_TRY_AGAIN && counter < max)
  353. static unsigned char buffer[200000];
  354. static void send_some_more_messages_normal (void)
  355. {
  356. msg_t my_msg;
  357. struct iovec iov[2];
  358. int i;
  359. int send_now;
  360. size_t payload_size;
  361. hash_state sha1_hash;
  362. cs_error_t res;
  363. cpg_flow_control_state_t fc_state;
  364. int retries = 0;
  365. time_t before;
  366. if (cpg_fd < 0)
  367. return;
  368. send_now = my_msgs_to_send;
  369. //syslog (LOG_DEBUG,"%s() send_now:%d", __func__, send_now);
  370. my_msg.pid = my_pid;
  371. my_msg.nodeid = my_nodeid;
  372. payload_size = (rand() % 100000);
  373. my_msg.size = sizeof (msg_t) + payload_size;
  374. my_msg.seq = 0;
  375. for (i = 0; i < payload_size; i++) {
  376. buffer[i] = i;
  377. }
  378. sha1_init (&sha1_hash);
  379. sha1_process (&sha1_hash, buffer, payload_size);
  380. sha1_done (&sha1_hash, my_msg.sha1);
  381. iov[0].iov_len = sizeof (msg_t);
  382. iov[0].iov_base = &my_msg;
  383. iov[1].iov_len = payload_size;
  384. iov[1].iov_base = buffer;
  385. for (i = 0; i < send_now; i++) {
  386. if (in_cnchg && pcmk_test) {
  387. retries = 0;
  388. before = time(NULL);
  389. cs_repeat(retries, 30, res = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, 1));
  390. if (retries > 20) {
  391. syslog (LOG_ERR, "%s() -> cs_repeat: blocked for :%lu secs.",
  392. __func__, (unsigned long)(time(NULL) - before));
  393. }
  394. if (res != CS_OK) {
  395. syslog (LOG_ERR, "%s() -> cpg_mcast_joined error:%d.",
  396. __func__, res);
  397. return;
  398. }
  399. } else {
  400. res = cpg_flow_control_state_get (cpg_handle, &fc_state);
  401. if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
  402. /* lets do this later */
  403. send_some_more_messages_later ();
  404. syslog (LOG_INFO, "%s() flow control enabled.", __func__);
  405. return;
  406. }
  407. res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 2);
  408. if (res == CS_ERR_TRY_AGAIN) {
  409. /* lets do this later */
  410. send_some_more_messages_later ();
  411. if (i > 0) {
  412. syslog (LOG_INFO, "%s() TRY_AGAIN %d to send.",
  413. __func__, my_msgs_to_send);
  414. }
  415. return;
  416. } else if (res != CS_OK) {
  417. syslog (LOG_ERR, "%s() -> cpg_mcast_joined error:%d, exiting.",
  418. __func__, res);
  419. exit (-2);
  420. }
  421. }
  422. my_msgs_to_send--;
  423. }
  424. }
  425. static void send_some_more_messages (void * unused)
  426. {
  427. if (use_zcb) {
  428. send_some_more_messages_zcb ();
  429. } else {
  430. send_some_more_messages_normal ();
  431. }
  432. }
  433. static void msg_blaster (int sock, char* num_to_send_str)
  434. {
  435. my_msgs_to_send = atoi (num_to_send_str);
  436. my_seq = 1;
  437. my_pid = getpid();
  438. use_zcb = 0;
  439. total_stored_msgs = 0;
  440. cpg_local_get (cpg_handle, &my_nodeid);
  441. /* control the limits */
  442. if (my_msgs_to_send <= 0)
  443. my_msgs_to_send = 1;
  444. if (my_msgs_to_send > 10000)
  445. my_msgs_to_send = 10000;
  446. send_some_more_messages_normal ();
  447. }
  448. static void context_test (int sock)
  449. {
  450. char response[100];
  451. char *cmp;
  452. cpg_context_set (cpg_handle, response);
  453. cpg_context_get (cpg_handle, (void**)&cmp);
  454. if (response != cmp) {
  455. snprintf (response, 100, "%s", FAIL_STR);
  456. }
  457. else {
  458. snprintf (response, 100, "%s", OK_STR);
  459. }
  460. send (sock, response, strlen (response), 0);
  461. }
  462. static void msg_blaster_zcb (int sock, char* num_to_send_str)
  463. {
  464. my_msgs_to_send = atoi (num_to_send_str);
  465. my_seq = 1;
  466. my_pid = getpid();
  467. use_zcb = 1;
  468. total_stored_msgs = 0;
  469. cpg_local_get (cpg_handle, &my_nodeid);
  470. /* control the limits */
  471. if (my_msgs_to_send <= 0)
  472. my_msgs_to_send = 1;
  473. if (my_msgs_to_send > 10000)
  474. my_msgs_to_send = 10000;
  475. send_some_more_messages_zcb ();
  476. }
  477. static corosync_cfg_state_notification_t notification_buffer;
  478. static int cfg_dispatch_wrapper_fn (hdb_handle_t handle,
  479. int fd,
  480. int revents,
  481. void *data)
  482. {
  483. cs_error_t error;
  484. if (revents & POLLHUP || revents & POLLERR) {
  485. syslog (LOG_ERR, "%s() got POLLHUP disconnecting from CFG", __func__);
  486. poll_dispatch_delete (ta_poll_handle_get(), cfg_fd);
  487. close (cfg_fd);
  488. cfg_fd = -1;
  489. return -1;
  490. }
  491. error = corosync_cfg_dispatch (cfg_handle, CS_DISPATCH_ALL);
  492. if (error == CS_ERR_LIBRARY) {
  493. syslog (LOG_ERR, "%s() got LIB error disconnecting from CFG.", __func__);
  494. poll_dispatch_delete (ta_poll_handle_get(), cfg_fd);
  495. close (cfg_fd);
  496. cfg_fd = -1;
  497. return -1;
  498. }
  499. return 0;
  500. }
  501. static int cpg_dispatch_wrapper_fn (hdb_handle_t handle,
  502. int fd,
  503. int revents,
  504. void *data)
  505. {
  506. cs_error_t error;
  507. if (revents & POLLHUP || revents & POLLERR) {
  508. syslog (LOG_ERR, "%s() got POLLHUP disconnecting from CPG", __func__);
  509. poll_dispatch_delete (ta_poll_handle_get(), cpg_fd);
  510. close (cpg_fd);
  511. cpg_fd = -1;
  512. return -1;
  513. }
  514. error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
  515. if (error == CS_ERR_LIBRARY) {
  516. syslog (LOG_ERR, "%s() got LIB error disconnecting from CPG", __func__);
  517. poll_dispatch_delete (ta_poll_handle_get(), cpg_fd);
  518. close (cpg_fd);
  519. cpg_fd = -1;
  520. return -1;
  521. }
  522. return 0;
  523. }
  524. static void do_command (int sock, char* func, char*args[], int num_args)
  525. {
  526. int result;
  527. char response[100];
  528. struct cpg_name group_name;
  529. if (parse_debug)
  530. syslog (LOG_DEBUG,"RPC:%s() called.", func);
  531. if (strcmp ("cpg_mcast_joined",func) == 0) {
  532. struct iovec iov[5];
  533. int a;
  534. for (a = 0; a < num_args; a++) {
  535. iov[a].iov_base = args[a];
  536. iov[a].iov_len = strlen(args[a])+1;
  537. }
  538. cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, num_args);
  539. } else if (strcmp ("cpg_join",func) == 0) {
  540. strcpy (group_name.value, args[0]);
  541. group_name.length = strlen(args[0]);
  542. result = cpg_join (cpg_handle, &group_name);
  543. if (result != CS_OK) {
  544. syslog (LOG_ERR,
  545. "Could not join process group, error %d\n", result);
  546. exit (1);
  547. }
  548. syslog (LOG_INFO, "called cpg_join()!");
  549. } else if (strcmp ("cpg_leave",func) == 0) {
  550. strcpy (group_name.value, args[0]);
  551. group_name.length = strlen(args[0]);
  552. result = cpg_leave (cpg_handle, &group_name);
  553. if (result != CS_OK) {
  554. syslog (LOG_ERR,
  555. "Could not leave process group, error %d\n", result);
  556. exit (1);
  557. }
  558. syslog (LOG_INFO, "called cpg_leave()!");
  559. } else if (strcmp ("cpg_initialize",func) == 0) {
  560. int retry_count = 0;
  561. result = cpg_initialize (&cpg_handle, &callbacks);
  562. while (result != CS_OK) {
  563. syslog (LOG_ERR,
  564. "cpg_initialize error %d (attempt %d)\n",
  565. result, retry_count);
  566. if (retry_count >= 3) {
  567. exit (1);
  568. }
  569. sleep(1);
  570. retry_count++;
  571. result = cpg_initialize (&cpg_handle, &callbacks);
  572. }
  573. cpg_fd_get (cpg_handle, &cpg_fd);
  574. poll_dispatch_add (ta_poll_handle_get(), cpg_fd, POLLIN|POLLNVAL, NULL, cpg_dispatch_wrapper_fn);
  575. } else if (strcmp ("cpg_local_get", func) == 0) {
  576. unsigned int local_nodeid;
  577. cpg_local_get (cpg_handle, &local_nodeid);
  578. snprintf (response, 100, "%u",local_nodeid);
  579. send (sock, response, strlen (response), 0);
  580. } else if (strcmp ("cpg_finalize", func) == 0) {
  581. cpg_finalize (cpg_handle);
  582. poll_dispatch_delete (ta_poll_handle_get(), cpg_fd);
  583. cpg_fd = -1;
  584. } else if (strcmp ("record_config_events", func) == 0) {
  585. record_config_events (sock);
  586. } else if (strcmp ("record_messages", func) == 0) {
  587. record_messages ();
  588. } else if (strcmp ("read_config_event", func) == 0) {
  589. read_config_event (sock);
  590. } else if (strcmp ("read_messages", func) == 0) {
  591. read_messages (sock, args[0]);
  592. } else if (strcmp ("msg_blaster_zcb", func) == 0) {
  593. msg_blaster_zcb (sock, args[0]);
  594. } else if (strcmp ("pcmk_test", func) == 0) {
  595. pcmk_test = 1;
  596. } else if (strcmp ("msg_blaster", func) == 0) {
  597. msg_blaster (sock, args[0]);
  598. } else if (strcmp ("context_test", func) == 0) {
  599. context_test (sock);
  600. } else if (strcmp ("are_you_ok_dude", func) == 0) {
  601. snprintf (response, 100, "%s", OK_STR);
  602. send (sock, response, strlen (response), 0);
  603. } else if (strcmp ("cfg_shutdown", func) == 0) {
  604. corosync_cfg_try_shutdown (cfg_handle, COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST);
  605. } else if (strcmp ("cfg_initialize",func) == 0) {
  606. int retry_count = 0;
  607. syslog (LOG_INFO,"%s %s() called!", __func__, func);
  608. result = corosync_cfg_initialize (&cfg_handle, &cfg_callbacks);
  609. while (result != CS_OK) {
  610. syslog (LOG_ERR,
  611. "cfg_initialize error %d (attempt %d)\n",
  612. result, retry_count);
  613. if (retry_count >= 3) {
  614. exit (1);
  615. }
  616. sleep(1);
  617. retry_count++;
  618. result = corosync_cfg_initialize (&cfg_handle, &cfg_callbacks);
  619. }
  620. corosync_cfg_fd_get (cfg_handle, &cfg_fd);
  621. corosync_cfg_state_track (cfg_handle, 0, &notification_buffer);
  622. poll_dispatch_add (ta_poll_handle_get(), cfg_fd, POLLIN|POLLNVAL, NULL, cfg_dispatch_wrapper_fn);
  623. } else {
  624. syslog (LOG_ERR,"%s RPC:%s not supported!", __func__, func);
  625. }
  626. }
  627. int main (int argc, char *argv[])
  628. {
  629. openlog (NULL, LOG_CONS|LOG_PID, LOG_DAEMON);
  630. list_init (&msg_log_head);
  631. list_init (&config_chg_log_head);
  632. return test_agent_run (9034, do_command);
  633. }