Prechádzať zdrojové kódy

vqsim: Add Quorum simulator program

vqsim is a small program that allows node up/down/split/join
operations to be simulated without the use of an actual cluster.

Signed-off-by: Christine Caulfield <ccaulfie@redhat.com>
Christine Caulfield 9 rokov pred
rodič
commit
71e885d7f9
8 zmenil súbory, kde vykonal 1768 pridanie a 1 odobranie
  1. 1 1
      Makefile.am
  2. 24 0
      configure.ac
  3. 47 0
      vqsim/Makefile.am
  4. 332 0
      vqsim/parser.c
  5. 121 0
      vqsim/vq_object.c
  6. 733 0
      vqsim/vqmain.c
  7. 77 0
      vqsim/vqsim.h
  8. 433 0
      vqsim/vqsim_vq_engine.c

+ 1 - 1
Makefile.am

@@ -49,7 +49,7 @@ MAINTAINERCLEANFILES	= Makefile.in aclocal.m4 configure depcomp \
 dist_doc_DATA		= LICENSE INSTALL README.recovery SECURITY AUTHORS
 dist_doc_DATA		= LICENSE INSTALL README.recovery SECURITY AUTHORS
 
 
 SUBDIRS			= include common_lib lib exec tools test cts pkgconfig \
 SUBDIRS			= include common_lib lib exec tools test cts pkgconfig \
-			  man init conf qdevices
+			  man init conf qdevices vqsim
 
 
 coverity:
 coverity:
 	rm -rf cov
 	rm -rf cov

+ 24 - 0
configure.ac

@@ -204,6 +204,7 @@ AC_CONFIG_FILES([Makefile
 		 tools/Makefile
 		 tools/Makefile
 		 conf/Makefile
 		 conf/Makefile
 		 qdevices/Makefile
 		 qdevices/Makefile
+		 vqsim/Makefile
 		 Doxyfile
 		 Doxyfile
 		 conf/logrotate/Makefile
 		 conf/logrotate/Makefile
 		 conf/tmpfiles.d/Makefile])
 		 conf/tmpfiles.d/Makefile])
@@ -418,6 +419,11 @@ AC_ARG_ENABLE([qnetd],
 	[ enable_qnetd="no" ])
 	[ enable_qnetd="no" ])
 AM_CONDITIONAL(BUILD_QNETD, test x$enable_qnetd = xyes)
 AM_CONDITIONAL(BUILD_QNETD, test x$enable_qnetd = xyes)
 
 
+AC_ARG_ENABLE([vqsim],
+	[  --enable-vqsim               : Quorum simulator support ],,
+	[ enable_vqsim="no" ])
+AM_CONDITIONAL(BUILD_VQSIM, test x$enable_vqsim = xyes)
+
 # *FLAGS handling goes here
 # *FLAGS handling goes here
 
 
 ENV_CFLAGS="$CFLAGS"
 ENV_CFLAGS="$CFLAGS"
@@ -454,6 +460,14 @@ if test "x${enable_testagents}" = xyes; then
 	WITH_LIST="$WITH_LIST --with testagents"
 	WITH_LIST="$WITH_LIST --with testagents"
 fi
 fi
 
 
+if test "x${enable_rdma}" = xyes; then
+	PKG_CHECK_MODULES([rdmacm],[rdmacm])
+	PKG_CHECK_MODULES([ibverbs],[ibverbs])
+	AC_DEFINE_UNQUOTED([HAVE_RDMA], 1, [have rdmacm])
+	PACKAGE_FEATURES="$PACKAGE_FEATURES rdma"
+	WITH_LIST="$WITH_LIST --with rdma"
+fi
+
 if test "x${enable_monitoring}" = xyes; then
 if test "x${enable_monitoring}" = xyes; then
 	PKG_CHECK_MODULES([statgrab], [libstatgrab])
 	PKG_CHECK_MODULES([statgrab], [libstatgrab])
 	PKG_CHECK_MODULES([statgrabge090], [libstatgrab >= 0.90],
 	PKG_CHECK_MODULES([statgrabge090], [libstatgrab >= 0.90],
@@ -490,6 +504,16 @@ fi
 if test "x${enable_qdevices}" = xyes; then
 if test "x${enable_qdevices}" = xyes; then
 	PACKAGE_FEATURES="$PACKAGE_FEATURES qdevices"
 	PACKAGE_FEATURES="$PACKAGE_FEATURES qdevices"
 fi
 fi
+
+if test "x${enable_vqsim}" = xyes; then
+	vqsim_readline=no
+	AC_CHECK_HEADERS([readline/readline.h readline/history.h],
+			 [],
+                         AC_MSG_WARN([vqsim will lack readline support]))
+	PACKAGE_FEATURES="$PACKAGE_FEATURES vqsim"
+fi
+AM_CONDITIONAL(VQSIM_READLINE, [test "x${ac_cv_header_readline_readline_h}" = xyes])
+
 if test "x${enable_qnetd}" = xyes; then
 if test "x${enable_qnetd}" = xyes; then
 	PACKAGE_FEATURES="$PACKAGE_FEATURES qnetd"
 	PACKAGE_FEATURES="$PACKAGE_FEATURES qnetd"
 fi
 fi

+ 47 - 0
vqsim/Makefile.am

@@ -0,0 +1,47 @@
+#
+# Copyright (c) 2009 Red Hat, Inc.
+#
+# Authors: Andrew Beekhof
+#	   Steven Dake (sdake@redhat.com)
+#
+# This software licensed under BSD license, the text of which follows:
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# - Redistributions of source code must retain the above copyright notice,
+#   this list of conditions and the following disclaimer.
+# - Redistributions in binary form must reproduce the above copyright notice,
+#   this list of conditions and the following disclaimer in the documentation
+#   and/or other materials provided with the distribution.
+# - Neither the name of the MontaVista Software, Inc. nor the names of its
+#   contributors may be used to endorse or promote products derived from this
+#   software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+# THE POSSIBILITY OF SUCH DAMAGE.
+
+MAINTAINERCLEANFILES	= Makefile.in
+
+noinst_PROGRAMS		= vqsim
+
+vqsim_LDADD		= $(top_builddir)/common_lib/libcorosync_common.la \
+			  ../exec/corosync-votequorum.o ../exec/corosync-icmap.o ../exec/corosync-logsys.o \
+			  ../exec/corosync-coroparse.o ../exec/corosync-logconfig.o \
+			  $(LIBQB_LIBS)
+if VQSIM_READLINE
+vqsim_LDADD		+= -lreadline
+endif
+
+vqsim_DEPENDENCIES	= $(top_builddir)/common_lib/libcorosync_common.la
+
+vqsim_SOURCES	        = vqmain.c parser.c vq_object.c vqsim_vq_engine.c

+ 332 - 0
vqsim/parser.c

@@ -0,0 +1,332 @@
+/* Parses the interactive commands */
+
+#include <config.h>
+
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <netinet/in.h>
+#ifdef HAVE_READLINE_HISTORY_H
+#include <readline/history.h>
+#endif
+
+#include <corosync/coroapi.h>
+#include "vqsim.h"
+
+static void do_usage(void)
+{
+	printf("  All node IDs in the cluster are unique and belong to a numbered 'partition' (default=0)\n");
+	printf("\n");
+	printf("up         [<partition>:][<nodeid>[,<nodeid>] ...] [[<partition>:][<nodeid>...]] [...]\n");
+	printf("           bring node(s) online in the specified partition(s)\n");
+	printf("down       <nodeid>,[<nodeid>...]\n");
+	printf("           send nodes offline (shut them down)\n");
+	printf("move/split [<partition>:][<nodeid>[,<nodeid>] ...] [[<partition>:][<nodeid>...]] [...]\n");
+	printf("           Move nodes from one partition to another (netsplit)\n");
+	printf("           <partition> here is the partition to move the nodes to\n");
+	printf("join       <partition> <partition> [<partition>] ... \n");
+	printf("           Join partitions together (reverse of a netsplit)\n");
+	printf("qdevice    on|off [<partition>:][<nodeid>[,<nodeid>] ...] [[<partition>:][<nodeid>...]] [...]\n");
+	printf("           Enable quorum device in specified nodes\n");
+	printf("autofence  on|off\n");
+	printf("           automatically 'down' nodes on inquorate side on netsplit\n");
+	printf("show       Show current nodes status\n");
+	printf("exit\n\n");
+}
+
+
+typedef void (*cmd_routine_t)(int argc, char **argv);
+
+static void run_up_cmd(int argc, char **argv);
+static void run_down_cmd(int argc, char **argv);
+static void run_join_cmd(int argc, char **argv);
+static void run_move_cmd(int argc, char **argv);
+static void run_exit_cmd(int argc, char **argv);
+static void run_show_cmd(int argc, char **argv);
+static void run_autofence_cmd(int argc, char **argv);
+static void run_qdevice_cmd(int argc, char **argv);
+
+static struct cmd_list_struct {
+	const char *cmd;
+	int min_args;
+	cmd_routine_t cmd_runner;
+} cmd_list[] = {
+	{ "up", 1, run_up_cmd},
+	{ "down", 1, run_down_cmd},
+	{ "move", 2, run_move_cmd},
+	{ "split", 2, run_move_cmd},
+	{ "join", 2, run_join_cmd},
+	{ "autofence", 1, run_autofence_cmd},
+	{ "qdevice", 1, run_qdevice_cmd},
+	{ "show", 0, run_show_cmd},
+	{ "exit", 0, run_exit_cmd},
+	{ "quit", 0, run_exit_cmd},
+	{ "q", 0, run_exit_cmd},
+};
+static int num_cmds = (sizeof(cmd_list)) / sizeof(struct cmd_list_struct);
+#define MAX_ARGS 1024
+
+/* Takes a <partition>:[<node>[,<node>]...] list and return it
+   as a partition and a list of nodes.
+   Returns 0 if successful, -1 if not
+*/
+static int parse_partition_nodelist(char *string, int *partition, int *num_nodes, int **retnodes)
+{
+	int i;
+	int nodecount;
+	int len;
+	int last_comma;
+	char *nodeptr;
+	int  *nodes;
+	char *colonptr = strchr(string, ':');
+
+	if (colonptr) {
+		*colonptr = '\0';
+		nodeptr = colonptr+1;
+		*partition = atoi(string);
+	}
+	else {
+		/* Default to partition 0 */
+		*partition = 0;
+		nodeptr = string;
+	}
+
+	/* Count the number of commas and allocate space for the nodes */
+	nodecount = 0;
+	for (i=0; i<strlen(nodeptr); i++) {
+		if (nodeptr[i] == ',') {
+			nodecount++;
+		}
+	}
+	nodecount++; /* The one between the last comma and the trailing NUL */
+	if (nodecount < 1 || nodecount > MAX_NODES) {
+		return -1;
+	}
+
+	nodes = malloc(sizeof(int) * nodecount);
+	if (!nodes) {
+		return -1;
+	}
+
+	nodecount = 0;
+	last_comma = 0;
+	len = strlen(nodeptr);
+	for (i=0; i<=len; i++) {
+		if (nodeptr[i] == ',' || nodeptr[i] == '\0') {
+
+			nodeptr[i] = '\0';
+			nodes[nodecount++] = atoi(&nodeptr[last_comma]);
+			last_comma = i+1;
+		}
+	}
+
+	*num_nodes = nodecount;
+	*retnodes = nodes;
+
+	return 0;
+}
+
+void parse_input_command(char *rl_cmd)
+{
+	int i;
+	int argc = 0;
+	int valid_cmd = 0;
+	char *argv[MAX_ARGS];
+	int last_arg_start = 0;
+	int last_was_space = 0;
+	int len;
+	char *cmd;
+
+	/* ^D quits */
+	if (rl_cmd == NULL) {
+		run_exit_cmd(0, NULL);
+	}
+
+	cmd = strdup(rl_cmd);
+
+	/* Split cmd up into args
+	 * destroying the original string mwahahahaha
+	 */
+
+	len = strlen(cmd);
+
+	/* Span leading spaces */
+	for (i=0; cmd[i] == ' '; i++)
+		;
+	last_arg_start = i;
+
+	for (; i<=len; i++) {
+		if (cmd[i] == ' ' || cmd[i] == '\0') {
+
+			/* Allow multiple spaces */
+			if (last_was_space) {
+				continue;
+			}
+
+			cmd[i] = '\0';
+			last_was_space = 1;
+
+			argv[argc] = &cmd[last_arg_start];
+			argc++;
+		}
+		else {
+			if (last_was_space) {
+				last_arg_start = i;
+			}
+			last_was_space = 0;
+		}
+	}
+
+	/* Ignore null commands */
+	if (strlen(argv[0]) == 0) {
+		free(cmd);
+	    return;
+	}
+#ifdef HAVE_READLINE_HISTORY_H
+	add_history(rl_cmd);
+#endif
+
+	/* Dispatch command */
+	for (i=0; i<num_cmds; i++) {
+		if (strcasecmp(argv[0], cmd_list[i].cmd) == 0) {
+
+			if (argc < cmd_list[i].min_args) {
+				break;
+			}
+			cmd_list[i].cmd_runner(argc, argv);
+			valid_cmd = 1;
+		}
+	}
+	if (!valid_cmd) {
+		do_usage();
+	}
+	free(cmd);
+}
+
+
+
+static void run_up_cmd(int argc, char **argv)
+{
+	int partition;
+	int num_nodes;
+	int *nodelist;
+	int i,j;
+
+	if (argc <= 1) {
+		return;
+	}
+
+	for (i=1; i<argc; i++) {
+		if (parse_partition_nodelist(argv[i], &partition, &num_nodes, &nodelist) == 0) {
+			for (j=0; j<num_nodes; j++) {
+				cmd_start_new_node(nodelist[j], partition);
+			}
+			free(nodelist);
+		}
+	}
+}
+
+static void run_down_cmd(int argc, char **argv)
+{
+	int nodeid;
+	int i;
+
+	for (i=1; i<argc; i++) {
+		nodeid = atoi(argv[1]);
+		cmd_stop_node(nodeid);
+	}
+}
+
+static void run_join_cmd(int argc, char **argv)
+{
+	int i;
+
+	if (argc < 2) {
+		printf("join needs at least two partition numbers\n");
+		return;
+	}
+
+	for (i=2; i<argc; i++) {
+		cmd_join_partitions(atoi(argv[1]), atoi(argv[i]));
+	}
+	cmd_update_all_partitions(1);
+}
+
+static void run_move_cmd(int argc, char **argv)
+{
+	int i;
+	int partition;
+	int num_nodes;
+	int *nodelist;
+
+	for (i=1; i<argc; i++) {
+		if (parse_partition_nodelist(argv[i], &partition, &num_nodes, &nodelist) == 0) {
+			cmd_move_nodes(partition, num_nodes, nodelist);
+			free(nodelist);
+		}
+	}
+	cmd_update_all_partitions(1);
+}
+
+static void run_autofence_cmd(int argc, char **argv)
+{
+	int onoff = -1;
+
+	if (strcasecmp(argv[1], "on") == 0) {
+		onoff = 1;
+	}
+	if (strcasecmp(argv[1], "off") == 0) {
+		onoff = 0;
+	}
+	if (onoff == -1) {
+		fprintf(stderr, "ERR: autofence value must be 'on' or 'off'\n");
+	}
+	else {
+		cmd_set_autofence(onoff);
+	}
+}
+
+static void run_qdevice_cmd(int argc, char **argv)
+{
+	int i,j;
+	int partition;
+	int num_nodes;
+	int *nodelist;
+	int onoff = -1;
+
+	if (strcasecmp(argv[1], "on") == 0) {
+		onoff = 1;
+	}
+	if (strcasecmp(argv[1], "off") == 0) {
+		onoff = 0;
+	}
+
+	if (onoff == -1) {
+		fprintf(stderr, "ERR: qdevice should be 'on' or 'off'\n");
+		return;
+	}
+
+	for (i=2; i<argc; i++) {
+		if (parse_partition_nodelist(argv[i], &partition, &num_nodes, &nodelist) == 0) {
+			for (j=0; j<num_nodes; j++) {
+				cmd_qdevice_poll(nodelist[j], onoff);
+			}
+			free(nodelist);
+		}
+	}
+	cmd_update_all_partitions(0);
+}
+
+static void run_show_cmd(int argc, char **argv)
+{
+	cmd_show_node_states();
+}
+
+static void run_exit_cmd(int argc, char **argv)
+{
+	cmd_stop_all_nodes();
+	exit(0);
+}
+
+

+ 121 - 0
vqsim/vq_object.c

@@ -0,0 +1,121 @@
+/*
+  This is a Votequorum object in the parent process. it's really just a conduit for the forked
+  votequorum entity
+*/
+
+#include <qb/qblog.h>
+#include <qb/qbloop.h>
+#include <qb/qbipcc.h>
+#include <netinet/in.h>
+
+#include "../exec/votequorum.h"
+#include "vqsim.h"
+
+struct vq_instance
+{
+	int nodeid;
+	int vq_socket;
+	pid_t pid;
+};
+
+vq_object_t vq_create_instance(qb_loop_t *poll_loop, int nodeid)
+{
+	struct vq_instance *instance = malloc(sizeof(struct vq_instance));
+	if (!instance) {
+		return NULL;
+	}
+
+	instance->nodeid = nodeid;
+
+	if (fork_new_instance(nodeid, &instance->vq_socket, &instance->pid)) {
+		free(instance);
+		return NULL;
+	}
+
+	return instance;
+}
+
+pid_t vq_get_pid(vq_object_t instance)
+{
+	struct vq_instance *vqi = instance;
+	return vqi->pid;
+}
+
+void vq_quit(vq_object_t instance)
+{
+	struct vq_instance *vqi = instance;
+	struct vqsim_msg_header msg;
+	int res;
+
+	msg.type = VQMSG_QUIT;
+	msg.from_nodeid = 0;
+	msg.param = 0;
+
+	res = write(vqi->vq_socket, &msg, sizeof(msg));
+	if (res <= 0) {
+		perror("Quit write failed");
+	}
+}
+
+int vq_quit_if_inquorate(vq_object_t instance)
+{
+	struct vq_instance *vqi = instance;
+	struct vqsim_msg_header msg;
+	int res;
+
+	msg.type = VQMSG_QUORUMQUIT;
+	msg.from_nodeid = 0;
+	msg.param = 0;
+
+	res = write(vqi->vq_socket, &msg, sizeof(msg));
+	if (res <= 0) {
+		perror("Quit write failed");
+	}
+	return 0;
+}
+
+int vq_set_nodelist(vq_object_t instance, struct memb_ring_id *ring_id, int *nodeids, int nodeids_entries)
+{
+	struct vq_instance *vqi = instance;
+	char msgbuf[sizeof(int)*nodeids_entries + sizeof(struct vqsim_sync_msg)];
+	struct vqsim_sync_msg *msg = (void*)msgbuf;
+	int res;
+
+	msg->header.type = VQMSG_SYNC;
+	msg->header.from_nodeid = 0;
+	msg->header.param = 0;
+	msg->view_list_entries = nodeids_entries;
+	memcpy(&msg->view_list, nodeids, nodeids_entries*sizeof(int));
+	memcpy(&msg->ring_id, ring_id, sizeof(struct memb_ring_id));
+
+	res = write(vqi->vq_socket, msgbuf, sizeof(msgbuf));
+	if (res <= 0) {
+		perror("Sync write failed");
+		return -1;
+	}
+	return 0;
+}
+
+int vq_set_qdevice(vq_object_t instance, struct memb_ring_id *ring_id, int onoff)
+{
+	struct vq_instance *vqi = instance;
+	struct vqsim_msg_header msg;
+	int res;
+
+	msg.type = VQMSG_QDEVICE;
+	msg.from_nodeid = 0;
+	msg.param = onoff;
+	res = write(vqi->vq_socket, &msg, sizeof(msg));
+	if (res <= 0) {
+		perror("qdevice register write failed");
+		return -1;
+	}
+	return 0;
+}
+
+int vq_get_parent_fd(vq_object_t instance)
+{
+	struct vq_instance *vqi = instance;
+
+	return vqi->vq_socket;
+}

+ 733 - 0
vqsim/vqmain.c

@@ -0,0 +1,733 @@
+#include <config.h>
+
+#include <stdio.h>
+#include <sys/types.h>
+#include <wait.h>
+#include <qb/qblog.h>
+#include <qb/qbloop.h>
+#include <sys/poll.h>
+#include <netinet/in.h>
+#include <sys/queue.h>
+#ifdef HAVE_READLINE_READLINE_H
+#include <readline/readline.h>
+#else
+#include <unistd.h>  /* isatty */
+#endif
+
+#include "../exec/votequorum.h"
+#include "../exec/service.h"
+#include <corosync/logsys.h>
+#include <corosync/coroapi.h>
+
+#include "icmap.h"
+#include "vqsim.h"
+
+/* Easier than including the config file with a ton of conflicting dependencies */
+extern int coroparse_configparse (icmap_map_t config_map, const char **error_string);
+extern int corosync_log_config_read (const char **error_string);
+
+/* One of these per partition */
+struct vq_partition {
+	TAILQ_HEAD(, vq_node) nodelist;
+	struct memb_ring_id ring_id;
+	int num;
+};
+
+/* One of these per node */
+struct vq_node {
+	vq_object_t instance;
+	unsigned int nodeid;
+	int fd;
+	struct vq_partition *partition;
+	TAILQ_ENTRY(vq_node) entries;
+
+	/* Last status */
+	int last_quorate;
+	struct memb_ring_id last_ring_id;
+	int last_view_list[MAX_NODES];
+	int last_view_list_entries;
+};
+
+static struct vq_partition partitions[MAX_PARTITIONS];
+static qb_loop_t *poll_loop;
+static int autofence;
+static int check_for_quorum;
+static FILE *output_file;
+static int nosync;
+static qb_loop_timer_handle kb_timer;
+static ssize_t wait_count;
+static ssize_t wait_count_to_unblock;
+
+static struct vq_node *find_by_pid(pid_t pid);
+static void send_partition_to_nodes(struct vq_partition *partition, int newring);
+static void start_kb_input(void);
+static void start_kb_input_timeout(void *data);
+
+#ifndef HAVE_READLINE_READLINE_H
+#define INPUT_BUF_SIZE 1024
+static char input_buf[INPUT_BUF_SIZE];
+static size_t input_buf_term = 0;
+static int is_tty;
+#endif
+
+/* Tell all non-quorate nodes to quit */
+static void force_fence(void)
+{
+	int i;
+	struct vq_node *vqn;
+
+	for (i=0; i<MAX_PARTITIONS; i++) {
+		TAILQ_FOREACH(vqn, &partitions[i].nodelist, entries) {
+			vq_quit_if_inquorate(vqn->instance);
+		}
+	}
+}
+
+/* Save quorum state from the incoming message */
+static void save_quorum_state(struct vq_node *node, struct vqsim_quorum_msg *qmsg)
+{
+	node->last_quorate = qmsg->quorate;
+	memcpy(&node->last_ring_id, &qmsg->ring_id, sizeof(struct memb_ring_id));
+	memcpy(node->last_view_list, qmsg->view_list, sizeof(int) * qmsg->view_list_entries);
+	node->last_view_list_entries = qmsg->view_list_entries;
+
+	/* If at least one node is quorate and autofence is enabled, then fence everyone who is not quorate */
+	if (check_for_quorum && qmsg->quorate & autofence) {
+		check_for_quorum = 0;
+		force_fence();
+	}
+}
+
+/* Print current node state */
+static void print_quorum_state(struct vq_node *node)
+{
+	int i;
+
+	if (node->last_quorate < 0) {
+		fprintf(output_file, "%d:%02d: q=UNINITIALIZED\n",
+			node->partition->num, node->nodeid);
+		return;
+	}
+
+	fprintf(output_file, "%d:%02d: q=%d ring=[%d/%lld] ", node->partition->num, node->nodeid, node->last_quorate,
+		node->last_ring_id.rep.nodeid, node->last_ring_id.seq);
+	fprintf(output_file, "nodes=[");
+	for (i = 0; i < node->last_view_list_entries; i++) {
+		if (i) {
+			fprintf(output_file, " ");
+		}
+		fprintf(output_file, "%d", node->last_view_list[i]);
+	}
+	fprintf(output_file, "]\n");
+
+}
+
+static void propogate_vq_message(struct vq_node *vqn, const char *msg, int len)
+{
+	struct vq_node *other_vqn;
+
+	/* Send it to everyone in that node's partition (including itself) */
+	TAILQ_FOREACH(other_vqn, &vqn->partition->nodelist, entries) {
+		write(other_vqn->fd, msg, len);
+	}
+}
+
+static int vq_parent_read_fn(int32_t fd, int32_t revents, void *data)
+{
+	char msgbuf[8192];
+	int msglen;
+	struct vqsim_msg_header *msg;
+	struct vqsim_quorum_msg *qmsg;
+	struct vq_node *vqn = data;
+
+	if (revents == POLLIN) {
+		msglen = read(fd, msgbuf, sizeof(msgbuf));
+		if (msglen < 0) {
+			perror("read failed");
+		}
+
+		if (msglen > 0) {
+			msg = (void*)msgbuf;
+			switch (msg->type) {
+			case VQMSG_QUORUM:
+				if (!nosync && --wait_count_to_unblock <= 0)
+					qb_loop_timer_del(poll_loop, kb_timer);
+				qmsg = (void*)msgbuf;
+				save_quorum_state(vqn, qmsg);
+				print_quorum_state(vqn);
+				if (!nosync && wait_count_to_unblock <= 0)
+					start_kb_input();
+				break;
+			case VQMSG_EXEC:
+				/* Message from votequorum, pass around the partition */
+				propogate_vq_message(vqn, msgbuf, msglen);
+				break;
+			case VQMSG_QUIT:
+			case VQMSG_SYNC:
+			case VQMSG_QDEVICE:
+			case VQMSG_QUORUMQUIT:
+				/* not used here */
+				break;
+			}
+		}
+	}
+	if (revents == POLLERR) {
+		fprintf(stderr, "pollerr on %d\n", vqn->nodeid);
+	}
+	return 0;
+}
+
+
+static int read_corosync_conf(void)
+{
+	int res;
+	const char *error_string;
+
+	int err = icmap_init();
+	if (!err) {
+		fprintf(stderr, "icmap_init failed\n");
+	}
+
+	/* Load corosync.conf */
+	logsys_format_set(NULL);
+	res = coroparse_configparse(icmap_get_global_map(), &error_string);
+	if (res == -1) {
+		log_printf (LOGSYS_LEVEL_INFO, "Error loading corosyc.conf %s", error_string);
+		return -1;
+	}
+	else {
+		res = corosync_log_config_read (&error_string);
+		if (res < 0) {
+			log_printf (LOGSYS_LEVEL_INFO, "error reading log config %s", error_string);
+			syslog (LOGSYS_LEVEL_INFO, "error reading log config %s", error_string);
+		}
+		else {
+			logsys_config_apply();
+		}
+	}
+	if (logsys_thread_start() != 0) {
+	        log_printf (LOGSYS_LEVEL_ERROR, "Can't initialize log thread");
+		return -1;
+	}
+	return 0;
+}
+
+static void remove_node(struct vq_node *node)
+{
+	struct vq_partition *part;
+	part = node->partition;
+
+	/* Remove from partition list */
+	TAILQ_REMOVE(&part->nodelist, node, entries);
+	free(node);
+
+	wait_count--;
+
+	/* Rebuild quorum */
+	send_partition_to_nodes(part, 1);
+}
+
+static int32_t sigchld_handler(int32_t sig, void *data)
+{
+	pid_t pid;
+	int status;
+	struct vq_node *vqn;
+	const char *exit_status="";
+	char text[132];
+
+	pid = wait(&status);
+	if (WIFEXITED(status)) {
+		vqn = find_by_pid(pid);
+		if (vqn) {
+			switch (WEXITSTATUS(status)) {
+			case 0:
+				exit_status = "(on request)";
+				break;
+			case 1:
+				exit_status = "(autofenced)";
+				break;
+			default:
+				sprintf(text, "(exit code %d)", WEXITSTATUS(status));
+				break;
+			}
+			printf("%d:%02d Quit %s\n", vqn->partition->num, vqn->nodeid, exit_status);
+
+			remove_node(vqn);
+		}
+		else {
+			fprintf(stderr, "Unknown child %d exited with status %d\n", pid, WEXITSTATUS(status));
+		}
+	}
+	if (WIFSIGNALED(status)) {
+		vqn = find_by_pid(pid);
+		if (vqn) {
+			printf("%d:%02d exited on signal %d%s\n", vqn->partition->num, vqn->nodeid, WTERMSIG(status), WCOREDUMP(status)?" (core dumped)":"");
+			remove_node(vqn);
+		}
+		else {
+			fprintf(stderr, "Unknown child %d exited with status %d%s\n", pid, WTERMSIG(status), WCOREDUMP(status)?" (core dumped)":"");
+		}
+	}
+	return 0;
+}
+
+static void send_partition_to_nodes(struct vq_partition *partition, int newring)
+{
+	struct vq_node *vqn;
+	int nodelist[MAX_NODES];
+	int nodes = 0;
+	int first = 1;
+
+	if (newring) {
+		/* Simulate corosync incrementing the seq by 4 for added authenticity */
+		partition->ring_id.seq += 4;
+	}
+
+	/* Build the node list */
+	TAILQ_FOREACH(vqn, &partition->nodelist, entries) {
+		nodelist[nodes++] = vqn->nodeid;
+		if (first) {
+			partition->ring_id.rep.nodeid = vqn->nodeid;
+			first = 0;
+		}
+	}
+
+	TAILQ_FOREACH(vqn, &partition->nodelist, entries) {
+		vq_set_nodelist(vqn->instance, &partition->ring_id, nodelist, nodes);
+	}
+}
+
+static void init_partitions(void)
+{
+	int i;
+
+	for (i=0; i<MAX_PARTITIONS; i++) {
+		TAILQ_INIT(&partitions[i].nodelist);
+		partitions[i].ring_id.rep.nodeid = 1000+i;
+		partitions[i].ring_id.seq = 0;
+		partitions[i].num = i;
+	}
+}
+
+static pid_t create_node(int nodeid, int partno)
+{
+	struct vq_node *newvq;
+
+	newvq = malloc(sizeof(struct vq_node));
+	if (newvq) {
+		if (!nosync) {
+			/* Number of expected "quorum" vq messages is a square
+			   of the total nodes count, so increment the node
+			   counter and set new square of this value as
+			   a "to observe" counter */
+			wait_count++;
+			wait_count_to_unblock = wait_count * wait_count;
+		}
+		newvq->last_quorate = -1;  /* mark "uninitialized" */
+		newvq->instance = vq_create_instance(poll_loop, nodeid);
+		if (!newvq->instance) {
+			fprintf(stderr,
+			        "ERR: could not create vq instance nodeid %d\n",
+				nodeid);
+			return (pid_t) -1;
+		}
+		newvq->partition = &partitions[partno];
+		newvq->nodeid = nodeid;
+		newvq->fd = vq_get_parent_fd(newvq->instance);
+		TAILQ_INSERT_TAIL(&partitions[partno].nodelist, newvq, entries);
+
+		if (qb_loop_poll_add(poll_loop,
+				     QB_LOOP_MED,
+				     newvq->fd,
+				     POLLIN | POLLERR,
+				     newvq,
+				     vq_parent_read_fn)) {
+			perror("qb_loop_poll_add returned error");
+			return (pid_t) -1;
+		}
+
+		/* Send sync with all the nodes so far in it. */
+		send_partition_to_nodes(&partitions[partno], 1);
+		return vq_get_pid(newvq->instance);
+	}
+	return (pid_t) -1;
+}
+
+static size_t create_nodes_from_config(void)
+{
+	icmap_iter_t iter;
+	char tmp_key[ICMAP_KEYNAME_MAXLEN];
+	uint32_t node_pos;
+	uint32_t nodeid;
+	const char *iter_key;
+	int res;
+	pid_t pid;
+	size_t ret = 0;
+
+	init_partitions();
+
+	iter = icmap_iter_init("nodelist.node.");
+	while ((iter_key = icmap_iter_next(iter, NULL, NULL)) != NULL) {
+		res = sscanf(iter_key, "nodelist.node.%u.%s", &node_pos, tmp_key);
+		if (res != 2) {
+			continue;
+		}
+
+		if (strcmp(tmp_key, "ring0_addr") != 0) {
+			continue;
+		}
+
+		snprintf(tmp_key, ICMAP_KEYNAME_MAXLEN, "nodelist.node.%u.nodeid", node_pos);
+		if (icmap_get_uint32(tmp_key, &nodeid) == CS_OK) {
+			pid = create_node(nodeid, 0);
+			if (pid == (pid_t) -1) {
+				fprintf(stderr,
+					"ERR: nodeid %d could not be spawned\n",
+					nodeid);
+				exit(1);
+			}
+			ret++;
+		}
+
+	}
+	icmap_iter_finalize(iter);
+
+	return ret;
+}
+
+static struct vq_node *find_node(int nodeid)
+{
+	int i;
+	struct vq_node *vqn;
+
+	for (i=0; i<MAX_PARTITIONS; i++) {
+		TAILQ_FOREACH(vqn, &partitions[i].nodelist, entries) {
+			if (vqn->nodeid == nodeid) {
+				return vqn;
+			}
+		}
+	}
+	return NULL;
+}
+
+static struct vq_node *find_by_pid(pid_t pid)
+{
+	int i;
+	struct vq_node *vqn;
+
+	for (i=0; i<MAX_PARTITIONS; i++) {
+		TAILQ_FOREACH(vqn, &partitions[i].nodelist, entries) {
+			if (vq_get_pid(vqn->instance) == pid) {
+				return vqn;
+			}
+		}
+	}
+	return NULL;
+}
+
+/* Routines called from the parser */
+void cmd_start_new_node(int nodeid, int partition)
+{
+	struct vq_node *node;
+
+	node = find_node(nodeid);
+	if (node) {
+		fprintf(stderr, "ERR: nodeid %d already exists in partition %d\n", nodeid, node->partition->num);
+		return;
+	}
+	qb_loop_poll_del(poll_loop, STDIN_FILENO);
+	create_node(nodeid, partition);
+	if (!nosync) {
+		/* Delay kb input handling by 0.25 second when we've just
+		   added a node; expect that the delay will be cancelled
+		   substantially earlier once it has reported its quorum info
+		   (the delay is in fact a failsafe input enabler here) */
+		qb_loop_timer_add(poll_loop,
+				  QB_LOOP_MED,
+				  250000000,
+				  NULL,
+				  start_kb_input_timeout,
+				  &kb_timer);
+	}
+}
+
+void cmd_stop_all_nodes()
+{
+	int i;
+	struct vq_node *vqn;
+
+	for (i=0; i<MAX_PARTITIONS; i++) {
+		TAILQ_FOREACH(vqn, &partitions[i].nodelist, entries) {
+			vq_quit(vqn->instance);
+		}
+	}
+}
+
+void cmd_show_node_states()
+{
+	int i;
+	struct vq_node *vqn;
+
+	for (i=0; i<MAX_PARTITIONS; i++) {
+		TAILQ_FOREACH(vqn, &partitions[i].nodelist, entries) {
+			print_quorum_state(vqn);
+		}
+	}
+	fprintf(output_file, "#autofence: %s\n", autofence?"on":"off");
+}
+
+void cmd_stop_node(int nodeid)
+{
+	struct vq_node *node;
+
+	node = find_node(nodeid);
+	if (!node) {
+		fprintf(stderr, "ERR: nodeid %d is not up\n", nodeid);
+		return;
+	}
+
+	/* Remove processor */
+	vq_quit(node->instance);
+
+	/* Node will be removed when the child process exits */
+}
+
+/* Move all nodes in 'nodelist' into partition 'partition' */
+void cmd_move_nodes(int partition, int num_nodes, int *nodelist)
+{
+	int i;
+	struct vq_node *node;
+
+	for (i=0; i<num_nodes; i++) {
+		node = find_node(nodelist[i]);
+		if (node) {
+
+			/* Remove it from the current partition */
+			TAILQ_REMOVE(&node->partition->nodelist, node, entries);
+
+			/* Add it to the new partition */
+			TAILQ_INSERT_TAIL(&partitions[partition].nodelist, node, entries);
+			node->partition = &partitions[partition];
+		}
+		else {
+			printf("ERR: node %d does not exist\n", nodelist[i]);
+		}
+	}
+}
+
+/* Take all the nodes in part2 and join them to part1 */
+void cmd_join_partitions(int part1, int part2)
+{
+	struct vq_node *vqn;
+
+	/* TAILQ_FOREACH is not delete safe *sigh* */
+retry:
+	TAILQ_FOREACH(vqn, &partitions[part2].nodelist, entries) {
+
+		TAILQ_REMOVE(&vqn->partition->nodelist, vqn, entries);
+		TAILQ_INSERT_TAIL(&partitions[part1].nodelist, vqn, entries);
+		vqn->partition = &partitions[part1];
+
+		goto retry;
+	}
+}
+
+void cmd_set_autofence(int onoff)
+{
+	autofence = onoff;
+	fprintf(output_file, "#autofence: %s\n", onoff?"on":"off");
+}
+
+void cmd_update_all_partitions(int newring)
+{
+	int i;
+
+	check_for_quorum = 1;
+	for (i=0; i<MAX_PARTITIONS; i++) {
+		send_partition_to_nodes(&partitions[i], newring);
+	}
+}
+
+void cmd_qdevice_poll(int nodeid, int onoff)
+{
+	struct vq_node *node;
+
+	node = find_node(nodeid);
+	if (node) {
+		vq_set_qdevice(node->instance, &node->partition->ring_id, onoff);
+	}
+}
+
+/* ---------------------------------- */
+
+#ifndef HAVE_READLINE_READLINE_H
+static void dummy_read_char(void);
+
+static void dummy_read_char()
+{
+	int c, flush = 0;
+
+	while (!flush) {
+		c = getchar();
+		if (++input_buf_term >= INPUT_BUF_SIZE) {
+			if (c != '\n' && c != EOF)
+				fprintf(stderr, "User input overflows the limit: %zu\n",
+						(size_t) INPUT_BUF_SIZE);
+			input_buf[INPUT_BUF_SIZE - 1] = '\0';
+			flush = 1;
+		} else if (c == '\n' || c == EOF) {
+			input_buf[input_buf_term - 1] = '\0';
+			flush = 1;
+		} else {
+			input_buf[input_buf_term - 1] = c;
+		}
+	}
+
+	parse_input_command((c == EOF) ? NULL : input_buf);
+	input_buf_term = 0;
+
+	if (is_tty) {
+		printf("vqsim> ");
+		fflush(stdout);
+	}
+}
+#endif
+
+static int stdin_read_fn(int32_t fd, int32_t revents, void *data)
+{
+#ifdef HAVE_READLINE_READLINE_H
+	/* Send it to readline */
+	rl_callback_read_char();
+#else
+	dummy_read_char();
+#endif
+	return 0;
+}
+
+static void start_kb_input(void)
+{
+	wait_count_to_unblock = 0;
+
+#ifdef HAVE_READLINE_READLINE_H
+	/* Readline will deal with completed lines when they arrive */
+	rl_callback_handler_install("vqsim> ", parse_input_command);
+#else
+	if (is_tty) {
+		printf("vqsim> ");
+		fflush(stdout);
+	}
+#endif
+
+	/* Send stdin to readline */
+	if (qb_loop_poll_add(poll_loop,
+			     QB_LOOP_MED,
+			     STDIN_FILENO,
+			     POLLIN | POLLERR,
+			     NULL,
+			     stdin_read_fn)) {
+		if (errno != EEXIST) {
+			perror("qb_loop_poll_add1 returned error");
+		}
+	}
+}
+
+static void start_kb_input_timeout(void *data)
+{
+//	fprintf(stderr, "Waiting for nodes to report status timed out\n");
+	start_kb_input();
+}
+
+static void usage(char *program)
+{
+	printf("Usage:\n");
+	printf("\n");
+	printf("%s [-f <config-file>] [-o <output-file>]\n", program);
+	printf("\n");
+	printf("    -f     config file. defaults to /etc/corosync/corosync.conf\n");
+	printf("    -o     output file. defaults to stdout\n");
+	printf("    -n     no synchronization (on adding a node)\n");
+	printf("    -h     display this help text\n");
+	printf("\n");
+}
+
+int main(int argc, char **argv)
+{
+	qb_loop_signal_handle sigchld_qb_handle;
+	int ch;
+	char *config_file_name = NULL;
+	char *output_file_name = NULL;
+	char envstring[PATH_MAX];
+
+	while ((ch = getopt (argc, argv, "f:o:nh")) != EOF) {
+		switch (ch) {
+		case 'f':
+			config_file_name = optarg;
+			break;
+		case 'o':
+			output_file_name = optarg;
+			break;
+		case 'n':
+			nosync = 1;
+			break;
+		default:
+			usage(argv[0]);
+			exit(0);
+		}
+	}
+
+	if (config_file_name) {
+		sprintf(envstring, "COROSYNC_MAIN_CONFIG_FILE=%s", config_file_name);
+		putenv(envstring);
+	}
+	if (output_file_name) {
+		output_file = fopen(output_file_name, "w");
+		if (!output_file) {
+			fprintf(stderr, "Unable to open %s for output: %s\n", output_file_name, strerror(errno));
+			exit(-1);
+		}
+	}
+	else {
+		output_file = stdout;
+	}
+#ifndef HAVE_READLINE_READLINE_H
+	is_tty = isatty(STDIN_FILENO);
+#endif
+
+	qb_log_filter_ctl(QB_LOG_SYSLOG, QB_LOG_FILTER_ADD,
+			  QB_LOG_FILTER_FUNCTION, "*", LOG_DEBUG);
+
+	qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
+	qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
+			  QB_LOG_FILTER_FUNCTION, "*", LOG_DEBUG);
+
+	poll_loop = qb_loop_create();
+
+	/* SIGCHLD handler to reap sub-processes and reconfigure the cluster */
+	qb_loop_signal_add(poll_loop,
+			   QB_LOOP_MED,
+			   SIGCHLD,
+			   NULL,
+			   sigchld_handler,
+			   &sigchld_qb_handle);
+
+	/* Create a full cluster of nodes from corosync.conf */
+	read_corosync_conf();
+	if (create_nodes_from_config() && !nosync) {
+		/* Delay kb input handling by 1 second when we've just
+		   added the nodes from corosync.conf; expect that
+		   the delay will be cancelled substantially earlier
+		   once they all have reported their quorum info
+		   (the delay is in fact a failsafe input enabler here) */
+		qb_loop_timer_add(poll_loop,
+				  QB_LOOP_MED,
+				  1000000000,
+				  NULL,
+				  start_kb_input_timeout,
+				  &kb_timer);
+	} else {
+		start_kb_input();
+	}
+
+	qb_loop_run(poll_loop);
+	return 0;
+}

+ 77 - 0
vqsim/vqsim.h

@@ -0,0 +1,77 @@
+
+typedef enum {VQMSG_QUIT=1,
+	      VQMSG_SYNC,    /* set nodelist */
+	      VQMSG_QUORUM,  /* quorum state of this 'node' */
+	      VQMSG_EXEC,    /* message for exec_handler */
+	      VQMSG_QDEVICE, /* quorum device enable/disable */
+	      VQMSG_QUORUMQUIT, /* quit if you don't have quorum */
+} vqsim_msg_type_t;
+
+typedef struct vq_instance *vq_object_t;
+
+struct vqsim_msg_header
+{
+	vqsim_msg_type_t type;
+	int from_nodeid;
+	int param;
+};
+
+/* This is the sync sent from the controller process */
+struct vqsim_sync_msg
+{
+	struct vqsim_msg_header header;
+	struct memb_ring_id ring_id;
+	size_t view_list_entries;
+	unsigned int view_list[];
+};
+
+/* This is just info sent from each VQ instance */
+struct vqsim_quorum_msg
+{
+	struct vqsim_msg_header header;
+	int quorate;
+	struct memb_ring_id ring_id;
+	size_t view_list_entries;
+	unsigned int view_list[];
+};
+
+struct vqsim_exec_msg
+{
+	struct vqsim_msg_header header;
+	char execmsg[];
+};
+
+struct vqsim_lib_msg
+{
+	struct vqsim_msg_header header;
+	char libmsg[];
+};
+
+#define MAX_NODES 1024
+#define MAX_PARTITIONS 16
+
+/* In vq_object.c */
+vq_object_t vq_create_instance(qb_loop_t *poll_loop, int nodeid);
+void vq_quit(vq_object_t instance);
+int vq_set_nodelist(vq_object_t instance, struct memb_ring_id *ring_id, int *nodeids, int nodeids_entries);
+int vq_get_parent_fd(vq_object_t instance);
+int vq_set_qdevice(vq_object_t instance, struct memb_ring_id *ring_id, int onoff);
+int vq_quit_if_inquorate(vq_object_t instance);
+pid_t vq_get_pid(vq_object_t instance);
+
+/* in vqsim_vq_engine.c - effectively the constructor */
+int fork_new_instance(int nodeid, int *vq_sock, pid_t *child_pid);
+
+/* In parser.c */
+void parse_input_command(char *cmd);
+
+/* These are in vqmain.c */
+void cmd_stop_node(int nodeid);
+void cmd_stop_all_nodes(void);
+void cmd_start_new_node(int nodeid, int partition);
+void cmd_set_autofence(int onoff);
+void cmd_move_nodes(int partition, int num_nodes, int *nodelist);
+void cmd_join_partitions(int part1, int part2);
+void cmd_update_all_partitions(int newring);
+void cmd_qdevice_poll(int nodeid, int onoff);
+void cmd_show_node_states(void);

+ 433 - 0
vqsim/vqsim_vq_engine.c

@@ -0,0 +1,433 @@
+
+/* This is the bit of VQSIM that runs in the forked process.
+   It represents a single votequorum instance or, if you like,
+   a 'node' in the cluster.
+*/
+
+#include <sys/types.h>
+#include <qb/qblog.h>
+#include <qb/qbloop.h>
+#include <qb/qbipc_common.h>
+#include <netinet/in.h>
+#include <sys/poll.h>
+#include <sys/socket.h>
+#include <stdio.h>
+
+#include "../exec/votequorum.h"
+#include "../exec/service.h"
+#include "../include/corosync/corotypes.h"
+#include "../include/corosync/votequorum.h"
+#include "../include/corosync/ipc_votequorum.h"
+#include <corosync/logsys.h>
+#include <corosync/coroapi.h>
+
+#include "icmap.h"
+#include "vqsim.h"
+
+#define QDEVICE_NAME "VQsim_qdevice"
+
+/* Static variables here are per-instance because we are forked */
+static struct corosync_service_engine *engine;
+static int parent_socket; /* Our end of the socket */
+static char buffer[8192];
+static int our_nodeid;
+static char *private_data;
+static qb_loop_t *poll_loop;
+static qb_loop_timer_handle sync_timer;
+static qb_loop_timer_handle qdevice_timer;
+static int we_are_quorate;
+static void *fake_conn = (void*)1;
+static cs_error_t last_lib_error;
+static struct memb_ring_id current_ring_id;
+static int qdevice_registered;
+static unsigned int qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT;
+
+/* 'Keep the compiler happy' time */
+char *get_run_dir(void);
+int api_timer_add_duration (
+        unsigned long long nanosec_duration,
+        void *data,
+        void (*timer_fn) (void *data),
+        corosync_timer_handle_t *handle);
+
+static void api_error_memory_failure(void) __attribute__((noreturn));
+static void api_error_memory_failure()
+{
+	fprintf(stderr, "Out of memory error\n");
+	exit(-1);
+}
+static void api_timer_delete(corosync_timer_handle_t th)
+{
+	qb_loop_timer_del(poll_loop, th);
+}
+
+int api_timer_add_duration (
+        unsigned long long nanosec_duration,
+        void *data,
+        void (*timer_fn) (void *data),
+        corosync_timer_handle_t *handle)
+{
+        return qb_loop_timer_add(poll_loop,
+				 QB_LOOP_MED,
+                                 nanosec_duration,
+                                 data,
+                                 timer_fn,
+                                 handle);
+}
+
+static unsigned int api_totem_nodeid_get(void)
+{
+	return our_nodeid;
+}
+
+static int api_totem_mcast(const struct iovec *iov, unsigned int iovlen, unsigned int type)
+{
+	struct vqsim_msg_header header;
+	struct iovec iovec[iovlen+1];
+	int total = sizeof(header);
+	int res;
+	int i;
+
+	header.type = VQMSG_EXEC;
+	header.from_nodeid = our_nodeid;
+	header.param = 0;
+
+	iovec[0].iov_base = &header;
+	iovec[0].iov_len = sizeof(header);
+	for (i=0; i<iovlen; i++) {
+		iovec[i+1].iov_base = iov[i].iov_base;
+		iovec[i+1].iov_len = iov[i].iov_len;
+		total += iov[i].iov_len;
+	}
+
+	res = writev(parent_socket, iovec, iovlen+1);
+	if (res != total) {
+		fprintf(stderr, "writev wrote only %d of %d bytes\n", res, total);
+	}
+	return 0;
+}
+static void *api_ipc_private_data_get(void *conn)
+{
+	return private_data;
+}
+static int api_ipc_response_send(void *conn, const void *msg, size_t len)
+{
+	struct qb_ipc_response_header *qb_header = (void*)msg;
+
+	/* Save the error so we can return it */
+	last_lib_error = qb_header->error;
+	return 0;
+}
+
+static struct corosync_api_v1 corosync_api = {
+	.error_memory_failure = api_error_memory_failure,
+	.timer_delete = api_timer_delete,
+	.timer_add_duration = api_timer_add_duration,
+	.totem_nodeid_get = api_totem_nodeid_get,
+	.totem_mcast = api_totem_mcast,
+	.ipc_private_data_get = api_ipc_private_data_get,
+	.ipc_response_send = api_ipc_response_send,
+};
+
+/* -------------------- Above is all for providing the corosync_api support routines --------------------------------------------*/
+/* They need to be in the same file as the engine as they use the local 'poll_loop' variable which is per-process */
+
+static void start_qdevice_poll(int longwait);
+static void start_sync_timer(void);
+
+/* Callback from Votequorum to tell us about the quorum state */
+static void quorum_fn(const unsigned int *view_list,
+		      size_t view_list_entries,
+		      int quorate, struct memb_ring_id *ring_id)
+{
+	char msgbuf[8192];
+	int len;
+	struct vqsim_quorum_msg *quorum_msg = (void*) msgbuf;
+
+	we_are_quorate = quorate;
+
+	/* Send back to parent */
+	quorum_msg->header.type = VQMSG_QUORUM;
+	quorum_msg->header.from_nodeid = our_nodeid;
+	quorum_msg->header.param = 0;
+	quorum_msg->quorate = quorate;
+	memcpy(&quorum_msg->ring_id, ring_id, sizeof(*ring_id));
+	quorum_msg->view_list_entries = view_list_entries;
+
+	memcpy(quorum_msg->view_list, view_list, sizeof(unsigned int)*view_list_entries);
+
+	if ( (len=write(parent_socket, msgbuf, sizeof(*quorum_msg) + sizeof(unsigned int)*view_list_entries)) <= 0) {
+		perror("write (view list to parent) failed");
+	}
+	memcpy(&current_ring_id, ring_id, sizeof(*ring_id));
+}
+
+char *corosync_service_link_and_init(struct corosync_api_v1 *api,
+				     struct default_service *service_engine)
+{
+	/* dummy */
+	return NULL;
+}
+
+/* For votequorum */
+char *get_run_dir()
+{
+	static char cwd_buffer[PATH_MAX];
+
+	return getcwd(cwd_buffer, PATH_MAX);
+}
+
+static int load_quorum_instance(struct corosync_api_v1 *api)
+{
+	const char *error_string;
+	int res;
+
+	error_string = votequorum_init(api, quorum_fn);
+	if (error_string) {
+		fprintf(stderr, "Votequorum init failed: %s\n", error_string);
+		return -1;
+	}
+
+	engine = votequorum_get_service_engine_ver0();
+	error_string = engine->exec_init_fn(api);
+	if (error_string) {
+		fprintf(stderr, "votequorum exec init failed: %s\n", error_string);
+		return -1;
+	}
+
+	private_data = malloc(engine->private_data_size);
+	if (!private_data) {
+		perror("Malloc in child failed");
+		return -1;
+	}
+
+	res = engine->lib_init_fn(fake_conn);
+
+	return res;
+}
+
+static void sync_dispatch_fn(void *data)
+{
+	if (engine->sync_process()) {
+		start_sync_timer();
+	}
+	else {
+		engine->sync_activate();
+	}
+}
+
+static void start_sync_timer()
+{
+	qb_loop_timer_add(poll_loop,
+			  QB_LOOP_MED,
+			  10000000,
+			  NULL,
+			  sync_dispatch_fn,
+			  &sync_timer);
+}
+
+static void send_sync(char *buf, int len)
+{
+	struct vqsim_sync_msg *msg = (void*)buf;
+
+	/* Votequorum doesn't use the transitional node list :-) */
+	engine->sync_init(NULL, 0,
+			  msg->view_list, msg->view_list_entries,
+			  &msg->ring_id);
+
+	start_sync_timer();
+}
+
+static void send_exec_msg(char *buf, int len)
+{
+	struct vqsim_exec_msg *execmsg = (void*)buf;
+	struct qb_ipc_request_header *qb_header = (void*)execmsg->execmsg;
+
+	engine->exec_engine[qb_header->id & 0xFFFF].exec_handler_fn(execmsg->execmsg, execmsg->header.from_nodeid);
+}
+
+static int send_lib_msg(int type, void *msg)
+{
+	/* Clear this as not all lib functions return a response immediately */
+	last_lib_error = CS_OK;
+
+	engine->lib_engine[type].lib_handler_fn(fake_conn, msg);
+
+	return last_lib_error;
+}
+
+static int poll_qdevice(int onoff)
+{
+	struct req_lib_votequorum_qdevice_poll pollmsg;
+	int res;
+
+	pollmsg.cast_vote = onoff;
+	pollmsg.ring_id.nodeid = current_ring_id.rep.nodeid;
+	pollmsg.ring_id.seq = current_ring_id.seq;
+	strcpy(pollmsg.name, QDEVICE_NAME);
+
+	res = send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_POLL, &pollmsg);
+	if (res != CS_OK) {
+		fprintf(stderr, "%d: qdevice poll failed: %d\n", our_nodeid, res);
+	}
+	return res;
+}
+
+static void qdevice_dispatch_fn(void *data)
+{
+	if (poll_qdevice(1) == CS_OK) {
+		start_qdevice_poll(0);
+	}
+}
+
+static void start_qdevice_poll(int longwait)
+{
+	unsigned long long timeout;
+
+	timeout = (unsigned long long)qdevice_timeout*500000; /* Half the corosync timeout */
+	if (longwait) {
+		timeout *= 2;
+	}
+
+	qb_loop_timer_add(poll_loop,
+			  QB_LOOP_MED,
+			  timeout,
+			  NULL,
+			  qdevice_dispatch_fn,
+			  &qdevice_timer);
+}
+
+static void stop_qdevice_poll(void)
+{
+	qb_loop_timer_del(poll_loop, qdevice_timer);
+	qdevice_timer = 0;
+}
+
+static void do_qdevice(int onoff)
+{
+	int res;
+
+	if (onoff) {
+		if (!qdevice_registered) {
+			struct req_lib_votequorum_qdevice_register regmsg;
+
+			strcpy(regmsg.name, QDEVICE_NAME);
+			if ( (res=send_lib_msg(MESSAGE_REQ_VOTEQUORUM_QDEVICE_REGISTER, &regmsg)) == CS_OK) {
+				qdevice_registered = 1;
+				start_qdevice_poll(1);
+			}
+			else {
+				fprintf(stderr, "%d: qdevice registration failed: %d\n", our_nodeid, res);
+			}
+		}
+		else {
+			if (!qdevice_timer) {
+				start_qdevice_poll(0);
+			}
+		}
+	}
+	else {
+		poll_qdevice(0);
+		stop_qdevice_poll();
+	}
+}
+
+
+/* From controller */
+static int parent_pipe_read_fn(int32_t fd, int32_t revents, void *data)
+{
+	struct vqsim_msg_header *header = (void*)buffer;
+	int len;
+
+	len = read(fd, buffer, sizeof(buffer));
+	if (len > 0) {
+		/* Check header and route */
+		switch (header->type) {
+		case VQMSG_QUIT:
+			exit(0);
+			break;
+		case VQMSG_EXEC: /* For votequorum exec messages */
+			send_exec_msg(buffer, len);
+			break;
+		case VQMSG_SYNC:
+			send_sync(buffer, len);
+			break;
+		case VQMSG_QDEVICE:
+			do_qdevice(header->param);
+			break;
+		case VQMSG_QUORUMQUIT:
+			if (!we_are_quorate) {
+				exit(1);
+			}
+			break;
+		case VQMSG_QUORUM:
+			/* not used here */
+			break;
+		}
+	}
+	return 0;
+}
+
+static void initial_sync(int nodeid)
+{
+	unsigned int trans_list[1] = {nodeid};
+	unsigned int member_list[1] = {nodeid};
+	struct memb_ring_id ring_id;
+
+	ring_id.rep.nodeid = our_nodeid;
+	ring_id.seq = 1;
+
+	/* cluster with just us in it */
+	engine->sync_init(trans_list, 1,
+			  member_list, 1,
+			  &ring_id);
+	start_sync_timer();
+}
+
+/* Return pipe FDs & child PID if sucessful */
+int fork_new_instance(int nodeid, int *vq_sock, pid_t *childpid)
+{
+	int pipes[2];
+	pid_t pid;
+
+	if (socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_NONBLOCK, 0, pipes)) {
+		return -1;
+	}
+	parent_socket = pipes[0];
+
+	switch ( (pid=fork()) ) {
+	case -1:
+		perror("fork failed");
+		return -1;
+	case 0:
+		/* child process - continue below */
+		break;
+	default:
+		/* parent process */
+		*vq_sock = pipes[1];
+		*childpid = pid;
+		return 0;
+	}
+
+	our_nodeid = nodeid;
+	poll_loop = qb_loop_create();
+
+	if (icmap_get_uint32("quorum.device.timeout", &qdevice_timeout) != CS_OK) {
+		qdevice_timeout = VOTEQUORUM_QDEVICE_DEFAULT_TIMEOUT;
+	}
+
+	load_quorum_instance(&corosync_api);
+
+	qb_loop_poll_add(poll_loop,
+			 QB_LOOP_MED,
+			 parent_socket,
+			 POLLIN,
+			 NULL,
+			 parent_pipe_read_fn);
+
+	/* Start it up! */
+	initial_sync(nodeid);
+	qb_loop_run(poll_loop);
+
+	return 0;
+}