| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479 |
- /* This is the bit of VQSIM that runs in the forked process.
- It represents a single votequorum instance or, if you like,
- a 'node' in the cluster.
- */
- #include <sys/types.h>
- #include <qb/qblog.h>
- #include <qb/qbloop.h>
- #include <qb/qbipc_common.h>
- #include <netinet/in.h>
- #include <sys/poll.h>
- #include <sys/socket.h>
- #include <stdio.h>
- #include "../exec/votequorum.h"
- #include "../exec/service.h"
- #include "../include/corosync/corotypes.h"
- #include "../include/corosync/votequorum.h"
- #include "../include/corosync/ipc_votequorum.h"
- #include <corosync/logsys.h>
- #include <corosync/coroapi.h>
- #include "icmap.h"
- #include "vqsim.h"
- #define QDEVICE_NAME "VQsim_qdevice"
- /* Static variables here are per-instance because we are forked */
- static struct corosync_service_engine *engine;
- static int parent_socket; /* Our end of the socket */
- static char buffer[8192];
- static int our_nodeid;
- static char *private_data;
- static qb_loop_t *poll_loop;
- static qb_loop_timer_handle sync_timer;
- static qb_loop_timer_handle qdevice_timer;
- static int we_are_quorate;
- static void *fake_conn = (void*)1;
- static cs_error_t last_lib_error;
- static struct memb_ring_id current_ring_id;
- static int qdevice_registered;
- static unsigned int qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT;
- /* 'Keep the compiler happy' time */
- char *get_run_dir(void);
- int api_timer_add_duration (
- unsigned long long nanosec_duration,
- void *data,
- void (*timer_fn) (void *data),
- corosync_timer_handle_t *handle);
- static void api_error_memory_failure(void) __attribute__((noreturn));
- static void api_error_memory_failure()
- {
- fprintf(stderr, "Out of memory error\n");
- exit(-1);
- }
- static void api_timer_delete(corosync_timer_handle_t th)
- {
- qb_loop_timer_del(poll_loop, th);
- }
- int api_timer_add_duration (
- unsigned long long nanosec_duration,
- void *data,
- void (*timer_fn) (void *data),
- corosync_timer_handle_t *handle)
- {
- return qb_loop_timer_add(poll_loop,
- QB_LOOP_MED,
- nanosec_duration,
- data,
- timer_fn,
- handle);
- }
- static unsigned int api_totem_nodeid_get(void)
- {
- return our_nodeid;
- }
- static int api_totem_mcast(const struct iovec *iov, unsigned int iovlen, unsigned int type)
- {
- struct vqsim_msg_header header;
- struct iovec iovec[iovlen+1];
- int total = sizeof(header);
- int res;
- int i;
- header.type = VQMSG_EXEC;
- header.from_nodeid = our_nodeid;
- header.param = 0;
- iovec[0].iov_base = &header;
- iovec[0].iov_len = sizeof(header);
- for (i=0; i<iovlen; i++) {
- iovec[i+1].iov_base = iov[i].iov_base;
- iovec[i+1].iov_len = iov[i].iov_len;
- total += iov[i].iov_len;
- }
- res = writev(parent_socket, iovec, iovlen+1);
- if (res != total) {
- fprintf(stderr, "writev wrote only %d of %d bytes\n", res, total);
- }
- return 0;
- }
- static void *api_ipc_private_data_get(void *conn)
- {
- return private_data;
- }
- static int api_ipc_response_send(void *conn, const void *msg, size_t len)
- {
- struct qb_ipc_response_header *qb_header = (void*)msg;
- /* Save the error so we can return it */
- last_lib_error = qb_header->error;
- return 0;
- }
- static struct corosync_api_v1 corosync_api = {
- .error_memory_failure = api_error_memory_failure,
- .timer_delete = api_timer_delete,
- .timer_add_duration = api_timer_add_duration,
- .totem_nodeid_get = api_totem_nodeid_get,
- .totem_mcast = api_totem_mcast,
- .ipc_private_data_get = api_ipc_private_data_get,
- .ipc_response_send = api_ipc_response_send,
- };
- /* -------------------- Above is all for providing the corosync_api support routines --------------------------------------------*/
- /* They need to be in the same file as the engine as they use the local 'poll_loop' variable which is per-process */
- static void start_qdevice_poll(int longwait);
- static void start_sync_timer(void);
- /* Callback from Votequorum to tell us about the quorum state */
- static void quorum_fn(const unsigned int *view_list,
- size_t view_list_entries,
- int quorate, struct memb_ring_id *ring_id)
- {
- char msgbuf[8192];
- int len;
- struct vqsim_quorum_msg *quorum_msg = (void*) msgbuf;
- we_are_quorate = quorate;
- /* Send back to parent */
- quorum_msg->header.type = VQMSG_QUORUM;
- quorum_msg->header.from_nodeid = our_nodeid;
- quorum_msg->header.param = 0;
- quorum_msg->quorate = quorate;
- memcpy(&quorum_msg->ring_id, ring_id, sizeof(*ring_id));
- quorum_msg->view_list_entries = view_list_entries;
- memcpy(quorum_msg->view_list, view_list, sizeof(unsigned int)*view_list_entries);
- if ( (len=write(parent_socket, msgbuf, sizeof(*quorum_msg) + sizeof(unsigned int)*view_list_entries)) <= 0) {
- perror("write (view list to parent) failed");
- }
- memcpy(¤t_ring_id, ring_id, sizeof(*ring_id));
- }
- char *corosync_service_link_and_init(struct corosync_api_v1 *api,
- struct default_service *service_engine)
- {
- /* dummy */
- return NULL;
- }
- /* For votequorum */
- char *get_run_dir()
- {
- static char cwd_buffer[PATH_MAX];
- return getcwd(cwd_buffer, PATH_MAX);
- }
- /* This is different to the one in totemconfig.c in that we already
- * know the 'local' node ID, so we can just search for that.
- * It needs to be here rather than at main config read time as it's
- * (obviously) going to be different for each instance.
- */
- static void set_local_node_pos(struct corosync_api_v1 *api)
- {
- icmap_iter_t iter;
- uint32_t node_pos;
- char name_str[ICMAP_KEYNAME_MAXLEN];
- uint32_t nodeid;
- const char *iter_key;
- int res;
- int found = 0;
- iter = icmap_iter_init("nodelist.node.");
- while ((iter_key = icmap_iter_next(iter, NULL, NULL)) != NULL) {
- res = sscanf(iter_key, "nodelist.node.%u.%s", &node_pos, name_str);
- if (res != 2) {
- continue;
- }
- if (strcmp(name_str, "nodeid")) {
- continue;
- }
- res = icmap_get_uint32(iter_key, &nodeid);
- if (res == CS_OK) {
- if (nodeid == our_nodeid) {
- found = 1;
- res = icmap_set_uint32("nodelist.local_node_pos", node_pos);
- assert(res == CS_OK);
- }
- }
- }
- if (!found) {
- /* This probably indicates a dynamically-added node
- * set the pos to zero and use the votes of the
- * first node in corosync.conf
- */
- res = icmap_set_uint32("nodelist.local_node_pos", 0);
- assert(res == CS_OK);
- }
- }
- static int load_quorum_instance(struct corosync_api_v1 *api)
- {
- const char *error_string;
- int res;
- error_string = votequorum_init(api, quorum_fn);
- if (error_string) {
- fprintf(stderr, "Votequorum init failed: %s\n", error_string);
- return -1;
- }
- engine = votequorum_get_service_engine_ver0();
- error_string = engine->exec_init_fn(api);
- if (error_string) {
- fprintf(stderr, "votequorum exec init failed: %s\n", error_string);
- return -1;
- }
- private_data = malloc(engine->private_data_size);
- if (!private_data) {
- perror("Malloc in child failed");
- return -1;
- }
- res = engine->lib_init_fn(fake_conn);
- return res;
- }
- static void sync_dispatch_fn(void *data)
- {
- if (engine->sync_process()) {
- start_sync_timer();
- }
- else {
- engine->sync_activate();
- }
- }
- static void start_sync_timer()
- {
- qb_loop_timer_add(poll_loop,
- QB_LOOP_MED,
- 10000000,
- NULL,
- sync_dispatch_fn,
- &sync_timer);
- }
- static void send_sync(char *buf, int len)
- {
- struct vqsim_sync_msg *msg = (void*)buf;
- /* Votequorum doesn't use the transitional node list :-) */
- engine->sync_init(NULL, 0,
- msg->view_list, msg->view_list_entries,
- &msg->ring_id);
- start_sync_timer();
- }
- static void send_exec_msg(char *buf, int len)
- {
- struct vqsim_exec_msg *execmsg = (void*)buf;
- struct qb_ipc_request_header *qb_header = (void*)execmsg->execmsg;
- engine->exec_engine[qb_header->id & 0xFFFF].exec_handler_fn(execmsg->execmsg, execmsg->header.from_nodeid);
- }
- static int send_lib_msg(int type, void *msg)
- {
- /* Clear this as not all lib functions return a response immediately */
- last_lib_error = CS_OK;
- engine->lib_engine[type].lib_handler_fn(fake_conn, msg);
- return last_lib_error;
- }
- static int poll_qdevice(int onoff)
- {
- struct req_lib_votequorum_qdevice_poll pollmsg;
- int res;
- pollmsg.cast_vote = onoff;
- pollmsg.ring_id.nodeid = current_ring_id.nodeid;
- pollmsg.ring_id.seq = current_ring_id.seq;
- strcpy(pollmsg.name, QDEVICE_NAME);
- res = send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_POLL, &pollmsg);
- if (res != CS_OK) {
- fprintf(stderr, CS_PRI_NODE_ID ": qdevice poll failed: %d\n", our_nodeid, res);
- }
- return res;
- }
- static void qdevice_dispatch_fn(void *data)
- {
- if (poll_qdevice(1) == CS_OK) {
- start_qdevice_poll(0);
- }
- }
- static void start_qdevice_poll(int longwait)
- {
- unsigned long long timeout;
- timeout = (unsigned long long)qdevice_timeout*500000; /* Half the corosync timeout */
- if (longwait) {
- timeout *= 2;
- }
- qb_loop_timer_add(poll_loop,
- QB_LOOP_MED,
- timeout,
- NULL,
- qdevice_dispatch_fn,
- &qdevice_timer);
- }
- static void stop_qdevice_poll(void)
- {
- qb_loop_timer_del(poll_loop, qdevice_timer);
- qdevice_timer = 0;
- }
- static void do_qdevice(int onoff)
- {
- int res;
- if (onoff) {
- if (!qdevice_registered) {
- struct req_lib_votequorum_qdevice_register regmsg;
- strcpy(regmsg.name, QDEVICE_NAME);
- if ( (res=send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_REGISTER, ®msg)) == CS_OK) {
- qdevice_registered = 1;
- start_qdevice_poll(1);
- }
- else {
- fprintf(stderr, CS_PRI_NODE_ID ": qdevice registration failed: %d\n", our_nodeid, res);
- }
- }
- else {
- if (!qdevice_timer) {
- start_qdevice_poll(0);
- }
- }
- }
- else {
- poll_qdevice(0);
- stop_qdevice_poll();
- }
- }
- /* From controller */
- static int parent_pipe_read_fn(int32_t fd, int32_t revents, void *data)
- {
- struct vqsim_msg_header *header = (void*)buffer;
- int len;
- len = read(fd, buffer, sizeof(buffer));
- if (len > 0) {
- /* Check header and route */
- switch (header->type) {
- case VQMSG_QUIT:
- exit(0);
- break;
- case VQMSG_EXEC: /* For votequorum exec messages */
- send_exec_msg(buffer, len);
- break;
- case VQMSG_SYNC:
- send_sync(buffer, len);
- break;
- case VQMSG_QDEVICE:
- do_qdevice(header->param);
- break;
- case VQMSG_QUORUMQUIT:
- if (!we_are_quorate) {
- exit(1);
- }
- break;
- case VQMSG_QUORUM:
- /* not used here */
- break;
- }
- }
- return 0;
- }
- static void initial_sync(int nodeid)
- {
- unsigned int trans_list[1] = {nodeid};
- unsigned int member_list[1] = {nodeid};
- struct memb_ring_id ring_id;
- ring_id.nodeid = our_nodeid;
- ring_id.seq = 1;
- /* cluster with just us in it */
- engine->sync_init(trans_list, 1,
- member_list, 1,
- &ring_id);
- start_sync_timer();
- }
- /* Return pipe FDs & child PID if sucessful */
- int fork_new_instance(int nodeid, int *vq_sock, pid_t *childpid)
- {
- int pipes[2];
- pid_t pid;
- if (socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_NONBLOCK, 0, pipes)) {
- return -1;
- }
- parent_socket = pipes[0];
- switch ( (pid=fork()) ) {
- case -1:
- perror("fork failed");
- return -1;
- case 0:
- /* child process - continue below */
- break;
- default:
- /* parent process */
- *vq_sock = pipes[1];
- *childpid = pid;
- return 0;
- }
- our_nodeid = nodeid;
- poll_loop = qb_loop_create();
- if (icmap_get_uint32("quorum.device.timeout", &qdevice_timeout) != CS_OK) {
- qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT;
- }
- set_local_node_pos(&corosync_api);
- load_quorum_instance(&corosync_api);
- qb_loop_poll_add(poll_loop,
- QB_LOOP_MED,
- parent_socket,
- POLLIN,
- NULL,
- parent_pipe_read_fn);
- /* Start it up! */
- initial_sync(nodeid);
- qb_loop_run(poll_loop);
- return 0;
- }
|