4
0

vqsim_vq_engine.c 11 KB


  1. /* This is the bit of VQSIM that runs in the forked process.
  2. It represents a single votequorum instance or, if you like,
  3. a 'node' in the cluster.
  4. */
  5. #include <sys/types.h>
  6. #include <qb/qblog.h>
  7. #include <qb/qbloop.h>
  8. #include <qb/qbipc_common.h>
  9. #include <netinet/in.h>
  10. #include <sys/poll.h>
  11. #include <sys/socket.h>
  12. #include <stdio.h>
  13. #include "../exec/votequorum.h"
  14. #include "../exec/service.h"
  15. #include "../include/corosync/corotypes.h"
  16. #include "../include/corosync/votequorum.h"
  17. #include "../include/corosync/ipc_votequorum.h"
  18. #include <corosync/logsys.h>
  19. #include <corosync/coroapi.h>
  20. #include "icmap.h"
  21. #include "vqsim.h"
  22. #define QDEVICE_NAME "VQsim_qdevice"
  23. /* Static variables here are per-instance because we are forked */
  24. static struct corosync_service_engine *engine;
  25. static int parent_socket; /* Our end of the socket */
  26. static char buffer[8192];
  27. static int our_nodeid;
  28. static char *private_data;
  29. static qb_loop_t *poll_loop;
  30. static qb_loop_timer_handle sync_timer;
  31. static qb_loop_timer_handle qdevice_timer;
  32. static int we_are_quorate;
  33. static void *fake_conn = (void*)1;
  34. static cs_error_t last_lib_error;
  35. static struct memb_ring_id current_ring_id;
  36. static int qdevice_registered;
  37. static unsigned int qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT;
  38. /* 'Keep the compiler happy' time */
  39. char *get_run_dir(void);
  40. int api_timer_add_duration (
  41. unsigned long long nanosec_duration,
  42. void *data,
  43. void (*timer_fn) (void *data),
  44. corosync_timer_handle_t *handle);
  45. static void api_error_memory_failure(void) __attribute__((noreturn));
  46. static void api_error_memory_failure()
  47. {
  48. fprintf(stderr, "Out of memory error\n");
  49. exit(-1);
  50. }
  51. static void api_timer_delete(corosync_timer_handle_t th)
  52. {
  53. qb_loop_timer_del(poll_loop, th);
  54. }
  55. int api_timer_add_duration (
  56. unsigned long long nanosec_duration,
  57. void *data,
  58. void (*timer_fn) (void *data),
  59. corosync_timer_handle_t *handle)
  60. {
  61. return qb_loop_timer_add(poll_loop,
  62. QB_LOOP_MED,
  63. nanosec_duration,
  64. data,
  65. timer_fn,
  66. handle);
  67. }
  68. static unsigned int api_totem_nodeid_get(void)
  69. {
  70. return our_nodeid;
  71. }
  72. static int api_totem_mcast(const struct iovec *iov, unsigned int iovlen, unsigned int type)
  73. {
  74. struct vqsim_msg_header header;
  75. struct iovec iovec[iovlen+1];
  76. int total = sizeof(header);
  77. int res;
  78. int i;
  79. header.type = VQMSG_EXEC;
  80. header.from_nodeid = our_nodeid;
  81. header.param = 0;
  82. iovec[0].iov_base = &header;
  83. iovec[0].iov_len = sizeof(header);
  84. for (i=0; i<iovlen; i++) {
  85. iovec[i+1].iov_base = iov[i].iov_base;
  86. iovec[i+1].iov_len = iov[i].iov_len;
  87. total += iov[i].iov_len;
  88. }
  89. res = writev(parent_socket, iovec, iovlen+1);
  90. if (res != total) {
  91. fprintf(stderr, "writev wrote only %d of %d bytes\n", res, total);
  92. }
  93. return 0;
  94. }
  95. static void *api_ipc_private_data_get(void *conn)
  96. {
  97. return private_data;
  98. }
  99. static int api_ipc_response_send(void *conn, const void *msg, size_t len)
  100. {
  101. struct qb_ipc_response_header *qb_header = (void*)msg;
  102. /* Save the error so we can return it */
  103. last_lib_error = qb_header->error;
  104. return 0;
  105. }
  106. static struct corosync_api_v1 corosync_api = {
  107. .error_memory_failure = api_error_memory_failure,
  108. .timer_delete = api_timer_delete,
  109. .timer_add_duration = api_timer_add_duration,
  110. .totem_nodeid_get = api_totem_nodeid_get,
  111. .totem_mcast = api_totem_mcast,
  112. .ipc_private_data_get = api_ipc_private_data_get,
  113. .ipc_response_send = api_ipc_response_send,
  114. };
  115. /* -------------------- Above is all for providing the corosync_api support routines --------------------------------------------*/
  116. /* They need to be in the same file as the engine as they use the local 'poll_loop' variable which is per-process */
  117. static void start_qdevice_poll(int longwait);
  118. static void start_sync_timer(void);
  119. /* Callback from Votequorum to tell us about the quorum state */
  120. static void quorum_fn(const unsigned int *view_list,
  121. size_t view_list_entries,
  122. int quorate, struct memb_ring_id *ring_id)
  123. {
  124. char msgbuf[8192];
  125. int len;
  126. struct vqsim_quorum_msg *quorum_msg = (void*) msgbuf;
  127. we_are_quorate = quorate;
  128. /* Send back to parent */
  129. quorum_msg->header.type = VQMSG_QUORUM;
  130. quorum_msg->header.from_nodeid = our_nodeid;
  131. quorum_msg->header.param = 0;
  132. quorum_msg->quorate = quorate;
  133. memcpy(&quorum_msg->ring_id, ring_id, sizeof(*ring_id));
  134. quorum_msg->view_list_entries = view_list_entries;
  135. memcpy(quorum_msg->view_list, view_list, sizeof(unsigned int)*view_list_entries);
  136. if ( (len=write(parent_socket, msgbuf, sizeof(*quorum_msg) + sizeof(unsigned int)*view_list_entries)) <= 0) {
  137. perror("write (view list to parent) failed");
  138. }
  139. memcpy(&current_ring_id, ring_id, sizeof(*ring_id));
  140. }
  141. char *corosync_service_link_and_init(struct corosync_api_v1 *api,
  142. struct default_service *service_engine)
  143. {
  144. /* dummy */
  145. return NULL;
  146. }
  147. /* For votequorum */
  148. char *get_run_dir()
  149. {
  150. static char cwd_buffer[PATH_MAX];
  151. return getcwd(cwd_buffer, PATH_MAX);
  152. }
  153. /* This is different to the one in totemconfig.c in that we already
  154. * know the 'local' node ID, so we can just search for that.
  155. * It needs to be here rather than at main config read time as it's
  156. * (obviously) going to be different for each instance.
  157. */
  158. static void set_local_node_pos(struct corosync_api_v1 *api)
  159. {
  160. icmap_iter_t iter;
  161. uint32_t node_pos;
  162. char name_str[ICMAP_KEYNAME_MAXLEN];
  163. uint32_t nodeid;
  164. const char *iter_key;
  165. int res;
  166. int found = 0;
  167. iter = icmap_iter_init("nodelist.node.");
  168. while ((iter_key = icmap_iter_next(iter, NULL, NULL)) != NULL) {
  169. res = sscanf(iter_key, "nodelist.node.%u.%s", &node_pos, name_str);
  170. if (res != 2) {
  171. continue;
  172. }
  173. if (strcmp(name_str, "nodeid")) {
  174. continue;
  175. }
  176. res = icmap_get_uint32(iter_key, &nodeid);
  177. if (res == CS_OK) {
  178. if (nodeid == our_nodeid) {
  179. found = 1;
  180. res = icmap_set_uint32("nodelist.local_node_pos", node_pos);
  181. assert(res == CS_OK);
  182. }
  183. }
  184. }
  185. if (!found) {
  186. /* This probably indicates a dynamically-added node
  187. * set the pos to zero and use the votes of the
  188. * first node in corosync.conf
  189. */
  190. res = icmap_set_uint32("nodelist.local_node_pos", 0);
  191. assert(res == CS_OK);
  192. }
  193. }
  194. static int load_quorum_instance(struct corosync_api_v1 *api)
  195. {
  196. const char *error_string;
  197. int res;
  198. error_string = votequorum_init(api, quorum_fn);
  199. if (error_string) {
  200. fprintf(stderr, "Votequorum init failed: %s\n", error_string);
  201. return -1;
  202. }
  203. engine = votequorum_get_service_engine_ver0();
  204. error_string = engine->exec_init_fn(api);
  205. if (error_string) {
  206. fprintf(stderr, "votequorum exec init failed: %s\n", error_string);
  207. return -1;
  208. }
  209. private_data = malloc(engine->private_data_size);
  210. if (!private_data) {
  211. perror("Malloc in child failed");
  212. return -1;
  213. }
  214. res = engine->lib_init_fn(fake_conn);
  215. return res;
  216. }
  217. static void sync_dispatch_fn(void *data)
  218. {
  219. if (engine->sync_process()) {
  220. start_sync_timer();
  221. }
  222. else {
  223. engine->sync_activate();
  224. }
  225. }
  226. static void start_sync_timer()
  227. {
  228. qb_loop_timer_add(poll_loop,
  229. QB_LOOP_MED,
  230. 10000000,
  231. NULL,
  232. sync_dispatch_fn,
  233. &sync_timer);
  234. }
  235. static void send_sync(char *buf, int len)
  236. {
  237. struct vqsim_sync_msg *msg = (void*)buf;
  238. /* Votequorum doesn't use the transitional node list :-) */
  239. engine->sync_init(NULL, 0,
  240. msg->view_list, msg->view_list_entries,
  241. &msg->ring_id);
  242. start_sync_timer();
  243. }
  244. static void send_exec_msg(char *buf, int len)
  245. {
  246. struct vqsim_exec_msg *execmsg = (void*)buf;
  247. struct qb_ipc_request_header *qb_header = (void*)execmsg->execmsg;
  248. engine->exec_engine[qb_header->id & 0xFFFF].exec_handler_fn(execmsg->execmsg, execmsg->header.from_nodeid);
  249. }
  250. static int send_lib_msg(int type, void *msg)
  251. {
  252. /* Clear this as not all lib functions return a response immediately */
  253. last_lib_error = CS_OK;
  254. engine->lib_engine[type].lib_handler_fn(fake_conn, msg);
  255. return last_lib_error;
  256. }
  257. static int poll_qdevice(int onoff)
  258. {
  259. struct req_lib_votequorum_qdevice_poll pollmsg;
  260. int res;
  261. pollmsg.cast_vote = onoff;
  262. pollmsg.ring_id.nodeid = current_ring_id.nodeid;
  263. pollmsg.ring_id.seq = current_ring_id.seq;
  264. strcpy(pollmsg.name, QDEVICE_NAME);
  265. res = send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_POLL, &pollmsg);
  266. if (res != CS_OK) {
  267. fprintf(stderr, CS_PRI_NODE_ID ": qdevice poll failed: %d\n", our_nodeid, res);
  268. }
  269. return res;
  270. }
  271. static void qdevice_dispatch_fn(void *data)
  272. {
  273. if (poll_qdevice(1) == CS_OK) {
  274. start_qdevice_poll(0);
  275. }
  276. }
  277. static void start_qdevice_poll(int longwait)
  278. {
  279. unsigned long long timeout;
  280. timeout = (unsigned long long)qdevice_timeout*500000; /* Half the corosync timeout */
  281. if (longwait) {
  282. timeout *= 2;
  283. }
  284. qb_loop_timer_add(poll_loop,
  285. QB_LOOP_MED,
  286. timeout,
  287. NULL,
  288. qdevice_dispatch_fn,
  289. &qdevice_timer);
  290. }
  291. static void stop_qdevice_poll(void)
  292. {
  293. qb_loop_timer_del(poll_loop, qdevice_timer);
  294. qdevice_timer = 0;
  295. }
  296. static void do_qdevice(int onoff)
  297. {
  298. int res;
  299. if (onoff) {
  300. if (!qdevice_registered) {
  301. struct req_lib_votequorum_qdevice_register regmsg;
  302. strcpy(regmsg.name, QDEVICE_NAME);
  303. if ( (res=send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_REGISTER, &regmsg)) == CS_OK) {
  304. qdevice_registered = 1;
  305. start_qdevice_poll(1);
  306. }
  307. else {
  308. fprintf(stderr, CS_PRI_NODE_ID ": qdevice registration failed: %d\n", our_nodeid, res);
  309. }
  310. }
  311. else {
  312. if (!qdevice_timer) {
  313. start_qdevice_poll(0);
  314. }
  315. }
  316. }
  317. else {
  318. poll_qdevice(0);
  319. stop_qdevice_poll();
  320. }
  321. }
  322. /* From controller */
  323. static int parent_pipe_read_fn(int32_t fd, int32_t revents, void *data)
  324. {
  325. struct vqsim_msg_header *header = (void*)buffer;
  326. int len;
  327. len = read(fd, buffer, sizeof(buffer));
  328. if (len > 0) {
  329. /* Check header and route */
  330. switch (header->type) {
  331. case VQMSG_QUIT:
  332. exit(0);
  333. break;
  334. case VQMSG_EXEC: /* For votequorum exec messages */
  335. send_exec_msg(buffer, len);
  336. break;
  337. case VQMSG_SYNC:
  338. send_sync(buffer, len);
  339. break;
  340. case VQMSG_QDEVICE:
  341. do_qdevice(header->param);
  342. break;
  343. case VQMSG_QUORUMQUIT:
  344. if (!we_are_quorate) {
  345. exit(1);
  346. }
  347. break;
  348. case VQMSG_QUORUM:
  349. /* not used here */
  350. break;
  351. }
  352. }
  353. return 0;
  354. }
  355. static void initial_sync(int nodeid)
  356. {
  357. unsigned int trans_list[1] = {nodeid};
  358. unsigned int member_list[1] = {nodeid};
  359. struct memb_ring_id ring_id;
  360. ring_id.nodeid = our_nodeid;
  361. ring_id.seq = 1;
  362. /* cluster with just us in it */
  363. engine->sync_init(trans_list, 1,
  364. member_list, 1,
  365. &ring_id);
  366. start_sync_timer();
  367. }
  368. /* Return pipe FDs & child PID if sucessful */
  369. int fork_new_instance(int nodeid, int *vq_sock, pid_t *childpid)
  370. {
  371. int pipes[2];
  372. pid_t pid;
  373. if (socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_NONBLOCK, 0, pipes)) {
  374. return -1;
  375. }
  376. parent_socket = pipes[0];
  377. switch ( (pid=fork()) ) {
  378. case -1:
  379. perror("fork failed");
  380. return -1;
  381. case 0:
  382. /* child process - continue below */
  383. break;
  384. default:
  385. /* parent process */
  386. *vq_sock = pipes[1];
  387. *childpid = pid;
  388. return 0;
  389. }
  390. our_nodeid = nodeid;
  391. poll_loop = qb_loop_create();
  392. if (icmap_get_uint32("quorum.device.timeout", &qdevice_timeout) != CS_OK) {
  393. qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT;
  394. }
  395. set_local_node_pos(&corosync_api);
  396. load_quorum_instance(&corosync_api);
  397. qb_loop_poll_add(poll_loop,
  398. QB_LOOP_MED,
  399. parent_socket,
  400. POLLIN,
  401. NULL,
  402. parent_pipe_read_fn);
  403. /* Start it up! */
  404. initial_sync(nodeid);
  405. qb_loop_run(poll_loop);
  406. return 0;
  407. }