vqsim_vq_engine.c 11 KB


  1. /* This is the bit of VQSIM that runs in the forked process.
  2. It represents a single votequorum instance or, if you like,
  3. a 'node' in the cluster.
  4. */
  5. #include <sys/types.h>
  6. #include <qb/qblog.h>
  7. #include <qb/qbloop.h>
  8. #include <qb/qbipc_common.h>
  9. #include <netinet/in.h>
  10. #include <sys/poll.h>
  11. #include <sys/socket.h>
  12. #include <stdio.h>
  13. #include "../exec/votequorum.h"
  14. #include "../exec/service.h"
  15. #include "../include/corosync/corotypes.h"
  16. #include "../include/corosync/votequorum.h"
  17. #include "../include/corosync/ipc_votequorum.h"
  18. #include <corosync/logsys.h>
  19. #include <corosync/coroapi.h>
  20. #include "icmap.h"
  21. #include "vqsim.h"
  22. #define QDEVICE_NAME "VQsim_qdevice"
  23. /* Static variables here are per-instance because we are forked */
  24. static struct corosync_service_engine *engine;
  25. static int parent_socket; /* Our end of the socket */
  26. static char buffer[8192];
  27. static int our_nodeid;
  28. static char *private_data;
  29. static qb_loop_t *poll_loop;
  30. static qb_loop_timer_handle sync_timer;
  31. static qb_loop_timer_handle qdevice_timer;
  32. static int we_are_quorate;
  33. static void *fake_conn = (void*)1;
  34. static cs_error_t last_lib_error;
  35. static struct memb_ring_id current_ring_id;
  36. static int qdevice_registered;
  37. static unsigned int qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT;
  38. /* 'Keep the compiler happy' time */
  39. char *get_run_dir(void);
  40. int api_timer_add_duration (
  41. unsigned long long nanosec_duration,
  42. void *data,
  43. void (*timer_fn) (void *data),
  44. corosync_timer_handle_t *handle);
  45. static void api_error_memory_failure(void) __attribute__((noreturn));
  46. static void api_error_memory_failure()
  47. {
  48. fprintf(stderr, "Out of memory error\n");
  49. exit(-1);
  50. }
  51. static void api_timer_delete(corosync_timer_handle_t th)
  52. {
  53. qb_loop_timer_del(poll_loop, th);
  54. }
  55. int api_timer_add_duration (
  56. unsigned long long nanosec_duration,
  57. void *data,
  58. void (*timer_fn) (void *data),
  59. corosync_timer_handle_t *handle)
  60. {
  61. return qb_loop_timer_add(poll_loop,
  62. QB_LOOP_MED,
  63. nanosec_duration,
  64. data,
  65. timer_fn,
  66. handle);
  67. }
  68. static unsigned int api_totem_nodeid_get(void)
  69. {
  70. return our_nodeid;
  71. }
  72. static int api_totem_mcast(const struct iovec *iov, unsigned int iovlen, unsigned int type)
  73. {
  74. struct vqsim_msg_header header;
  75. struct iovec iovec[iovlen+1];
  76. int total = sizeof(header);
  77. int res;
  78. int i;
  79. header.type = VQMSG_EXEC;
  80. header.from_nodeid = our_nodeid;
  81. header.param = 0;
  82. iovec[0].iov_base = &header;
  83. iovec[0].iov_len = sizeof(header);
  84. for (i=0; i<iovlen; i++) {
  85. iovec[i+1].iov_base = iov[i].iov_base;
  86. iovec[i+1].iov_len = iov[i].iov_len;
  87. total += iov[i].iov_len;
  88. }
  89. res = writev(parent_socket, iovec, iovlen+1);
  90. if (res != total) {
  91. fprintf(stderr, "writev wrote only %d of %d bytes\n", res, total);
  92. }
  93. return 0;
  94. }
  95. static void *api_ipc_private_data_get(void *conn)
  96. {
  97. return private_data;
  98. }
  99. static int api_ipc_response_send(void *conn, const void *msg, size_t len)
  100. {
  101. struct qb_ipc_response_header *qb_header = (void*)msg;
  102. /* Save the error so we can return it */
  103. last_lib_error = qb_header->error;
  104. return 0;
  105. }
  106. static struct corosync_api_v1 corosync_api = {
  107. .error_memory_failure = api_error_memory_failure,
  108. .timer_delete = api_timer_delete,
  109. .timer_add_duration = api_timer_add_duration,
  110. .totem_nodeid_get = api_totem_nodeid_get,
  111. .totem_mcast = api_totem_mcast,
  112. .ipc_private_data_get = api_ipc_private_data_get,
  113. .ipc_response_send = api_ipc_response_send,
  114. };
  115. /* -------------------- Above is all for providing the corosync_api support routines --------------------------------------------*/
  116. /* They need to be in the same file as the engine as they use the local 'poll_loop' variable which is per-process */
  117. static void start_qdevice_poll(int longwait);
  118. static void start_sync_timer(void);
  119. /* Callback from Votequorum to tell us about the quorum state */
  120. static void quorum_fn(const unsigned int *view_list,
  121. size_t view_list_entries,
  122. int quorate, struct memb_ring_id *ring_id)
  123. {
  124. char msgbuf[8192];
  125. int len;
  126. struct vqsim_quorum_msg *quorum_msg = (void*) msgbuf;
  127. we_are_quorate = quorate;
  128. /* Send back to parent */
  129. quorum_msg->header.type = VQMSG_QUORUM;
  130. quorum_msg->header.from_nodeid = our_nodeid;
  131. quorum_msg->header.param = 0;
  132. quorum_msg->quorate = quorate;
  133. memcpy(&quorum_msg->ring_id, ring_id, sizeof(*ring_id));
  134. quorum_msg->view_list_entries = view_list_entries;
  135. memcpy(quorum_msg->view_list, view_list, sizeof(unsigned int)*view_list_entries);
  136. if ( (len=write(parent_socket, msgbuf, sizeof(*quorum_msg) + sizeof(unsigned int)*view_list_entries)) <= 0) {
  137. perror("write (view list to parent) failed");
  138. }
  139. memcpy(&current_ring_id, ring_id, sizeof(*ring_id));
  140. }
  141. char *corosync_service_link_and_init(struct corosync_api_v1 *api,
  142. struct default_service *service_engine)
  143. {
  144. /* dummy */
  145. return NULL;
  146. }
  147. /* For votequorum */
  148. char *get_run_dir()
  149. {
  150. static char cwd_buffer[PATH_MAX];
  151. return getcwd(cwd_buffer, PATH_MAX);
  152. }
  153. /* This is different to the one in totemconfig.c in that we already
  154. * know the 'local' node ID, so we can just search for that.
  155. * It needs to be here rather than at main config read time as it's
  156. * (obviously) going to be different for each instance.
  157. */
  158. static void set_local_node_pos(struct corosync_api_v1 *api)
  159. {
  160. icmap_iter_t iter;
  161. uint32_t node_pos;
  162. char name_str[ICMAP_KEYNAME_MAXLEN];
  163. uint32_t nodeid;
  164. const char *iter_key;
  165. int res;
  166. int found = 0;
  167. iter = icmap_iter_init("nodelist.node.");
  168. while ((iter_key = icmap_iter_next(iter, NULL, NULL)) != NULL) {
  169. res = sscanf(iter_key, "nodelist.node.%u.%s", &node_pos, name_str);
  170. if (res != 2) {
  171. continue;
  172. }
  173. if (strcmp(name_str, "nodeid")) {
  174. continue;
  175. }
  176. res = icmap_get_uint32(iter_key, &nodeid);
  177. if (res == CS_OK) {
  178. if (nodeid == our_nodeid) {
  179. found = 1;
  180. res = icmap_set_uint32("nodelist.local_node_pos", node_pos);
  181. }
  182. }
  183. }
  184. if (!found) {
  185. /* This probably indicates a dynamically-added node
  186. * set the pos to zero and use the votes of the
  187. * first node in corosync.conf
  188. */
  189. res = icmap_set_uint32("nodelist.local_node_pos", 0);
  190. }
  191. }
  192. static int load_quorum_instance(struct corosync_api_v1 *api)
  193. {
  194. const char *error_string;
  195. int res;
  196. error_string = votequorum_init(api, quorum_fn);
  197. if (error_string) {
  198. fprintf(stderr, "Votequorum init failed: %s\n", error_string);
  199. return -1;
  200. }
  201. engine = votequorum_get_service_engine_ver0();
  202. error_string = engine->exec_init_fn(api);
  203. if (error_string) {
  204. fprintf(stderr, "votequorum exec init failed: %s\n", error_string);
  205. return -1;
  206. }
  207. private_data = malloc(engine->private_data_size);
  208. if (!private_data) {
  209. perror("Malloc in child failed");
  210. return -1;
  211. }
  212. res = engine->lib_init_fn(fake_conn);
  213. return res;
  214. }
  215. static void sync_dispatch_fn(void *data)
  216. {
  217. if (engine->sync_process()) {
  218. start_sync_timer();
  219. }
  220. else {
  221. engine->sync_activate();
  222. }
  223. }
  224. static void start_sync_timer()
  225. {
  226. qb_loop_timer_add(poll_loop,
  227. QB_LOOP_MED,
  228. 10000000,
  229. NULL,
  230. sync_dispatch_fn,
  231. &sync_timer);
  232. }
  233. static void send_sync(char *buf, int len)
  234. {
  235. struct vqsim_sync_msg *msg = (void*)buf;
  236. /* Votequorum doesn't use the transitional node list :-) */
  237. engine->sync_init(NULL, 0,
  238. msg->view_list, msg->view_list_entries,
  239. &msg->ring_id);
  240. start_sync_timer();
  241. }
  242. static void send_exec_msg(char *buf, int len)
  243. {
  244. struct vqsim_exec_msg *execmsg = (void*)buf;
  245. struct qb_ipc_request_header *qb_header = (void*)execmsg->execmsg;
  246. engine->exec_engine[qb_header->id & 0xFFFF].exec_handler_fn(execmsg->execmsg, execmsg->header.from_nodeid);
  247. }
  248. static int send_lib_msg(int type, void *msg)
  249. {
  250. /* Clear this as not all lib functions return a response immediately */
  251. last_lib_error = CS_OK;
  252. engine->lib_engine[type].lib_handler_fn(fake_conn, msg);
  253. return last_lib_error;
  254. }
  255. static int poll_qdevice(int onoff)
  256. {
  257. struct req_lib_votequorum_qdevice_poll pollmsg;
  258. int res;
  259. pollmsg.cast_vote = onoff;
  260. pollmsg.ring_id.nodeid = current_ring_id.nodeid;
  261. pollmsg.ring_id.seq = current_ring_id.seq;
  262. strcpy(pollmsg.name, QDEVICE_NAME);
  263. res = send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_POLL, &pollmsg);
  264. if (res != CS_OK) {
  265. fprintf(stderr, "%d: qdevice poll failed: %d\n", our_nodeid, res);
  266. }
  267. return res;
  268. }
  269. static void qdevice_dispatch_fn(void *data)
  270. {
  271. if (poll_qdevice(1) == CS_OK) {
  272. start_qdevice_poll(0);
  273. }
  274. }
  275. static void start_qdevice_poll(int longwait)
  276. {
  277. unsigned long long timeout;
  278. timeout = (unsigned long long)qdevice_timeout*500000; /* Half the corosync timeout */
  279. if (longwait) {
  280. timeout *= 2;
  281. }
  282. qb_loop_timer_add(poll_loop,
  283. QB_LOOP_MED,
  284. timeout,
  285. NULL,
  286. qdevice_dispatch_fn,
  287. &qdevice_timer);
  288. }
  289. static void stop_qdevice_poll(void)
  290. {
  291. qb_loop_timer_del(poll_loop, qdevice_timer);
  292. qdevice_timer = 0;
  293. }
  294. static void do_qdevice(int onoff)
  295. {
  296. int res;
  297. if (onoff) {
  298. if (!qdevice_registered) {
  299. struct req_lib_votequorum_qdevice_register regmsg;
  300. strcpy(regmsg.name, QDEVICE_NAME);
  301. if ( (res=send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_REGISTER, &regmsg)) == CS_OK) {
  302. qdevice_registered = 1;
  303. start_qdevice_poll(1);
  304. }
  305. else {
  306. fprintf(stderr, "%d: qdevice registration failed: %d\n", our_nodeid, res);
  307. }
  308. }
  309. else {
  310. if (!qdevice_timer) {
  311. start_qdevice_poll(0);
  312. }
  313. }
  314. }
  315. else {
  316. poll_qdevice(0);
  317. stop_qdevice_poll();
  318. }
  319. }
  320. /* From controller */
  321. static int parent_pipe_read_fn(int32_t fd, int32_t revents, void *data)
  322. {
  323. struct vqsim_msg_header *header = (void*)buffer;
  324. int len;
  325. len = read(fd, buffer, sizeof(buffer));
  326. if (len > 0) {
  327. /* Check header and route */
  328. switch (header->type) {
  329. case VQMSG_QUIT:
  330. exit(0);
  331. break;
  332. case VQMSG_EXEC: /* For votequorum exec messages */
  333. send_exec_msg(buffer, len);
  334. break;
  335. case VQMSG_SYNC:
  336. send_sync(buffer, len);
  337. break;
  338. case VQMSG_QDEVICE:
  339. do_qdevice(header->param);
  340. break;
  341. case VQMSG_QUORUMQUIT:
  342. if (!we_are_quorate) {
  343. exit(1);
  344. }
  345. break;
  346. case VQMSG_QUORUM:
  347. /* not used here */
  348. break;
  349. }
  350. }
  351. return 0;
  352. }
  353. static void initial_sync(int nodeid)
  354. {
  355. unsigned int trans_list[1] = {nodeid};
  356. unsigned int member_list[1] = {nodeid};
  357. struct memb_ring_id ring_id;
  358. ring_id.nodeid = our_nodeid;
  359. ring_id.seq = 1;
  360. /* cluster with just us in it */
  361. engine->sync_init(trans_list, 1,
  362. member_list, 1,
  363. &ring_id);
  364. start_sync_timer();
  365. }
  366. /* Return pipe FDs & child PID if sucessful */
  367. int fork_new_instance(int nodeid, int *vq_sock, pid_t *childpid)
  368. {
  369. int pipes[2];
  370. pid_t pid;
  371. if (socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_NONBLOCK, 0, pipes)) {
  372. return -1;
  373. }
  374. parent_socket = pipes[0];
  375. switch ( (pid=fork()) ) {
  376. case -1:
  377. perror("fork failed");
  378. return -1;
  379. case 0:
  380. /* child process - continue below */
  381. break;
  382. default:
  383. /* parent process */
  384. *vq_sock = pipes[1];
  385. *childpid = pid;
  386. return 0;
  387. }
  388. our_nodeid = nodeid;
  389. poll_loop = qb_loop_create();
  390. if (icmap_get_uint32("quorum.device.timeout", &qdevice_timeout) != CS_OK) {
  391. qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT;
  392. }
  393. set_local_node_pos(&corosync_api);
  394. load_quorum_instance(&corosync_api);
  395. qb_loop_poll_add(poll_loop,
  396. QB_LOOP_MED,
  397. parent_socket,
  398. POLLIN,
  399. NULL,
  400. parent_pipe_read_fn);
  401. /* Start it up! */
  402. initial_sync(nodeid);
  403. qb_loop_run(poll_loop);
  404. return 0;
  405. }