Explorar el Código

Add a test harness to corosync that uses CTS from pacemaker.

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@2668 fd59a12c-fef9-0310-b244-a6a79926bd2f
Angus Salkeld hace 16 años
padre
commit
824fd8e82a
Se han modificado 13 ficheros con 2135 adiciones y 3 borrados
  1. 1 1
      Makefile.am
  2. 13 0
      configure.ac
  3. 3 0
      cts/CTSvars.py.in
  4. 34 0
      cts/Makefile.am
  5. 80 0
      cts/README
  6. 51 0
      cts/agents/Makefile.am
  7. 616 0
      cts/agents/cpg_test_agent.c
  8. 114 0
      cts/agents/mem_leak_test.sh
  9. 19 0
      cts/agents/net_breaker.sh
  10. 306 0
      cts/corolab.py
  11. 510 0
      cts/corosync.py
  12. 386 0
      cts/corotests.py
  13. 2 2
      exec/main.c

+ 1 - 1
Makefile.am

@@ -56,7 +56,7 @@ corolenstestdir		= ${corolensdir}/tests
 corolenstest_DATA	= conf/lenses/tests/test_corosync.aug
 endif
 
-SUBDIRS			= include lcr lib exec services tools test pkgconfig \
+SUBDIRS			= include lcr lib exec services tools test cts pkgconfig \
 			  man init
 
 install-exec-local:

+ 13 - 0
configure.ac

@@ -128,6 +128,9 @@ AC_CONFIG_FILES([Makefile
 		 pkgconfig/Makefile
 		 services/Makefile
 		 test/Makefile
+		 cts/Makefile
+		 cts/agents/Makefile
+		 cts/CTSvars.py
 		 tools/Makefile])
 
 ### Local business
@@ -199,6 +202,10 @@ AC_ARG_ENABLE([nss],
 	[  --enable-nss            : Network Security Services encryption. ],,
 	[ enable_nss="yes" ])
 
+AC_ARG_ENABLE([testagents],
+	[  --enable-testagents            : Install Test Agents. ],,
+	[ default="no" ])
+
 AC_ARG_ENABLE([rdma],
 	[  --enable-rdma           : Infiniband RDMA transport support ],,
 	[ enable_rdma="no" ])
@@ -327,6 +334,11 @@ if test "x${enable_nss}" = xyes; then
 	PACKAGE_FEATURES="$PACKAGE_FEATURES nss"
 fi
 
+if test "x${enable_testagents}" = xyes; then
+	AC_DEFINE_UNQUOTED([HAVE_TESTAGENTS], 1, [have testagents])
+	PACKAGE_FEATURES="$PACKAGE_FEATURES testagents"
+fi
+
 if test "x${enable_rdma}" = xyes; then
 	PKG_CHECK_MODULES([rdmacm],[rdmacm])
 	PKG_CHECK_MODULES([ibverbs],[ibverbs])
@@ -425,6 +437,7 @@ AC_SUBST([CPG_SONAME])
 AC_SUBST([OS_DYFLAGS])
 
 AC_SUBST([OS_LDL])
+AM_CONDITIONAL(INSTALL_TESTAGENTS, test -n "${enable_testagents}")
 
 AC_SUBST([NSS_LDFLAGS])
 

+ 3 - 0
cts/CTSvars.py.in

@@ -0,0 +1,3 @@
+class CTSvars:
+    CTS_home="@prefix@/share/pacemaker/tests/cts"
+    INITDIR="/etc/init.d"

+ 34 - 0
cts/Makefile.am

@@ -0,0 +1,34 @@
+# Copyright (c) 2010 Red Hat, Inc.
+#
+# Authors: Angus Salkeld (asalkeld@redhat.com)
+#
+# This software licensed under BSD license, the text of which follows:
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# - Redistributions of source code must retain the above copyright notice,
+#   this list of conditions and the following disclaimer.
+# - Redistributions in binary form must reproduce the above copyright notice,
+#   this list of conditions and the following disclaimer in the documentation
+#   and/or other materials provided with the distribution.
+# - Neither the name of the MontaVista Software, Inc. nor the names of its
+#   contributors may be used to endorse or promote products derived from this
+#   software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+# THE POSSIBILITY OF SUCH DAMAGE.
+
+MAINTAINERCLEANFILES    = Makefile.in
+
+SUBDIRS = agents
+

+ 80 - 0
cts/README

@@ -0,0 +1,80 @@
+Quick start guide.
+==================
+
+CTS: Cluster Test System
+The CTS uses a test driver node(TDN) to drive the execution of the test
+software.  The CTS also uses 2 or more test target nodes(TTN) to run the test
+cases.  The CTS software requires atleast 3 nodes 1 of which acts as a TDN and
+the remaining acting as TTNs.
+
+The dependencies of the TDN include Pacemaker and syslog-ng.
+The dependencies of the TTN include syslog-ng.
+
+On the single TDN install syslog-ng.
+On the single TDN, it is recommended at this time to install pacemaker from
+source so that the CTS from Pacemaker remains compatible with the current
+working version of the CTS components within Corosync.
+
+On the 2 or more TTNs install syslog-ng.
+
+2] ssh access
+--------------------------------------------------------------------------------
+CTS requires login-less root access to the CNs.
+so if my CN is called "node32"
+# ssh node32
+should not ask for a password and result in a root shell.
+
+To enable this behavior, create a ssh key with the command ssh-keygen.  When it
+prompts for a password, enter an empty field.  It will create a file called
+~/.ssh/id_dsa.pub.  Copy that file to the TTNs into /root/.ssh/authorized_keys.
+Ensure permissions are 700 on /root/.ssh.
+
+Test ssh into the machine works without a password from the TDN.
+
+3] Redirect corosync logging from the TTN to the TDN
+--------------------------------------------------------------------------------
+a) install & enable syslog-ng on the EN and CNs.
+b) put the following config into /etc/syslog-ng/syslog-ng.conf
+
+For the below configuration content, substitute the following with your
+environment:
+Note: obviously change the following to real values:
+@THE-LOG-FILE-HERE@ 		A log file name on the TTN
+@EXERCISER-HOSTNAME-HERE@	The host name of the TTN.  To avoid DNS issues
+				IP addresses also work and are more convient.
+
+Place this anywhere in the configuration file of the TDN:
+
+source s_tcp { tcp(port(9191) max-connections(99999)); };
+filter f_ha  { facility(daemon); };
+destination ha_local { file(@THE-LOG-FILE-HERE@ perm(0644)); };
+log { source(s_sys); source(s_tcp); filter(f_ha); destination(ha_local); };
+
+Place this anywhere in the configuration file on the TTN:
+
+destination ha_tcp { tcp(@EXERCISER-HOSTNAME-HERE@ port(9191));};
+filter f_ha_tcp  { facility(daemon); };
+log { source(s_sys); filter(f_ha_tcp); destination(ha_tcp); };
+
+c) Test that syslog-ng is working properly by using the logger command
+From the TTN:
+logger -p daemon.err "Hello from $(hostname)"
+
+Then on the TDN:
+tail @THE-LOG-FILE-HERE@
+
+4] Install augeas on the TTN
+--------------------------------------------------------------------------------
+yum install augeas
+
+5] Configuring corosync for CTSs testing
+--------------------------------------------------------------------------------
+
+./configure --enable-testagents --enable-augeas
+
+and then install it on the TTN.
+
+6] run CTS
+--------------------------------------------------------------------------------
+cd <your-corosync-src-dir>/cts
+python ./CoroLab.py -L @THE-LOG-FILE-HERE@ --at-boot 0  --nodes "n1 n2"

+ 51 - 0
cts/agents/Makefile.am

@@ -0,0 +1,51 @@
+#
+# Copyright (c) 2010 Red Hat, Inc.
+#
+# Authors: Angus Salkeld (asalkeld@redhat.com)
+#
+# This software licensed under BSD license, the text of which follows:
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# - Redistributions of source code must retain the above copyright notice,
+#   this list of conditions and the following disclaimer.
+# - Redistributions in binary form must reproduce the above copyright notice,
+#   this list of conditions and the following disclaimer in the documentation
+#   and/or other materials provided with the distribution.
+# - Neither the name of the MontaVista Software, Inc. nor the names of its
+#   contributors may be used to endorse or promote products derived from this
+#   software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+# THE POSSIBILITY OF SUCH DAMAGE.
+
+MAINTAINERCLEANFILES = Makefile.in
+INCLUDES = -I$(top_builddir)/include -I$(top_srcdir)/include
+
+TEST_AGENTS = cpg_test_agent
+
+if INSTALL_TESTAGENTS
+agentdir = $(datadir)/$(PACKAGE)/tests
+bin_PROGRAMS = $(TEST_AGENTS)
+dist_agent_SCRIPTS = mem_leak_test.sh net_breaker.sh
+else
+noinst_PROGRAMS = $(TEST_AGENTS)
+noinst_SCRIPTS = mem_leak_test.sh net_breaker.sh
+endif
+
+cpg_test_agent_LDADD =  -lcpg -lcoroipcc ../../exec/coropoll.o
+cpg_test_agent_LDFLAGS =  -L../../lib
+
+lint:
+	-splint $(LINT_FLAGS) $(CFLAGS) *.c
+

+ 616 - 0
cts/agents/cpg_test_agent.c

@@ -0,0 +1,616 @@
+/*
+ * Copyright (c) 2010 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Angus Salkeld (asalkeld@redhat.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+#include <errno.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <syslog.h>
+#include <poll.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+#include <corosync/totem/coropoll.h>
+#include <corosync/list.h>
+#include <corosync/cpg.h>
+
+
+#define SERVER_PORT "9034"
+
+typedef enum {
+	MSG_OK,
+	MSG_NODEID_ERR,
+	MSG_PID_ERR,
+	MSG_SEQ_ERR,
+	MSG_SIZE_ERR,
+	MSG_HASH_ERR,
+} msg_status_t;
+
+typedef struct {
+	uint32_t nodeid;
+	pid_t   pid;
+	uint32_t hash;
+	uint32_t seq;
+	size_t size;
+	char payload[1];
+} msg_t;
+
+#define LOG_STR_SIZE 128
+typedef struct {
+	char log[LOG_STR_SIZE];
+	struct list_head list;
+} log_entry_t;
+
+#define HOW_BIG_AND_BUF 4096
+static char big_and_buf[HOW_BIG_AND_BUF];
+static char big_and_buf_rx[HOW_BIG_AND_BUF];
+static int32_t parse_debug = 0;
+static int32_t record_config_events_g = 0;
+static int32_t record_messages_g = 0;
+static cpg_handle_t cpg_handle = 0;
+static int32_t cpg_fd = -1;
+static struct list_head config_chg_log_head;
+static struct list_head msg_log_head;
+static pid_t my_pid;
+static uint32_t my_nodeid;
+static int32_t my_seq;
+static int32_t my_msgs_to_send;
+static int32_t total_stored_msgs = 0;
+static hdb_handle_t poll_handle;
+
+
+static void delivery_callback (
+	cpg_handle_t handle,
+	const struct cpg_name *groupName,
+	uint32_t nodeid,
+	uint32_t pid,
+	void *msg,
+	size_t msg_len)
+{
+	log_entry_t *log_pt;
+	msg_t *msg_pt = (msg_t*)msg;
+	msg_status_t status = MSG_OK;
+
+	if (record_messages_g == 0) {
+		return;
+	}
+
+	msg_pt->seq = my_seq;
+	my_seq++;
+
+	if (nodeid != msg_pt->nodeid) {
+		status = MSG_NODEID_ERR;
+	}
+	if (pid != msg_pt->pid) {
+		status = MSG_PID_ERR;
+	}
+	if (msg_len != msg_pt->size) {
+		status = MSG_SIZE_ERR;
+	}
+	/* TODO: check hash here.
+	*/
+
+	log_pt = malloc (sizeof(log_entry_t));
+	list_init (&log_pt->list);
+	snprintf (log_pt->log, 128, "%d:%d:%d:%d;",
+		msg_pt->nodeid, msg_pt->pid, msg_pt->seq, status);
+	list_add_tail (&log_pt->list, &msg_log_head);
+	total_stored_msgs++;
+}
+
+static void config_change_callback (
+	cpg_handle_t handle,
+	const struct cpg_name *groupName,
+	const struct cpg_address *member_list, size_t member_list_entries,
+	const struct cpg_address *left_list, size_t left_list_entries,
+	const struct cpg_address *joined_list, size_t joined_list_entries)
+{
+	int i;
+	log_entry_t *log_pt;
+
+	/* group_name,ip,pid,join|leave */
+
+	if (record_config_events_g == 0) {
+		return;
+	}
+	for (i = 0; i < left_list_entries; i++) {
+		syslog (LOG_DEBUG, "%s() inserting leave event into list", __func__);
+
+		log_pt = malloc (sizeof(log_entry_t));
+		list_init (&log_pt->list);
+		snprintf (log_pt->log, 256, "%s,%d,%d,left",
+			groupName->value, left_list[i].nodeid,left_list[i].pid);
+		list_add_tail(&log_pt->list, &config_chg_log_head);
+	}
+	for (i = 0; i < joined_list_entries; i++) {
+		syslog (LOG_DEBUG, "%s() inserting join event into list", __func__);
+
+		log_pt = malloc (sizeof(log_entry_t));
+		list_init (&log_pt->list);
+		snprintf (log_pt->log, 256, "%s,%d,%d,join",
+			groupName->value, joined_list[i].nodeid,joined_list[i].pid);
+		list_add_tail (&log_pt->list, &config_chg_log_head);
+	}
+}
+
+static cpg_callbacks_t callbacks = {
+	.cpg_deliver_fn = delivery_callback,
+	.cpg_confchg_fn = config_change_callback,
+};
+
+static void record_messages (void)
+{
+	record_messages_g = 1;
+	syslog (LOG_DEBUG,"%s() record:%d", __func__, record_messages_g);
+}
+
+static void record_config_events (void)
+{
+	record_config_events_g = 1;
+	syslog (LOG_DEBUG,"%s() record:%d", __func__, record_config_events_g);
+}
+
+static void read_config_event (int sock)
+{
+	const char *empty = "None";
+	struct list_head * list = config_chg_log_head.next;
+	log_entry_t *entry;
+
+	if (list != &config_chg_log_head) {
+		entry = list_entry (list, log_entry_t, list);
+		send (sock, entry->log,	strlen (entry->log) + 1, 0);
+		list_del (&entry->list);
+		free (entry);
+	} else {
+		syslog (LOG_DEBUG,"%s() no events in list", __func__);
+		send (sock, empty, strlen (empty) + 1, 0);
+	}
+}
+
+static void read_messages (int sock, char* atmost_str)
+{
+	struct list_head * list;
+	log_entry_t *entry;
+	int atmost = atoi (atmost_str);
+	int packed = 0;
+
+	if (atmost == 0)
+		atmost = 1;
+	if (atmost > (HOW_BIG_AND_BUF / LOG_STR_SIZE))
+		atmost = (HOW_BIG_AND_BUF / LOG_STR_SIZE);
+
+	syslog (LOG_DEBUG, "%s() atmost %d; total_stored_msgs:%d",
+		__func__, atmost, total_stored_msgs);
+	big_and_buf[0] = '\0';
+
+	for (list = msg_log_head.next;
+		(!list_empty (&msg_log_head) && packed < atmost); ) {
+
+		entry = list_entry (list, log_entry_t, list);
+
+		strcat (big_and_buf, entry->log);
+		packed++;
+
+		list = list->next;
+		list_del (&entry->list);
+		free (entry);
+
+		total_stored_msgs--;
+	}
+	syslog (LOG_DEBUG, "%s() sending %d; total_stored_msgs:%d; len:%d",
+		__func__, packed, total_stored_msgs, (int)strlen (big_and_buf));
+	if (packed == 0) {
+		strcpy (big_and_buf, "None");
+	}
+	send (sock, big_and_buf, strlen (big_and_buf), 0);
+}
+
+static void send_some_more_messages (void)
+{
+	msg_t my_msg;
+	struct iovec iov[1];
+	int i;
+	int send_now;
+	cs_error_t res;
+	cpg_flow_control_state_t fc_state;
+
+	if (cpg_fd < 0)
+		return;
+
+	send_now = my_msgs_to_send;
+
+	syslog (LOG_DEBUG,"%s() send_now:%d", __func__, send_now);
+	my_msg.pid = my_pid;
+	my_msg.nodeid = my_nodeid;
+	my_msg.hash = 0;
+	my_msg.size = sizeof (msg_t);
+	my_msg.seq = 0;
+
+	iov[0].iov_len = my_msg.size;
+	iov[0].iov_base = &my_msg;
+
+	for (i = 0; i < send_now; i++) {
+
+		res = cpg_flow_control_state_get (cpg_handle, &fc_state);
+		if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
+			/* lets do this later */
+			syslog (LOG_DEBUG, "%s() flow control enabled.", __func__);
+			return;
+		}
+
+		res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 1);
+		if (res == CS_ERR_TRY_AGAIN) {
+			/* lets do this later */
+			syslog (LOG_DEBUG, "%s() cpg_mcast_joined() says try again.",
+				__func__);
+			return;
+		} else
+			if (res != CS_OK) {
+				syslog (LOG_ERR, "%s() -> cpg_mcast_joined error:%d, exiting.",
+					__func__, res);
+				exit (-2);
+			}
+
+		my_msgs_to_send--;
+	}
+}
+
+static void msg_blaster (int sock, char* num_to_send_str)
+{
+	my_msgs_to_send = atoi (num_to_send_str);
+	my_seq = 1;
+	my_pid = getpid();
+
+	cpg_local_get (cpg_handle, &my_nodeid);
+
+	/* control the limits */
+	if (my_msgs_to_send <= 0)
+		my_msgs_to_send = 1;
+	if (my_msgs_to_send > 1000)
+		my_msgs_to_send = 1000;
+
+	send_some_more_messages ();
+}
+
+
+static int cpg_dispatch_wrapper_fn (hdb_handle_t handle,
+	int fd,
+	int revents,
+	void *data)
+{
+	cs_error_t error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
+	if (error == CS_ERR_LIBRARY) {
+		syslog (LOG_ERR, "%s() got LIB error disconnecting from corosync.", __func__);
+		poll_dispatch_delete (poll_handle, cpg_fd);
+		close (cpg_fd);
+		cpg_fd = -1;
+	}
+	return 0;
+}
+
+static void do_command (int sock, char* func, char*args[], int num_args)
+{
+	int result;
+	struct cpg_name group_name;
+
+	if (parse_debug)
+		syslog (LOG_DEBUG,"RPC:%s() called.", func);
+
+	if (strcmp ("cpg_mcast_joined",func) == 0) {
+		struct iovec iov[5];
+		int a;
+
+		for (a = 0; a < num_args; a++) {
+			iov[a].iov_base = args[a];
+			iov[a].iov_len = strlen(args[a])+1;
+		}
+		cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, num_args);
+
+	} else if (strcmp ("cpg_join",func) == 0) {
+
+		strcpy (group_name.value, args[0]);
+		group_name.length = strlen(args[0]);
+
+		result = cpg_join (cpg_handle, &group_name);
+		if (result != CS_OK) {
+			syslog (LOG_ERR,
+				"Could not join process group, error %d\n", result);
+			exit (1);
+		}
+
+	} else if (strcmp ("cpg_leave",func) == 0) {
+
+		strcpy (group_name.value, args[0]);
+		group_name.length = strlen(args[0]);
+
+		result = cpg_leave (cpg_handle, &group_name);
+		if (result != CS_OK) {
+			syslog (LOG_ERR,
+				"Could not leave process group, error %d\n", result);
+			exit (1);
+		}
+		syslog (LOG_INFO, "called cpg_leave()!");
+
+	} else if (strcmp ("cpg_initialize",func) == 0) {
+		int retry_count = 0;
+
+		result = cpg_initialize (&cpg_handle, &callbacks);
+		while (result != CS_OK) {
+			syslog (LOG_ERR,
+				"cpg_initialize error %d (attempt %d)\n",
+				result, retry_count);
+			if (retry_count >= 3) {
+				exit (1);
+			}
+			sleep(1);
+			retry_count++;
+		}
+
+		cpg_fd_get (cpg_handle, &cpg_fd);
+		poll_dispatch_add (poll_handle, cpg_fd, POLLIN|POLLNVAL, NULL, cpg_dispatch_wrapper_fn);
+
+	} else if (strcmp ("cpg_local_get", func) == 0) {
+		unsigned int local_nodeid;
+		char response[100];
+
+		cpg_local_get (cpg_handle, &local_nodeid);
+		snprintf (response, 100, "%u",local_nodeid);
+		send (sock, response, strlen (response) + 1, 0);
+
+	} else if (strcmp ("cpg_finalize",func) == 0) {
+
+		cpg_finalize (cpg_handle);
+		poll_dispatch_delete (poll_handle, cpg_fd);
+		cpg_fd = -1;
+
+	} else if (strcmp ("record_config_events",func) == 0) {
+
+		record_config_events ();
+
+	} else if (strcmp ("record_messages",func) == 0) {
+
+		record_messages ();
+
+	} else if (strcmp ("read_config_event",func) == 0) {
+
+		read_config_event (sock);
+
+	} else if (strcmp ("read_messages",func) == 0) {
+
+		read_messages (sock, args[0]);
+
+	} else if (strcmp ("msg_blaster",func) == 0) {
+
+		msg_blaster (sock, args[0]);
+
+	} else {
+		syslog (LOG_ERR,"%s RPC:%s not supported!", __func__, func);
+	}
+}
+
+static void handle_command (int sock, char* msg)
+{
+	int num_args;
+	char *saveptr = NULL;
+	char *str = strdup (msg);
+	char *str_len;
+	char *str_arg;
+	char *args[5];
+	int i = 0;
+	int a = 0;
+	char* func = NULL;
+
+	if (parse_debug)
+		syslog (LOG_DEBUG,"%s (MSG:%s)\n", __func__, msg);
+
+	str_len = strtok_r (str, ":", &saveptr);
+	assert (str_len);
+
+	num_args = atoi (str_len) * 2;
+	for (i = 0; i < num_args / 2; i++) {
+		str_len = strtok_r (NULL, ":", &saveptr);
+		str_arg = strtok_r (NULL, ":", &saveptr);
+		if (func == NULL) {
+			/* first "arg" is the function */
+			if (parse_debug)
+				syslog (LOG_DEBUG, "(LEN:%s, FUNC:%s)", str_len, str_arg);
+			func = str_arg;
+			a = 0;
+		} else {
+			args[a] = str_arg;
+			a++;
+			if (parse_debug)
+				syslog (LOG_DEBUG, "(LEN:%s, ARG:%s)", str_len, str_arg);
+		}
+	}
+	do_command (sock, func, args, a+1);
+
+	free (str);
+}
+
+static int server_process_data_fn (hdb_handle_t handle,
+	int fd,
+	int revents,
+	void *data)
+{
+	char *saveptr;
+	char *msg;
+	char *cmd;
+	int32_t nbytes;
+
+	if ((nbytes = recv (fd, big_and_buf_rx, sizeof (big_and_buf_rx), 0)) <= 0) {
+		/* got error or connection closed by client */
+		if (nbytes == 0) {
+			/* connection closed */
+			syslog (LOG_WARNING, "socket %d hung up: exiting...\n", fd);
+		} else {
+			syslog (LOG_ERR,"recv() failed: %s", strerror(errno));
+		}
+		close (fd);
+		exit (0);
+	} else {
+		if (my_msgs_to_send > 0)
+			send_some_more_messages ();
+
+		big_and_buf_rx[nbytes] = '\0';
+
+		msg = strtok_r (big_and_buf_rx, ";", &saveptr);
+		assert (msg);
+		while (msg) {
+			cmd = strdup (msg);
+			handle_command (fd, cmd);
+			free (cmd);
+			msg = strtok_r (NULL, ";", &saveptr);
+		}
+	}
+
+	return 0;
+}
+
+static int server_accept_fn (hdb_handle_t handle,
+	int fd,
+	int revents,
+	void *data)
+{
+	socklen_t addrlen;
+	struct sockaddr_in in_addr;
+	int new_fd;
+	int res;
+
+	addrlen = sizeof (struct sockaddr_in);
+
+retry_accept:
+	new_fd = accept (fd, (struct sockaddr *)&in_addr, &addrlen);
+	if (new_fd == -1 && errno == EINTR) {
+		goto retry_accept;
+	}
+
+	if (new_fd == -1) {
+		syslog (LOG_ERR,
+			"Could not accept connection: %s\n", strerror (errno));
+		return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
+	}
+
+	res = fcntl (new_fd, F_SETFL, O_NONBLOCK);
+	if (res == -1) {
+		syslog (LOG_ERR,
+			"Could not set non-blocking operation on connection: %s\n",
+			strerror (errno));
+		close (new_fd);
+		return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
+	}
+
+	poll_dispatch_add (poll_handle, new_fd, POLLIN|POLLNVAL, NULL, server_process_data_fn);
+	return 0;
+}
+
+static int create_server_sockect (void)
+{
+	int listener;
+	int yes = 1;
+	int rv;
+	struct addrinfo hints, *ai, *p;
+
+	/* get a socket and bind it
+	 */
+	memset (&hints, 0, sizeof hints);
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE;
+	if ((rv = getaddrinfo (NULL, SERVER_PORT, &hints, &ai)) != 0) {
+		syslog (LOG_ERR, "%s\n", gai_strerror (rv));
+		exit (1);
+	}
+
+	for (p = ai; p != NULL; p = p->ai_next) {
+		listener = socket (p->ai_family, p->ai_socktype, p->ai_protocol);
+		if (listener < 0) {
+			continue;
+		}
+
+		/* lose the pesky "address already in use" error message
+		 */
+		if (setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
+				&yes, sizeof(int)) < 0) {
+			syslog (LOG_ERR, "setsockopt() failed: %s\n", strerror (errno));
+		}
+
+		if (bind (listener, p->ai_addr, p->ai_addrlen) < 0) {
+			syslog (LOG_ERR, "bind() failed: %s\n", strerror (errno));
+			close (listener);
+			continue;
+		}
+
+		break;
+	}
+
+	if (p == NULL) {
+		syslog (LOG_ERR, "failed to bind\n");
+		exit (2);
+	}
+
+	freeaddrinfo (ai);
+
+	if (listen (listener, 10) == -1) {
+		syslog (LOG_ERR, "listen() failed: %s", strerror(errno));
+		exit (3);
+	}
+
+	return listener;
+}
+
+int main (int argc, char *argv[])
+{
+	int listener;
+
+	openlog (NULL, LOG_CONS|LOG_PID, LOG_DAEMON);
+
+	list_init (&msg_log_head);
+	list_init (&config_chg_log_head);
+
+	poll_handle = poll_create ();
+
+	listener = create_server_sockect ();
+	poll_dispatch_add (poll_handle, listener, POLLIN|POLLNVAL, NULL, server_accept_fn);
+
+	poll_run (poll_handle);
+	return -1;
+}
+

+ 114 - 0
cts/agents/mem_leak_test.sh

@@ -0,0 +1,114 @@
+#!/bin/sh
+
+_usage_()
+{
+  echo bla bla
+
+  exit 0
+}
+
+get_mem()
+{
+  if [ -z "$1" ]
+  then
+    type=Data
+  else
+    type=$1
+  fi
+  MEM=$(cat /proc/$(pidof corosync)/status | grep Vm$type | sed "s/Vm$type:\(.*\) kB/\1/")
+  echo $MEM
+}
+
+#
+# create and destroy a lot of objects
+#
+_object_test_()
+{
+  TYPE=RSS
+  temp_file=/tmp/object.txt
+  COUNT=1
+
+  corosync-objctl -c usr
+  corosync-objctl -w usr.angus=456
+  corosync-objctl -d usr
+
+  BEFORE=$(get_mem $TYPE)
+  # this loop is just to ignore the first iteration
+  for f in /usr/share/man /usr/lib /usr/bin /usr/local ;
+  do
+    rm -f $temp_file
+
+    find $f | sed "s|\.|_|g" | sed "s|/|.|g" | while read l
+    do 
+      echo $l.count=$count >> $temp_file
+      let COUNT="$COUNT+1"
+    done
+
+    corosync-objctl -p $temp_file
+    corosync-objctl -d usr
+  done
+  AFTER=$(get_mem $TYPE)
+  let DIFF="$AFTER - $BEFORE"
+  rm -f $temp_file
+  #echo $f diff $TYPE $DIFF
+  echo $DIFF
+
+  exit 0
+}
+
+#
+# load and unload a service a bunch of times
+#
+_service_test_()
+{
+  echo _service_test_
+
+  exit 0
+}
+
+#
+# run the corosync tools to cause IPC sessions to created/destroyed
+#
+_session_test_()
+{
+  echo _session_test_
+  COUNT=1
+
+  find /usr/bin | sed "s|\.|_|g" | sed "s|/|.|g" | while read l
+  do 
+    corosync-objctl -c $l
+    corosync-objctl -w $l.value=$COUNT
+    let COUNT="$COUNT+1"
+  done
+  corosync-objctl -d usr
+
+  exit 0
+}
+
+# Note that we use `"$@"' to let each command-line parameter expand to a 
+# separate word. The quotes around `$@' are essential!
+# We need TEMP as the `eval set --' would nuke the return value of getopt.
+TEMP=`getopt -o u123 --long help,object,session,service \
+     -n '$0' -- "$@"`
+
+if [ $? != 0 ] ; then echo "Incorrect arguments..." >&2 ; _usage_ ; exit 1 ; fi
+
+# Note the quotes around `$TEMP': they are essential!
+eval set -- "$TEMP"
+
+while true ; do
+        case "$1" in
+                -u|--help) _usage_ ;;
+                -1|--object) _object_test_ ;;
+                -2|--session) _session_test_ ;;
+                -3|--service) _service_test_ ;;
+                --) shift ; break ;;
+                *) echo "Internal error!" ; exit 1 ;;
+        esac
+done
+echo "Remaining arguments:"
+for arg do echo '--> '"\`$arg'" ; done
+
+
+
+

+ 19 - 0
cts/agents/net_breaker.sh

@@ -0,0 +1,19 @@
+#!/bin/sh
+
+set -e
+
+if [ $1 = "BreakCommCmd" ]
+then
+  iptables -A INPUT -s $2 -j DROP >/dev/null 2>&1
+  iptables -A OUTPUT -s $2 -j DROP >/dev/null 2>&1
+  iptables -A INPUT -m pkttype --pkt-type multicast -j DROP
+fi
+if [ $1 = "FixCommCmd" ]
+then
+  iptables -D INPUT -s $2 -j DROP >/dev/null 2>&1
+  iptables -D OUTPUT -s $2 -j DROP >/dev/null 2>&1
+  iptables -D INPUT -m pkttype --pkt-type multicast -j DROP
+fi
+
+exit 0
+

+ 306 - 0
cts/corolab.py

@@ -0,0 +1,306 @@
+#!/usr/bin/python
+
+'''CTS: Cluster Testing System: Lab environment module
+ '''
+
+__copyright__='''
+Copyright (c) 2010 Red Hat, Inc.
+
+'''
+
+# All rights reserved.
+# 
+# Author: Angus Salkeld <asalkeld@redhat.com>
+#
+# This software licensed under BSD license, the text of which follows:
+# 
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+# 
+# - Redistributions of source code must retain the above copyright notice,
+#   this list of conditions and the following disclaimer.
+# - Redistributions in binary form must reproduce the above copyright notice,
+#   this list of conditions and the following disclaimer in the documentation
+#   and/or other materials provided with the distribution.
+# - Neither the name of the MontaVista Software, Inc. nor the names of its
+#   contributors may be used to endorse or promote products derived from this
+#   software without specific prior written permission.
+# 
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+# THE POSSIBILITY OF SUCH DAMAGE.
+
+import sys
+from cts.CTSaudits import AuditList
+from cts.CTS import Scenario, InitClusterManager
+from corotests import CoroTestList, AllTests
+from corosync import *
+
+
+sys.path.append("/usr/share/pacemaker/tests/cts") # So that things work from the source directory
+
+try:
+    from CTSlab import *
+
+except ImportError:
+    sys.stderr.write("abort: couldn't find CTSLab in [%s]\n" %
+                     ' '.join(sys.path))
+    sys.stderr.write("(check your install and PYTHONPATH)\n")
+    sys.exit(-1)
+
+tests = None
+cm = None
+old_handler = None
+DefaultFacility = "daemon"
+
+
+def usage(arg):
+    print "Illegal argument " + arg
+    print "usage: " + sys.argv[0] +" [options] number-of-iterations" 
+    print "\nCommon options: "  
+    print "\t [--at-boot (1|0)],         does the cluster software start at boot time" 
+    print "\t [--nodes 'node list'],     list of cluster nodes separated by whitespace" 
+    print "\t [--limit-nodes max],       only use the first 'max' cluster nodes supplied with --nodes" 
+    print "\t [--logfile path],          where should the test software look for logs from cluster nodes" 
+    print "\t [--outputfile path],       optional location for the test software to write logs to" 
+    print "\t [--syslog-facility name],  which syslog facility should the test software log to" 
+    print "\t [--choose testcase-name],  run only the named test" 
+    print "\t [--list-tests],            list the valid tests" 
+    print "\t [--benchmark],             add the timing information" 
+    print "\t "
+    print "Additional (less common) options: "  
+    print "\t [--trunc (truncate logfile before starting)]" 
+    print "\t [--xmit-loss lost-rate(0.0-1.0)]" 
+    print "\t [--recv-loss lost-rate(0.0-1.0)]" 
+    print "\t [--standby (1 | 0 | yes | no)]" 
+    print "\t [--fencing (1 | 0 | yes | no)]" 
+    print "\t [--once],                 run all valid tests once" 
+    print "\t [--no-loop-tests],        dont run looping/time-based tests" 
+    print "\t [--no-unsafe-tests],      dont run tests that are unsafe for use with ocfs2/drbd" 
+    print "\t [--valgrind-tests],       include tests using valgrind" 
+    print "\t [--experimental-tests],   include experimental tests" 
+    print "\t [--oprofile 'node list'], list of cluster nodes to run oprofile on]" 
+    print "\t [--qarsh]                 Use the QARSH backdoor to access nodes instead of SSH"
+    print "\t [--seed random_seed]"
+    print "\t [--set option=value]"
+    sys.exit(1)
+
+    
+#
+# Main entry into the test system.
+#
+if __name__ == '__main__': 
+    Environment = CtsLab()
+
+    NumIter = 0
+    Version = 1
+    LimitNodes = 0
+    TestCase = None
+    TruncateLog = 0
+    ListTests = 0
+    HaveSeed = 0
+    node_list = ''
+
+    #
+    # The values of the rest of the parameters are now properly derived from
+    # the configuration files.
+    #
+    # Set the signal handler
+    signal.signal(15, sig_handler)
+    signal.signal(10, sig_handler)
+    
+    # Process arguments...
+
+    skipthis=None
+    args=sys.argv[1:]
+    for i in range(0, len(args)):
+       if skipthis:
+           skipthis=None
+           continue
+
+       elif args[i] == "-l" or args[i] == "--limit-nodes":
+           skipthis=1
+           LimitNodes = int(args[i+1])
+
+       elif args[i] == "-L" or args[i] == "--logfile":
+           skipthis=1
+           Environment["LogFileName"] = args[i+1]
+
+       elif args[i] == "--outputfile":
+           skipthis=1
+           Environment["OutputFile"] = args[i+1]
+
+       elif args[i] == "--oprofile":
+           skipthis=1
+           Environment["oprofile"] = args[i+1].split(' ')
+
+       elif args[i] == "--trunc":
+           Environment["TruncateLog"]=1
+
+       elif args[i] == "--list-tests":
+           Environment["ListTests"]=1
+
+       elif args[i] == "--benchmark":
+           Environment["benchmark"]=1
+
+       elif args[i] == "--qarsh":
+           Environment.rsh.enable_qarsh()
+
+       elif args[i] == "--fencing":
+           skipthis=1
+           if args[i+1] == "1" or args[i+1] == "yes":
+               Environment["DoFencing"] = 1
+           elif args[i+1] == "0" or args[i+1] == "no":
+               Environment["DoFencing"] = 0
+           else:
+               usage(args[i+1])
+
+       elif args[i] == "--xmit-loss":
+           try:
+               float(args[i+1])
+           except ValueError:
+               print ("--xmit-loss parameter should be float")
+               usage(args[i+1])
+           skipthis=1
+           Environment["XmitLoss"] = args[i+1]
+
+       elif args[i] == "--recv-loss":
+           try:
+               float(args[i+1])
+           except ValueError:
+               print ("--recv-loss parameter should be float")
+               usage(args[i+1])
+           skipthis=1
+           Environment["RecvLoss"] = args[i+1]
+
+       elif args[i] == "--choose":
+           skipthis=1
+           TestCase = args[i+1]
+
+       elif args[i] == "--nodes":
+           skipthis=1
+           node_list = args[i+1].split(' ')
+
+       elif args[i] == "--at-boot" or args[i] == "--cluster-starts-at-boot":
+           skipthis=1
+           if args[i+1] == "1" or args[i+1] == "yes":
+               Environment["at-boot"] = 1
+           elif args[i+1] == "0" or args[i+1] == "no":
+               Environment["at-boot"] = 0
+           else:
+               usage(args[i+1])
+
+       elif args[i] == "--set":
+           skipthis=1
+           (name, value) = args[i+1].split('=')
+           Environment[name] = value
+
+       else:
+           try:
+               NumIter=int(args[i])
+           except ValueError:
+               usage(args[i])
+
+    Environment["remote_logwatch"]  = True
+    Environment["SyslogFacility"] = DefaultFacility
+    Environment["loop-minutes"] = int(Environment["loop-minutes"])
+    Environment["Stack"]    = "corosync (flatiron)"
+    Environment['CMclass']  = corosync_flatiron
+    Environment["use_logd"] = 0
+    if Environment["OutputFile"]:
+        Environment["logger"].append(FileLog(Environment, Environment["OutputFile"]))
+
+    if len(node_list) < 1:
+        print "No nodes specified!"
+        sys.exit(1)
+
+    if LimitNodes > 0:
+        if len(node_list) > LimitNodes:
+            print("Limiting the number of nodes configured=%d (max=%d)"
+                  %(len(node_list), LimitNodes))
+            while len(node_list) > LimitNodes:
+                node_list.pop(len(node_list)-1)
+
+    Environment["nodes"] = node_list
+
+    # Create the Cluster Manager object
+    cm = Environment['CMclass'](Environment)
+
+    Audits = AuditList(cm)
+    Tests = []
+        
+    # Your basic start up the world type of test scenario...
+
+    # Scenario selection
+    scenario = Scenario(
+        [ InitClusterManager(Environment), TestAgentComponent(Environment)])
+
+
+    if Environment["ListTests"] == 1 :
+        Tests = CoroTestList(cm, Audits)
+        cm.log("Total %d tests"%len(Tests))
+        for test in Tests :
+            cm.log(str(test.name));
+        sys.exit(0)
+
+    if TruncateLog:
+        cm.log("Truncating %s" % LogFile)
+        lf = open(LogFile, "w");
+        if lf != None:
+            lf.truncate(0)
+            lf.close()
+
+    keys = []
+    for key in Environment.keys():
+        keys.append(key)
+
+    keys.sort()
+    for key in keys:
+        cm.debug("Environment["+key+"]:\t"+str(Environment[key]))
+
+    cm.log(">>>>>>>>>>>>>>>> BEGINNING " + repr(NumIter) + " TESTS ")
+    cm.log("System log files: %s" % Environment["LogFileName"])
+    cm.ns.WaitForAllNodesToComeUp(Environment["nodes"])
+    cm.log("Cluster nodes: ")
+    for node in Environment["nodes"]:
+        cm.log("    * %s" % (node))
+
+    if TestCase != None:
+        for test in CoroTestList(cm, Audits):
+            if test.name == TestCase:
+                Tests.append(test)
+        if Tests == []:
+            usage("--choose: No applicable/valid tests chosen")        
+    else:
+        Tests = CoroTestList(cm, Audits)
+    
+    if Environment["benchmark"]:
+        Environment.ScenarioTests = BenchTests(scenario, cm, Tests, Audits)
+    elif Environment["all-once"] or NumIter == 0:
+        Environment.ScenarioTests = AllTests(scenario, cm, Tests, Audits)
+    else:
+        Environment.ScenarioTests = RandomTests(scenario, cm, Tests, Audits)
+
+    try :
+        overall, detailed = Environment.ScenarioTests.run(NumIter)
+    except :
+        cm.Env.log("Exception by %s" % sys.exc_info()[0])
+        for logmethod in Environment["logger"]:
+          traceback.print_exc(50, logmethod)
+        
+    Environment.ScenarioTests.summarize()
+    if Environment.ScenarioTests.Stats["failure"] > 0:
+        sys.exit(Environment.ScenarioTests.Stats["failure"])
+
+    elif Environment.ScenarioTests.Stats["success"] != NumIter:
+        cm.Env.log("No failure count but success != requested iterations")
+        sys.exit(1)
+        

+ 510 - 0
cts/corosync.py

@@ -0,0 +1,510 @@
+'''CTS: Cluster Testing System: corosync...
+'''
+
+__copyright__='''
+Copyright (c) 2010 Red Hat, Inc.
+'''
+
+# All rights reserved.
+# 
+# Author: Angus Salkeld <asalkeld@redhat.com>
+#
+# This software licensed under BSD license, the text of which follows:
+# 
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+# 
+# - Redistributions of source code must retain the above copyright notice,
+#   this list of conditions and the following disclaimer.
+# - Redistributions in binary form must reproduce the above copyright notice,
+#   this list of conditions and the following disclaimer in the documentation
+#   and/or other materials provided with the distribution.
+# - Neither the name of the MontaVista Software, Inc. nor the names of its
+#   contributors may be used to endorse or promote products derived from this
+#   software without specific prior written permission.
+# 
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+# THE POSSIBILITY OF SUCH DAMAGE.
+
+import os
+import sys
+import time
+import socket
+import shutil
+import string
+
+import augeas
+from cts.CTS import ClusterManager
+from cts.CTS import ScenarioComponent
+from cts.CTS import RemoteExec
+from cts.CTSvars import CTSvars
+
+
+###################################################################
+class CoroConfig(object):
+    def __init__(self, corobase=None):
+        self.base = "/files/etc/corosync/corosync.conf/"
+        self.new_root = "/tmp/aug-root/"
+        if corobase == None: 
+            self.corobase = os.getcwd() + "/.."
+        else:
+            self.corobase = corobase
+        example = self.corobase + "/conf/corosync.conf.example"
+
+        shutil.rmtree (self.new_root)
+        os.makedirs (self.new_root + "/etc/corosync")
+        shutil.copy (example, self.new_root + "/etc/corosync/corosync.conf")
+
+        self.aug = augeas.Augeas (root=self.new_root,
+                loadpath=self.corobase + "/conf/lenses")
+
+        self.original = {}
+        # store the original values (of totem), so we can restore them in
+        # apply_default_config()
+        totem = self.aug.match('/files/etc/corosync/corosync.conf/totem/*')
+        for c in totem:
+            # /files/etc/corosync/corosync.conf/
+            short_name = c[len(self.base):]
+            self.original[short_name] = self.aug.get(c)
+        interface = self.aug.match('/files/etc/corosync/corosync.conf/totem/interface/*')
+        for c in interface:
+            short_name = c[len(self.base):]
+            self.original[short_name] = self.aug.get(c)
+
+    def get (self, name):
+        return self.aug.get (self.base + name)
+
+    def set (self, name, value):
+        token = self.aug.set (self.base + name, str(value))
+
+    def save (self):
+        self.aug.save()
+
+    def get_filename(self):
+        return self.new_root + "/etc/corosync/corosync.conf"
+
+###################################################################
+class corosync_flatiron(ClusterManager):
+    '''
+     bla
+    '''
+    def __init__(self, Environment, randseed=None):
+        ClusterManager.__init__(self, Environment, randseed)
+
+        self.update({
+            "Name"           : "corosync(flatiron)",
+            "StartCmd"       : CTSvars.INITDIR+"/corosync start",
+            "StopCmd"        : CTSvars.INITDIR+"/corosync stop",
+            "RereadCmd"      : CTSvars.INITDIR+"/corosync reload",
+            "StatusCmd"      : CTSvars.INITDIR+"/corosync status %s",
+            "DeadTime"       : 30,
+            "StartTime"      : 15,        # Max time to start up
+            "StableTime"     : 10,
+            "BreakCommCmd"   : "/usr/share/corosync/tests/net_breaker.sh BreakCommCmd %s",
+            "FixCommCmd"     : "/usr/share/corosync/tests/net_breaker.sh FixCommCmd %s",
+
+            "Pat:We_stopped"   : "%s.*Corosync Cluster Engine exiting with status.*",
+            "Pat:They_stopped" : "%s.*Member left:.*%s.*",
+            "Pat:They_dead"    : "corosync:.*Node %s is now: lost",
+            "Pat:Local_starting" : "%s.*started and ready to provide service.",
+            "Pat:Local_started" :  "%s.*started and ready to provide service.",
+            "Pat:Master_started" : "%s.*Completed service synchronization, ready to provide service.",
+            "Pat:Slave_started" :  "%s.*Completed service synchronization, ready to provide service.",
+            "Pat:ChildKilled"  : "%s corosync.*Child process %s terminated with signal 9",
+            "Pat:ChildRespawn" : "%s corosync.*Respawning failed child process: %s",
+            "Pat:ChildExit"    : "Child process .* exited",
+            "Pat:DC_IDLE"      : ".*A processor joined or left the membership and a new membership was formed.",
+            # Bad news Regexes.  Should never occur.
+            "BadRegexes"   : (
+                r"ERROR:",
+                r"CRIT:",
+                r"Shutting down\.",
+                r"Forcing shutdown\.",
+                r"core dump",
+            ),
+            "LogFileName"    : Environment["LogFileName"],
+            })
+        self.agent={}
+        self.config = CoroConfig ()
+        self.node_to_ip = {}
+        
+        self.new_config = {}
+        self.applied_config = {}
+        for n in self.Env["nodes"]:
+            ip = socket.gethostbyname(n)
+            ips = ip.split('.')
+            ips[3] = '0'
+            ip_mask = '.'.join(ips)
+            self.new_config['totem/interface/bindnetaddr'] = str(ip_mask)
+            return
+
+    def apply_default_config(self):
+
+        for c in self.applied_config:
+            if 'bindnetaddr' in c:
+                continue
+            elif not self.config.original.has_key(c):
+                # new config option (non default)
+                pass
+            elif self.applied_config[c] is not self.config.original[c]:
+                # reset to the original
+                self.new_config[c] = self.config.original[c]
+
+        if len(self.new_config) > 0:
+            self.debug('applying default config')
+            self.stopall()
+
+    def apply_new_config(self):
+
+        if len(self.new_config) > 0:
+            self.debug('applying new config')
+            self.stopall()
+            self.startall()
+
+    def install_all_config(self):
+        tmp1 = {}
+        for c in self.new_config:
+            self.log('configuring: ' + c + ' = '+ str(self.new_config[c]))
+            self.config.set (c, self.new_config[c])
+            self.applied_config[c] = self.new_config[c]
+            tmp1[c] = self.new_config[c]
+
+        for c in tmp1:
+            del self.new_config[c]
+
+        self.config.save()
+        src_file = self.config.get_filename()
+        for node in self.Env["nodes"]:
+            self.rsh.cp(src_file, "%s:%s" % (node, "/etc/corosync/"))
+
+    def install_config(self, node):
+        # install gets new_config and installs it, then moves the
+        # config to applied_config
+        if len(self.new_config) > 0:
+            self.install_all_config()
+
+    def key_for_node(self, node):
+        if not self.node_to_ip.has_key(node):
+            self.node_to_ip[node] = socket.gethostbyname (node)
+        return self.node_to_ip[node]
+
+    def StartaCM(self, node):
+
+        if not self.ShouldBeStatus.has_key(node):
+            self.ShouldBeStatus[node] = "down"
+
+        if self.ShouldBeStatus[node] != "down":
+            return 1
+
+        self.debug('starting corosync on : ' + node)
+        ret = ClusterManager.StartaCM(self, node)
+        if self.agent.has_key(node):
+            self.agent[node].restart()
+        return ret
+
+    def StopaCM(self, node):
+        if self.ShouldBeStatus[node] != "up":
+            return 1
+
+        self.debug('stoping corosync on : ' + node)
+        if self.agent.has_key(node):
+            self.agent[node].stop()
+        return ClusterManager.StopaCM(self, node)
+
+
+    def StataCM(self, node):
+
+        '''Report the status of corosync on a given node'''
+
+        out=self.rsh(node, self["StatusCmd"], 1)
+        is_stopped = string.find(out, 'stopped')
+        is_dead = string.find(out, 'dead')
+
+        ret = (is_dead is -1 and is_stopped is -1)
+
+        try:
+            if ret:
+                if self.ShouldBeStatus[node] == "down":
+                    self.log(
+                    "Node status for %s is %s but we think it should be %s"
+                    %        (node, "up", self.ShouldBeStatus[node]))
+            else:
+                if self.ShouldBeStatus[node] == "up":
+                    self.log(
+                    "Node status for %s is %s but we think it should be %s"
+                    %        (node, "down", self.ShouldBeStatus[node]))
+        except KeyError:        pass
+
+        if ret:        self.ShouldBeStatus[node]="up"
+        else:        self.ShouldBeStatus[node]="down"
+        return ret
+
+
+    def RereadCM(self, node):
+        self.log('reloading corosync on : ' + node)
+        return ClusterManager.RereadCM(self, node)
+
+    def prepare(self):
+        '''Finish the Initialization process. Prepare to test...'''
+
+        self.partitions_expected = 1
+        for node in self.Env["nodes"]:
+            self.ShouldBeStatus[node] = ""
+            self.unisolate_node(node)
+            self.rsh(node, 'service abrtd stop', 1)
+            self.StataCM(node)
+
+    def HasQuorum(self, node_list):
+        # If we are auditing a partition, then one side will
+        #   have quorum and the other not.
+        # So the caller needs to tell us which we are checking
+        # If no value for node_list is specified... assume all nodes  
+        if not node_list:
+            node_list = self.Env["nodes"]
+
+        for node in node_list:
+            if self.ShouldBeStatus[node] == "up":
+                quorum = self.rsh(node, self["QuorumCmd"], 1)
+                if string.find(quorum, "1") != -1:
+                    return 1
+                elif string.find(quorum, "0") != -1:
+                    return 0
+                else:
+                    self.log("WARN: Unexpected quorum test result from "+ node +":"+ quorum)
+
+        return 0
+
+    def Components(self):    
+        return None
+
+
+###################################################################
+class TestAgentComponent(ScenarioComponent):
+    def __init__(self, Env):
+        self.Env = Env
+
+    def IsApplicable(self):
+        '''Return TRUE if the current ScenarioComponent is applicable
+        in the given LabEnvironment given to the constructor.
+        '''
+        return True
+
+    def SetUp(self, CM):
+        '''Set up the given ScenarioComponent'''
+        self.CM = CM
+        
+        for node in self.Env["nodes"]:
+            if not CM.StataCM(node):
+                raise RuntimeError ("corosync not up")
+
+            self.CM.agent[node] = CpgTestAgent(node, CM.Env)
+            self.CM.agent[node].start()
+        return 1
+
+    def TearDown(self, CM):
+        '''Tear down (undo) the given ScenarioComponent'''
+        self.CM = CM
+        for node in self.Env["nodes"]:
+            self.CM.agent[node].stop()
+
+###################################################################
+class TestAgent(object):
+
+    def __init__(self, binary, node, port, env=None):
+        self.node = node
+        self.node_address = None
+        self.port = port
+        self.sock = None
+        self.binary = binary
+        self.started = False
+        self.rsh = RemoteExec(Env=env)
+        self.func_name = None
+        self.used = False
+        self.env = env
+
+    def restart(self):
+        self.stop()
+        self.start()
+
+    def clean_start(self):
+        if self.used or not self.status():
+            self.env.debug('test agent: clean_start (' + self.node + ')')
+            self.stop()
+            self.start()
+
+    def status(self):
+        if not self.started:
+            return False
+
+        try:
+            self.send (["cpg_local_get"])  
+            self.nodeid = self.read ()
+            return True
+        except RuntimeError, msg:
+            return False
+    
+    def start(self):
+        '''Set up the given ScenarioComponent'''
+
+        self.env.debug('test agent: start (' + self.node + ')')
+        self.sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+        ip = socket.gethostbyname(self.node)
+        self.rsh(self.node, self.binary, blocking=0)
+        is_connected = False
+        retries = 0
+        while not is_connected:
+            try:
+                retries = retries + 1
+                self.sock.connect ((ip, self.port))
+                is_connected = True
+            except socket.error, msg:
+                if retries > 5:
+                    self.env.debug( "Retried " + str(retries) + " times. Error: " + msg )
+                time.sleep(1)
+        self.started = True
+        self.used = False
+
+    def stop(self):
+        '''Tear down (undo) the given ScenarioComponent'''
+        self.env.debug('test agent: stop (' + self.node + ')')
+        self.sock.close ()
+        self.rsh(self.node, "killall " + self.binary + " 2>/dev/null")
+        self.started = False
+
+    def send (self, args):
+        if not self.started:
+            self.start()
+
+        real_msg = str (len (args))
+        for a in args:
+            a_str = str(a)
+            real_msg += ":" + str (len (a_str)) + ":" + a_str
+        real_msg += ";"
+        sent = 0
+        try:
+            sent = self.sock.send (real_msg)
+        except socket.error, msg:
+            print msg
+
+        if sent == 0:
+            raise RuntimeError ("socket connection broken")
+        self.used = True
+
+    def __getattribute__(self,name):
+
+        try:
+            return object.__getattribute__(self, name)
+        except:
+            self.func_name = name
+            return self.send_dynamic
+
+    def send_dynamic (self, *args):
+        if not self.started:
+            self.start()
+
+        # number of args+func
+        real_msg = str (len (args) + 1) + ":" + str(len(self.func_name)) + ":" + self.func_name
+        for a in args:
+            a_str = str(a)
+            real_msg += ":" + str (len (a_str)) + ":" + a_str
+        real_msg += ";"
+        #print "CLIENT:" + real_msg
+        sent = 0
+        try:
+            sent = self.sock.send (real_msg)
+        except socket.error, msg:
+            print msg
+
+        if sent == 0:
+            raise RuntimeError ("socket connection broken")
+        self.used = True
+
+    def read (self):
+
+		msg = self.sock.recv (4096)
+		if msg == '':
+			raise RuntimeError("socket connection broken")
+		return msg
+
+
+class CpgConfigEvent:
+    def __init__(self, msg):
+        info = msg.split(',')
+        self.group_name = info[0]
+        self.node_id = info[1]
+        self.node = None
+        self.pid = info[2]
+        if "left" in info[3]:
+            self.is_member = False
+        else:
+            self.is_member = True
+    
+    def __str__ (self):
+        
+        str = self.group_name + "," + self.node_id + "," + self.pid + ","
+        if self.is_member:
+            return str + "joined"
+        else:
+            return str + "left"
+
+###################################################################
+class CpgTestAgent(TestAgent):
+
+    def __init__(self, node, Env=None):
+        TestAgent.__init__(self, "cpg_test_agent", node, 9034, env=Env)
+        self.initialized = False
+        self.nodeid = None
+
+    def start(self):
+        TestAgent.start(self)
+        self.send(["cpg_initialize"])
+        self.used = False
+
+    def stop(self):
+        try:
+            self.send(["cpg_finalize"])
+        except RuntimeError, msg:
+            # if agent is down, we are not going to stress
+            print msg
+
+        TestAgent.stop(self)
+
+    def cpg_local_get(self):
+        if self.nodeid == None:
+            self.send (["cpg_local_get"])  
+            self.nodeid = self.read ()
+        return self.nodeid
+
+    def record_config_events(self, truncate=True):
+        if truncate:
+            self.send (["record_config_events", "truncate"])  
+        else:
+            self.send (["record_config_events", "append"])  
+
+    def read_config_event(self):
+        self.send (["read_config_event"])  
+        msg = self.read ()
+
+        if "None" in msg:
+            return None
+        else:
+            return CpgConfigEvent(msg)
+
+    def read_messages(self, atmost):
+        self.send (["read_messages", atmost])  
+        msg = self.read ()
+
+        if "None" in msg:
+            return None
+        else:
+            return msg
+

+ 386 - 0
cts/corotests.py

@@ -0,0 +1,386 @@
+__copyright__='''
+Copyright (c) 2010 Red Hat, Inc.
+'''
+
+# All rights reserved.
+# 
+# Author: Angus Salkeld <asalkeld@redhat.com>
+#
+# This software licensed under BSD license, the text of which follows:
+# 
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+# 
+# - Redistributions of source code must retain the above copyright notice,
+#   this list of conditions and the following disclaimer.
+# - Redistributions in binary form must reproduce the above copyright notice,
+#   this list of conditions and the following disclaimer in the documentation
+#   and/or other materials provided with the distribution.
+# - Neither the name of the MontaVista Software, Inc. nor the names of its
+#   contributors may be used to endorse or promote products derived from this
+#   software without specific prior written permission.
+# 
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+# THE POSSIBILITY OF SUCH DAMAGE.
+
+from cts.CTStests import *
+
+###################################################################
+class CoroTest(CTSTest):
+    '''
+    basic class to make sure that new configuration is applied
+    and old configuration is removed.
+    '''
+    def __init__(self, cm):
+        CTSTest.__init__(self,cm)
+        self.start = StartTest(cm)
+        self.stop = StopTest(cm)
+
+    def setup(self, node):
+        ret = CTSTest.setup(self, node)
+        self.CM.apply_new_config()
+
+        for n in self.CM.Env["nodes"]:
+            if not self.CM.StataCM(n):
+                self.incr("started")
+                self.start(n)
+        return ret
+
+    def teardown(self, node):
+        self.CM.apply_default_config()
+        return CTSTest.teardown(self, node)
+
+
+###################################################################
+class CpgConfigChangeBase(CoroTest):
+    '''
+    join a cpg group on each node, and test that the following 
+    causes a leave event:
+    - a call to cpg_leave()
+    - app exit
+    - node leave
+    - node leave (with large token timeout)
+    '''
+
+    def setup(self, node):
+        ret = CoroTest.setup(self, node)
+
+        self.listener = None
+        self.wobbly = None
+        for n in self.CM.Env["nodes"]:
+            self.CM.agent[n].clean_start()
+            self.CM.agent[n].cpg_join(self.name)
+            if self.listener is None:
+                self.listener = n
+            elif self.wobbly is None:
+                self.wobbly = n
+
+        self.wobbly_id = self.CM.agent[self.wobbly].cpg_local_get()
+        self.CM.agent[self.listener].record_config_events(truncate=True)
+
+        return ret
+
+    def wait_for_config_change(self):
+        found = False
+        max_timeout = 5 * 60
+        waited = 0
+        printit = 0
+        self.CM.log("Waiting for config change on " + self.listener)
+        while not found:
+            event = self.CM.agent[self.listener].read_config_event()
+            if not event == None:
+                self.CM.debug("RECEIVED: " + str(event))
+            if event == None:
+                if waited >= max_timeout:
+                    return self.failure("timedout(" + str(waited) + " sec) == no event!")
+                else:
+                    time.sleep(1)
+                    waited = waited + 1
+                    printit = printit + 1
+                    if printit is 60:
+                        print 'waited 60 seconds'
+                        printit = 0
+                
+            elif str(event.node_id) in str(self.wobbly_id) and not event.is_member:
+                self.CM.log("Got the config change in " + str(waited) + " seconds")
+                found = True
+            else:
+                self.CM.debug("No match")
+                self.CM.debug("wobbly nodeid:" + str(self.wobbly_id))
+                self.CM.debug("event nodeid:" + str(event.node_id))
+                self.CM.debug("event.is_member:" + str(event.is_member))
+
+        if found:
+            return self.success()
+
+
+###################################################################
+class CpgCfgChgOnGroupLeave(CpgConfigChangeBase):
+
+    def __init__(self, cm):
+        CpgConfigChangeBase.__init__(self,cm)
+        self.name="CpgCfgChgOnGroupLeave"
+
+    def failure_action(self):
+        self.CM.log("calling cpg_leave() on " + self.wobbly)
+        self.CM.agent[self.wobbly].cpg_leave(self.name)
+
+    def __call__(self, node):
+        self.incr("calls")
+        self.failure_action()
+        return self.wait_for_config_change()
+
+###################################################################
+class CpgCfgChgOnNodeLeave(CpgConfigChangeBase):
+
+    def __init__(self, cm):
+        CpgConfigChangeBase.__init__(self,cm)
+        self.name="CpgCfgChgOnNodeLeave"
+
+    def failure_action(self):
+        self.CM.log("stopping corosync on " + self.wobbly)
+        self.stop(self.wobbly)
+
+    def __call__(self, node):
+        self.incr("calls")
+        self.failure_action()
+        return self.wait_for_config_change()
+
+###################################################################
+class CpgCfgChgOnExecCrash(CpgConfigChangeBase):
+
+    def __init__(self, cm):
+        CpgConfigChangeBase.__init__(self,cm)
+        self.name="CpgCfgChgOnExecCrash"
+
+    def failure_action(self):
+        self.CM.log("sending SIGSEGV to corosync on " + self.wobbly)
+        self.CM.rsh(self.wobbly, "killall -SIGSEGV corosync")
+        self.CM.rsh(self.wobbly, "rm -f /var/run/corosync.pid")
+
+    def __call__(self, node):
+        self.incr("calls")
+        self.failure_action()
+        return self.wait_for_config_change()
+
+
+###################################################################
+class CpgCfgChgOnNodeLeave_v2(CpgConfigChangeBase):
+
+    def __init__(self, cm):
+        CpgConfigChangeBase.__init__(self,cm)
+        self.name="CpgCfgChgOnNodeLeave_v2"
+       
+    def setup(self, node):
+        self.CM.new_config['compatibility'] = 'none'
+        self.CM.new_config['totem/token'] = 10000
+        return CpgConfigChangeBase.setup(self, node)
+
+    def failure_action(self):
+        self.CM.log("isolating node " + self.wobbly)
+        self.CM.isolate_node(self.wobbly)
+
+    def __call__(self, node):
+        self.incr("calls")
+        self.failure_action()
+        return self.wait_for_config_change()
+
+    def teardown(self, node):
+        self.CM.unisolate_node (self.wobbly)
+        return CpgConfigChangeBase.teardown(self, node)
+
+###################################################################
+class CpgCfgChgOnNodeLeave_v1(CpgConfigChangeBase):
+
+    def __init__(self, cm):
+        CpgConfigChangeBase.__init__(self,cm)
+        self.name="CpgCfgChgOnNodeLeave_v1"
+
+    def setup(self, node):
+        self.CM.new_config['compatibility'] = 'whitetank'
+        self.CM.new_config['totem/token'] = 10000
+        return CpgConfigChangeBase.setup(self, node)
+        
+    def failure_action(self):
+        self.CM.log("isolating node " + self.wobbly)
+        self.CM.isolate_node(self.wobbly)
+
+    def __call__(self, node):
+        self.incr("calls")
+        self.failure_action()
+        return self.wait_for_config_change()
+
+    def teardown(self, node):
+        self.CM.unisolate_node (self.wobbly)
+        return CpgConfigChangeBase.teardown(self, node)
+
+###################################################################
+class CpgMsgOrderBase(CoroTest):
+
+    def __init__(self, cm):
+        CoroTest.__init__(self,cm)
+        self.num_msgs_per_node = 0
+        self.total_num_msgs = 0
+
+    def setup(self, node):
+        ret = CoroTest.setup(self, node)
+
+        for n in self.CM.Env["nodes"]:
+            self.total_num_msgs = self.total_num_msgs + self.num_msgs_per_node
+            self.CM.agent[n].clean_start()
+            self.CM.agent[n].cpg_join(self.name)
+            self.CM.agent[n].record_messages()
+
+        time.sleep(1)
+        return ret
+
+    def cpg_msg_blaster(self):
+        for n in self.CM.Env["nodes"]:
+            self.CM.agent[n].msg_blaster(self.num_msgs_per_node)
+        
+    def wait_and_validate_order(self):
+        msgs = {}
+
+        for n in self.CM.Env["nodes"]:
+            msgs[n] = []
+            got = False
+            stopped = False
+            self.CM.debug( " getting messages from " + n )
+
+            while len(msgs[n]) < self.total_num_msgs and not stopped:
+
+                msg = self.CM.agent[n].read_messages(25)
+                if not msg == None:
+                    got = True
+                    msgl = msg.split(";")
+
+                    # remove empty entries
+                    not_done=True
+                    while not_done:
+                        try:
+                            msgl.remove('')
+                        except:
+                            not_done = False
+
+                    msgs[n].extend(msgl)
+                elif msg == None and got:
+                    self.CM.debug(" done getting messages from " + n)
+                    stopped = True
+
+                if not got:
+                    time.sleep(1)
+
+        fail = False
+        for i in range(0, self.total_num_msgs):
+            first = None
+            for n in self.CM.Env["nodes"]:
+                if first == None:
+                    first = n
+                else:
+                    if not msgs[first][i] == msgs[n][i]:
+                        # message order not the same!
+                        fail = True
+                        self.CM.log(msgs[first][i] + " != " + msgs[n][i])
+                
+        if fail:
+            return self.failure()
+        else:
+            return self.success()
+
+###################################################################
+class CpgMsgOrderBasic(CpgMsgOrderBase):
+    '''
+    each sends & logs 100 messages
+    '''
+    def __init__(self, cm):
+        CpgMsgOrderBase.__init__(self,cm)
+        self.name="CpgMsgOrderBasic"
+
+    def __call__(self, node):
+        self.incr("calls")
+
+        # o > reconfigure corosync
+        # o > reconfigure interfaces (mtu)
+        # o > restart corosync
+        # o > set node to die after x msgs
+        self.num_msgs_per_node = 100
+        self.cpg_msg_blaster()
+        return self.wait_and_validate_order()
+
+
+###################################################################
+class MemLeakObject(CoroTest):
+    '''
+    run mem_leak_test.sh -1
+    '''
+    def __init__(self, cm):
+        CoroTest.__init__(self,cm)
+        self.name="MemLeakObject"
+
+    def __call__(self, node):
+        self.incr("calls")
+
+        mem_leaked = self.CM.rsh(node, "/usr/share/corosync/tests/mem_leak_test.sh -1")
+        if mem_leaked is 0:
+            return self.success()
+        else:
+            return self.failure(str(mem_leaked) + 'kB memory leaked.')
+
+###################################################################
+class MemLeakSession(CoroTest):
+    '''
+    run mem_leak_test.sh -2
+    '''
+    def __init__(self, cm):
+        CoroTest.__init__(self,cm)
+        self.name="MemLeakSession"
+
+    def __call__(self, node):
+        self.incr("calls")
+
+        mem_leaked = self.CM.rsh(node, "/usr/share/corosync/tests/mem_leak_test.sh -2")
+        if mem_leaked is 0:
+            return self.success()
+        else:
+            return self.failure(str(mem_leaked) + 'kB memory leaked.')
+
+
+
+AllTestClasses = []
+AllTestClasses.append(MemLeakObject)
+AllTestClasses.append(MemLeakSession)
+AllTestClasses.append(CpgCfgChgOnGroupLeave)
+AllTestClasses.append(CpgCfgChgOnNodeLeave)
+AllTestClasses.append(CpgCfgChgOnNodeLeave_v1)
+AllTestClasses.append(CpgCfgChgOnNodeLeave_v2)
+AllTestClasses.append(CpgCfgChgOnExecCrash)
+AllTestClasses.append(CpgMsgOrderBasic)
+
+AllTestClasses.append(FlipTest)
+AllTestClasses.append(RestartTest)
+AllTestClasses.append(StartOnebyOne)
+AllTestClasses.append(SimulStart)
+AllTestClasses.append(StopOnebyOne)
+AllTestClasses.append(SimulStop)
+AllTestClasses.append(RestartOnebyOne)
+#AllTestClasses.append(PartialStart)
+
+
+def CoroTestList(cm, audits):
+    result = []
+    for testclass in AllTestClasses:
+        bound_test = testclass(cm)
+        if bound_test.is_applicable():
+            bound_test.Audits = audits
+            result.append(bound_test)
+    return result
+

+ 2 - 2
exec/main.c

@@ -354,11 +354,11 @@ static void confchg_fn (
 	memcpy (&corosync_ring_id, ring_id, sizeof (struct memb_ring_id));
 
 	for (i = 0; i < left_list_entries; i++) {
-		log_printf (LOGSYS_LEVEL_DEBUG,
+		log_printf (LOGSYS_LEVEL_INFO,
 			"Member left: %s\n", api->totem_ifaces_print (left_list[i]));
 	}
 	for (i = 0; i < joined_list_entries; i++) {
-		log_printf (LOGSYS_LEVEL_DEBUG,
+		log_printf (LOGSYS_LEVEL_INFO,
 			"Member joined: %s\n", api->totem_ifaces_print (joined_list[i]));
 	}
 	/*