vqsim_vq_engine.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. /* This is the bit of VQSIM that runs in the forked process.
  2. It represents a single votequorum instance or, if you like,
  3. a 'node' in the cluster.
  4. */
  5. #include <sys/types.h>
  6. #include <qb/qblog.h>
  7. #include <qb/qbloop.h>
  8. #include <qb/qbipc_common.h>
  9. #include <netinet/in.h>
  10. #include <sys/poll.h>
  11. #include <sys/socket.h>
  12. #include <stdio.h>
  13. #include "../exec/votequorum.h"
  14. #include "../exec/service.h"
  15. #include "../include/corosync/corotypes.h"
  16. #include "../include/corosync/votequorum.h"
  17. #include "../include/corosync/ipc_votequorum.h"
  18. #include <corosync/logsys.h>
  19. #include <corosync/coroapi.h>
  20. #include "icmap.h"
  21. #include "vqsim.h"
  22. #define QDEVICE_NAME "VQsim_qdevice"
  23. /* Static variables here are per-instance because we are forked */
  24. static struct corosync_service_engine *engine;
  25. static int parent_socket; /* Our end of the socket */
  26. static char buffer[8192];
  27. static int our_nodeid;
  28. static char *private_data;
  29. static qb_loop_t *poll_loop;
  30. static qb_loop_timer_handle sync_timer;
  31. static qb_loop_timer_handle qdevice_timer;
  32. static int we_are_quorate;
  33. static void *fake_conn = (void*)1;
  34. static cs_error_t last_lib_error;
  35. static struct memb_ring_id current_ring_id;
  36. static int qdevice_registered;
  37. static unsigned int qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT;
  38. /* 'Keep the compiler happy' time */
  39. char *get_run_dir(void);
  40. int api_timer_add_duration (
  41. unsigned long long nanosec_duration,
  42. void *data,
  43. void (*timer_fn) (void *data),
  44. corosync_timer_handle_t *handle);
  45. static void api_error_memory_failure(void) __attribute__((noreturn));
  46. static void api_error_memory_failure()
  47. {
  48. fprintf(stderr, "Out of memory error\n");
  49. exit(-1);
  50. }
  51. static void api_timer_delete(corosync_timer_handle_t th)
  52. {
  53. qb_loop_timer_del(poll_loop, th);
  54. }
  55. int api_timer_add_duration (
  56. unsigned long long nanosec_duration,
  57. void *data,
  58. void (*timer_fn) (void *data),
  59. corosync_timer_handle_t *handle)
  60. {
  61. return qb_loop_timer_add(poll_loop,
  62. QB_LOOP_MED,
  63. nanosec_duration,
  64. data,
  65. timer_fn,
  66. handle);
  67. }
  68. static unsigned int api_totem_nodeid_get(void)
  69. {
  70. return our_nodeid;
  71. }
  72. static int api_totem_mcast(const struct iovec *iov, unsigned int iovlen, unsigned int type)
  73. {
  74. struct vqsim_msg_header header;
  75. struct iovec iovec[iovlen+1];
  76. int total = sizeof(header);
  77. int res;
  78. int i;
  79. header.type = VQMSG_EXEC;
  80. header.from_nodeid = our_nodeid;
  81. header.param = 0;
  82. iovec[0].iov_base = &header;
  83. iovec[0].iov_len = sizeof(header);
  84. for (i=0; i<iovlen; i++) {
  85. iovec[i+1].iov_base = iov[i].iov_base;
  86. iovec[i+1].iov_len = iov[i].iov_len;
  87. total += iov[i].iov_len;
  88. }
  89. res = writev(parent_socket, iovec, iovlen+1);
  90. if (res != total) {
  91. fprintf(stderr, "writev wrote only %d of %d bytes\n", res, total);
  92. }
  93. return 0;
  94. }
  95. static void *api_ipc_private_data_get(void *conn)
  96. {
  97. return private_data;
  98. }
  99. static int api_ipc_response_send(void *conn, const void *msg, size_t len)
  100. {
  101. struct qb_ipc_response_header *qb_header = (void*)msg;
  102. /* Save the error so we can return it */
  103. last_lib_error = qb_header->error;
  104. return 0;
  105. }
  106. static struct corosync_api_v1 corosync_api = {
  107. .error_memory_failure = api_error_memory_failure,
  108. .timer_delete = api_timer_delete,
  109. .timer_add_duration = api_timer_add_duration,
  110. .totem_nodeid_get = api_totem_nodeid_get,
  111. .totem_mcast = api_totem_mcast,
  112. .ipc_private_data_get = api_ipc_private_data_get,
  113. .ipc_response_send = api_ipc_response_send,
  114. };
  115. /* -------------------- Above is all for providing the corosync_api support routines --------------------------------------------*/
  116. /* They need to be in the same file as the engine as they use the local 'poll_loop' variable which is per-process */
  117. static void start_qdevice_poll(int longwait);
  118. static void start_sync_timer(void);
  119. /* Callback from Votequorum to tell us about the quorum state */
  120. static void quorum_fn(const unsigned int *view_list,
  121. size_t view_list_entries,
  122. int quorate, struct memb_ring_id *ring_id)
  123. {
  124. char msgbuf[8192];
  125. int len;
  126. struct vqsim_quorum_msg *quorum_msg = (void*) msgbuf;
  127. we_are_quorate = quorate;
  128. /* Send back to parent */
  129. quorum_msg->header.type = VQMSG_QUORUM;
  130. quorum_msg->header.from_nodeid = our_nodeid;
  131. quorum_msg->header.param = 0;
  132. quorum_msg->quorate = quorate;
  133. memcpy(&quorum_msg->ring_id, ring_id, sizeof(*ring_id));
  134. quorum_msg->view_list_entries = view_list_entries;
  135. memcpy(quorum_msg->view_list, view_list, sizeof(unsigned int)*view_list_entries);
  136. if ( (len=write(parent_socket, msgbuf, sizeof(*quorum_msg) + sizeof(unsigned int)*view_list_entries)) <= 0) {
  137. perror("write (view list to parent) failed");
  138. }
  139. memcpy(&current_ring_id, ring_id, sizeof(*ring_id));
  140. }
  141. char *corosync_service_link_and_init(struct corosync_api_v1 *api,
  142. struct default_service *service_engine)
  143. {
  144. /* dummy */
  145. return NULL;
  146. }
  147. /* For votequorum */
  148. char *get_run_dir()
  149. {
  150. static char cwd_buffer[PATH_MAX];
  151. return getcwd(cwd_buffer, PATH_MAX);
  152. }
  153. /* This is different to the one in totemconfig.c in that we already
  154. * know the 'local' node ID, so we can just search for that.
  155. * It needs to be here rather than at main config read time as it's
  156. * (obviously) going to be different for each instance.
  157. */
  158. static void set_local_node_pos(struct corosync_api_v1 *api)
  159. {
  160. icmap_iter_t iter;
  161. uint32_t node_pos;
  162. char name_str[ICMAP_KEYNAME_MAXLEN];
  163. uint32_t nodeid;
  164. const char *iter_key;
  165. int res;
  166. iter = icmap_iter_init("nodelist.node.");
  167. while ((iter_key = icmap_iter_next(iter, NULL, NULL)) != NULL) {
  168. res = sscanf(iter_key, "nodelist.node.%u.%s", &node_pos, name_str);
  169. if (res != 2) {
  170. continue;
  171. }
  172. if (strcmp(name_str, "nodeid")) {
  173. continue;
  174. }
  175. res = icmap_get_uint32(iter_key, &nodeid);
  176. if (res == CS_OK) {
  177. if (nodeid == our_nodeid) {
  178. res = icmap_set_uint32("nodelist.local_node_pos", node_pos);
  179. if (res != CS_OK) {
  180. fprintf(stderr, "Failed to find node %d in corosync.conf. Quorum calculations may not be correct:\n", our_nodeid);
  181. }
  182. }
  183. }
  184. }
  185. }
  186. static int load_quorum_instance(struct corosync_api_v1 *api)
  187. {
  188. const char *error_string;
  189. int res;
  190. error_string = votequorum_init(api, quorum_fn);
  191. if (error_string) {
  192. fprintf(stderr, "Votequorum init failed: %s\n", error_string);
  193. return -1;
  194. }
  195. engine = votequorum_get_service_engine_ver0();
  196. error_string = engine->exec_init_fn(api);
  197. if (error_string) {
  198. fprintf(stderr, "votequorum exec init failed: %s\n", error_string);
  199. return -1;
  200. }
  201. private_data = malloc(engine->private_data_size);
  202. if (!private_data) {
  203. perror("Malloc in child failed");
  204. return -1;
  205. }
  206. res = engine->lib_init_fn(fake_conn);
  207. return res;
  208. }
  209. static void sync_dispatch_fn(void *data)
  210. {
  211. if (engine->sync_process()) {
  212. start_sync_timer();
  213. }
  214. else {
  215. engine->sync_activate();
  216. }
  217. }
  218. static void start_sync_timer()
  219. {
  220. qb_loop_timer_add(poll_loop,
  221. QB_LOOP_MED,
  222. 10000000,
  223. NULL,
  224. sync_dispatch_fn,
  225. &sync_timer);
  226. }
  227. static void send_sync(char *buf, int len)
  228. {
  229. struct vqsim_sync_msg *msg = (void*)buf;
  230. /* Votequorum doesn't use the transitional node list :-) */
  231. engine->sync_init(NULL, 0,
  232. msg->view_list, msg->view_list_entries,
  233. &msg->ring_id);
  234. start_sync_timer();
  235. }
  236. static void send_exec_msg(char *buf, int len)
  237. {
  238. struct vqsim_exec_msg *execmsg = (void*)buf;
  239. struct qb_ipc_request_header *qb_header = (void*)execmsg->execmsg;
  240. engine->exec_engine[qb_header->id & 0xFFFF].exec_handler_fn(execmsg->execmsg, execmsg->header.from_nodeid);
  241. }
  242. static int send_lib_msg(int type, void *msg)
  243. {
  244. /* Clear this as not all lib functions return a response immediately */
  245. last_lib_error = CS_OK;
  246. engine->lib_engine[type].lib_handler_fn(fake_conn, msg);
  247. return last_lib_error;
  248. }
  249. static int poll_qdevice(int onoff)
  250. {
  251. struct req_lib_votequorum_qdevice_poll pollmsg;
  252. int res;
  253. pollmsg.cast_vote = onoff;
  254. pollmsg.ring_id.nodeid = current_ring_id.nodeid;
  255. pollmsg.ring_id.seq = current_ring_id.seq;
  256. strcpy(pollmsg.name, QDEVICE_NAME);
  257. res = send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_POLL, &pollmsg);
  258. if (res != CS_OK) {
  259. fprintf(stderr, "%d: qdevice poll failed: %d\n", our_nodeid, res);
  260. }
  261. return res;
  262. }
  263. static void qdevice_dispatch_fn(void *data)
  264. {
  265. if (poll_qdevice(1) == CS_OK) {
  266. start_qdevice_poll(0);
  267. }
  268. }
  269. static void start_qdevice_poll(int longwait)
  270. {
  271. unsigned long long timeout;
  272. timeout = (unsigned long long)qdevice_timeout*500000; /* Half the corosync timeout */
  273. if (longwait) {
  274. timeout *= 2;
  275. }
  276. qb_loop_timer_add(poll_loop,
  277. QB_LOOP_MED,
  278. timeout,
  279. NULL,
  280. qdevice_dispatch_fn,
  281. &qdevice_timer);
  282. }
  283. static void stop_qdevice_poll(void)
  284. {
  285. qb_loop_timer_del(poll_loop, qdevice_timer);
  286. qdevice_timer = 0;
  287. }
  288. static void do_qdevice(int onoff)
  289. {
  290. int res;
  291. if (onoff) {
  292. if (!qdevice_registered) {
  293. struct req_lib_votequorum_qdevice_register regmsg;
  294. strcpy(regmsg.name, QDEVICE_NAME);
  295. if ( (res=send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_REGISTER, &regmsg)) == CS_OK) {
  296. qdevice_registered = 1;
  297. start_qdevice_poll(1);
  298. }
  299. else {
  300. fprintf(stderr, "%d: qdevice registration failed: %d\n", our_nodeid, res);
  301. }
  302. }
  303. else {
  304. if (!qdevice_timer) {
  305. start_qdevice_poll(0);
  306. }
  307. }
  308. }
  309. else {
  310. poll_qdevice(0);
  311. stop_qdevice_poll();
  312. }
  313. }
  314. /* From controller */
  315. static int parent_pipe_read_fn(int32_t fd, int32_t revents, void *data)
  316. {
  317. struct vqsim_msg_header *header = (void*)buffer;
  318. int len;
  319. len = read(fd, buffer, sizeof(buffer));
  320. if (len > 0) {
  321. /* Check header and route */
  322. switch (header->type) {
  323. case VQMSG_QUIT:
  324. exit(0);
  325. break;
  326. case VQMSG_EXEC: /* For votequorum exec messages */
  327. send_exec_msg(buffer, len);
  328. break;
  329. case VQMSG_SYNC:
  330. send_sync(buffer, len);
  331. break;
  332. case VQMSG_QDEVICE:
  333. do_qdevice(header->param);
  334. break;
  335. case VQMSG_QUORUMQUIT:
  336. if (!we_are_quorate) {
  337. exit(1);
  338. }
  339. break;
  340. case VQMSG_QUORUM:
  341. /* not used here */
  342. break;
  343. }
  344. }
  345. return 0;
  346. }
  347. static void initial_sync(int nodeid)
  348. {
  349. unsigned int trans_list[1] = {nodeid};
  350. unsigned int member_list[1] = {nodeid};
  351. struct memb_ring_id ring_id;
  352. ring_id.nodeid = our_nodeid;
  353. ring_id.seq = 1;
  354. /* cluster with just us in it */
  355. engine->sync_init(trans_list, 1,
  356. member_list, 1,
  357. &ring_id);
  358. start_sync_timer();
  359. }
  360. /* Return pipe FDs & child PID if sucessful */
  361. int fork_new_instance(int nodeid, int *vq_sock, pid_t *childpid)
  362. {
  363. int pipes[2];
  364. pid_t pid;
  365. if (socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_NONBLOCK, 0, pipes)) {
  366. return -1;
  367. }
  368. parent_socket = pipes[0];
  369. switch ( (pid=fork()) ) {
  370. case -1:
  371. perror("fork failed");
  372. return -1;
  373. case 0:
  374. /* child process - continue below */
  375. break;
  376. default:
  377. /* parent process */
  378. *vq_sock = pipes[1];
  379. *childpid = pid;
  380. return 0;
  381. }
  382. our_nodeid = nodeid;
  383. poll_loop = qb_loop_create();
  384. if (icmap_get_uint32("quorum.device.timeout", &qdevice_timeout) != CS_OK) {
  385. qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT;
  386. }
  387. set_local_node_pos(&corosync_api);
  388. load_quorum_instance(&corosync_api);
  389. qb_loop_poll_add(poll_loop,
  390. QB_LOOP_MED,
  391. parent_socket,
  392. POLLIN,
  393. NULL,
  394. parent_pipe_read_fn);
  395. /* Start it up! */
  396. initial_sync(nodeid);
  397. qb_loop_run(poll_loop);
  398. return 0;
  399. }