cpg_test_agent.c 16 KB


  1. /*
  2. * Copyright (c) 2010 Red Hat, Inc.
  3. *
  4. * All rights reserved.
  5. *
  6. * Author: Angus Salkeld (asalkeld@redhat.com)
  7. *
  8. * This software licensed under BSD license, the text of which follows:
  9. *
  10. * Redistribution and use in source and binary forms, with or without
  11. * modification, are permitted provided that the following conditions are met:
  12. *
  13. * - Redistributions of source code must retain the above copyright notice,
  14. * this list of conditions and the following disclaimer.
  15. * - Redistributions in binary form must reproduce the above copyright notice,
  16. * this list of conditions and the following disclaimer in the documentation
  17. * and/or other materials provided with the distribution.
  18. * - Neither the name of the MontaVista Software, Inc. nor the names of its
  19. * contributors may be used to endorse or promote products derived from this
  20. * software without specific prior written permission.
  21. *
  22. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  23. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  24. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  25. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  26. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  27. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  28. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  29. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  30. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  31. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  32. * THE POSSIBILITY OF SUCH DAMAGE.
  33. */
  34. #include <errno.h>
  35. #include <unistd.h>
  36. #include <stdio.h>
  37. #include <stdlib.h>
  38. #include <assert.h>
  39. #include <string.h>
  40. #include <sys/types.h>
  41. #include <sys/socket.h>
  42. #include <netinet/in.h>
  43. #include <arpa/inet.h>
  44. #include <netdb.h>
  45. #include <syslog.h>
  46. #include <poll.h>
  47. #include <unistd.h>
  48. #include <fcntl.h>
  49. #include <corosync/totem/coropoll.h>
  50. #include <corosync/list.h>
  51. #include <corosync/cpg.h>
  52. #include "../../exec/crypto.h"
  53. #define SERVER_PORT "9034"
  54. typedef enum {
  55. MSG_OK,
  56. MSG_NODEID_ERR,
  57. MSG_PID_ERR,
  58. MSG_SEQ_ERR,
  59. MSG_SIZE_ERR,
  60. MSG_SHA1_ERR,
  61. } msg_status_t;
  62. typedef struct {
  63. uint32_t nodeid;
  64. pid_t pid;
  65. unsigned char sha1[20];
  66. uint32_t seq;
  67. size_t size;
  68. unsigned char payload[0];
  69. } msg_t;
  70. #define LOG_STR_SIZE 256
  71. typedef struct {
  72. char log[LOG_STR_SIZE];
  73. struct list_head list;
  74. } log_entry_t;
  75. #define HOW_BIG_AND_BUF 4096
  76. static char big_and_buf[HOW_BIG_AND_BUF];
  77. static char big_and_buf_rx[HOW_BIG_AND_BUF];
  78. static int32_t parse_debug = 0;
  79. static int32_t record_config_events_g = 0;
  80. static int32_t record_messages_g = 0;
  81. static cpg_handle_t cpg_handle = 0;
  82. static int32_t cpg_fd = -1;
  83. static struct list_head config_chg_log_head;
  84. static struct list_head msg_log_head;
  85. static pid_t my_pid;
  86. static uint32_t my_nodeid;
  87. static int32_t my_seq;
  88. static int32_t my_msgs_to_send;
  89. static int32_t total_stored_msgs = 0;
  90. static hdb_handle_t poll_handle;
  91. static char* err_status_string (char * buf, size_t buf_len, msg_status_t status)
  92. {
  93. switch (status) {
  94. case MSG_OK:
  95. strncpy (buf, "OK", buf_len);
  96. return buf;
  97. break;
  98. case MSG_NODEID_ERR:
  99. strncpy (buf, "NODEID_ERR", buf_len);
  100. return buf;
  101. break;
  102. case MSG_PID_ERR:
  103. strncpy (buf, "PID_ERR", buf_len);
  104. return buf;
  105. break;
  106. case MSG_SEQ_ERR:
  107. strncpy (buf, "SEQ_ERR", buf_len);
  108. return buf;
  109. break;
  110. case MSG_SIZE_ERR:
  111. strncpy (buf, "SIZE_ERR", buf_len);
  112. return buf;
  113. break;
  114. case MSG_SHA1_ERR:
  115. default:
  116. strncpy (buf, "SHA1_ERR", buf_len);
  117. return buf;
  118. break;
  119. }
  120. }
  121. static void delivery_callback (
  122. cpg_handle_t handle,
  123. const struct cpg_name *groupName,
  124. uint32_t nodeid,
  125. uint32_t pid,
  126. void *msg,
  127. size_t msg_len)
  128. {
  129. log_entry_t *log_pt;
  130. msg_t *msg_pt = (msg_t*)msg;
  131. msg_status_t status = MSG_OK;
  132. char status_buf[20];
  133. unsigned char sha1_compare[20];
  134. hash_state sha1_hash;
  135. if (record_messages_g == 0) {
  136. return;
  137. }
  138. msg_pt->seq = my_seq;
  139. my_seq++;
  140. if (nodeid != msg_pt->nodeid) {
  141. status = MSG_NODEID_ERR;
  142. }
  143. if (pid != msg_pt->pid) {
  144. status = MSG_PID_ERR;
  145. }
  146. if (msg_len != msg_pt->size) {
  147. status = MSG_SIZE_ERR;
  148. }
  149. sha1_init (&sha1_hash);
  150. sha1_process (&sha1_hash, msg_pt->payload, msg_pt->size);
  151. sha1_done (&sha1_hash, sha1_compare);
  152. if (memcmp (sha1_compare, msg_pt->sha1, 20) != 0) {
  153. syslog (LOG_ERR, "%s(); msg seq:%d; incorrect hash",
  154. __func__, msg_pt->seq);
  155. status = MSG_SHA1_ERR;
  156. }
  157. log_pt = malloc (sizeof(log_entry_t));
  158. list_init (&log_pt->list);
  159. snprintf (log_pt->log, LOG_STR_SIZE, "%d:%d:%d:%s;",
  160. msg_pt->nodeid, msg_pt->pid, msg_pt->seq,
  161. err_status_string (status_buf, 20, status));
  162. list_add_tail (&log_pt->list, &msg_log_head);
  163. total_stored_msgs++;
  164. }
  165. static void config_change_callback (
  166. cpg_handle_t handle,
  167. const struct cpg_name *groupName,
  168. const struct cpg_address *member_list, size_t member_list_entries,
  169. const struct cpg_address *left_list, size_t left_list_entries,
  170. const struct cpg_address *joined_list, size_t joined_list_entries)
  171. {
  172. int i;
  173. log_entry_t *log_pt;
  174. /* group_name,ip,pid,join|leave */
  175. if (record_config_events_g == 0) {
  176. return;
  177. }
  178. for (i = 0; i < left_list_entries; i++) {
  179. syslog (LOG_DEBUG, "%s() inserting leave event into list", __func__);
  180. log_pt = malloc (sizeof(log_entry_t));
  181. list_init (&log_pt->list);
  182. snprintf (log_pt->log, LOG_STR_SIZE, "%s,%d,%d,left",
  183. groupName->value, left_list[i].nodeid,left_list[i].pid);
  184. list_add_tail(&log_pt->list, &config_chg_log_head);
  185. }
  186. for (i = 0; i < joined_list_entries; i++) {
  187. syslog (LOG_DEBUG, "%s() inserting join event into list", __func__);
  188. log_pt = malloc (sizeof(log_entry_t));
  189. list_init (&log_pt->list);
  190. snprintf (log_pt->log, LOG_STR_SIZE, "%s,%d,%d,join",
  191. groupName->value, joined_list[i].nodeid,joined_list[i].pid);
  192. list_add_tail (&log_pt->list, &config_chg_log_head);
  193. }
  194. }
  195. static cpg_callbacks_t callbacks = {
  196. .cpg_deliver_fn = delivery_callback,
  197. .cpg_confchg_fn = config_change_callback,
  198. };
  199. static void record_messages (void)
  200. {
  201. record_messages_g = 1;
  202. syslog (LOG_DEBUG,"%s() record:%d", __func__, record_messages_g);
  203. }
  204. static void record_config_events (void)
  205. {
  206. record_config_events_g = 1;
  207. syslog (LOG_DEBUG,"%s() record:%d", __func__, record_config_events_g);
  208. }
  209. static void read_config_event (int sock)
  210. {
  211. const char *empty = "None";
  212. struct list_head * list = config_chg_log_head.next;
  213. log_entry_t *entry;
  214. if (list != &config_chg_log_head) {
  215. entry = list_entry (list, log_entry_t, list);
  216. send (sock, entry->log, strlen (entry->log) + 1, 0);
  217. list_del (&entry->list);
  218. free (entry);
  219. } else {
  220. syslog (LOG_DEBUG,"%s() no events in list", __func__);
  221. send (sock, empty, strlen (empty) + 1, 0);
  222. }
  223. }
  224. static void read_messages (int sock, char* atmost_str)
  225. {
  226. struct list_head * list;
  227. log_entry_t *entry;
  228. int atmost = atoi (atmost_str);
  229. int packed = 0;
  230. if (atmost == 0)
  231. atmost = 1;
  232. if (atmost > (HOW_BIG_AND_BUF / LOG_STR_SIZE))
  233. atmost = (HOW_BIG_AND_BUF / LOG_STR_SIZE);
  234. syslog (LOG_DEBUG, "%s() atmost %d; total_stored_msgs:%d",
  235. __func__, atmost, total_stored_msgs);
  236. big_and_buf[0] = '\0';
  237. for (list = msg_log_head.next;
  238. (!list_empty (&msg_log_head) && packed < atmost); ) {
  239. entry = list_entry (list, log_entry_t, list);
  240. strcat (big_and_buf, entry->log);
  241. packed++;
  242. list = list->next;
  243. list_del (&entry->list);
  244. free (entry);
  245. total_stored_msgs--;
  246. }
  247. syslog (LOG_DEBUG, "%s() sending %d; total_stored_msgs:%d; len:%d",
  248. __func__, packed, total_stored_msgs, (int)strlen (big_and_buf));
  249. if (packed == 0) {
  250. strcpy (big_and_buf, "None");
  251. }
  252. send (sock, big_and_buf, strlen (big_and_buf), 0);
  253. }
  254. static unsigned char buffer[200000];
  255. static void send_some_more_messages (void)
  256. {
  257. msg_t my_msg;
  258. struct iovec iov[2];
  259. int i;
  260. int send_now;
  261. size_t payload_size;
  262. hash_state sha1_hash;
  263. cs_error_t res;
  264. cpg_flow_control_state_t fc_state;
  265. if (cpg_fd < 0)
  266. return;
  267. send_now = my_msgs_to_send;
  268. syslog (LOG_DEBUG,"%s() send_now:%d", __func__, send_now);
  269. my_msg.pid = my_pid;
  270. my_msg.nodeid = my_nodeid;
  271. payload_size = (rand() % 100000);
  272. my_msg.size = sizeof (msg_t) + payload_size;
  273. my_msg.seq = 0;
  274. for (i = 0; i < payload_size; i++) {
  275. buffer[i] = i;
  276. }
  277. sha1_init (&sha1_hash);
  278. sha1_process (&sha1_hash, buffer, payload_size);
  279. sha1_done (&sha1_hash, my_msg.sha1);
  280. iov[0].iov_len = sizeof (msg_t);
  281. iov[0].iov_base = &my_msg;
  282. iov[1].iov_len = payload_size;
  283. iov[1].iov_base = buffer;
  284. for (i = 0; i < send_now; i++) {
  285. res = cpg_flow_control_state_get (cpg_handle, &fc_state);
  286. if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
  287. /* lets do this later */
  288. syslog (LOG_INFO, "%s() flow control enabled.", __func__);
  289. return;
  290. }
  291. res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 2);
  292. if (res == CS_ERR_TRY_AGAIN) {
  293. /* lets do this later */
  294. syslog (LOG_INFO, "%s() cpg_mcast_joined() says try again.",
  295. __func__);
  296. return;
  297. } else
  298. if (res != CS_OK) {
  299. syslog (LOG_ERR, "%s() -> cpg_mcast_joined error:%d, exiting.",
  300. __func__, res);
  301. exit (-2);
  302. }
  303. my_msgs_to_send--;
  304. }
  305. }
  306. static void msg_blaster (int sock, char* num_to_send_str)
  307. {
  308. my_msgs_to_send = atoi (num_to_send_str);
  309. my_seq = 1;
  310. my_pid = getpid();
  311. cpg_local_get (cpg_handle, &my_nodeid);
  312. /* control the limits */
  313. if (my_msgs_to_send <= 0)
  314. my_msgs_to_send = 1;
  315. if (my_msgs_to_send > 1000)
  316. my_msgs_to_send = 1000;
  317. send_some_more_messages ();
  318. }
  319. static int cpg_dispatch_wrapper_fn (hdb_handle_t handle,
  320. int fd,
  321. int revents,
  322. void *data)
  323. {
  324. cs_error_t error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
  325. if (error == CS_ERR_LIBRARY) {
  326. syslog (LOG_ERR, "%s() got LIB error disconnecting from corosync.", __func__);
  327. poll_dispatch_delete (poll_handle, cpg_fd);
  328. close (cpg_fd);
  329. cpg_fd = -1;
  330. }
  331. return 0;
  332. }
  333. static void do_command (int sock, char* func, char*args[], int num_args)
  334. {
  335. int result;
  336. struct cpg_name group_name;
  337. if (parse_debug)
  338. syslog (LOG_DEBUG,"RPC:%s() called.", func);
  339. if (strcmp ("cpg_mcast_joined",func) == 0) {
  340. struct iovec iov[5];
  341. int a;
  342. for (a = 0; a < num_args; a++) {
  343. iov[a].iov_base = args[a];
  344. iov[a].iov_len = strlen(args[a])+1;
  345. }
  346. cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, num_args);
  347. } else if (strcmp ("cpg_join",func) == 0) {
  348. strcpy (group_name.value, args[0]);
  349. group_name.length = strlen(args[0]);
  350. result = cpg_join (cpg_handle, &group_name);
  351. if (result != CS_OK) {
  352. syslog (LOG_ERR,
  353. "Could not join process group, error %d\n", result);
  354. exit (1);
  355. }
  356. } else if (strcmp ("cpg_leave",func) == 0) {
  357. strcpy (group_name.value, args[0]);
  358. group_name.length = strlen(args[0]);
  359. result = cpg_leave (cpg_handle, &group_name);
  360. if (result != CS_OK) {
  361. syslog (LOG_ERR,
  362. "Could not leave process group, error %d\n", result);
  363. exit (1);
  364. }
  365. syslog (LOG_INFO, "called cpg_leave()!");
  366. } else if (strcmp ("cpg_initialize",func) == 0) {
  367. int retry_count = 0;
  368. result = cpg_initialize (&cpg_handle, &callbacks);
  369. while (result != CS_OK) {
  370. syslog (LOG_ERR,
  371. "cpg_initialize error %d (attempt %d)\n",
  372. result, retry_count);
  373. if (retry_count >= 3) {
  374. exit (1);
  375. }
  376. sleep(1);
  377. retry_count++;
  378. }
  379. cpg_fd_get (cpg_handle, &cpg_fd);
  380. poll_dispatch_add (poll_handle, cpg_fd, POLLIN|POLLNVAL, NULL, cpg_dispatch_wrapper_fn);
  381. } else if (strcmp ("cpg_local_get", func) == 0) {
  382. unsigned int local_nodeid;
  383. char response[100];
  384. cpg_local_get (cpg_handle, &local_nodeid);
  385. snprintf (response, 100, "%u",local_nodeid);
  386. send (sock, response, strlen (response) + 1, 0);
  387. } else if (strcmp ("cpg_finalize",func) == 0) {
  388. cpg_finalize (cpg_handle);
  389. poll_dispatch_delete (poll_handle, cpg_fd);
  390. cpg_fd = -1;
  391. } else if (strcmp ("record_config_events",func) == 0) {
  392. record_config_events ();
  393. } else if (strcmp ("record_messages",func) == 0) {
  394. record_messages ();
  395. } else if (strcmp ("read_config_event",func) == 0) {
  396. read_config_event (sock);
  397. } else if (strcmp ("read_messages",func) == 0) {
  398. read_messages (sock, args[0]);
  399. } else if (strcmp ("msg_blaster",func) == 0) {
  400. msg_blaster (sock, args[0]);
  401. } else {
  402. syslog (LOG_ERR,"%s RPC:%s not supported!", __func__, func);
  403. }
  404. }
  405. static void handle_command (int sock, char* msg)
  406. {
  407. int num_args;
  408. char *saveptr = NULL;
  409. char *str = strdup (msg);
  410. char *str_len;
  411. char *str_arg;
  412. char *args[5];
  413. int i = 0;
  414. int a = 0;
  415. char* func = NULL;
  416. if (parse_debug)
  417. syslog (LOG_DEBUG,"%s (MSG:%s)\n", __func__, msg);
  418. str_len = strtok_r (str, ":", &saveptr);
  419. assert (str_len);
  420. num_args = atoi (str_len) * 2;
  421. for (i = 0; i < num_args / 2; i++) {
  422. str_len = strtok_r (NULL, ":", &saveptr);
  423. str_arg = strtok_r (NULL, ":", &saveptr);
  424. if (func == NULL) {
  425. /* first "arg" is the function */
  426. if (parse_debug)
  427. syslog (LOG_DEBUG, "(LEN:%s, FUNC:%s)", str_len, str_arg);
  428. func = str_arg;
  429. a = 0;
  430. } else {
  431. args[a] = str_arg;
  432. a++;
  433. if (parse_debug)
  434. syslog (LOG_DEBUG, "(LEN:%s, ARG:%s)", str_len, str_arg);
  435. }
  436. }
  437. do_command (sock, func, args, a+1);
  438. free (str);
  439. }
  440. static int server_process_data_fn (hdb_handle_t handle,
  441. int fd,
  442. int revents,
  443. void *data)
  444. {
  445. char *saveptr;
  446. char *msg;
  447. char *cmd;
  448. int32_t nbytes;
  449. if ((nbytes = recv (fd, big_and_buf_rx, sizeof (big_and_buf_rx), 0)) <= 0) {
  450. /* got error or connection closed by client */
  451. if (nbytes == 0) {
  452. /* connection closed */
  453. syslog (LOG_WARNING, "socket %d hung up: exiting...\n", fd);
  454. } else {
  455. syslog (LOG_ERR,"recv() failed: %s", strerror(errno));
  456. }
  457. close (fd);
  458. exit (0);
  459. } else {
  460. if (my_msgs_to_send > 0)
  461. send_some_more_messages ();
  462. big_and_buf_rx[nbytes] = '\0';
  463. msg = strtok_r (big_and_buf_rx, ";", &saveptr);
  464. assert (msg);
  465. while (msg) {
  466. cmd = strdup (msg);
  467. handle_command (fd, cmd);
  468. free (cmd);
  469. msg = strtok_r (NULL, ";", &saveptr);
  470. }
  471. }
  472. return 0;
  473. }
  474. static int server_accept_fn (hdb_handle_t handle,
  475. int fd,
  476. int revents,
  477. void *data)
  478. {
  479. socklen_t addrlen;
  480. struct sockaddr_in in_addr;
  481. int new_fd;
  482. int res;
  483. addrlen = sizeof (struct sockaddr_in);
  484. retry_accept:
  485. new_fd = accept (fd, (struct sockaddr *)&in_addr, &addrlen);
  486. if (new_fd == -1 && errno == EINTR) {
  487. goto retry_accept;
  488. }
  489. if (new_fd == -1) {
  490. syslog (LOG_ERR,
  491. "Could not accept connection: %s\n", strerror (errno));
  492. return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
  493. }
  494. res = fcntl (new_fd, F_SETFL, O_NONBLOCK);
  495. if (res == -1) {
  496. syslog (LOG_ERR,
  497. "Could not set non-blocking operation on connection: %s\n",
  498. strerror (errno));
  499. close (new_fd);
  500. return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
  501. }
  502. poll_dispatch_add (poll_handle, new_fd, POLLIN|POLLNVAL, NULL, server_process_data_fn);
  503. return 0;
  504. }
  505. static int create_server_sockect (void)
  506. {
  507. int listener;
  508. int yes = 1;
  509. int rv;
  510. struct addrinfo hints, *ai, *p;
  511. /* get a socket and bind it
  512. */
  513. memset (&hints, 0, sizeof hints);
  514. hints.ai_family = AF_UNSPEC;
  515. hints.ai_socktype = SOCK_STREAM;
  516. hints.ai_flags = AI_PASSIVE;
  517. if ((rv = getaddrinfo (NULL, SERVER_PORT, &hints, &ai)) != 0) {
  518. syslog (LOG_ERR, "%s\n", gai_strerror (rv));
  519. exit (1);
  520. }
  521. for (p = ai; p != NULL; p = p->ai_next) {
  522. listener = socket (p->ai_family, p->ai_socktype, p->ai_protocol);
  523. if (listener < 0) {
  524. continue;
  525. }
  526. /* lose the pesky "address already in use" error message
  527. */
  528. if (setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
  529. &yes, sizeof(int)) < 0) {
  530. syslog (LOG_ERR, "setsockopt() failed: %s\n", strerror (errno));
  531. }
  532. if (bind (listener, p->ai_addr, p->ai_addrlen) < 0) {
  533. syslog (LOG_ERR, "bind() failed: %s\n", strerror (errno));
  534. close (listener);
  535. continue;
  536. }
  537. break;
  538. }
  539. if (p == NULL) {
  540. syslog (LOG_ERR, "failed to bind\n");
  541. exit (2);
  542. }
  543. freeaddrinfo (ai);
  544. if (listen (listener, 10) == -1) {
  545. syslog (LOG_ERR, "listen() failed: %s", strerror(errno));
  546. exit (3);
  547. }
  548. return listener;
  549. }
  550. int main (int argc, char *argv[])
  551. {
  552. int listener;
  553. openlog (NULL, LOG_CONS|LOG_PID, LOG_DAEMON);
  554. list_init (&msg_log_head);
  555. list_init (&config_chg_log_head);
  556. poll_handle = poll_create ();
  557. listener = create_server_sockect ();
  558. poll_dispatch_add (poll_handle, listener, POLLIN|POLLNVAL, NULL, server_accept_fn);
  559. poll_run (poll_handle);
  560. return -1;
  561. }