Просмотр исходного кода

CTS: Add a confdb test agent and python tests.

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@2735 fd59a12c-fef9-0310-b244-a6a79926bd2f
Angus Salkeld 16 лет назад
Родитель
Сommit
8df2e184f9
4 измененных файлов с 133 добавлено и 216 удалено
  1. 8 2
      cts/agents/Makefile.am
  2. 6 189
      cts/agents/cpg_test_agent.c
  3. 38 10
      cts/corosync.py
  4. 81 15
      cts/corotests.py

+ 8 - 2
cts/agents/Makefile.am

@@ -32,7 +32,7 @@
 MAINTAINERCLEANFILES = Makefile.in
 INCLUDES = -I$(top_builddir)/include -I$(top_srcdir)/include
 
-TEST_AGENTS = cpg_test_agent
+TEST_AGENTS = cpg_test_agent confdb_test_agent
 
 if INSTALL_TESTAGENTS
 agentdir = $(datadir)/$(PACKAGE)/tests
@@ -43,8 +43,14 @@ noinst_PROGRAMS = $(TEST_AGENTS)
 noinst_SCRIPTS = mem_leak_test.sh net_breaker.sh
 endif
 
+
+cpg_test_agent_SOURCES = cpg_test_agent.c common_test_agent.c
 cpg_test_agent_LDADD =  -lcpg -lcoroipcc ../../exec/coropoll.o ../../exec/crypto.o
-cpg_test_agent_LDFLAGS =  -L../../lib
+cpg_test_agent_LDFLAGS =  -L../../lib -L.
+
+confdb_test_agent_SOURCES = confdb_test_agent.c common_test_agent.c
+confdb_test_agent_LDADD =  -lconfdb -lcoroipcc ../../exec/coropoll.o
+confdb_test_agent_LDFLAGS =  -L../../lib 
 
 lint:
 	-splint $(LINT_FLAGS) $(CFLAGS) *.c

+ 6 - 189
cts/agents/cpg_test_agent.c

@@ -51,10 +51,9 @@
 #include <corosync/list.h>
 #include <corosync/cpg.h>
 #include "../../exec/crypto.h"
+#include "common_test_agent.h"
 
 
-#define SERVER_PORT "9034"
-
 typedef enum {
 	MSG_OK,
 	MSG_NODEID_ERR,
@@ -79,10 +78,7 @@ typedef struct {
 	struct list_head list;
 } log_entry_t;
 
-#define HOW_BIG_AND_BUF 4096
 static char big_and_buf[HOW_BIG_AND_BUF];
-static char big_and_buf_rx[HOW_BIG_AND_BUF];
-static int32_t parse_debug = 0;
 static int32_t record_config_events_g = 0;
 static int32_t record_messages_g = 0;
 static cpg_handle_t cpg_handle = 0;
@@ -94,7 +90,6 @@ static uint32_t my_nodeid;
 static int32_t my_seq;
 static int32_t my_msgs_to_send;
 static int32_t total_stored_msgs = 0;
-static hdb_handle_t poll_handle;
 
 
 static void send_some_more_messages (void * unused);
@@ -289,7 +284,7 @@ static void send_some_more_messages_later (void)
 	poll_timer_handle timer_handle;
 	cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
 	poll_timer_add (
-		poll_handle,
+		ta_poll_handle_get(),
 		100, NULL,
 		send_some_more_messages,
 		&timer_handle);
@@ -384,7 +379,7 @@ static int cpg_dispatch_wrapper_fn (hdb_handle_t handle,
 	cs_error_t error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
 	if (error == CS_ERR_LIBRARY) {
 		syslog (LOG_ERR, "%s() got LIB error disconnecting from corosync.", __func__);
-		poll_dispatch_delete (poll_handle, cpg_fd);
+		poll_dispatch_delete (ta_poll_handle_get(), cpg_fd);
 		close (cpg_fd);
 		cpg_fd = -1;
 	}
@@ -450,7 +445,7 @@ static void do_command (int sock, char* func, char*args[], int num_args)
 		}
 
 		cpg_fd_get (cpg_handle, &cpg_fd);
-		poll_dispatch_add (poll_handle, cpg_fd, POLLIN|POLLNVAL, NULL, cpg_dispatch_wrapper_fn);
+		poll_dispatch_add (ta_poll_handle_get(), cpg_fd, POLLIN|POLLNVAL, NULL, cpg_dispatch_wrapper_fn);
 
 	} else if (strcmp ("cpg_local_get", func) == 0) {
 		unsigned int local_nodeid;
@@ -463,7 +458,7 @@ static void do_command (int sock, char* func, char*args[], int num_args)
 	} else if (strcmp ("cpg_finalize",func) == 0) {
 
 		cpg_finalize (cpg_handle);
-		poll_dispatch_delete (poll_handle, cpg_fd);
+		poll_dispatch_delete (ta_poll_handle_get(), cpg_fd);
 		cpg_fd = -1;
 
 	} else if (strcmp ("record_config_events",func) == 0) {
@@ -491,192 +486,14 @@ static void do_command (int sock, char* func, char*args[], int num_args)
 	}
 }
 
-static void handle_command (int sock, char* msg)
-{
-	int num_args;
-	char *saveptr = NULL;
-	char *str = strdup (msg);
-	char *str_len;
-	char *str_arg;
-	char *args[5];
-	int i = 0;
-	int a = 0;
-	char* func = NULL;
-
-	if (parse_debug)
-		syslog (LOG_DEBUG,"%s (MSG:%s)\n", __func__, msg);
-
-	str_len = strtok_r (str, ":", &saveptr);
-	assert (str_len);
-
-	num_args = atoi (str_len) * 2;
-	for (i = 0; i < num_args / 2; i++) {
-		str_len = strtok_r (NULL, ":", &saveptr);
-		str_arg = strtok_r (NULL, ":", &saveptr);
-		if (func == NULL) {
-			/* first "arg" is the function */
-			if (parse_debug)
-				syslog (LOG_DEBUG, "(LEN:%s, FUNC:%s)", str_len, str_arg);
-			func = str_arg;
-			a = 0;
-		} else {
-			args[a] = str_arg;
-			a++;
-			if (parse_debug)
-				syslog (LOG_DEBUG, "(LEN:%s, ARG:%s)", str_len, str_arg);
-		}
-	}
-	do_command (sock, func, args, a+1);
-
-	free (str);
-}
-
-static int server_process_data_fn (hdb_handle_t handle,
-	int fd,
-	int revents,
-	void *data)
-{
-	char *saveptr;
-	char *msg;
-	char *cmd;
-	int32_t nbytes;
-
-	if ((nbytes = recv (fd, big_and_buf_rx, sizeof (big_and_buf_rx), 0)) <= 0) {
-		/* got error or connection closed by client */
-		if (nbytes == 0) {
-			/* connection closed */
-			syslog (LOG_WARNING, "socket %d hung up: exiting...\n", fd);
-		} else {
-			syslog (LOG_ERR,"recv() failed: %s", strerror(errno));
-		}
-		close (fd);
-		exit (0);
-	} else {
-		if (my_msgs_to_send > 0)
-			send_some_more_messages (NULL);
-
-		big_and_buf_rx[nbytes] = '\0';
-
-		msg = strtok_r (big_and_buf_rx, ";", &saveptr);
-		assert (msg);
-		while (msg) {
-			cmd = strdup (msg);
-			handle_command (fd, cmd);
-			free (cmd);
-			msg = strtok_r (NULL, ";", &saveptr);
-		}
-	}
-
-	return 0;
-}
-
-static int server_accept_fn (hdb_handle_t handle,
-	int fd,
-	int revents,
-	void *data)
-{
-	socklen_t addrlen;
-	struct sockaddr_in in_addr;
-	int new_fd;
-	int res;
-
-	addrlen = sizeof (struct sockaddr_in);
-
-retry_accept:
-	new_fd = accept (fd, (struct sockaddr *)&in_addr, &addrlen);
-	if (new_fd == -1 && errno == EINTR) {
-		goto retry_accept;
-	}
-
-	if (new_fd == -1) {
-		syslog (LOG_ERR,
-			"Could not accept connection: %s\n", strerror (errno));
-		return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
-	}
-
-	res = fcntl (new_fd, F_SETFL, O_NONBLOCK);
-	if (res == -1) {
-		syslog (LOG_ERR,
-			"Could not set non-blocking operation on connection: %s\n",
-			strerror (errno));
-		close (new_fd);
-		return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
-	}
-
-	poll_dispatch_add (poll_handle, new_fd, POLLIN|POLLNVAL, NULL, server_process_data_fn);
-	return 0;
-}
-
-static int create_server_sockect (void)
-{
-	int listener;
-	int yes = 1;
-	int rv;
-	struct addrinfo hints, *ai, *p;
-
-	/* get a socket and bind it
-	 */
-	memset (&hints, 0, sizeof hints);
-	hints.ai_family = AF_UNSPEC;
-	hints.ai_socktype = SOCK_STREAM;
-	hints.ai_flags = AI_PASSIVE;
-	if ((rv = getaddrinfo (NULL, SERVER_PORT, &hints, &ai)) != 0) {
-		syslog (LOG_ERR, "%s\n", gai_strerror (rv));
-		exit (1);
-	}
-
-	for (p = ai; p != NULL; p = p->ai_next) {
-		listener = socket (p->ai_family, p->ai_socktype, p->ai_protocol);
-		if (listener < 0) {
-			continue;
-		}
-
-		/* lose the pesky "address already in use" error message
-		 */
-		if (setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
-				&yes, sizeof(int)) < 0) {
-			syslog (LOG_ERR, "setsockopt() failed: %s\n", strerror (errno));
-		}
-
-		if (bind (listener, p->ai_addr, p->ai_addrlen) < 0) {
-			syslog (LOG_ERR, "bind() failed: %s\n", strerror (errno));
-			close (listener);
-			continue;
-		}
-
-		break;
-	}
-
-	if (p == NULL) {
-		syslog (LOG_ERR, "failed to bind\n");
-		exit (2);
-	}
-
-	freeaddrinfo (ai);
-
-	if (listen (listener, 10) == -1) {
-		syslog (LOG_ERR, "listen() failed: %s", strerror(errno));
-		exit (3);
-	}
-
-	return listener;
-}
 
 int main (int argc, char *argv[])
 {
-	int listener;
-
 	openlog (NULL, LOG_CONS|LOG_PID, LOG_DAEMON);
 
 	list_init (&msg_log_head);
 	list_init (&config_chg_log_head);
 
-	poll_handle = poll_create ();
-
-	listener = create_server_sockect ();
-	poll_dispatch_add (poll_handle, listener, POLLIN|POLLNVAL, NULL, server_accept_fn);
-
-	poll_run (poll_handle);
-	return -1;
+	return test_agent_run (9034, do_command);
 }
 

+ 38 - 10
cts/corosync.py

@@ -135,7 +135,8 @@ class corosync_flatiron(ClusterManager):
             ),
             "LogFileName"    : Environment["LogFileName"],
             })
-        self.agent={}
+        self.cpg_agent={}
+        self.confdb_agent={}
         self.config = CoroConfig ()
         self.node_to_ip = {}
         
@@ -209,8 +210,10 @@ class corosync_flatiron(ClusterManager):
 
         self.debug('starting corosync on : ' + node)
         ret = ClusterManager.StartaCM(self, node)
-        if self.agent.has_key(node):
-            self.agent[node].restart()
+        if self.cpg_agent.has_key(node):
+            self.cpg_agent[node].restart()
+        if self.confdb_agent.has_key(node):
+            self.confdb_agent[node].restart()
         return ret
 
     def StopaCM(self, node):
@@ -218,8 +221,10 @@ class corosync_flatiron(ClusterManager):
             return 1
 
         self.debug('stoping corosync on : ' + node)
-        if self.agent.has_key(node):
-            self.agent[node].stop()
+        if self.cpg_agent.has_key(node):
+            self.cpg_agent[node].stop()
+        if self.confdb_agent.has_key(node):
+            self.confdb_agent[node].stop()
         return ClusterManager.StopaCM(self, node)
 
     def test_node_CM(self, node):
@@ -318,15 +323,18 @@ class TestAgentComponent(ScenarioComponent):
             if not CM.StataCM(node):
                 raise RuntimeError ("corosync not up")
 
-            self.CM.agent[node] = CpgTestAgent(node, CM.Env)
-            self.CM.agent[node].start()
+            self.CM.cpg_agent[node] = CpgTestAgent(node, CM.Env)
+            self.CM.cpg_agent[node].start()
+            self.CM.confdb_agent[node] = ConfdbTestAgent(node, CM.Env)
+            self.CM.confdb_agent[node].start()
         return 1
 
     def TearDown(self, CM):
         '''Tear down (undo) the given ScenarioComponent'''
         self.CM = CM
         for node in self.Env["nodes"]:
-            self.CM.agent[node].stop()
+            self.CM.cpg_agent[node].stop()
+            self.CM.confdb_agent[node].stop()
 
 ###################################################################
 class TestAgent(object):
@@ -342,6 +350,7 @@ class TestAgent(object):
         self.func_name = None
         self.used = False
         self.env = env
+        self.send_recv = False
 
     def restart(self):
         self.stop()
@@ -416,7 +425,14 @@ class TestAgent(object):
             return object.__getattribute__(self, name)
         except:
             self.func_name = name
-            return self.send_dynamic
+            if self.send_recv:
+                return self.send_recv_dynamic
+            else:
+                return self.send_dynamic
+
+    def send_recv_dynamic (self, *args):
+        self.send_dynamic (args)
+        return self.read()
 
     def send_dynamic (self, *args):
         if not self.started:
@@ -484,7 +500,7 @@ class CpgTestAgent(TestAgent):
         try:
             self.send(["cpg_finalize"])
         except RuntimeError, msg:
-            # if agent is down, we are not going to stress
+            # if cpg_agent is down, we are not going to stress
             print msg
 
         TestAgent.stop(self)
@@ -519,3 +535,15 @@ class CpgTestAgent(TestAgent):
         else:
             return msg
 
+###################################################################
+class ConfdbTestAgent(TestAgent):
+
+    def __init__(self, node, Env=None):
+        TestAgent.__init__(self, "confdb_test_agent", node, 9035, env=Env)
+        self.initialized = False
+        self.nodeid = None
+        self.send_recv = True
+
+    def cpg_local_get(self):
+        return 1
+

+ 81 - 15
cts/corotests.py

@@ -97,15 +97,15 @@ class CpgConfigChangeBase(CoroTest):
         self.listener = None
         self.wobbly = None
         for n in self.CM.Env["nodes"]:
-            self.CM.agent[n].clean_start()
-            self.CM.agent[n].cpg_join(self.name)
+            self.CM.cpg_agent[n].clean_start()
+            self.CM.cpg_agent[n].cpg_join(self.name)
             if self.listener is None:
                 self.listener = n
             elif self.wobbly is None:
                 self.wobbly = n
 
-        self.wobbly_id = self.CM.agent[self.wobbly].cpg_local_get()
-        self.CM.agent[self.listener].record_config_events(truncate=True)
+        self.wobbly_id = self.CM.cpg_agent[self.wobbly].cpg_local_get()
+        self.CM.cpg_agent[self.listener].record_config_events(truncate=True)
 
         return ret
 
@@ -117,9 +117,9 @@ class CpgConfigChangeBase(CoroTest):
         self.CM.log("Waiting for config change on " + self.listener)
         while not found:
             try:
-                event = self.CM.agent[self.listener].read_config_event()
+                event = self.CM.cpg_agent[self.listener].read_config_event()
             except:
-                return self.failure('connection to test agent failed.')
+                return self.failure('connection to test cpg_agent failed.')
             if not event == None:
                 self.CM.debug("RECEIVED: " + str(event))
             if event == None:
@@ -155,7 +155,7 @@ class CpgCfgChgOnGroupLeave(CpgConfigChangeBase):
 
     def failure_action(self):
         self.CM.log("calling cpg_leave() on " + self.wobbly)
-        self.CM.agent[self.wobbly].cpg_leave(self.name)
+        self.CM.cpg_agent[self.wobbly].cpg_leave(self.name)
 
     def __call__(self, node):
         self.incr("calls")
@@ -230,16 +230,16 @@ class CpgMsgOrderBase(CoroTest):
 
         for n in self.CM.Env["nodes"]:
             self.total_num_msgs = self.total_num_msgs + self.num_msgs_per_node
-            self.CM.agent[n].clean_start()
-            self.CM.agent[n].cpg_join(self.name)
-            self.CM.agent[n].record_messages()
+            self.CM.cpg_agent[n].clean_start()
+            self.CM.cpg_agent[n].cpg_join(self.name)
+            self.CM.cpg_agent[n].record_messages()
 
         time.sleep(1)
         return ret
 
     def cpg_msg_blaster(self):
         for n in self.CM.Env["nodes"]:
-            self.CM.agent[n].msg_blaster(self.num_msgs_per_node)
+            self.CM.cpg_agent[n].msg_blaster(self.num_msgs_per_node)
         
     def wait_and_validate_order(self):
         msgs = {}
@@ -251,7 +251,7 @@ class CpgMsgOrderBase(CoroTest):
 
             while len(msgs[n]) < self.total_num_msgs and waited < 60:
 
-                msg = self.CM.agent[n].read_messages(25)
+                msg = self.CM.cpg_agent[n].read_messages(25)
                 if not msg == None:
                     msgl = msg.split(";")
 
@@ -428,6 +428,67 @@ class ServiceLoadTest(CoroTest):
 
         return self.success()
 
+
+###################################################################
+class ConfdbReplaceTest(CoroTest):
+    def __init__(self, cm):
+        CoroTest.__init__(self, cm)
+        self.name="ConfdbReplaceTest"
+
+    def __call__(self, node):
+        self.incr("calls")
+        res = self.CM.confdb_agent[node].set_get_test()
+        if 'OK' in res:
+            return self.success()
+        else:
+            return self.failure('set_get_test failed')
+
+
+###################################################################
+class ConfdbIncrementTest(CoroTest):
+    def __init__(self, cm):
+        CoroTest.__init__(self, cm)
+        self.name="ConfdbIncrementTest"
+
+    def __call__(self, node):
+        self.incr("calls")
+        res = self.CM.confdb_agent[node].increment_decrement_test()
+        if 'OK' in res:
+            return self.success()
+        else:
+            return self.failure('increment_decrement_test failed')
+
+
+###################################################################
+class ConfdbObjectFindTest(CoroTest):
+    def __init__(self, cm):
+        CoroTest.__init__(self, cm)
+        self.name="ConfdbObjectFindTest"
+
+    def __call__(self, node):
+        self.incr("calls")
+        res = self.CM.confdb_agent[node].object_find_test()
+        if 'OK' in res:
+            return self.success()
+        else:
+            return self.failure('object_find_test failed')
+
+
+###################################################################
+class ConfdbNotificationTest(CoroTest):
+    def __init__(self, cm):
+        CoroTest.__init__(self, cm)
+        self.name="ConfdbNotificationTest"
+
+    def __call__(self, node):
+        self.incr("calls")
+        res = self.CM.confdb_agent[node].notification_test()
+        if 'OK' in res:
+            return self.success()
+        else:
+            return self.failure('notification_test failed')
+
+
 GenTestClasses = []
 GenTestClasses.append(CpgMsgOrderBasic)
 GenTestClasses.append(CpgCfgChgOnExecCrash)
@@ -436,6 +497,11 @@ GenTestClasses.append(CpgCfgChgOnNodeLeave)
 GenTestClasses.append(CpgCfgChgOnNodeIsolate)
 
 AllTestClasses = []
+AllTestClasses.append(ConfdbReplaceTest)
+AllTestClasses.append(ConfdbIncrementTest)
+AllTestClasses.append(ConfdbObjectFindTest)
+AllTestClasses.append(ConfdbNotificationTest)
+
 AllTestClasses.append(ServiceLoadTest)
 AllTestClasses.append(MemLeakObject)
 AllTestClasses.append(MemLeakSession)
@@ -488,9 +554,9 @@ def CoroTestList(cm, audits):
     configs.append(e)
 
     #quorum/provider=
-    f = {}
-    f['quorum/provider'] = 'corosync_quorum_ykd'
-    configs.append(f)
+    #f = {}
+    #f['quorum/provider'] = 'corosync_quorum_ykd'
+    #configs.append(f)
 
     num=1
     for cfg in configs: