cpg_test_agent.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616
  1. /*
  2. * Copyright (c) 2010 Red Hat, Inc.
  3. *
  4. * All rights reserved.
  5. *
  6. * Author: Angus Salkeld (asalkeld@redhat.com)
  7. *
  8. * This software licensed under BSD license, the text of which follows:
  9. *
  10. * Redistribution and use in source and binary forms, with or without
  11. * modification, are permitted provided that the following conditions are met:
  12. *
  13. * - Redistributions of source code must retain the above copyright notice,
  14. * this list of conditions and the following disclaimer.
  15. * - Redistributions in binary form must reproduce the above copyright notice,
  16. * this list of conditions and the following disclaimer in the documentation
  17. * and/or other materials provided with the distribution.
  18. * - Neither the name of the MontaVista Software, Inc. nor the names of its
  19. * contributors may be used to endorse or promote products derived from this
  20. * software without specific prior written permission.
  21. *
  22. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  23. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  24. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  25. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  26. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  27. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  28. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  29. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  30. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  31. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  32. * THE POSSIBILITY OF SUCH DAMAGE.
  33. */
  34. #include <errno.h>
  35. #include <unistd.h>
  36. #include <stdio.h>
  37. #include <stdlib.h>
  38. #include <assert.h>
  39. #include <string.h>
  40. #include <sys/types.h>
  41. #include <sys/socket.h>
  42. #include <netinet/in.h>
  43. #include <arpa/inet.h>
  44. #include <netdb.h>
  45. #include <syslog.h>
  46. #include <poll.h>
  47. #include <unistd.h>
  48. #include <fcntl.h>
  49. #include <corosync/totem/coropoll.h>
  50. #include <corosync/list.h>
  51. #include <corosync/cpg.h>
  52. #define SERVER_PORT "9034"
  53. typedef enum {
  54. MSG_OK,
  55. MSG_NODEID_ERR,
  56. MSG_PID_ERR,
  57. MSG_SEQ_ERR,
  58. MSG_SIZE_ERR,
  59. MSG_HASH_ERR,
  60. } msg_status_t;
  61. typedef struct {
  62. uint32_t nodeid;
  63. pid_t pid;
  64. uint32_t hash;
  65. uint32_t seq;
  66. size_t size;
  67. char payload[1];
  68. } msg_t;
  69. #define LOG_STR_SIZE 128
  70. typedef struct {
  71. char log[LOG_STR_SIZE];
  72. struct list_head list;
  73. } log_entry_t;
  74. #define HOW_BIG_AND_BUF 4096
  75. static char big_and_buf[HOW_BIG_AND_BUF];
  76. static char big_and_buf_rx[HOW_BIG_AND_BUF];
  77. static int32_t parse_debug = 0;
  78. static int32_t record_config_events_g = 0;
  79. static int32_t record_messages_g = 0;
  80. static cpg_handle_t cpg_handle = 0;
  81. static int32_t cpg_fd = -1;
  82. static struct list_head config_chg_log_head;
  83. static struct list_head msg_log_head;
  84. static pid_t my_pid;
  85. static uint32_t my_nodeid;
  86. static int32_t my_seq;
  87. static int32_t my_msgs_to_send;
  88. static int32_t total_stored_msgs = 0;
  89. static hdb_handle_t poll_handle;
  90. static void delivery_callback (
  91. cpg_handle_t handle,
  92. const struct cpg_name *groupName,
  93. uint32_t nodeid,
  94. uint32_t pid,
  95. void *msg,
  96. size_t msg_len)
  97. {
  98. log_entry_t *log_pt;
  99. msg_t *msg_pt = (msg_t*)msg;
  100. msg_status_t status = MSG_OK;
  101. if (record_messages_g == 0) {
  102. return;
  103. }
  104. msg_pt->seq = my_seq;
  105. my_seq++;
  106. if (nodeid != msg_pt->nodeid) {
  107. status = MSG_NODEID_ERR;
  108. }
  109. if (pid != msg_pt->pid) {
  110. status = MSG_PID_ERR;
  111. }
  112. if (msg_len != msg_pt->size) {
  113. status = MSG_SIZE_ERR;
  114. }
  115. /* TODO: check hash here.
  116. */
  117. log_pt = malloc (sizeof(log_entry_t));
  118. list_init (&log_pt->list);
  119. snprintf (log_pt->log, 128, "%d:%d:%d:%d;",
  120. msg_pt->nodeid, msg_pt->pid, msg_pt->seq, status);
  121. list_add_tail (&log_pt->list, &msg_log_head);
  122. total_stored_msgs++;
  123. }
  124. static void config_change_callback (
  125. cpg_handle_t handle,
  126. const struct cpg_name *groupName,
  127. const struct cpg_address *member_list, size_t member_list_entries,
  128. const struct cpg_address *left_list, size_t left_list_entries,
  129. const struct cpg_address *joined_list, size_t joined_list_entries)
  130. {
  131. int i;
  132. log_entry_t *log_pt;
  133. /* group_name,ip,pid,join|leave */
  134. if (record_config_events_g == 0) {
  135. return;
  136. }
  137. for (i = 0; i < left_list_entries; i++) {
  138. syslog (LOG_DEBUG, "%s() inserting leave event into list", __func__);
  139. log_pt = malloc (sizeof(log_entry_t));
  140. list_init (&log_pt->list);
  141. snprintf (log_pt->log, 256, "%s,%d,%d,left",
  142. groupName->value, left_list[i].nodeid,left_list[i].pid);
  143. list_add_tail(&log_pt->list, &config_chg_log_head);
  144. }
  145. for (i = 0; i < joined_list_entries; i++) {
  146. syslog (LOG_DEBUG, "%s() inserting join event into list", __func__);
  147. log_pt = malloc (sizeof(log_entry_t));
  148. list_init (&log_pt->list);
  149. snprintf (log_pt->log, 256, "%s,%d,%d,join",
  150. groupName->value, joined_list[i].nodeid,joined_list[i].pid);
  151. list_add_tail (&log_pt->list, &config_chg_log_head);
  152. }
  153. }
  154. static cpg_callbacks_t callbacks = {
  155. .cpg_deliver_fn = delivery_callback,
  156. .cpg_confchg_fn = config_change_callback,
  157. };
  158. static void record_messages (void)
  159. {
  160. record_messages_g = 1;
  161. syslog (LOG_DEBUG,"%s() record:%d", __func__, record_messages_g);
  162. }
  163. static void record_config_events (void)
  164. {
  165. record_config_events_g = 1;
  166. syslog (LOG_DEBUG,"%s() record:%d", __func__, record_config_events_g);
  167. }
  168. static void read_config_event (int sock)
  169. {
  170. const char *empty = "None";
  171. struct list_head * list = config_chg_log_head.next;
  172. log_entry_t *entry;
  173. if (list != &config_chg_log_head) {
  174. entry = list_entry (list, log_entry_t, list);
  175. send (sock, entry->log, strlen (entry->log) + 1, 0);
  176. list_del (&entry->list);
  177. free (entry);
  178. } else {
  179. syslog (LOG_DEBUG,"%s() no events in list", __func__);
  180. send (sock, empty, strlen (empty) + 1, 0);
  181. }
  182. }
  183. static void read_messages (int sock, char* atmost_str)
  184. {
  185. struct list_head * list;
  186. log_entry_t *entry;
  187. int atmost = atoi (atmost_str);
  188. int packed = 0;
  189. if (atmost == 0)
  190. atmost = 1;
  191. if (atmost > (HOW_BIG_AND_BUF / LOG_STR_SIZE))
  192. atmost = (HOW_BIG_AND_BUF / LOG_STR_SIZE);
  193. syslog (LOG_DEBUG, "%s() atmost %d; total_stored_msgs:%d",
  194. __func__, atmost, total_stored_msgs);
  195. big_and_buf[0] = '\0';
  196. for (list = msg_log_head.next;
  197. (!list_empty (&msg_log_head) && packed < atmost); ) {
  198. entry = list_entry (list, log_entry_t, list);
  199. strcat (big_and_buf, entry->log);
  200. packed++;
  201. list = list->next;
  202. list_del (&entry->list);
  203. free (entry);
  204. total_stored_msgs--;
  205. }
  206. syslog (LOG_DEBUG, "%s() sending %d; total_stored_msgs:%d; len:%d",
  207. __func__, packed, total_stored_msgs, (int)strlen (big_and_buf));
  208. if (packed == 0) {
  209. strcpy (big_and_buf, "None");
  210. }
  211. send (sock, big_and_buf, strlen (big_and_buf), 0);
  212. }
  213. static void send_some_more_messages (void)
  214. {
  215. msg_t my_msg;
  216. struct iovec iov[1];
  217. int i;
  218. int send_now;
  219. cs_error_t res;
  220. cpg_flow_control_state_t fc_state;
  221. if (cpg_fd < 0)
  222. return;
  223. send_now = my_msgs_to_send;
  224. syslog (LOG_DEBUG,"%s() send_now:%d", __func__, send_now);
  225. my_msg.pid = my_pid;
  226. my_msg.nodeid = my_nodeid;
  227. my_msg.hash = 0;
  228. my_msg.size = sizeof (msg_t);
  229. my_msg.seq = 0;
  230. iov[0].iov_len = my_msg.size;
  231. iov[0].iov_base = &my_msg;
  232. for (i = 0; i < send_now; i++) {
  233. res = cpg_flow_control_state_get (cpg_handle, &fc_state);
  234. if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
  235. /* lets do this later */
  236. syslog (LOG_DEBUG, "%s() flow control enabled.", __func__);
  237. return;
  238. }
  239. res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 1);
  240. if (res == CS_ERR_TRY_AGAIN) {
  241. /* lets do this later */
  242. syslog (LOG_DEBUG, "%s() cpg_mcast_joined() says try again.",
  243. __func__);
  244. return;
  245. } else
  246. if (res != CS_OK) {
  247. syslog (LOG_ERR, "%s() -> cpg_mcast_joined error:%d, exiting.",
  248. __func__, res);
  249. exit (-2);
  250. }
  251. my_msgs_to_send--;
  252. }
  253. }
  254. static void msg_blaster (int sock, char* num_to_send_str)
  255. {
  256. my_msgs_to_send = atoi (num_to_send_str);
  257. my_seq = 1;
  258. my_pid = getpid();
  259. cpg_local_get (cpg_handle, &my_nodeid);
  260. /* control the limits */
  261. if (my_msgs_to_send <= 0)
  262. my_msgs_to_send = 1;
  263. if (my_msgs_to_send > 1000)
  264. my_msgs_to_send = 1000;
  265. send_some_more_messages ();
  266. }
  267. static int cpg_dispatch_wrapper_fn (hdb_handle_t handle,
  268. int fd,
  269. int revents,
  270. void *data)
  271. {
  272. cs_error_t error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
  273. if (error == CS_ERR_LIBRARY) {
  274. syslog (LOG_ERR, "%s() got LIB error disconnecting from corosync.", __func__);
  275. poll_dispatch_delete (poll_handle, cpg_fd);
  276. close (cpg_fd);
  277. cpg_fd = -1;
  278. }
  279. return 0;
  280. }
  281. static void do_command (int sock, char* func, char*args[], int num_args)
  282. {
  283. int result;
  284. struct cpg_name group_name;
  285. if (parse_debug)
  286. syslog (LOG_DEBUG,"RPC:%s() called.", func);
  287. if (strcmp ("cpg_mcast_joined",func) == 0) {
  288. struct iovec iov[5];
  289. int a;
  290. for (a = 0; a < num_args; a++) {
  291. iov[a].iov_base = args[a];
  292. iov[a].iov_len = strlen(args[a])+1;
  293. }
  294. cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, num_args);
  295. } else if (strcmp ("cpg_join",func) == 0) {
  296. strcpy (group_name.value, args[0]);
  297. group_name.length = strlen(args[0]);
  298. result = cpg_join (cpg_handle, &group_name);
  299. if (result != CS_OK) {
  300. syslog (LOG_ERR,
  301. "Could not join process group, error %d\n", result);
  302. exit (1);
  303. }
  304. } else if (strcmp ("cpg_leave",func) == 0) {
  305. strcpy (group_name.value, args[0]);
  306. group_name.length = strlen(args[0]);
  307. result = cpg_leave (cpg_handle, &group_name);
  308. if (result != CS_OK) {
  309. syslog (LOG_ERR,
  310. "Could not leave process group, error %d\n", result);
  311. exit (1);
  312. }
  313. syslog (LOG_INFO, "called cpg_leave()!");
  314. } else if (strcmp ("cpg_initialize",func) == 0) {
  315. int retry_count = 0;
  316. result = cpg_initialize (&cpg_handle, &callbacks);
  317. while (result != CS_OK) {
  318. syslog (LOG_ERR,
  319. "cpg_initialize error %d (attempt %d)\n",
  320. result, retry_count);
  321. if (retry_count >= 3) {
  322. exit (1);
  323. }
  324. sleep(1);
  325. retry_count++;
  326. }
  327. cpg_fd_get (cpg_handle, &cpg_fd);
  328. poll_dispatch_add (poll_handle, cpg_fd, POLLIN|POLLNVAL, NULL, cpg_dispatch_wrapper_fn);
  329. } else if (strcmp ("cpg_local_get", func) == 0) {
  330. unsigned int local_nodeid;
  331. char response[100];
  332. cpg_local_get (cpg_handle, &local_nodeid);
  333. snprintf (response, 100, "%u",local_nodeid);
  334. send (sock, response, strlen (response) + 1, 0);
  335. } else if (strcmp ("cpg_finalize",func) == 0) {
  336. cpg_finalize (cpg_handle);
  337. poll_dispatch_delete (poll_handle, cpg_fd);
  338. cpg_fd = -1;
  339. } else if (strcmp ("record_config_events",func) == 0) {
  340. record_config_events ();
  341. } else if (strcmp ("record_messages",func) == 0) {
  342. record_messages ();
  343. } else if (strcmp ("read_config_event",func) == 0) {
  344. read_config_event (sock);
  345. } else if (strcmp ("read_messages",func) == 0) {
  346. read_messages (sock, args[0]);
  347. } else if (strcmp ("msg_blaster",func) == 0) {
  348. msg_blaster (sock, args[0]);
  349. } else {
  350. syslog (LOG_ERR,"%s RPC:%s not supported!", __func__, func);
  351. }
  352. }
  353. static void handle_command (int sock, char* msg)
  354. {
  355. int num_args;
  356. char *saveptr = NULL;
  357. char *str = strdup (msg);
  358. char *str_len;
  359. char *str_arg;
  360. char *args[5];
  361. int i = 0;
  362. int a = 0;
  363. char* func = NULL;
  364. if (parse_debug)
  365. syslog (LOG_DEBUG,"%s (MSG:%s)\n", __func__, msg);
  366. str_len = strtok_r (str, ":", &saveptr);
  367. assert (str_len);
  368. num_args = atoi (str_len) * 2;
  369. for (i = 0; i < num_args / 2; i++) {
  370. str_len = strtok_r (NULL, ":", &saveptr);
  371. str_arg = strtok_r (NULL, ":", &saveptr);
  372. if (func == NULL) {
  373. /* first "arg" is the function */
  374. if (parse_debug)
  375. syslog (LOG_DEBUG, "(LEN:%s, FUNC:%s)", str_len, str_arg);
  376. func = str_arg;
  377. a = 0;
  378. } else {
  379. args[a] = str_arg;
  380. a++;
  381. if (parse_debug)
  382. syslog (LOG_DEBUG, "(LEN:%s, ARG:%s)", str_len, str_arg);
  383. }
  384. }
  385. do_command (sock, func, args, a+1);
  386. free (str);
  387. }
  388. static int server_process_data_fn (hdb_handle_t handle,
  389. int fd,
  390. int revents,
  391. void *data)
  392. {
  393. char *saveptr;
  394. char *msg;
  395. char *cmd;
  396. int32_t nbytes;
  397. if ((nbytes = recv (fd, big_and_buf_rx, sizeof (big_and_buf_rx), 0)) <= 0) {
  398. /* got error or connection closed by client */
  399. if (nbytes == 0) {
  400. /* connection closed */
  401. syslog (LOG_WARNING, "socket %d hung up: exiting...\n", fd);
  402. } else {
  403. syslog (LOG_ERR,"recv() failed: %s", strerror(errno));
  404. }
  405. close (fd);
  406. exit (0);
  407. } else {
  408. if (my_msgs_to_send > 0)
  409. send_some_more_messages ();
  410. big_and_buf_rx[nbytes] = '\0';
  411. msg = strtok_r (big_and_buf_rx, ";", &saveptr);
  412. assert (msg);
  413. while (msg) {
  414. cmd = strdup (msg);
  415. handle_command (fd, cmd);
  416. free (cmd);
  417. msg = strtok_r (NULL, ";", &saveptr);
  418. }
  419. }
  420. return 0;
  421. }
  422. static int server_accept_fn (hdb_handle_t handle,
  423. int fd,
  424. int revents,
  425. void *data)
  426. {
  427. socklen_t addrlen;
  428. struct sockaddr_in in_addr;
  429. int new_fd;
  430. int res;
  431. addrlen = sizeof (struct sockaddr_in);
  432. retry_accept:
  433. new_fd = accept (fd, (struct sockaddr *)&in_addr, &addrlen);
  434. if (new_fd == -1 && errno == EINTR) {
  435. goto retry_accept;
  436. }
  437. if (new_fd == -1) {
  438. syslog (LOG_ERR,
  439. "Could not accept connection: %s\n", strerror (errno));
  440. return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
  441. }
  442. res = fcntl (new_fd, F_SETFL, O_NONBLOCK);
  443. if (res == -1) {
  444. syslog (LOG_ERR,
  445. "Could not set non-blocking operation on connection: %s\n",
  446. strerror (errno));
  447. close (new_fd);
  448. return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
  449. }
  450. poll_dispatch_add (poll_handle, new_fd, POLLIN|POLLNVAL, NULL, server_process_data_fn);
  451. return 0;
  452. }
  453. static int create_server_sockect (void)
  454. {
  455. int listener;
  456. int yes = 1;
  457. int rv;
  458. struct addrinfo hints, *ai, *p;
  459. /* get a socket and bind it
  460. */
  461. memset (&hints, 0, sizeof hints);
  462. hints.ai_family = AF_UNSPEC;
  463. hints.ai_socktype = SOCK_STREAM;
  464. hints.ai_flags = AI_PASSIVE;
  465. if ((rv = getaddrinfo (NULL, SERVER_PORT, &hints, &ai)) != 0) {
  466. syslog (LOG_ERR, "%s\n", gai_strerror (rv));
  467. exit (1);
  468. }
  469. for (p = ai; p != NULL; p = p->ai_next) {
  470. listener = socket (p->ai_family, p->ai_socktype, p->ai_protocol);
  471. if (listener < 0) {
  472. continue;
  473. }
  474. /* lose the pesky "address already in use" error message
  475. */
  476. if (setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
  477. &yes, sizeof(int)) < 0) {
  478. syslog (LOG_ERR, "setsockopt() failed: %s\n", strerror (errno));
  479. }
  480. if (bind (listener, p->ai_addr, p->ai_addrlen) < 0) {
  481. syslog (LOG_ERR, "bind() failed: %s\n", strerror (errno));
  482. close (listener);
  483. continue;
  484. }
  485. break;
  486. }
  487. if (p == NULL) {
  488. syslog (LOG_ERR, "failed to bind\n");
  489. exit (2);
  490. }
  491. freeaddrinfo (ai);
  492. if (listen (listener, 10) == -1) {
  493. syslog (LOG_ERR, "listen() failed: %s", strerror(errno));
  494. exit (3);
  495. }
  496. return listener;
  497. }
  498. int main (int argc, char *argv[])
  499. {
  500. int listener;
  501. openlog (NULL, LOG_CONS|LOG_PID, LOG_DAEMON);
  502. list_init (&msg_log_head);
  503. list_init (&config_chg_log_head);
  504. poll_handle = poll_create ();
  505. listener = create_server_sockect ();
  506. poll_dispatch_add (poll_handle, listener, POLLIN|POLLNVAL, NULL, server_accept_fn);
  507. poll_run (poll_handle);
  508. return -1;
  509. }