| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619 |
- '''CTS: Cluster Testing System: corosync...
- '''
- __copyright__='''
- Copyright (c) 2010 Red Hat, Inc.
- '''
- # All rights reserved.
- #
- # Author: Angus Salkeld <asalkeld@redhat.com>
- #
- # This software licensed under BSD license, the text of which follows:
- #
- # Redistribution and use in source and binary forms, with or without
- # modification, are permitted provided that the following conditions are met:
- #
- # - Redistributions of source code must retain the above copyright notice,
- # this list of conditions and the following disclaimer.
- # - Redistributions in binary form must reproduce the above copyright notice,
- # this list of conditions and the following disclaimer in the documentation
- # and/or other materials provided with the distribution.
- # - Neither the name of the MontaVista Software, Inc. nor the names of its
- # contributors may be used to endorse or promote products derived from this
- # software without specific prior written permission.
- #
- # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
- # THE POSSIBILITY OF SUCH DAMAGE.
- import os
- import sys
- import time
- import socket
- import shutil
- import string
- import augeas
- from cts.CTS import ClusterManager
- from cts.CTSscenarios import ScenarioComponent
- from cts.CTS import RemoteExec
- from cts.CTSvars import CTSvars
- ###################################################################
- class CoroConfig(object):
- def __init__(self, corobase=None):
- self.base = "/files/etc/corosync/corosync.conf/"
- self.new_root = "/tmp/aug-root/"
- if corobase == None:
- self.corobase = os.getcwd() + "/.."
- else:
- self.corobase = corobase
- example = self.corobase + "/conf/corosync.conf.example"
- if os.path.isdir(self.new_root):
- shutil.rmtree (self.new_root)
- os.makedirs (self.new_root + "/etc/corosync")
- shutil.copy (example, self.new_root + "/etc/corosync/corosync.conf")
- self.aug = augeas.Augeas (root=self.new_root,
- loadpath=self.corobase + "/conf/lenses")
- self.original = {}
- # store the original values (of totem), so we can restore them in
- # apply_default_config()
- totem = self.aug.match('/files/etc/corosync/corosync.conf/totem/*')
- for c in totem:
- # /files/etc/corosync/corosync.conf/
- short_name = c[len(self.base):]
- self.original[short_name] = self.aug.get(c)
- interface = self.aug.match('/files/etc/corosync/corosync.conf/totem/interface/*')
- for c in interface:
- short_name = c[len(self.base):]
- self.original[short_name] = self.aug.get(c)
- def get (self, name):
- return self.aug.get (self.base + name)
- def set (self, name, value):
- token = self.aug.set (self.base + name, str(value))
- def save (self):
- self.aug.save()
- def get_filename(self):
- return self.new_root + "/etc/corosync/corosync.conf"
- ###################################################################
- class corosync_flatiron(ClusterManager):
- '''
- bla
- '''
- def __init__(self, Environment, randseed=None):
- ClusterManager.__init__(self, Environment, randseed)
- self.update({
- "Name" : "corosync(flatiron)",
- "StartCmd" : CTSvars.INITDIR+"/corosync start",
- "StopCmd" : CTSvars.INITDIR+"/corosync stop",
- "RereadCmd" : CTSvars.INITDIR+"/corosync reload",
- "StatusCmd" : CTSvars.INITDIR+"/corosync status %s",
- "DeadTime" : 30,
- "StartTime" : 15, # Max time to start up
- "StableTime" : 10,
- "BreakCommCmd" : "/usr/share/corosync/tests/net_breaker.sh BreakCommCmd %s",
- "FixCommCmd" : "/usr/share/corosync/tests/net_breaker.sh FixCommCmd %s",
- "Pat:We_stopped" : "%s.*Corosync Cluster Engine exiting with status.*",
- "Pat:They_stopped" : "%s.*Member left:.*%s.*",
- "Pat:They_dead" : "corosync:.*Node %s is now: lost",
- "Pat:Local_starting" : "%s.*started and ready to provide service.",
- "Pat:Local_started" : "%s.*started and ready to provide service.",
- "Pat:Master_started" : "%s.*Completed service synchronization, ready to provide service.",
- "Pat:Slave_started" : "%s.*Completed service synchronization, ready to provide service.",
- "Pat:ChildKilled" : "%s corosync.*Child process %s terminated with signal 9",
- "Pat:ChildRespawn" : "%s corosync.*Respawning failed child process: %s",
- "Pat:ChildExit" : "Child process .* exited",
- "Pat:DC_IDLE" : ".*A processor joined or left the membership and a new membership was formed.",
- # Bad news Regexes. Should never occur.
- "BadRegexes" : (
- r"ERROR:",
- r"CRIT:",
- r"Shutting down\.",
- r"Forcing shutdown\.",
- r"core dump",
- r"Could not bind AF_UNIX",
- ),
- "LogFileName" : Environment["LogFileName"],
- })
- self.start_cpg = True
- self.cpg_agent = {}
- self.confdb_agent = {}
- self.sam_agent = {}
- self.votequorum_agent = {}
- self.config = CoroConfig ()
- self.node_to_ip = {}
-
- self.new_config = {}
- self.new_config['service[1]/name'] = 'corosync_tst_sv2'
- self.new_config['service[1]/ver'] = '0'
- self.applied_config = {}
- for n in self.Env["nodes"]:
- ip = socket.gethostbyname(n)
- ips = ip.split('.')
- ips[3] = '0'
- ip_mask = '.'.join(ips)
- self.new_config['totem/interface/bindnetaddr'] = str(ip_mask)
- return
- def apply_default_config(self):
- for c in self.applied_config:
- if 'bindnetaddr' in c:
- continue
- elif not self.config.original.has_key(c):
- # new config option (non default)
- pass
- elif self.applied_config[c] is not self.config.original[c]:
- # reset to the original
- self.new_config[c] = self.config.original[c]
- if len(self.new_config) > 0:
- self.debug('applying default config')
- self.stopall()
- def apply_new_config(self):
- if len(self.new_config) > 0:
- self.debug('applying new config')
- self.stopall()
- self.startall()
- def install_all_config(self):
- tmp1 = {}
- for c in self.new_config:
- self.log('configuring: ' + c + ' = '+ str(self.new_config[c]))
- self.config.set (c, self.new_config[c])
- self.applied_config[c] = self.new_config[c]
- tmp1[c] = self.new_config[c]
- for c in tmp1:
- del self.new_config[c]
- self.config.save()
- src_file = self.config.get_filename()
- for node in self.Env["nodes"]:
- self.rsh.cp(src_file, "%s:%s" % (node, "/etc/corosync/"))
- def install_config(self, node):
- # install gets new_config and installs it, then moves the
- # config to applied_config
- if len(self.new_config) > 0:
- self.install_all_config()
- def key_for_node(self, node):
- if not self.node_to_ip.has_key(node):
- self.node_to_ip[node] = socket.gethostbyname (node)
- return self.node_to_ip[node]
- def StartaCM(self, node):
- if not self.ShouldBeStatus.has_key(node):
- self.ShouldBeStatus[node] = "down"
- if self.ShouldBeStatus[node] != "down":
- return 1
- self.debug('starting corosync on : ' + node)
- ret = ClusterManager.StartaCM(self, node)
- if self.start_cpg:
- if self.cpg_agent.has_key(node):
- self.cpg_agent[node].restart()
- else:
- self.cpg_agent[node] = CpgTestAgent(node, self.Env)
- self.cpg_agent[node].start()
- if self.confdb_agent.has_key(node):
- self.confdb_agent[node].restart()
- if self.sam_agent.has_key(node):
- self.sam_agent[node].restart()
- # votequorum agent started as needed.
- if self.applied_config.has_key('quorum/provider'):
- if self.votequorum_agent.has_key(node):
- self.votequorum_agent[node].restart()
- else:
- self.votequorum_agent[node] = VoteQuorumTestAgent(node, self.Env)
- self.votequorum_agent[node].start()
- return ret
- def StopaCM(self, node):
- if self.ShouldBeStatus[node] != "up":
- return 1
- self.debug('stoping corosync on : ' + node)
- if self.cpg_agent.has_key(node):
- self.cpg_agent[node].stop()
- if self.sam_agent.has_key(node):
- self.sam_agent[node].stop()
- if self.votequorum_agent.has_key(node):
- self.votequorum_agent[node].stop()
- return ClusterManager.StopaCM(self, node)
- def test_node_CM(self, node):
- # 2 - up and stable
- # 1 - unstable
- # 0 - down
- out = self.rsh(node, self["StatusCmd"], 1)
- is_stopped = string.find(out, 'stopped')
- is_dead = string.find(out, 'dead')
- ret = (is_dead is -1 and is_stopped is -1)
- try:
- if ret:
- ret = 2
- if self.ShouldBeStatus[node] == "down":
- self.log(
- "Node status for %s is %s but we think it should be %s"
- % (node, "up", self.ShouldBeStatus[node]))
- else:
- if self.ShouldBeStatus[node] == "up":
- self.log(
- "Node status for %s is %s but we think it should be %s"
- % (node, "down", self.ShouldBeStatus[node]))
- except KeyError: pass
- if ret: self.ShouldBeStatus[node] = "up"
- else: self.ShouldBeStatus[node] = "down"
- return ret
- def StataCM(self, node):
- '''Report the status of corosync on a given node'''
- if self.test_node_CM(node) > 0:
- return 1
- else:
- return None
- def RereadCM(self, node):
- self.log('reloading corosync on : ' + node)
- return ClusterManager.RereadCM(self, node)
- def find_partitions(self):
- ccm_partitions = []
- return ccm_partitions
- def prepare(self):
- '''Finish the Initialization process. Prepare to test...'''
- self.partitions_expected = 1
- for node in self.Env["nodes"]:
- self.ShouldBeStatus[node] = ""
- self.unisolate_node(node)
- self.StataCM(node)
- def HasQuorum(self, node_list):
- # If we are auditing a partition, then one side will
- # have quorum and the other not.
- # So the caller needs to tell us which we are checking
- # If no value for node_list is specified... assume all nodes
- if not node_list:
- node_list = self.Env["nodes"]
- for node in node_list:
- if self.ShouldBeStatus[node] == "up":
- quorum = self.rsh(node, self["QuorumCmd"], 1)
- if string.find(quorum, "1") != -1:
- return 1
- elif string.find(quorum, "0") != -1:
- return 0
- else:
- self.log("WARN: Unexpected quorum test result from "+ node +":"+ quorum)
- return 0
- def Components(self):
- return None
- ###################################################################
- class TestAgentComponent(ScenarioComponent):
- def __init__(self, Env):
- self.Env = Env
- def IsApplicable(self):
- '''Return TRUE if the current ScenarioComponent is applicable
- in the given LabEnvironment given to the constructor.
- '''
- return True
- def SetUp(self, CM):
- '''Set up the given ScenarioComponent'''
- self.CM = CM
-
- for node in self.Env["nodes"]:
- if not CM.StataCM(node):
- raise RuntimeError ("corosync not up")
- if self.CM.start_cpg:
- self.CM.cpg_agent[node] = CpgTestAgent(node, CM.Env)
- self.CM.cpg_agent[node].start()
- self.CM.confdb_agent[node] = ConfdbTestAgent(node, CM.Env)
- self.CM.confdb_agent[node].start()
- self.CM.sam_agent[node] = SamTestAgent(node, CM.Env)
- self.CM.sam_agent[node].start()
- # votequorum agent started as needed.
- if CM.applied_config.has_key('quorum/provider'):
- self.CM.votequorum_agent[node] = VoteQuorumTestAgent(node, CM.Env)
- self.CM.votequorum_agent[node].start()
- return 1
- def TearDown(self, CM):
- '''Tear down (undo) the given ScenarioComponent'''
- self.CM = CM
- for node in self.Env["nodes"]:
- if self.CM.cpg_agent.has_key(node):
- self.CM.cpg_agent[node].stop()
- self.CM.confdb_agent[node].stop()
- self.CM.sam_agent[node].stop()
- if self.CM.votequorum_agent.has_key(node):
- self.CM.votequorum_agent[node].stop()
- ###################################################################
- class TestAgent(object):
- def __init__(self, binary, node, port, env=None):
- self.node = node
- self.node_address = None
- self.port = port
- self.sock = None
- self.binary = binary
- self.started = False
- self.rsh = RemoteExec(Env=env)
- self.func_name = None
- self.used = False
- self.env = env
- self.send_recv = False
- def restart(self):
- self.stop()
- self.start()
- def clean_start(self):
- if self.used or not self.status():
- self.env.debug('test agent: cleaning %s on node %s' % (self.binary, self.node))
- self.stop()
- self.start()
- def status(self):
- if not self.started:
- return False
- try:
- self.send (["are_you_ok_dude"])
- self.read ()
- self.started = True
- return True
- except RuntimeError, msg:
- self.started = False
- return False
-
- def start(self):
- '''Set up the given ScenarioComponent'''
- self.env.debug('test agent: starting %s on node %s' % (self.binary, self.node))
- self.sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
- ip = socket.gethostbyname(self.node)
- self.rsh(self.node, self.binary, blocking=0)
- is_connected = False
- retries = 0
- while not is_connected:
- try:
- retries = retries + 1
- self.sock.connect ((ip, self.port))
- is_connected = True
- except socket.error, msg:
- if retries > 5:
- self.env.log("Retried " + str(retries) + " times. Error: " + str(msg))
- time.sleep(1)
- self.started = True
- self.used = False
- def stop(self):
- '''Tear down (undo) the given ScenarioComponent'''
- self.env.debug('test agent: stopping %s on node %s' % (self.binary, self.node))
- self.sock.close ()
- self.rsh(self.node, "killall " + self.binary + " 2>/dev/null")
- self.started = False
- def send (self, args):
- if not self.started:
- self.start()
- real_msg = str (len (args))
- for a in args:
- a_str = str(a)
- real_msg += ":" + str (len (a_str)) + ":" + a_str
- real_msg += ";"
- sent = 0
- try:
- sent = self.sock.send (real_msg)
- except socket.error, msg:
- self.env.debug("send(%s): %s; error: %s" % (self.node, real_msg, msg))
- if sent == 0:
- raise RuntimeError ("socket connection broken")
- self.used = True
- def __getattribute__(self,name):
- try:
- return object.__getattribute__(self, name)
- except:
- self.func_name = name
- if self.send_recv:
- return self.send_recv_dynamic
- else:
- return self.send_dynamic
- def send_recv_dynamic (self, *args):
- self.send_dynamic (args)
- try:
- res = self.read ()
- except RuntimeError, msg:
- self.env.log("send_recv_dynamic: %s; error: %s" % (str(real_msg), msg))
- return res
- def send_dynamic (self, *args):
- if not self.started:
- self.start()
- # number of args+func
- real_msg = str (len (args) + 1) + ":" + str(len(self.func_name)) + ":" + self.func_name
- for a in args:
- a_str = str(a)
- real_msg += ":" + str (len (a_str)) + ":" + a_str
- real_msg += ";"
- sent = 0
- try:
- sent = self.sock.send (real_msg)
- except socket.error, msg:
- self.env.debug("send_dynamic(%s): %s; error: %s" % (self.node, real_msg, msg))
- if sent == 0:
- raise RuntimeError ("socket connection broken")
- self.used = True
- def read (self):
- msg = self.sock.recv (4096)
- if msg == '':
- raise RuntimeError("socket connection broken")
- return msg
- class CpgConfigEvent:
- def __init__(self, msg):
- info = msg.split(',')
- self.group_name = info[0]
- self.node_id = info[1]
- self.node = None
- self.pid = info[2]
- if "left" in info[3]:
- self.is_member = False
- else:
- self.is_member = True
-
- def __str__ (self):
-
- str = self.group_name + "," + self.node_id + "," + self.pid + ","
- if self.is_member:
- return str + "joined"
- else:
- return str + "left"
- ###################################################################
- class CpgTestAgent(TestAgent):
- def __init__(self, node, Env=None):
- TestAgent.__init__(self, "cpg_test_agent", node, 9034, env=Env)
- self.initialized = False
- self.nodeid = None
- def start(self):
- if not self.started:
- TestAgent.start(self)
- self.cpg_initialize()
- self.used = False
- def stop(self):
- try:
- if self.started:
- self.cpg_finalize()
- except RuntimeError, msg:
- # if cpg_agent is down, we are not going to stress
- self.env.debug("CpgTestAgent::cpg_finalize() - %s" % msg)
- TestAgent.stop(self)
- def cpg_local_get(self):
- if self.nodeid == None:
- self.send (["cpg_local_get"])
- self.nodeid = self.read ()
- return self.nodeid
- def record_config_events(self, truncate=True):
- if truncate:
- self.send (["record_config_events", "truncate"])
- else:
- self.send (["record_config_events", "append"])
- return self.read ()
- def read_config_event(self):
- self.send (["read_config_event"])
- msg = self.read ()
- if "None" in msg:
- return None
- else:
- return CpgConfigEvent(msg)
- def read_messages(self, atmost):
- self.send (["read_messages", atmost])
- msg = self.read ()
- if "None" in msg:
- return None
- else:
- return msg
- def context_test(self):
- self.send (["context_test"])
- return self.read ()
- ###################################################################
- class ConfdbTestAgent(TestAgent):
- def __init__(self, node, Env=None):
- TestAgent.__init__(self, "confdb_test_agent", node, 9035, env=Env)
- self.initialized = False
- self.nodeid = None
- self.send_recv = True
- ###################################################################
- class SamTestAgent(TestAgent):
- def __init__(self, node, Env=None):
- TestAgent.__init__(self, "sam_test_agent", node, 9036, env=Env)
- self.initialized = False
- self.nodeid = None
- self.send_recv = True
- ###################################################################
- class VoteQuorumTestAgent(TestAgent):
- def __init__(self, node, Env=None):
- TestAgent.__init__(self, "votequorum_test_agent", node, 9037, env=Env)
- self.initialized = False
- self.nodeid = None
- self.send_recv = True
- def start(self):
- if not self.started:
- TestAgent.start(self)
- self.init()
- self.used = False
|