cpg_test_agent.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601
  1. /*
  2. * Copyright (c) 2010 Red Hat, Inc.
  3. *
  4. * All rights reserved.
  5. *
  6. * Author: Angus Salkeld (asalkeld@redhat.com)
  7. *
  8. * This software licensed under BSD license, the text of which follows:
  9. *
  10. * Redistribution and use in source and binary forms, with or without
  11. * modification, are permitted provided that the following conditions are met:
  12. *
  13. * - Redistributions of source code must retain the above copyright notice,
  14. * this list of conditions and the following disclaimer.
  15. * - Redistributions in binary form must reproduce the above copyright notice,
  16. * this list of conditions and the following disclaimer in the documentation
  17. * and/or other materials provided with the distribution.
  18. * - Neither the name of the MontaVista Software, Inc. nor the names of its
  19. * contributors may be used to endorse or promote products derived from this
  20. * software without specific prior written permission.
  21. *
  22. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  23. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  24. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  25. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  26. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  27. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  28. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  29. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  30. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  31. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  32. * THE POSSIBILITY OF SUCH DAMAGE.
  33. */
  34. #include <errno.h>
  35. #include <unistd.h>
  36. #include <stdio.h>
  37. #include <stdlib.h>
  38. #include <assert.h>
  39. #include <string.h>
  40. #include <sys/types.h>
  41. #include <sys/socket.h>
  42. #include <netinet/in.h>
  43. #include <arpa/inet.h>
  44. #include <netdb.h>
  45. #include <syslog.h>
  46. #include <poll.h>
  47. #include <unistd.h>
  48. #include <fcntl.h>
  49. #include <corosync/totem/coropoll.h>
  50. #include <corosync/list.h>
  51. #include <corosync/cpg.h>
  52. #include "../../exec/crypto.h"
  53. #include "common_test_agent.h"
  54. typedef enum {
  55. MSG_OK,
  56. MSG_NODEID_ERR,
  57. MSG_PID_ERR,
  58. MSG_SEQ_ERR,
  59. MSG_SIZE_ERR,
  60. MSG_SHA1_ERR,
  61. } msg_status_t;
  62. typedef struct {
  63. uint32_t nodeid;
  64. pid_t pid;
  65. unsigned char sha1[20];
  66. uint32_t seq;
  67. size_t size;
  68. unsigned char payload[0];
  69. } msg_t;
  70. #define LOG_STR_SIZE 256
  71. typedef struct {
  72. char log[LOG_STR_SIZE];
  73. struct list_head list;
  74. } log_entry_t;
  75. static char big_and_buf[HOW_BIG_AND_BUF];
  76. static int32_t record_config_events_g = 0;
  77. static int32_t record_messages_g = 0;
  78. static cpg_handle_t cpg_handle = 0;
  79. static int32_t cpg_fd = -1;
  80. static struct list_head config_chg_log_head;
  81. static struct list_head msg_log_head;
  82. static pid_t my_pid;
  83. static uint32_t my_nodeid;
  84. static int32_t my_seq;
  85. static int32_t use_zcb = 0;
  86. static int32_t my_msgs_to_send;
  87. static int32_t total_stored_msgs = 0;
  88. static void send_some_more_messages (void * unused);
  89. static char* err_status_string (char * buf, size_t buf_len, msg_status_t status)
  90. {
  91. switch (status) {
  92. case MSG_OK:
  93. strncpy (buf, "OK", buf_len);
  94. break;
  95. case MSG_NODEID_ERR:
  96. strncpy (buf, "NODEID_ERR", buf_len);
  97. break;
  98. case MSG_PID_ERR:
  99. strncpy (buf, "PID_ERR", buf_len);
  100. break;
  101. case MSG_SEQ_ERR:
  102. strncpy (buf, "SEQ_ERR", buf_len);
  103. break;
  104. case MSG_SIZE_ERR:
  105. strncpy (buf, "SIZE_ERR", buf_len);
  106. break;
  107. case MSG_SHA1_ERR:
  108. strncpy (buf, "SHA1_ERR", buf_len);
  109. break;
  110. default:
  111. strncpy (buf, "UNKNOWN_ERR", buf_len);
  112. break;
  113. }
  114. return buf;
  115. }
  116. static void delivery_callback (
  117. cpg_handle_t handle,
  118. const struct cpg_name *groupName,
  119. uint32_t nodeid,
  120. uint32_t pid,
  121. void *msg,
  122. size_t msg_len)
  123. {
  124. log_entry_t *log_pt;
  125. msg_t *msg_pt = (msg_t*)msg;
  126. msg_status_t status = MSG_OK;
  127. char status_buf[20];
  128. unsigned char sha1_compare[20];
  129. hash_state sha1_hash;
  130. if (record_messages_g == 0) {
  131. return;
  132. }
  133. msg_pt->seq = my_seq;
  134. my_seq++;
  135. if (nodeid != msg_pt->nodeid) {
  136. status = MSG_NODEID_ERR;
  137. }
  138. if (pid != msg_pt->pid) {
  139. status = MSG_PID_ERR;
  140. }
  141. if (msg_len != msg_pt->size) {
  142. status = MSG_SIZE_ERR;
  143. }
  144. sha1_init (&sha1_hash);
  145. sha1_process (&sha1_hash, msg_pt->payload, (msg_pt->size - sizeof (msg_t)));
  146. sha1_done (&sha1_hash, sha1_compare);
  147. if (memcmp (sha1_compare, msg_pt->sha1, 20) != 0) {
  148. syslog (LOG_ERR, "%s(); msg seq:%d; incorrect hash",
  149. __func__, msg_pt->seq);
  150. status = MSG_SHA1_ERR;
  151. }
  152. log_pt = malloc (sizeof(log_entry_t));
  153. list_init (&log_pt->list);
  154. snprintf (log_pt->log, LOG_STR_SIZE, "%d:%d:%d:%s;",
  155. msg_pt->nodeid, msg_pt->pid, msg_pt->seq,
  156. err_status_string (status_buf, 20, status));
  157. list_add_tail (&log_pt->list, &msg_log_head);
  158. total_stored_msgs++;
  159. }
  160. static void config_change_callback (
  161. cpg_handle_t handle,
  162. const struct cpg_name *groupName,
  163. const struct cpg_address *member_list, size_t member_list_entries,
  164. const struct cpg_address *left_list, size_t left_list_entries,
  165. const struct cpg_address *joined_list, size_t joined_list_entries)
  166. {
  167. int i;
  168. log_entry_t *log_pt;
  169. /* group_name,ip,pid,join|leave */
  170. if (record_config_events_g == 0) {
  171. return;
  172. }
  173. for (i = 0; i < left_list_entries; i++) {
  174. syslog (LOG_DEBUG, "%s() inserting leave event into list", __func__);
  175. log_pt = malloc (sizeof(log_entry_t));
  176. list_init (&log_pt->list);
  177. snprintf (log_pt->log, LOG_STR_SIZE, "%s,%d,%d,left",
  178. groupName->value, left_list[i].nodeid,left_list[i].pid);
  179. list_add_tail(&log_pt->list, &config_chg_log_head);
  180. }
  181. for (i = 0; i < joined_list_entries; i++) {
  182. syslog (LOG_DEBUG, "%s() inserting join event into list", __func__);
  183. log_pt = malloc (sizeof(log_entry_t));
  184. list_init (&log_pt->list);
  185. snprintf (log_pt->log, LOG_STR_SIZE, "%s,%d,%d,join",
  186. groupName->value, joined_list[i].nodeid,joined_list[i].pid);
  187. list_add_tail (&log_pt->list, &config_chg_log_head);
  188. }
  189. }
  190. static cpg_callbacks_t callbacks = {
  191. .cpg_deliver_fn = delivery_callback,
  192. .cpg_confchg_fn = config_change_callback,
  193. };
  194. static void record_messages (void)
  195. {
  196. record_messages_g = 1;
  197. syslog (LOG_DEBUG,"%s() record:%d", __func__, record_messages_g);
  198. }
  199. static void record_config_events (void)
  200. {
  201. record_config_events_g = 1;
  202. syslog (LOG_DEBUG,"%s() record:%d", __func__, record_config_events_g);
  203. }
  204. static void read_config_event (int sock)
  205. {
  206. const char *empty = "None";
  207. struct list_head * list = config_chg_log_head.next;
  208. log_entry_t *entry;
  209. if (list != &config_chg_log_head) {
  210. entry = list_entry (list, log_entry_t, list);
  211. send (sock, entry->log, strlen (entry->log) + 1, 0);
  212. list_del (&entry->list);
  213. free (entry);
  214. } else {
  215. syslog (LOG_DEBUG,"%s() no events in list", __func__);
  216. send (sock, empty, strlen (empty) + 1, 0);
  217. }
  218. }
  219. static void read_messages (int sock, char* atmost_str)
  220. {
  221. struct list_head * list;
  222. log_entry_t *entry;
  223. int atmost = atoi (atmost_str);
  224. int packed = 0;
  225. if (atmost == 0)
  226. atmost = 1;
  227. if (atmost > (HOW_BIG_AND_BUF / LOG_STR_SIZE))
  228. atmost = (HOW_BIG_AND_BUF / LOG_STR_SIZE);
  229. syslog (LOG_DEBUG, "%s() atmost %d; total_stored_msgs:%d",
  230. __func__, atmost, total_stored_msgs);
  231. big_and_buf[0] = '\0';
  232. for (list = msg_log_head.next;
  233. (!list_empty (&msg_log_head) && packed < atmost); ) {
  234. entry = list_entry (list, log_entry_t, list);
  235. strcat (big_and_buf, entry->log);
  236. packed++;
  237. list = list->next;
  238. list_del (&entry->list);
  239. free (entry);
  240. total_stored_msgs--;
  241. }
  242. syslog (LOG_DEBUG, "%s() sending %d; total_stored_msgs:%d; len:%d",
  243. __func__, packed, total_stored_msgs, (int)strlen (big_and_buf));
  244. if (packed == 0) {
  245. strcpy (big_and_buf, "None");
  246. }
  247. send (sock, big_and_buf, strlen (big_and_buf), 0);
  248. }
  249. static void send_some_more_messages_later (void)
  250. {
  251. poll_timer_handle timer_handle;
  252. cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
  253. poll_timer_add (
  254. ta_poll_handle_get(),
  255. 300, NULL,
  256. send_some_more_messages,
  257. &timer_handle);
  258. }
  259. static void send_some_more_messages_zcb (void)
  260. {
  261. msg_t *my_msg;
  262. int i;
  263. int send_now;
  264. size_t payload_size;
  265. size_t total_size;
  266. hash_state sha1_hash;
  267. cs_error_t res;
  268. cpg_flow_control_state_t fc_state;
  269. void *zcb_buffer;
  270. if (cpg_fd < 0)
  271. return;
  272. send_now = my_msgs_to_send;
  273. payload_size = (rand() % 100000);
  274. total_size = payload_size + sizeof (msg_t);
  275. cpg_zcb_alloc (cpg_handle, total_size, &zcb_buffer);
  276. my_msg = (msg_t*)zcb_buffer;
  277. //syslog (LOG_DEBUG,"%s() send_now:%d", __func__, send_now);
  278. my_msg->pid = my_pid;
  279. my_msg->nodeid = my_nodeid;
  280. my_msg->size = sizeof (msg_t) + payload_size;
  281. my_msg->seq = 0;
  282. for (i = 0; i < payload_size; i++) {
  283. my_msg->payload[i] = i;
  284. }
  285. sha1_init (&sha1_hash);
  286. sha1_process (&sha1_hash, my_msg->payload, payload_size);
  287. sha1_done (&sha1_hash, my_msg->sha1);
  288. for (i = 0; i < send_now; i++) {
  289. res = cpg_flow_control_state_get (cpg_handle, &fc_state);
  290. if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
  291. /* lets do this later */
  292. send_some_more_messages_later ();
  293. syslog (LOG_INFO, "%s() flow control enabled.", __func__);
  294. goto free_buffer;
  295. }
  296. res = cpg_zcb_mcast_joined (cpg_handle, CPG_TYPE_AGREED, zcb_buffer, total_size);
  297. if (res == CS_ERR_TRY_AGAIN) {
  298. /* lets do this later */
  299. send_some_more_messages_later ();
  300. syslog (LOG_INFO, "%s() cpg_mcast_joined() says try again.",
  301. __func__);
  302. goto free_buffer;
  303. } else if (res != CS_OK) {
  304. syslog (LOG_ERR, "%s() -> cpg_mcast_joined error:%d, exiting.",
  305. __func__, res);
  306. exit (-2);
  307. }
  308. my_msgs_to_send--;
  309. }
  310. free_buffer:
  311. cpg_zcb_free (cpg_handle, zcb_buffer);
  312. }
  313. static unsigned char buffer[200000];
  314. static void send_some_more_messages_normal (void)
  315. {
  316. msg_t my_msg;
  317. struct iovec iov[2];
  318. int i;
  319. int send_now;
  320. size_t payload_size;
  321. hash_state sha1_hash;
  322. cs_error_t res;
  323. cpg_flow_control_state_t fc_state;
  324. if (cpg_fd < 0)
  325. return;
  326. send_now = my_msgs_to_send;
  327. //syslog (LOG_DEBUG,"%s() send_now:%d", __func__, send_now);
  328. my_msg.pid = my_pid;
  329. my_msg.nodeid = my_nodeid;
  330. payload_size = (rand() % 100000);
  331. my_msg.size = sizeof (msg_t) + payload_size;
  332. my_msg.seq = 0;
  333. for (i = 0; i < payload_size; i++) {
  334. buffer[i] = i;
  335. }
  336. sha1_init (&sha1_hash);
  337. sha1_process (&sha1_hash, buffer, payload_size);
  338. sha1_done (&sha1_hash, my_msg.sha1);
  339. iov[0].iov_len = sizeof (msg_t);
  340. iov[0].iov_base = &my_msg;
  341. iov[1].iov_len = payload_size;
  342. iov[1].iov_base = buffer;
  343. for (i = 0; i < send_now; i++) {
  344. res = cpg_flow_control_state_get (cpg_handle, &fc_state);
  345. if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
  346. /* lets do this later */
  347. send_some_more_messages_later ();
  348. syslog (LOG_INFO, "%s() flow control enabled.", __func__);
  349. return;
  350. }
  351. res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 2);
  352. if (res == CS_ERR_TRY_AGAIN) {
  353. /* lets do this later */
  354. send_some_more_messages_later ();
  355. syslog (LOG_INFO, "%s() cpg_mcast_joined() says try again.",
  356. __func__);
  357. return;
  358. } else
  359. if (res != CS_OK) {
  360. syslog (LOG_ERR, "%s() -> cpg_mcast_joined error:%d, exiting.",
  361. __func__, res);
  362. exit (-2);
  363. }
  364. my_msgs_to_send--;
  365. }
  366. }
  367. static void send_some_more_messages (void * unused)
  368. {
  369. if (use_zcb) {
  370. send_some_more_messages_zcb ();
  371. } else {
  372. send_some_more_messages_normal ();
  373. }
  374. }
  375. static void msg_blaster (int sock, char* num_to_send_str)
  376. {
  377. my_msgs_to_send = atoi (num_to_send_str);
  378. my_seq = 1;
  379. my_pid = getpid();
  380. use_zcb = 0;
  381. cpg_local_get (cpg_handle, &my_nodeid);
  382. /* control the limits */
  383. if (my_msgs_to_send <= 0)
  384. my_msgs_to_send = 1;
  385. if (my_msgs_to_send > 10000)
  386. my_msgs_to_send = 10000;
  387. send_some_more_messages_normal ();
  388. }
  389. static void msg_blaster_zcb (int sock, char* num_to_send_str)
  390. {
  391. my_msgs_to_send = atoi (num_to_send_str);
  392. my_seq = 1;
  393. my_pid = getpid();
  394. use_zcb = 1;
  395. cpg_local_get (cpg_handle, &my_nodeid);
  396. /* control the limits */
  397. if (my_msgs_to_send <= 0)
  398. my_msgs_to_send = 1;
  399. if (my_msgs_to_send > 10000)
  400. my_msgs_to_send = 10000;
  401. send_some_more_messages_zcb ();
  402. }
  403. static int cpg_dispatch_wrapper_fn (hdb_handle_t handle,
  404. int fd,
  405. int revents,
  406. void *data)
  407. {
  408. cs_error_t error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
  409. if (error == CS_ERR_LIBRARY) {
  410. syslog (LOG_ERR, "%s() got LIB error disconnecting from corosync.", __func__);
  411. poll_dispatch_delete (ta_poll_handle_get(), cpg_fd);
  412. close (cpg_fd);
  413. cpg_fd = -1;
  414. }
  415. return 0;
  416. }
  417. static void do_command (int sock, char* func, char*args[], int num_args)
  418. {
  419. int result;
  420. struct cpg_name group_name;
  421. if (parse_debug)
  422. syslog (LOG_DEBUG,"RPC:%s() called.", func);
  423. if (strcmp ("cpg_mcast_joined",func) == 0) {
  424. struct iovec iov[5];
  425. int a;
  426. for (a = 0; a < num_args; a++) {
  427. iov[a].iov_base = args[a];
  428. iov[a].iov_len = strlen(args[a])+1;
  429. }
  430. cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, num_args);
  431. } else if (strcmp ("cpg_join",func) == 0) {
  432. strcpy (group_name.value, args[0]);
  433. group_name.length = strlen(args[0]);
  434. result = cpg_join (cpg_handle, &group_name);
  435. if (result != CS_OK) {
  436. syslog (LOG_ERR,
  437. "Could not join process group, error %d\n", result);
  438. exit (1);
  439. }
  440. } else if (strcmp ("cpg_leave",func) == 0) {
  441. strcpy (group_name.value, args[0]);
  442. group_name.length = strlen(args[0]);
  443. result = cpg_leave (cpg_handle, &group_name);
  444. if (result != CS_OK) {
  445. syslog (LOG_ERR,
  446. "Could not leave process group, error %d\n", result);
  447. exit (1);
  448. }
  449. syslog (LOG_INFO, "called cpg_leave()!");
  450. } else if (strcmp ("cpg_initialize",func) == 0) {
  451. int retry_count = 0;
  452. result = cpg_initialize (&cpg_handle, &callbacks);
  453. while (result != CS_OK) {
  454. syslog (LOG_ERR,
  455. "cpg_initialize error %d (attempt %d)\n",
  456. result, retry_count);
  457. if (retry_count >= 3) {
  458. exit (1);
  459. }
  460. sleep(1);
  461. retry_count++;
  462. }
  463. cpg_fd_get (cpg_handle, &cpg_fd);
  464. poll_dispatch_add (ta_poll_handle_get(), cpg_fd, POLLIN|POLLNVAL, NULL, cpg_dispatch_wrapper_fn);
  465. } else if (strcmp ("cpg_local_get", func) == 0) {
  466. unsigned int local_nodeid;
  467. char response[100];
  468. cpg_local_get (cpg_handle, &local_nodeid);
  469. snprintf (response, 100, "%u",local_nodeid);
  470. send (sock, response, strlen (response) + 1, 0);
  471. } else if (strcmp ("cpg_finalize",func) == 0) {
  472. cpg_finalize (cpg_handle);
  473. poll_dispatch_delete (ta_poll_handle_get(), cpg_fd);
  474. cpg_fd = -1;
  475. } else if (strcmp ("record_config_events",func) == 0) {
  476. record_config_events ();
  477. } else if (strcmp ("record_messages",func) == 0) {
  478. record_messages ();
  479. } else if (strcmp ("read_config_event",func) == 0) {
  480. read_config_event (sock);
  481. } else if (strcmp ("read_messages",func) == 0) {
  482. read_messages (sock, args[0]);
  483. } else if (strcmp ("msg_blaster_zcb", func) == 0) {
  484. msg_blaster_zcb (sock, args[0]);
  485. } else if (strcmp ("msg_blaster",func) == 0) {
  486. msg_blaster (sock, args[0]);
  487. } else {
  488. syslog (LOG_ERR,"%s RPC:%s not supported!", __func__, func);
  489. }
  490. }
  491. int main (int argc, char *argv[])
  492. {
  493. openlog (NULL, LOG_CONS|LOG_PID, LOG_DAEMON);
  494. list_init (&msg_log_head);
  495. list_init (&config_chg_log_head);
  496. return test_agent_run (9034, do_command);
  497. }