vqsim_vq_engine.c 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. /* This is the bit of VQSIM that runs in the forked process.
  2. It represents a single votequorum instance or, if you like,
  3. a 'node' in the cluster.
  4. */
  5. #include <sys/types.h>
  6. #include <qb/qblog.h>
  7. #include <qb/qbloop.h>
  8. #include <qb/qbipc_common.h>
  9. #include <netinet/in.h>
  10. #include <sys/poll.h>
  11. #include <sys/socket.h>
  12. #include <stdio.h>
  13. #include "../exec/votequorum.h"
  14. #include "../exec/service.h"
  15. #include "../include/corosync/corotypes.h"
  16. #include "../include/corosync/votequorum.h"
  17. #include "../include/corosync/ipc_votequorum.h"
  18. #include <corosync/logsys.h>
  19. #include <corosync/coroapi.h>
  20. #include "icmap.h"
  21. #include "vqsim.h"
  22. #define QDEVICE_NAME "VQsim_qdevice"
  23. /* Static variables here are per-instance because we are forked */
  24. static struct corosync_service_engine *engine;
  25. static int parent_socket; /* Our end of the socket */
  26. static char buffer[8192];
  27. static int our_nodeid;
  28. static char *private_data;
  29. static qb_loop_t *poll_loop;
  30. static qb_loop_timer_handle sync_timer;
  31. static qb_loop_timer_handle qdevice_timer;
  32. static int we_are_quorate;
  33. static void *fake_conn = (void*)1;
  34. static cs_error_t last_lib_error;
  35. static struct memb_ring_id current_ring_id;
  36. static int qdevice_registered;
  37. static unsigned int qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT;
  38. /* 'Keep the compiler happy' time */
  39. char *get_run_dir(void);
  40. int api_timer_add_duration (
  41. unsigned long long nanosec_duration,
  42. void *data,
  43. void (*timer_fn) (void *data),
  44. corosync_timer_handle_t *handle);
  45. static void api_error_memory_failure(void) __attribute__((noreturn));
  46. static void api_error_memory_failure()
  47. {
  48. fprintf(stderr, "Out of memory error\n");
  49. exit(-1);
  50. }
  51. static void api_timer_delete(corosync_timer_handle_t th)
  52. {
  53. qb_loop_timer_del(poll_loop, th);
  54. }
  55. int api_timer_add_duration (
  56. unsigned long long nanosec_duration,
  57. void *data,
  58. void (*timer_fn) (void *data),
  59. corosync_timer_handle_t *handle)
  60. {
  61. return qb_loop_timer_add(poll_loop,
  62. QB_LOOP_MED,
  63. nanosec_duration,
  64. data,
  65. timer_fn,
  66. handle);
  67. }
  68. static unsigned int api_totem_nodeid_get(void)
  69. {
  70. return our_nodeid;
  71. }
  72. static int api_totem_mcast(const struct iovec *iov, unsigned int iovlen, unsigned int type)
  73. {
  74. struct vqsim_msg_header header;
  75. struct iovec iovec[iovlen+1];
  76. int total = sizeof(header);
  77. int res;
  78. int i;
  79. header.type = VQMSG_EXEC;
  80. header.from_nodeid = our_nodeid;
  81. header.param = 0;
  82. iovec[0].iov_base = &header;
  83. iovec[0].iov_len = sizeof(header);
  84. for (i=0; i<iovlen; i++) {
  85. iovec[i+1].iov_base = iov[i].iov_base;
  86. iovec[i+1].iov_len = iov[i].iov_len;
  87. total += iov[i].iov_len;
  88. }
  89. res = writev(parent_socket, iovec, iovlen+1);
  90. if (res != total) {
  91. fprintf(stderr, "writev wrote only %d of %d bytes\n", res, total);
  92. }
  93. return 0;
  94. }
  95. static void *api_ipc_private_data_get(void *conn)
  96. {
  97. return private_data;
  98. }
  99. static int api_ipc_response_send(void *conn, const void *msg, size_t len)
  100. {
  101. struct qb_ipc_response_header *qb_header = (void*)msg;
  102. /* Save the error so we can return it */
  103. last_lib_error = qb_header->error;
  104. return 0;
  105. }
  106. static struct corosync_api_v1 corosync_api = {
  107. .error_memory_failure = api_error_memory_failure,
  108. .timer_delete = api_timer_delete,
  109. .timer_add_duration = api_timer_add_duration,
  110. .totem_nodeid_get = api_totem_nodeid_get,
  111. .totem_mcast = api_totem_mcast,
  112. .ipc_private_data_get = api_ipc_private_data_get,
  113. .ipc_response_send = api_ipc_response_send,
  114. };
  115. /* -------------------- Above is all for providing the corosync_api support routines --------------------------------------------*/
  116. /* They need to be in the same file as the engine as they use the local 'poll_loop' variable which is per-process */
  117. static void start_qdevice_poll(int longwait);
  118. static void start_sync_timer(void);
  119. /* Callback from Votequorum to tell us about the quorum state */
  120. static void quorum_fn(const unsigned int *view_list,
  121. size_t view_list_entries,
  122. int quorate, struct memb_ring_id *ring_id)
  123. {
  124. char msgbuf[8192];
  125. int len;
  126. struct vqsim_quorum_msg *quorum_msg = (void*) msgbuf;
  127. we_are_quorate = quorate;
  128. /* Send back to parent */
  129. quorum_msg->header.type = VQMSG_QUORUM;
  130. quorum_msg->header.from_nodeid = our_nodeid;
  131. quorum_msg->header.param = 0;
  132. quorum_msg->quorate = quorate;
  133. memcpy(&quorum_msg->ring_id, ring_id, sizeof(*ring_id));
  134. quorum_msg->view_list_entries = view_list_entries;
  135. memcpy(quorum_msg->view_list, view_list, sizeof(unsigned int)*view_list_entries);
  136. if ( (len=write(parent_socket, msgbuf, sizeof(*quorum_msg) + sizeof(unsigned int)*view_list_entries)) <= 0) {
  137. perror("write (view list to parent) failed");
  138. }
  139. memcpy(&current_ring_id, ring_id, sizeof(*ring_id));
  140. }
  141. char *corosync_service_link_and_init(struct corosync_api_v1 *api,
  142. struct default_service *service_engine)
  143. {
  144. /* dummy */
  145. return NULL;
  146. }
  147. /* For votequorum */
  148. char *get_run_dir()
  149. {
  150. static char cwd_buffer[PATH_MAX];
  151. return getcwd(cwd_buffer, PATH_MAX);
  152. }
  153. static int load_quorum_instance(struct corosync_api_v1 *api)
  154. {
  155. const char *error_string;
  156. int res;
  157. error_string = votequorum_init(api, quorum_fn);
  158. if (error_string) {
  159. fprintf(stderr, "Votequorum init failed: %s\n", error_string);
  160. return -1;
  161. }
  162. engine = votequorum_get_service_engine_ver0();
  163. error_string = engine->exec_init_fn(api);
  164. if (error_string) {
  165. fprintf(stderr, "votequorum exec init failed: %s\n", error_string);
  166. return -1;
  167. }
  168. private_data = malloc(engine->private_data_size);
  169. if (!private_data) {
  170. perror("Malloc in child failed");
  171. return -1;
  172. }
  173. res = engine->lib_init_fn(fake_conn);
  174. return res;
  175. }
  176. static void sync_dispatch_fn(void *data)
  177. {
  178. if (engine->sync_process()) {
  179. start_sync_timer();
  180. }
  181. else {
  182. engine->sync_activate();
  183. }
  184. }
  185. static void start_sync_timer()
  186. {
  187. qb_loop_timer_add(poll_loop,
  188. QB_LOOP_MED,
  189. 10000000,
  190. NULL,
  191. sync_dispatch_fn,
  192. &sync_timer);
  193. }
  194. static void send_sync(char *buf, int len)
  195. {
  196. struct vqsim_sync_msg *msg = (void*)buf;
  197. /* Votequorum doesn't use the transitional node list :-) */
  198. engine->sync_init(NULL, 0,
  199. msg->view_list, msg->view_list_entries,
  200. &msg->ring_id);
  201. start_sync_timer();
  202. }
  203. static void send_exec_msg(char *buf, int len)
  204. {
  205. struct vqsim_exec_msg *execmsg = (void*)buf;
  206. struct qb_ipc_request_header *qb_header = (void*)execmsg->execmsg;
  207. engine->exec_engine[qb_header->id & 0xFFFF].exec_handler_fn(execmsg->execmsg, execmsg->header.from_nodeid);
  208. }
  209. static int send_lib_msg(int type, void *msg)
  210. {
  211. /* Clear this as not all lib functions return a response immediately */
  212. last_lib_error = CS_OK;
  213. engine->lib_engine[type].lib_handler_fn(fake_conn, msg);
  214. return last_lib_error;
  215. }
  216. static int poll_qdevice(int onoff)
  217. {
  218. struct req_lib_votequorum_qdevice_poll pollmsg;
  219. int res;
  220. pollmsg.cast_vote = onoff;
  221. pollmsg.ring_id.nodeid = current_ring_id.rep.nodeid;
  222. pollmsg.ring_id.seq = current_ring_id.seq;
  223. strcpy(pollmsg.name, QDEVICE_NAME);
  224. res = send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_POLL, &pollmsg);
  225. if (res != CS_OK) {
  226. fprintf(stderr, "%d: qdevice poll failed: %d\n", our_nodeid, res);
  227. }
  228. return res;
  229. }
  230. static void qdevice_dispatch_fn(void *data)
  231. {
  232. if (poll_qdevice(1) == CS_OK) {
  233. start_qdevice_poll(0);
  234. }
  235. }
  236. static void start_qdevice_poll(int longwait)
  237. {
  238. unsigned long long timeout;
  239. timeout = (unsigned long long)qdevice_timeout*500000; /* Half the corosync timeout */
  240. if (longwait) {
  241. timeout *= 2;
  242. }
  243. qb_loop_timer_add(poll_loop,
  244. QB_LOOP_MED,
  245. timeout,
  246. NULL,
  247. qdevice_dispatch_fn,
  248. &qdevice_timer);
  249. }
  250. static void stop_qdevice_poll(void)
  251. {
  252. qb_loop_timer_del(poll_loop, qdevice_timer);
  253. qdevice_timer = 0;
  254. }
  255. static void do_qdevice(int onoff)
  256. {
  257. int res;
  258. if (onoff) {
  259. if (!qdevice_registered) {
  260. struct req_lib_votequorum_qdevice_register regmsg;
  261. strcpy(regmsg.name, QDEVICE_NAME);
  262. if ( (res=send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_REGISTER, &regmsg)) == CS_OK) {
  263. qdevice_registered = 1;
  264. start_qdevice_poll(1);
  265. }
  266. else {
  267. fprintf(stderr, "%d: qdevice registration failed: %d\n", our_nodeid, res);
  268. }
  269. }
  270. else {
  271. if (!qdevice_timer) {
  272. start_qdevice_poll(0);
  273. }
  274. }
  275. }
  276. else {
  277. poll_qdevice(0);
  278. stop_qdevice_poll();
  279. }
  280. }
  281. /* From controller */
  282. static int parent_pipe_read_fn(int32_t fd, int32_t revents, void *data)
  283. {
  284. struct vqsim_msg_header *header = (void*)buffer;
  285. int len;
  286. len = read(fd, buffer, sizeof(buffer));
  287. if (len > 0) {
  288. /* Check header and route */
  289. switch (header->type) {
  290. case VQMSG_QUIT:
  291. exit(0);
  292. break;
  293. case VQMSG_EXEC: /* For votequorum exec messages */
  294. send_exec_msg(buffer, len);
  295. break;
  296. case VQMSG_SYNC:
  297. send_sync(buffer, len);
  298. break;
  299. case VQMSG_QDEVICE:
  300. do_qdevice(header->param);
  301. break;
  302. case VQMSG_QUORUMQUIT:
  303. if (!we_are_quorate) {
  304. exit(1);
  305. }
  306. break;
  307. case VQMSG_QUORUM:
  308. /* not used here */
  309. break;
  310. }
  311. }
  312. return 0;
  313. }
  314. static void initial_sync(int nodeid)
  315. {
  316. unsigned int trans_list[1] = {nodeid};
  317. unsigned int member_list[1] = {nodeid};
  318. struct memb_ring_id ring_id;
  319. ring_id.rep.nodeid = our_nodeid;
  320. ring_id.seq = 1;
  321. /* cluster with just us in it */
  322. engine->sync_init(trans_list, 1,
  323. member_list, 1,
  324. &ring_id);
  325. start_sync_timer();
  326. }
  327. /* Return pipe FDs & child PID if sucessful */
  328. int fork_new_instance(int nodeid, int *vq_sock, pid_t *childpid)
  329. {
  330. int pipes[2];
  331. pid_t pid;
  332. if (socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_NONBLOCK, 0, pipes)) {
  333. return -1;
  334. }
  335. parent_socket = pipes[0];
  336. switch ( (pid=fork()) ) {
  337. case -1:
  338. perror("fork failed");
  339. return -1;
  340. case 0:
  341. /* child process - continue below */
  342. break;
  343. default:
  344. /* parent process */
  345. *vq_sock = pipes[1];
  346. *childpid = pid;
  347. return 0;
  348. }
  349. our_nodeid = nodeid;
  350. poll_loop = qb_loop_create();
  351. if (icmap_get_uint32("quorum.device.timeout", &qdevice_timeout) != CS_OK) {
  352. qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT;
  353. }
  354. load_quorum_instance(&corosync_api);
  355. qb_loop_poll_add(poll_loop,
  356. QB_LOOP_MED,
  357. parent_socket,
  358. POLLIN,
  359. NULL,
  360. parent_pipe_read_fn);
  361. /* Start it up! */
  362. initial_sync(nodeid);
  363. qb_loop_run(poll_loop);
  364. return 0;
  365. }