vqmain.c 17 KB


  1. #include <config.h>
  2. #include <stdio.h>
  3. #include <sys/types.h>
  4. #include <sys/wait.h>
  5. #include <qb/qblog.h>
  6. #include <qb/qbloop.h>
  7. #include <sys/poll.h>
  8. #include <netinet/in.h>
  9. #include <sys/queue.h>
  10. #ifdef HAVE_READLINE_READLINE_H
  11. #include <readline/readline.h>
  12. #else
  13. #include <unistd.h> /* isatty */
  14. #endif
  15. #include "../exec/votequorum.h"
  16. #include "../exec/service.h"
  17. #include <corosync/logsys.h>
  18. #include <corosync/coroapi.h>
  19. #include "icmap.h"
  20. #include "vqsim.h"
  21. /* Easier than including the config file with a ton of conflicting dependencies */
  22. extern int coroparse_configparse (icmap_map_t config_map, const char **error_string);
  23. extern int corosync_log_config_read (const char **error_string);
  24. /* One of these per partition */
  25. struct vq_partition {
  26. TAILQ_HEAD(, vq_node) nodelist;
  27. struct memb_ring_id ring_id;
  28. int num;
  29. };
  30. /* One of these per node */
  31. struct vq_node {
  32. vq_object_t instance;
  33. unsigned int nodeid;
  34. int fd;
  35. struct vq_partition *partition;
  36. TAILQ_ENTRY(vq_node) entries;
  37. /* Last status */
  38. int last_quorate;
  39. struct memb_ring_id last_ring_id;
  40. int last_view_list[MAX_NODES];
  41. int last_view_list_entries;
  42. };
  43. static struct vq_partition partitions[MAX_PARTITIONS];
  44. static qb_loop_t *poll_loop;
  45. static int autofence;
  46. static int check_for_quorum;
  47. static FILE *output_file;
  48. static int nosync;
  49. static qb_loop_timer_handle kb_timer;
  50. static ssize_t wait_count;
  51. static ssize_t wait_count_to_unblock;
  52. static struct vq_node *find_by_pid(pid_t pid);
  53. static void send_partition_to_nodes(struct vq_partition *partition, int newring);
  54. static void start_kb_input(void);
  55. static void start_kb_input_timeout(void *data);
  56. #ifndef HAVE_READLINE_READLINE_H
  57. #define INPUT_BUF_SIZE 1024
  58. static char input_buf[INPUT_BUF_SIZE];
  59. static size_t input_buf_term = 0;
  60. static int is_tty;
  61. #endif
  62. /* Tell all non-quorate nodes to quit */
  63. static void force_fence(void)
  64. {
  65. int i;
  66. struct vq_node *vqn;
  67. for (i=0; i<MAX_PARTITIONS; i++) {
  68. TAILQ_FOREACH(vqn, &partitions[i].nodelist, entries) {
  69. vq_quit_if_inquorate(vqn->instance);
  70. }
  71. }
  72. }
  73. /* Save quorum state from the incoming message */
  74. static void save_quorum_state(struct vq_node *node, struct vqsim_quorum_msg *qmsg)
  75. {
  76. node->last_quorate = qmsg->quorate;
  77. memcpy(&node->last_ring_id, &qmsg->ring_id, sizeof(struct memb_ring_id));
  78. memcpy(node->last_view_list, qmsg->view_list, sizeof(int) * qmsg->view_list_entries);
  79. node->last_view_list_entries = qmsg->view_list_entries;
  80. /* If at least one node is quorate and autofence is enabled, then fence everyone who is not quorate */
  81. if (check_for_quorum && qmsg->quorate & autofence) {
  82. check_for_quorum = 0;
  83. force_fence();
  84. }
  85. }
  86. /* Print current node state */
  87. static void print_quorum_state(struct vq_node *node)
  88. {
  89. int i;
  90. if (node->last_quorate < 0) {
  91. fprintf(output_file, "%d:%02d: q=UNINITIALIZED\n",
  92. node->partition->num, node->nodeid);
  93. return;
  94. }
  95. fprintf(output_file, "%d:%02d: q=%d ring=[%d/%lld] ", node->partition->num, node->nodeid, node->last_quorate,
  96. node->last_ring_id.rep.nodeid, node->last_ring_id.seq);
  97. fprintf(output_file, "nodes=[");
  98. for (i = 0; i < node->last_view_list_entries; i++) {
  99. if (i) {
  100. fprintf(output_file, " ");
  101. }
  102. fprintf(output_file, "%d", node->last_view_list[i]);
  103. }
  104. fprintf(output_file, "]\n");
  105. }
  106. static void propogate_vq_message(struct vq_node *vqn, const char *msg, int len)
  107. {
  108. struct vq_node *other_vqn;
  109. /* Send it to everyone in that node's partition (including itself) */
  110. TAILQ_FOREACH(other_vqn, &vqn->partition->nodelist, entries) {
  111. write(other_vqn->fd, msg, len);
  112. }
  113. }
  114. static int vq_parent_read_fn(int32_t fd, int32_t revents, void *data)
  115. {
  116. char msgbuf[8192];
  117. int msglen;
  118. struct vqsim_msg_header *msg;
  119. struct vqsim_quorum_msg *qmsg;
  120. struct vq_node *vqn = data;
  121. if (revents == POLLIN) {
  122. msglen = read(fd, msgbuf, sizeof(msgbuf));
  123. if (msglen < 0) {
  124. perror("read failed");
  125. }
  126. if (msglen > 0) {
  127. msg = (void*)msgbuf;
  128. switch (msg->type) {
  129. case VQMSG_QUORUM:
  130. if (!nosync && --wait_count_to_unblock <= 0)
  131. qb_loop_timer_del(poll_loop, kb_timer);
  132. qmsg = (void*)msgbuf;
  133. save_quorum_state(vqn, qmsg);
  134. print_quorum_state(vqn);
  135. if (!nosync && wait_count_to_unblock <= 0)
  136. start_kb_input();
  137. break;
  138. case VQMSG_EXEC:
  139. /* Message from votequorum, pass around the partition */
  140. propogate_vq_message(vqn, msgbuf, msglen);
  141. break;
  142. case VQMSG_QUIT:
  143. case VQMSG_SYNC:
  144. case VQMSG_QDEVICE:
  145. case VQMSG_QUORUMQUIT:
  146. /* not used here */
  147. break;
  148. }
  149. }
  150. }
  151. if (revents == POLLERR) {
  152. fprintf(stderr, "pollerr on %d\n", vqn->nodeid);
  153. }
  154. return 0;
  155. }
  156. static int read_corosync_conf(void)
  157. {
  158. int res;
  159. const char *error_string;
  160. int err = icmap_init();
  161. if (!err) {
  162. fprintf(stderr, "icmap_init failed\n");
  163. }
  164. /* Load corosync.conf */
  165. logsys_format_set(NULL);
  166. res = coroparse_configparse(icmap_get_global_map(), &error_string);
  167. if (res == -1) {
  168. log_printf (LOGSYS_LEVEL_INFO, "Error loading corosyc.conf %s", error_string);
  169. return -1;
  170. }
  171. else {
  172. res = corosync_log_config_read (&error_string);
  173. if (res < 0) {
  174. log_printf (LOGSYS_LEVEL_INFO, "error reading log config %s", error_string);
  175. syslog (LOGSYS_LEVEL_INFO, "error reading log config %s", error_string);
  176. }
  177. else {
  178. logsys_config_apply();
  179. }
  180. }
  181. if (logsys_thread_start() != 0) {
  182. log_printf (LOGSYS_LEVEL_ERROR, "Can't initialize log thread");
  183. return -1;
  184. }
  185. return 0;
  186. }
  187. static void remove_node(struct vq_node *node)
  188. {
  189. struct vq_partition *part;
  190. part = node->partition;
  191. /* Remove from partition list */
  192. TAILQ_REMOVE(&part->nodelist, node, entries);
  193. free(node);
  194. wait_count--;
  195. /* Rebuild quorum */
  196. send_partition_to_nodes(part, 1);
  197. }
  198. static int32_t sigchld_handler(int32_t sig, void *data)
  199. {
  200. pid_t pid;
  201. int status;
  202. struct vq_node *vqn;
  203. const char *exit_status="";
  204. char text[132];
  205. pid = wait(&status);
  206. if (WIFEXITED(status)) {
  207. vqn = find_by_pid(pid);
  208. if (vqn) {
  209. switch (WEXITSTATUS(status)) {
  210. case 0:
  211. exit_status = "(on request)";
  212. break;
  213. case 1:
  214. exit_status = "(autofenced)";
  215. break;
  216. default:
  217. sprintf(text, "(exit code %d)", WEXITSTATUS(status));
  218. break;
  219. }
  220. printf("%d:%02d Quit %s\n", vqn->partition->num, vqn->nodeid, exit_status);
  221. remove_node(vqn);
  222. }
  223. else {
  224. fprintf(stderr, "Unknown child %d exited with status %d\n", pid, WEXITSTATUS(status));
  225. }
  226. }
  227. if (WIFSIGNALED(status)) {
  228. vqn = find_by_pid(pid);
  229. if (vqn) {
  230. printf("%d:%02d exited on signal %d%s\n", vqn->partition->num, vqn->nodeid, WTERMSIG(status), WCOREDUMP(status)?" (core dumped)":"");
  231. remove_node(vqn);
  232. }
  233. else {
  234. fprintf(stderr, "Unknown child %d exited with status %d%s\n", pid, WTERMSIG(status), WCOREDUMP(status)?" (core dumped)":"");
  235. }
  236. }
  237. return 0;
  238. }
  239. static void send_partition_to_nodes(struct vq_partition *partition, int newring)
  240. {
  241. struct vq_node *vqn;
  242. int nodelist[MAX_NODES];
  243. int nodes = 0;
  244. int first = 1;
  245. if (newring) {
  246. /* Simulate corosync incrementing the seq by 4 for added authenticity */
  247. partition->ring_id.seq += 4;
  248. }
  249. /* Build the node list */
  250. TAILQ_FOREACH(vqn, &partition->nodelist, entries) {
  251. nodelist[nodes++] = vqn->nodeid;
  252. if (first) {
  253. partition->ring_id.rep.nodeid = vqn->nodeid;
  254. first = 0;
  255. }
  256. }
  257. TAILQ_FOREACH(vqn, &partition->nodelist, entries) {
  258. vq_set_nodelist(vqn->instance, &partition->ring_id, nodelist, nodes);
  259. }
  260. }
  261. static void init_partitions(void)
  262. {
  263. int i;
  264. for (i=0; i<MAX_PARTITIONS; i++) {
  265. TAILQ_INIT(&partitions[i].nodelist);
  266. partitions[i].ring_id.rep.nodeid = 1000+i;
  267. partitions[i].ring_id.seq = 0;
  268. partitions[i].num = i;
  269. }
  270. }
  271. static pid_t create_node(int nodeid, int partno)
  272. {
  273. struct vq_node *newvq;
  274. newvq = malloc(sizeof(struct vq_node));
  275. if (newvq) {
  276. if (!nosync) {
  277. /* Number of expected "quorum" vq messages is a square
  278. of the total nodes count, so increment the node
  279. counter and set new square of this value as
  280. a "to observe" counter */
  281. wait_count++;
  282. wait_count_to_unblock = wait_count * wait_count;
  283. }
  284. newvq->last_quorate = -1; /* mark "uninitialized" */
  285. newvq->instance = vq_create_instance(poll_loop, nodeid);
  286. if (!newvq->instance) {
  287. fprintf(stderr,
  288. "ERR: could not create vq instance nodeid %d\n",
  289. nodeid);
  290. return (pid_t) -1;
  291. }
  292. newvq->partition = &partitions[partno];
  293. newvq->nodeid = nodeid;
  294. newvq->fd = vq_get_parent_fd(newvq->instance);
  295. TAILQ_INSERT_TAIL(&partitions[partno].nodelist, newvq, entries);
  296. if (qb_loop_poll_add(poll_loop,
  297. QB_LOOP_MED,
  298. newvq->fd,
  299. POLLIN | POLLERR,
  300. newvq,
  301. vq_parent_read_fn)) {
  302. perror("qb_loop_poll_add returned error");
  303. return (pid_t) -1;
  304. }
  305. /* Send sync with all the nodes so far in it. */
  306. send_partition_to_nodes(&partitions[partno], 1);
  307. return vq_get_pid(newvq->instance);
  308. }
  309. return (pid_t) -1;
  310. }
  311. static size_t create_nodes_from_config(void)
  312. {
  313. icmap_iter_t iter;
  314. char tmp_key[ICMAP_KEYNAME_MAXLEN];
  315. uint32_t node_pos;
  316. uint32_t nodeid;
  317. const char *iter_key;
  318. int res;
  319. pid_t pid;
  320. size_t ret = 0;
  321. init_partitions();
  322. iter = icmap_iter_init("nodelist.node.");
  323. while ((iter_key = icmap_iter_next(iter, NULL, NULL)) != NULL) {
  324. res = sscanf(iter_key, "nodelist.node.%u.%s", &node_pos, tmp_key);
  325. if (res != 2) {
  326. continue;
  327. }
  328. if (strcmp(tmp_key, "ring0_addr") != 0) {
  329. continue;
  330. }
  331. snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "nodelist.node.%u.nodeid", node_pos);
  332. if (icmap_get_uint32(tmp_key, &nodeid) == CS_OK) {
  333. pid = create_node(nodeid, 0);
  334. if (pid == (pid_t) -1) {
  335. fprintf(stderr,
  336. "ERR: nodeid %d could not be spawned\n",
  337. nodeid);
  338. exit(1);
  339. }
  340. ret++;
  341. }
  342. }
  343. icmap_iter_finalize(iter);
  344. return ret;
  345. }
  346. static struct vq_node *find_node(int nodeid)
  347. {
  348. int i;
  349. struct vq_node *vqn;
  350. for (i=0; i<MAX_PARTITIONS; i++) {
  351. TAILQ_FOREACH(vqn, &partitions[i].nodelist, entries) {
  352. if (vqn->nodeid == nodeid) {
  353. return vqn;
  354. }
  355. }
  356. }
  357. return NULL;
  358. }
  359. static struct vq_node *find_by_pid(pid_t pid)
  360. {
  361. int i;
  362. struct vq_node *vqn;
  363. for (i=0; i<MAX_PARTITIONS; i++) {
  364. TAILQ_FOREACH(vqn, &partitions[i].nodelist, entries) {
  365. if (vq_get_pid(vqn->instance) == pid) {
  366. return vqn;
  367. }
  368. }
  369. }
  370. return NULL;
  371. }
  372. /* Routines called from the parser */
  373. void cmd_start_new_node(int nodeid, int partition)
  374. {
  375. struct vq_node *node;
  376. node = find_node(nodeid);
  377. if (node) {
  378. fprintf(stderr, "ERR: nodeid %d already exists in partition %d\n", nodeid, node->partition->num);
  379. return;
  380. }
  381. qb_loop_poll_del(poll_loop, STDIN_FILENO);
  382. create_node(nodeid, partition);
  383. if (!nosync) {
  384. /* Delay kb input handling by 0.25 second when we've just
  385. added a node; expect that the delay will be cancelled
  386. substantially earlier once it has reported its quorum info
  387. (the delay is in fact a failsafe input enabler here) */
  388. qb_loop_timer_add(poll_loop,
  389. QB_LOOP_MED,
  390. 250000000,
  391. NULL,
  392. start_kb_input_timeout,
  393. &kb_timer);
  394. }
  395. }
  396. void cmd_stop_all_nodes()
  397. {
  398. int i;
  399. struct vq_node *vqn;
  400. for (i=0; i<MAX_PARTITIONS; i++) {
  401. TAILQ_FOREACH(vqn, &partitions[i].nodelist, entries) {
  402. vq_quit(vqn->instance);
  403. }
  404. }
  405. }
  406. void cmd_show_node_states()
  407. {
  408. int i;
  409. struct vq_node *vqn;
  410. for (i=0; i<MAX_PARTITIONS; i++) {
  411. TAILQ_FOREACH(vqn, &partitions[i].nodelist, entries) {
  412. print_quorum_state(vqn);
  413. }
  414. }
  415. fprintf(output_file, "#autofence: %s\n", autofence?"on":"off");
  416. }
  417. void cmd_stop_node(int nodeid)
  418. {
  419. struct vq_node *node;
  420. node = find_node(nodeid);
  421. if (!node) {
  422. fprintf(stderr, "ERR: nodeid %d is not up\n", nodeid);
  423. return;
  424. }
  425. /* Remove processor */
  426. vq_quit(node->instance);
  427. /* Node will be removed when the child process exits */
  428. }
  429. /* Move all nodes in 'nodelist' into partition 'partition' */
  430. void cmd_move_nodes(int partition, int num_nodes, int *nodelist)
  431. {
  432. int i;
  433. struct vq_node *node;
  434. for (i=0; i<num_nodes; i++) {
  435. node = find_node(nodelist[i]);
  436. if (node) {
  437. /* Remove it from the current partition */
  438. TAILQ_REMOVE(&node->partition->nodelist, node, entries);
  439. /* Add it to the new partition */
  440. TAILQ_INSERT_TAIL(&partitions[partition].nodelist, node, entries);
  441. node->partition = &partitions[partition];
  442. }
  443. else {
  444. printf("ERR: node %d does not exist\n", nodelist[i]);
  445. }
  446. }
  447. }
  448. /* Take all the nodes in part2 and join them to part1 */
  449. void cmd_join_partitions(int part1, int part2)
  450. {
  451. struct vq_node *vqn;
  452. /* TAILQ_FOREACH is not delete safe *sigh* */
  453. retry:
  454. TAILQ_FOREACH(vqn, &partitions[part2].nodelist, entries) {
  455. TAILQ_REMOVE(&vqn->partition->nodelist, vqn, entries);
  456. TAILQ_INSERT_TAIL(&partitions[part1].nodelist, vqn, entries);
  457. vqn->partition = &partitions[part1];
  458. goto retry;
  459. }
  460. }
  461. void cmd_set_autofence(int onoff)
  462. {
  463. autofence = onoff;
  464. fprintf(output_file, "#autofence: %s\n", onoff?"on":"off");
  465. }
  466. void cmd_update_all_partitions(int newring)
  467. {
  468. int i;
  469. check_for_quorum = 1;
  470. for (i=0; i<MAX_PARTITIONS; i++) {
  471. send_partition_to_nodes(&partitions[i], newring);
  472. }
  473. }
  474. void cmd_qdevice_poll(int nodeid, int onoff)
  475. {
  476. struct vq_node *node;
  477. node = find_node(nodeid);
  478. if (node) {
  479. vq_set_qdevice(node->instance, &node->partition->ring_id, onoff);
  480. }
  481. }
  482. /* ---------------------------------- */
  483. #ifndef HAVE_READLINE_READLINE_H
  484. static void dummy_read_char(void);
  485. static void dummy_read_char()
  486. {
  487. int c, flush = 0;
  488. while (!flush) {
  489. c = getchar();
  490. if (++input_buf_term >= INPUT_BUF_SIZE) {
  491. if (c != '\n' && c != EOF)
  492. fprintf(stderr, "User input overflows the limit: %zu\n",
  493. (size_t) INPUT_BUF_SIZE);
  494. input_buf[INPUT_BUF_SIZE - 1] = '\0';
  495. flush = 1;
  496. } else if (c == '\n' || c == EOF) {
  497. input_buf[input_buf_term - 1] = '\0';
  498. flush = 1;
  499. } else {
  500. input_buf[input_buf_term - 1] = c;
  501. }
  502. }
  503. parse_input_command((c == EOF) ? NULL : input_buf);
  504. input_buf_term = 0;
  505. if (is_tty) {
  506. printf("vqsim> ");
  507. fflush(stdout);
  508. }
  509. }
  510. #endif
  511. static int stdin_read_fn(int32_t fd, int32_t revents, void *data)
  512. {
  513. #ifdef HAVE_READLINE_READLINE_H
  514. /* Send it to readline */
  515. rl_callback_read_char();
  516. #else
  517. dummy_read_char();
  518. #endif
  519. return 0;
  520. }
  521. static void start_kb_input(void)
  522. {
  523. wait_count_to_unblock = 0;
  524. #ifdef HAVE_READLINE_READLINE_H
  525. /* Readline will deal with completed lines when they arrive */
  526. rl_callback_handler_install("vqsim> ", parse_input_command);
  527. #else
  528. if (is_tty) {
  529. printf("vqsim> ");
  530. fflush(stdout);
  531. }
  532. #endif
  533. /* Send stdin to readline */
  534. if (qb_loop_poll_add(poll_loop,
  535. QB_LOOP_MED,
  536. STDIN_FILENO,
  537. POLLIN | POLLERR,
  538. NULL,
  539. stdin_read_fn)) {
  540. if (errno != EEXIST) {
  541. perror("qb_loop_poll_add1 returned error");
  542. }
  543. }
  544. }
  545. static void start_kb_input_timeout(void *data)
  546. {
  547. // fprintf(stderr, "Waiting for nodes to report status timed out\n");
  548. start_kb_input();
  549. }
  550. static void usage(char *program)
  551. {
  552. printf("Usage:\n");
  553. printf("\n");
  554. printf("%s [-f <config-file>] [-o <output-file>]\n", program);
  555. printf("\n");
  556. printf(" -f config file. defaults to /etc/corosync/corosync.conf\n");
  557. printf(" -o output file. defaults to stdout\n");
  558. printf(" -n no synchronization (on adding a node)\n");
  559. printf(" -h display this help text\n");
  560. printf("\n");
  561. }
  562. int main(int argc, char **argv)
  563. {
  564. qb_loop_signal_handle sigchld_qb_handle;
  565. int ch;
  566. char *config_file_name = NULL;
  567. char *output_file_name = NULL;
  568. char envstring[PATH_MAX];
  569. while ((ch = getopt (argc, argv, "f:o:nh")) != EOF) {
  570. switch (ch) {
  571. case 'f':
  572. config_file_name = optarg;
  573. break;
  574. case 'o':
  575. output_file_name = optarg;
  576. break;
  577. case 'n':
  578. nosync = 1;
  579. break;
  580. default:
  581. usage(argv[0]);
  582. exit(0);
  583. }
  584. }
  585. if (config_file_name) {
  586. sprintf(envstring, "COROSYNC_MAIN_CONFIG_FILE=%s", config_file_name);
  587. putenv(envstring);
  588. }
  589. if (output_file_name) {
  590. output_file = fopen(output_file_name, "w");
  591. if (!output_file) {
  592. fprintf(stderr, "Unable to open %s for output: %s\n", output_file_name, strerror(errno));
  593. exit(-1);
  594. }
  595. }
  596. else {
  597. output_file = stdout;
  598. }
  599. #ifndef HAVE_READLINE_READLINE_H
  600. is_tty = isatty(STDIN_FILENO);
  601. #endif
  602. qb_log_filter_ctl(QB_LOG_SYSLOG, QB_LOG_FILTER_ADD,
  603. QB_LOG_FILTER_FUNCTION, "*", LOG_DEBUG);
  604. qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
  605. qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
  606. QB_LOG_FILTER_FUNCTION, "*", LOG_DEBUG);
  607. poll_loop = qb_loop_create();
  608. /* SIGCHLD handler to reap sub-processes and reconfigure the cluster */
  609. qb_loop_signal_add(poll_loop,
  610. QB_LOOP_MED,
  611. SIGCHLD,
  612. NULL,
  613. sigchld_handler,
  614. &sigchld_qb_handle);
  615. /* Create a full cluster of nodes from corosync.conf */
  616. read_corosync_conf();
  617. if (create_nodes_from_config() && !nosync) {
  618. /* Delay kb input handling by 1 second when we've just
  619. added the nodes from corosync.conf; expect that
  620. the delay will be cancelled substantially earlier
  621. once they all have reported their quorum info
  622. (the delay is in fact a failsafe input enabler here) */
  623. qb_loop_timer_add(poll_loop,
  624. QB_LOOP_MED,
  625. 1000000000,
  626. NULL,
  627. start_kb_input_timeout,
  628. &kb_timer);
  629. } else {
  630. start_kb_input();
  631. }
  632. qb_loop_run(poll_loop);
  633. return 0;
  634. }