corosync.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630
  1. '''CTS: Cluster Testing System: corosync...
  2. '''
  3. __copyright__='''
  4. Copyright (c) 2010 Red Hat, Inc.
  5. '''
  6. # All rights reserved.
  7. #
  8. # Author: Angus Salkeld <asalkeld@redhat.com>
  9. #
  10. # This software licensed under BSD license, the text of which follows:
  11. #
  12. # Redistribution and use in source and binary forms, with or without
  13. # modification, are permitted provided that the following conditions are met:
  14. #
  15. # - Redistributions of source code must retain the above copyright notice,
  16. # this list of conditions and the following disclaimer.
  17. # - Redistributions in binary form must reproduce the above copyright notice,
  18. # this list of conditions and the following disclaimer in the documentation
  19. # and/or other materials provided with the distribution.
  20. # - Neither the name of the MontaVista Software, Inc. nor the names of its
  21. # contributors may be used to endorse or promote products derived from this
  22. # software without specific prior written permission.
  23. #
  24. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25. # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27. # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28. # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29. # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30. # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31. # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32. # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33. # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  34. # THE POSSIBILITY OF SUCH DAMAGE.
  35. import os
  36. import sys
  37. import time
  38. import socket
  39. import shutil
  40. import string
  41. import augeas
  42. from cts.CTS import ClusterManager
  43. from cts.CTSscenarios import ScenarioComponent
  44. from cts.CTS import RemoteExec
  45. from cts.CTSvars import CTSvars
  46. ###################################################################
  47. class CoroConfig(object):
  48. def __init__(self, corobase=None):
  49. self.base = "/files/etc/corosync/corosync.conf/"
  50. self.new_root = "/tmp/aug-root/"
  51. if corobase == None:
  52. self.corobase = os.getcwd() + "/.."
  53. else:
  54. self.corobase = corobase
  55. example = self.corobase + "/conf/corosync.conf.example"
  56. if os.path.isdir(self.new_root):
  57. shutil.rmtree (self.new_root)
  58. os.makedirs (self.new_root + "/etc/corosync")
  59. shutil.copy (example, self.new_root + "/etc/corosync/corosync.conf")
  60. self.aug = augeas.Augeas (root=self.new_root,
  61. loadpath=self.corobase + "/conf/lenses")
  62. self.original = {}
  63. # store the original values (of totem), so we can restore them in
  64. # apply_default_config()
  65. totem = self.aug.match('/files/etc/corosync/corosync.conf/totem/*')
  66. for c in totem:
  67. # /files/etc/corosync/corosync.conf/
  68. short_name = c[len(self.base):]
  69. self.original[short_name] = self.aug.get(c)
  70. interface = self.aug.match('/files/etc/corosync/corosync.conf/totem/interface/*')
  71. for c in interface:
  72. short_name = c[len(self.base):]
  73. self.original[short_name] = self.aug.get(c)
  74. def get (self, name):
  75. return self.aug.get (self.base + name)
  76. def set (self, name, value):
  77. token = self.aug.set (self.base + name, str(value))
  78. def save (self):
  79. self.aug.save()
  80. def get_filename(self):
  81. return self.new_root + "/etc/corosync/corosync.conf"
  82. ###################################################################
  83. class corosync_flatiron(ClusterManager):
  84. '''
  85. bla
  86. '''
  87. def __init__(self, Environment, randseed=None):
  88. ClusterManager.__init__(self, Environment, randseed)
  89. self.update({
  90. "Name" : "corosync(flatiron)",
  91. "StartCmd" : CTSvars.INITDIR+"/corosync start",
  92. "StopCmd" : CTSvars.INITDIR+"/corosync stop",
  93. "RereadCmd" : CTSvars.INITDIR+"/corosync reload",
  94. "StatusCmd" : CTSvars.INITDIR+"/corosync status %s",
  95. "DeadTime" : 30,
  96. "StartTime" : 15, # Max time to start up
  97. "StableTime" : 10,
  98. "BreakCommCmd" : "/usr/share/corosync/tests/net_breaker.sh BreakCommCmd %s",
  99. "FixCommCmd" : "/usr/share/corosync/tests/net_breaker.sh FixCommCmd %s",
  100. "Pat:We_stopped" : "%s.*Corosync Cluster Engine exiting with status.*",
  101. "Pat:They_stopped" : "%s.*Member left:.*%s.*",
  102. "Pat:They_dead" : "corosync:.*Node %s is now: lost",
  103. "Pat:Local_starting" : "%s.*started and ready to provide service.",
  104. "Pat:Local_started" : "%s.*started and ready to provide service.",
  105. "Pat:Master_started" : "%s.*Completed service synchronization, ready to provide service.",
  106. "Pat:Slave_started" : "%s.*Completed service synchronization, ready to provide service.",
  107. "Pat:ChildKilled" : "%s corosync.*Child process %s terminated with signal 9",
  108. "Pat:ChildRespawn" : "%s corosync.*Respawning failed child process: %s",
  109. "Pat:ChildExit" : "Child process .* exited",
  110. "Pat:DC_IDLE" : ".*A processor joined or left the membership and a new membership was formed.",
  111. # Bad news Regexes. Should never occur.
  112. "BadRegexes" : (
  113. r"ERROR:",
  114. r"CRIT:",
  115. r"Shutting down\.",
  116. r"Forcing shutdown\.",
  117. r"core dump",
  118. r"Could not bind AF_UNIX",
  119. ),
  120. "LogFileName" : Environment["LogFileName"],
  121. })
  122. self.start_cpg = True
  123. self.cpg_agent = {}
  124. self.confdb_agent = {}
  125. self.sam_agent = {}
  126. self.votequorum_agent = {}
  127. self.config = CoroConfig ()
  128. self.node_to_ip = {}
  129. self.new_config = {}
  130. self.new_config['service[1]/name'] = 'corosync_tst_sv2'
  131. self.new_config['service[1]/ver'] = '0'
  132. self.applied_config = {}
  133. for n in self.Env["nodes"]:
  134. ip = socket.gethostbyname(n)
  135. ips = ip.split('.')
  136. ips[3] = '0'
  137. ip_mask = '.'.join(ips)
  138. self.new_config['totem/interface/bindnetaddr'] = str(ip_mask)
  139. return
  140. def apply_default_config(self):
  141. for c in self.applied_config:
  142. if 'bindnetaddr' in c:
  143. continue
  144. elif not self.config.original.has_key(c):
  145. # new config option (non default)
  146. pass
  147. elif self.applied_config[c] is not self.config.original[c]:
  148. # reset to the original
  149. self.new_config[c] = self.config.original[c]
  150. if len(self.new_config) > 0:
  151. self.debug('applying default config')
  152. self.stopall()
  153. def apply_new_config(self):
  154. if len(self.new_config) > 0:
  155. self.debug('applying new config')
  156. self.stopall()
  157. self.startall()
  158. def install_all_config(self):
  159. tmp1 = {}
  160. for c in self.new_config:
  161. self.log('configuring: ' + c + ' = '+ str(self.new_config[c]))
  162. self.config.set (c, self.new_config[c])
  163. self.applied_config[c] = self.new_config[c]
  164. tmp1[c] = self.new_config[c]
  165. for c in tmp1:
  166. del self.new_config[c]
  167. self.config.save()
  168. src_file = self.config.get_filename()
  169. for node in self.Env["nodes"]:
  170. self.rsh.cp(src_file, "%s:%s" % (node, "/etc/corosync/"))
  171. def install_config(self, node):
  172. # install gets new_config and installs it, then moves the
  173. # config to applied_config
  174. if len(self.new_config) > 0:
  175. self.install_all_config()
  176. def key_for_node(self, node):
  177. if not self.node_to_ip.has_key(node):
  178. self.node_to_ip[node] = socket.gethostbyname (node)
  179. return self.node_to_ip[node]
  180. def StartaCM(self, node):
  181. if not self.ShouldBeStatus.has_key(node):
  182. self.ShouldBeStatus[node] = "down"
  183. if self.ShouldBeStatus[node] != "down":
  184. return 1
  185. self.debug('starting corosync on : ' + node)
  186. ret = ClusterManager.StartaCM(self, node)
  187. if self.start_cpg:
  188. if self.cpg_agent.has_key(node):
  189. self.cpg_agent[node].restart()
  190. else:
  191. self.cpg_agent[node] = CpgTestAgent(node, self.Env)
  192. self.cpg_agent[node].start()
  193. if self.confdb_agent.has_key(node):
  194. self.confdb_agent[node].restart()
  195. if self.sam_agent.has_key(node):
  196. self.sam_agent[node].restart()
  197. # votequorum agent started as needed.
  198. if self.applied_config.has_key('quorum/provider'):
  199. if self.applied_config['quorum/provider'] is 'corosync_votequorum':
  200. if self.votequorum_agent.has_key(node):
  201. self.votequorum_agent[node].restart()
  202. else:
  203. self.votequorum_agent[node] = VoteQuorumTestAgent(node, self.Env)
  204. self.votequorum_agent[node].start()
  205. return ret
  206. def StopaCM(self, node):
  207. if self.ShouldBeStatus[node] != "up":
  208. return 1
  209. self.debug('stoping corosync on : ' + node)
  210. if self.cpg_agent.has_key(node):
  211. self.cpg_agent[node].stop()
  212. if self.sam_agent.has_key(node):
  213. self.sam_agent[node].stop()
  214. if self.votequorum_agent.has_key(node):
  215. self.votequorum_agent[node].stop()
  216. return ClusterManager.StopaCM(self, node)
  217. def test_node_CM(self, node):
  218. # 2 - up and stable
  219. # 1 - unstable
  220. # 0 - down
  221. out = self.rsh(node, self["StatusCmd"], 1)
  222. is_stopped = string.find(out, 'stopped')
  223. is_dead = string.find(out, 'dead')
  224. ret = (is_dead is -1 and is_stopped is -1)
  225. try:
  226. if ret:
  227. ret = 2
  228. if self.ShouldBeStatus[node] == "down":
  229. self.log(
  230. "Node status for %s is %s but we think it should be %s"
  231. % (node, "up", self.ShouldBeStatus[node]))
  232. else:
  233. if self.ShouldBeStatus[node] == "up":
  234. self.log(
  235. "Node status for %s is %s but we think it should be %s"
  236. % (node, "down", self.ShouldBeStatus[node]))
  237. except KeyError: pass
  238. if ret: self.ShouldBeStatus[node] = "up"
  239. else: self.ShouldBeStatus[node] = "down"
  240. return ret
  241. def StataCM(self, node):
  242. '''Report the status of corosync on a given node'''
  243. if self.test_node_CM(node) > 0:
  244. return 1
  245. else:
  246. return None
  247. def RereadCM(self, node):
  248. self.log('reloading corosync on : ' + node)
  249. return ClusterManager.RereadCM(self, node)
  250. def find_partitions(self):
  251. ccm_partitions = []
  252. return ccm_partitions
  253. def prepare(self):
  254. '''Finish the Initialization process. Prepare to test...'''
  255. self.partitions_expected = 1
  256. for node in self.Env["nodes"]:
  257. self.ShouldBeStatus[node] = ""
  258. self.unisolate_node(node)
  259. self.StataCM(node)
  260. def HasQuorum(self, node_list):
  261. # If we are auditing a partition, then one side will
  262. # have quorum and the other not.
  263. # So the caller needs to tell us which we are checking
  264. # If no value for node_list is specified... assume all nodes
  265. if not node_list:
  266. node_list = self.Env["nodes"]
  267. for node in node_list:
  268. if self.ShouldBeStatus[node] == "up":
  269. quorum = self.rsh(node, self["QuorumCmd"], 1)
  270. if string.find(quorum, "1") != -1:
  271. return 1
  272. elif string.find(quorum, "0") != -1:
  273. return 0
  274. else:
  275. self.log("WARN: Unexpected quorum test result from "+ node +":"+ quorum)
  276. return 0
  277. def Components(self):
  278. return None
  279. ###################################################################
  280. class TestAgentComponent(ScenarioComponent):
  281. def __init__(self, Env):
  282. self.Env = Env
  283. def IsApplicable(self):
  284. '''Return TRUE if the current ScenarioComponent is applicable
  285. in the given LabEnvironment given to the constructor.
  286. '''
  287. return True
  288. def SetUp(self, CM):
  289. '''Set up the given ScenarioComponent'''
  290. self.CM = CM
  291. for node in self.Env["nodes"]:
  292. if not CM.StataCM(node):
  293. raise RuntimeError ("corosync not up")
  294. if self.CM.start_cpg:
  295. self.CM.cpg_agent[node] = CpgTestAgent(node, CM.Env)
  296. self.CM.cpg_agent[node].start()
  297. self.CM.confdb_agent[node] = ConfdbTestAgent(node, CM.Env)
  298. self.CM.confdb_agent[node].start()
  299. self.CM.sam_agent[node] = SamTestAgent(node, CM.Env)
  300. self.CM.sam_agent[node].start()
  301. # votequorum agent started as needed.
  302. if self.CM.applied_config.has_key('quorum/provider'):
  303. if CM.applied_config['quorum/provider'] is 'corosync_votequorum':
  304. self.CM.votequorum_agent[node] = VoteQuorumTestAgent(node, CM.Env)
  305. self.CM.votequorum_agent[node].start()
  306. return 1
  307. def TearDown(self, CM):
  308. '''Tear down (undo) the given ScenarioComponent'''
  309. self.CM = CM
  310. for node in self.Env["nodes"]:
  311. if self.CM.cpg_agent.has_key(node):
  312. self.CM.cpg_agent[node].stop()
  313. self.CM.confdb_agent[node].stop()
  314. self.CM.sam_agent[node].stop()
  315. if self.CM.votequorum_agent.has_key(node):
  316. self.CM.votequorum_agent[node].stop()
  317. ###################################################################
  318. class TestAgent(object):
  319. def __init__(self, binary, node, port, env=None):
  320. self.node = node
  321. self.node_address = None
  322. self.port = port
  323. self.sock = None
  324. self.binary = binary
  325. self.started = False
  326. self.rsh = RemoteExec(Env=env)
  327. self.func_name = None
  328. self.used = False
  329. self.env = env
  330. self.send_recv = False
  331. def restart(self):
  332. self.stop()
  333. self.start()
  334. def clean_start(self):
  335. if self.used or not self.status():
  336. self.env.debug('test agent: cleaning %s on node %s' % (self.binary, self.node))
  337. self.stop()
  338. self.start()
  339. def status(self):
  340. if not self.started:
  341. return False
  342. try:
  343. self.send (["are_you_ok_dude"])
  344. self.read ()
  345. self.started = True
  346. return True
  347. except RuntimeError, msg:
  348. self.started = False
  349. return False
  350. def start(self):
  351. '''Set up the given ScenarioComponent'''
  352. self.env.debug('test agent: starting %s on node %s' % (self.binary, self.node))
  353. self.sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
  354. ip = socket.gethostbyname(self.node)
  355. self.rsh(self.node, self.binary, blocking=0)
  356. is_connected = False
  357. retries = 0
  358. while not is_connected:
  359. try:
  360. retries = retries + 1
  361. self.sock.connect ((ip, self.port))
  362. is_connected = True
  363. except socket.error, msg:
  364. if retries > 5:
  365. self.env.log("Retried " + str(retries) + " times. Error: " + str(msg))
  366. time.sleep(1)
  367. self.started = True
  368. self.used = False
  369. def stop(self):
  370. '''Tear down (undo) the given ScenarioComponent'''
  371. self.env.debug('test agent: stopping %s on node %s' % (self.binary, self.node))
  372. self.sock.close ()
  373. self.rsh(self.node, "killall " + self.binary + " 2>/dev/null")
  374. self.started = False
  375. def kill(self):
  376. '''Tear down (undo) the given ScenarioComponent'''
  377. self.env.debug('test agent: killing %s on node %s' % (self.binary, self.node))
  378. self.rsh(self.node, "killall -9 " + self.binary + " 2>/dev/null")
  379. self.started = False
  380. def getpid(self):
  381. return self.rsh(self.node, 'pidof ' + self.binary, 1)
  382. def send (self, args):
  383. if not self.started:
  384. self.start()
  385. real_msg = str (len (args))
  386. for a in args:
  387. a_str = str(a)
  388. real_msg += ":" + str (len (a_str)) + ":" + a_str
  389. real_msg += ";"
  390. sent = 0
  391. try:
  392. sent = self.sock.send (real_msg)
  393. except socket.error, msg:
  394. self.env.debug("send(%s): %s; error: %s" % (self.node, real_msg, msg))
  395. if sent == 0:
  396. raise RuntimeError ("socket connection broken")
  397. self.used = True
  398. def __getattribute__(self,name):
  399. try:
  400. return object.__getattribute__(self, name)
  401. except:
  402. self.func_name = name
  403. if self.send_recv:
  404. return self.send_recv_dynamic
  405. else:
  406. return self.send_dynamic
  407. def send_recv_dynamic (self, *args):
  408. self.send_dynamic (args)
  409. try:
  410. res = self.read ()
  411. except RuntimeError, msg:
  412. self.env.log("send_recv_dynamic: %s(); error: %s" % (self.func_name, msg))
  413. return res
  414. def send_dynamic (self, *args):
  415. if not self.started:
  416. self.start()
  417. # number of args+func
  418. real_msg = str (len (args) + 1) + ":" + str(len(self.func_name)) + ":" + self.func_name
  419. for a in args:
  420. a_str = str(a)
  421. real_msg += ":" + str (len (a_str)) + ":" + a_str
  422. real_msg += ";"
  423. sent = 0
  424. try:
  425. sent = self.sock.send (real_msg)
  426. except socket.error, msg:
  427. self.env.debug("send_dynamic(%s): %s; error: %s" % (self.node, real_msg, msg))
  428. if sent == 0:
  429. raise RuntimeError ("socket connection broken")
  430. self.used = True
  431. def read (self):
  432. msg = self.sock.recv (4096)
  433. if msg == '':
  434. raise RuntimeError("socket connection broken")
  435. return msg
  436. class CpgConfigEvent:
  437. def __init__(self, msg):
  438. info = msg.split(',')
  439. self.group_name = info[0]
  440. self.node_id = info[1]
  441. self.node = None
  442. self.pid = info[2]
  443. if "left" in info[3]:
  444. self.is_member = False
  445. else:
  446. self.is_member = True
  447. def __str__ (self):
  448. str = self.group_name + "," + self.node_id + "," + self.pid + ","
  449. if self.is_member:
  450. return str + "joined"
  451. else:
  452. return str + "left"
  453. ###################################################################
  454. class CpgTestAgent(TestAgent):
  455. def __init__(self, node, Env=None):
  456. TestAgent.__init__(self, "cpg_test_agent", node, 9034, env=Env)
  457. self.initialized = False
  458. self.nodeid = None
  459. def start(self):
  460. if not self.started:
  461. TestAgent.start(self)
  462. self.cpg_initialize()
  463. self.used = False
  464. def stop(self):
  465. try:
  466. if self.started:
  467. self.cpg_finalize()
  468. except RuntimeError, msg:
  469. # if cpg_agent is down, we are not going to stress
  470. self.env.debug("CpgTestAgent::cpg_finalize() - %s" % msg)
  471. TestAgent.stop(self)
  472. def cpg_local_get(self):
  473. if self.nodeid == None:
  474. self.send (["cpg_local_get"])
  475. self.nodeid = self.read ()
  476. return self.nodeid
  477. def record_config_events(self, truncate=True):
  478. if truncate:
  479. self.send (["record_config_events", "truncate"])
  480. else:
  481. self.send (["record_config_events", "append"])
  482. return self.read ()
  483. def read_config_event(self):
  484. self.send (["read_config_event"])
  485. msg = self.read ()
  486. if "None" in msg:
  487. return None
  488. else:
  489. return CpgConfigEvent(msg)
  490. def read_messages(self, atmost):
  491. self.send (["read_messages", atmost])
  492. msg = self.read ()
  493. if "None" in msg:
  494. return None
  495. else:
  496. return msg
  497. def context_test(self):
  498. self.send (["context_test"])
  499. return self.read ()
  500. ###################################################################
  501. class ConfdbTestAgent(TestAgent):
  502. def __init__(self, node, Env=None):
  503. TestAgent.__init__(self, "confdb_test_agent", node, 9035, env=Env)
  504. self.initialized = False
  505. self.nodeid = None
  506. self.send_recv = True
  507. ###################################################################
  508. class SamTestAgent(TestAgent):
  509. def __init__(self, node, Env=None):
  510. TestAgent.__init__(self, "sam_test_agent", node, 9036, env=Env)
  511. self.initialized = False
  512. self.nodeid = None
  513. self.send_recv = True
  514. ###################################################################
  515. class VoteQuorumTestAgent(TestAgent):
  516. def __init__(self, node, Env=None):
  517. TestAgent.__init__(self, "votequorum_test_agent", node, 9037, env=Env)
  518. self.initialized = False
  519. self.nodeid = None
  520. self.send_recv = True
  521. def start(self):
  522. if not self.started:
  523. TestAgent.start(self)
  524. self.init()
  525. self.used = False