corosync.py 22 KB


  1. '''CTS: Cluster Testing System: corosync...
  2. '''
  3. __copyright__='''
  4. Copyright (c) 2010 Red Hat, Inc.
  5. '''
  6. # All rights reserved.
  7. #
  8. # Author: Angus Salkeld <asalkeld@redhat.com>
  9. #
  10. # This software licensed under BSD license, the text of which follows:
  11. #
  12. # Redistribution and use in source and binary forms, with or without
  13. # modification, are permitted provided that the following conditions are met:
  14. #
  15. # - Redistributions of source code must retain the above copyright notice,
  16. # this list of conditions and the following disclaimer.
  17. # - Redistributions in binary form must reproduce the above copyright notice,
  18. # this list of conditions and the following disclaimer in the documentation
  19. # and/or other materials provided with the distribution.
  20. # - Neither the name of the MontaVista Software, Inc. nor the names of its
  21. # contributors may be used to endorse or promote products derived from this
  22. # software without specific prior written permission.
  23. #
  24. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25. # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27. # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28. # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29. # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30. # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31. # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32. # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33. # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  34. # THE POSSIBILITY OF SUCH DAMAGE.
  35. import os
  36. import sys
  37. import time
  38. import socket
  39. import shutil
  40. import string
  41. import augeas
  42. from cts.CTS import ClusterManager
  43. from cts.CTSscenarios import ScenarioComponent
  44. from cts.remote import RemoteExec
  45. from cts.remote import RemoteFactory
  46. from cts.logging import *
  47. from cts.CTSvars import CTSvars
  48. from cts.CTSaudits import ClusterAudit
  49. from cts.CTSaudits import LogAudit
  50. ###################################################################
  51. class CoroConfig(object):
  52. def __init__(self, corobase=None):
  53. self.base = "/files/etc/corosync/corosync.conf/"
  54. self.new_root = "/tmp/aug-root/"
  55. if corobase == None:
  56. self.corobase = os.getcwd() + "/.."
  57. else:
  58. self.corobase = corobase
  59. example = self.corobase + "/conf/corosync.conf.example"
  60. if os.path.isdir(self.new_root):
  61. shutil.rmtree (self.new_root)
  62. os.makedirs (self.new_root + "/etc/corosync")
  63. shutil.copy (example, self.new_root + "/etc/corosync/corosync.conf")
  64. self.aug = augeas.Augeas (root=self.new_root,
  65. loadpath=self.corobase + "/conf/lenses")
  66. self.original = {}
  67. # store the original values (of totem), so we can restore them in
  68. # apply_default_config()
  69. totem = self.aug.match('/files/etc/corosync/corosync.conf/totem/*')
  70. for c in totem:
  71. # /files/etc/corosync/corosync.conf/
  72. short_name = c[len(self.base):]
  73. self.original[short_name] = self.aug.get(c)
  74. interface = self.aug.match('/files/etc/corosync/corosync.conf/totem/interface/*')
  75. for c in interface:
  76. short_name = c[len(self.base):]
  77. self.original[short_name] = self.aug.get(c)
  78. def get (self, name):
  79. return self.aug.get (self.base + name)
  80. def set (self, name, value):
  81. token = self.aug.set (self.base + name, str(value))
  82. def save (self):
  83. self.aug.save()
  84. def get_filename(self):
  85. return self.new_root + "/etc/corosync/corosync.conf"
  86. ###################################################################
  87. class corosync_needle(ClusterManager):
  88. '''
  89. bla
  90. '''
  91. def __init__(self, Environment, randseed=None):
  92. ClusterManager.__init__(self, Environment, randseed)
  93. self.update({
  94. "Name" : "corosync(needle)",
  95. "StartCmd" : "service corosync start",
  96. "StopCmd" : "service corosync stop",
  97. "RereadCmd" : "service corosync reload",
  98. "StatusCmd" : "service corosync status",
  99. "DeadTime" : 30,
  100. "StartTime" : 15, # Max time to start up
  101. "StableTime" : 10,
  102. "BreakCommCmd" : "/usr/share/corosync/tests/net_breaker.sh BreakCommCmd %s",
  103. "FixCommCmd" : "/usr/share/corosync/tests/net_breaker.sh FixCommCmd %s",
  104. "QuorumCmd" : "corosync-quorumtool -s",
  105. "Pat:We_stopped" : "%s.*Corosync Cluster Engine exiting.*",
  106. "Pat:They_stopped" : "%s.*Member left:.*%s.*",
  107. "Pat:They_dead" : "corosync:.*Node %s is now: lost",
  108. "Pat:Local_starting" : "%s.*Initializing transport",
  109. "Pat:Local_started" : "%s.*Initializing transport",
  110. "Pat:Master_started" : "%s.*Completed service synchronization, ready to provide service.",
  111. "Pat:Slave_started" : "%s.*Completed service synchronization, ready to provide service.",
  112. "Pat:ChildKilled" : "%s corosync.*Child process %s terminated with signal 9",
  113. "Pat:ChildRespawn" : "%s corosync.*Respawning failed child process: %s",
  114. "Pat:ChildExit" : "Child process .* exited",
  115. "Pat:DC_IDLE" : ".*A new membership.*was formed.",
  116. # Bad news Regexes. Should never occur.
  117. "BadRegexes" : (
  118. r"ERROR:",
  119. r"CRIT:",
  120. r"Shutting down\.",
  121. r"Forcing shutdown\.",
  122. r"core dump",
  123. r"Could not bind AF_UNIX",
  124. r"Too many open files",
  125. r"Address already in use",
  126. ),
  127. "LogFileName" : Environment["LogFileName"],
  128. })
  129. self.start_cpg = True
  130. self.cpg_agent = {}
  131. self.sam_agent = {}
  132. self.votequorum_agent = {}
  133. self.config = CoroConfig ()
  134. self.node_to_ip = {}
  135. self.new_config = {}
  136. self.applied_config = {}
  137. for n in self.Env["nodes"]:
  138. ip = socket.gethostbyname(n)
  139. ips = ip.split('.')
  140. ips[3] = '0'
  141. ip_mask = '.'.join(ips)
  142. self.new_config['totem/interface/bindnetaddr'] = str(ip_mask)
  143. return
  144. def apply_default_config(self):
  145. for c in self.applied_config:
  146. if 'bindnetaddr' in c:
  147. continue
  148. elif c not in self.config.original:
  149. # new config option (non default)
  150. pass
  151. elif self.applied_config[c] is not self.config.original[c]:
  152. # reset to the original
  153. self.new_config[c] = self.config.original[c]
  154. if len(self.new_config) > 0:
  155. self.debug('applying default config')
  156. self.stopall()
  157. def apply_new_config(self, need_all_up=True):
  158. if len(self.new_config) > 0:
  159. self.debug('applying new config')
  160. self.stopall()
  161. if need_all_up:
  162. self.startall()
  163. def install_all_config(self):
  164. tmp1 = {}
  165. sorted_keys = sorted(self.new_config.keys())
  166. for c in sorted_keys:
  167. self.log('configuring: ' + c + ' = '+ str(self.new_config[c]))
  168. self.config.set (c, self.new_config[c])
  169. self.applied_config[c] = self.new_config[c]
  170. tmp1[c] = self.new_config[c]
  171. for c in tmp1:
  172. del self.new_config[c]
  173. self.config.save()
  174. src_file = self.config.get_filename()
  175. for node in self.Env["nodes"]:
  176. self.rsh.cp(src_file, "%s:%s" % (node, "/etc/corosync/"))
  177. def install_config(self, node):
  178. # install gets new_config and installs it, then moves the
  179. # config to applied_config
  180. if len(self.new_config) > 0:
  181. self.install_all_config()
  182. def key_for_node(self, node):
  183. if node not in self.node_to_ip:
  184. self.node_to_ip[node] = socket.gethostbyname (node)
  185. return self.node_to_ip[node]
  186. def StartaCM(self, node, verbose=False):
  187. if node not in self.ShouldBeStatus:
  188. self.ShouldBeStatus[node] = "down"
  189. if self.ShouldBeStatus[node] != "down":
  190. return 1
  191. self.debug('starting corosync on : ' + node)
  192. ret = ClusterManager.StartaCM(self, node)
  193. if self.start_cpg:
  194. if node in self.cpg_agent:
  195. self.cpg_agent[node].restart()
  196. else:
  197. self.cpg_agent[node] = CpgTestAgent(node, self.Env)
  198. self.cpg_agent[node].start()
  199. if node in self.sam_agent:
  200. self.sam_agent[node].restart()
  201. # votequorum agent started as needed.
  202. if 'quorum/provider' in self.applied_config:
  203. if self.applied_config['quorum/provider'] is 'corosync_votequorum':
  204. if node in self.votequorum_agent:
  205. self.votequorum_agent[node].restart()
  206. else:
  207. self.votequorum_agent[node] = VoteQuorumTestAgent(node, self.Env)
  208. self.votequorum_agent[node].start()
  209. return ret
  210. def StopaCM(self, node, verbose=False):
  211. if self.ShouldBeStatus[node] != "up":
  212. return 1
  213. self.debug('stoping corosync on : ' + node)
  214. if node in self.cpg_agent:
  215. self.cpg_agent[node].stop()
  216. if node in self.sam_agent:
  217. self.sam_agent[node].stop()
  218. if node in self.votequorum_agent:
  219. self.votequorum_agent[node].stop()
  220. return ClusterManager.StopaCM(self, node)
  221. def test_node_CM(self, node):
  222. # 2 - up and stable
  223. # 1 - unstable
  224. # 0 - down
  225. (rc, lines) = self.rsh(node, self["StatusCmd"], stdout=2)
  226. out = str(lines)
  227. if 'systemd' in out:
  228. if 'running' in out:
  229. ret = 2
  230. else:
  231. ret = 0
  232. else:
  233. is_stopped = string.find(out, 'stopped')
  234. is_dead = string.find(out, 'dead')
  235. ret = (is_dead is -1 and is_stopped is -1)
  236. try:
  237. if ret:
  238. ret = 2
  239. if self.ShouldBeStatus[node] == "down":
  240. self.log(
  241. "Node status for %s is %s but we think it should be %s"
  242. % (node, "up", self.ShouldBeStatus[node]))
  243. else:
  244. if self.ShouldBeStatus[node] == "up":
  245. self.log(
  246. "Node status for %s is %s but we think it should be %s"
  247. % (node, "down", self.ShouldBeStatus[node]))
  248. except KeyError: pass
  249. if ret: self.ShouldBeStatus[node] = "up"
  250. else: self.ShouldBeStatus[node] = "down"
  251. return ret
  252. def StataCM(self, node):
  253. '''Report the status of corosync on a given node'''
  254. if self.test_node_CM(node) > 0:
  255. return 1
  256. else:
  257. return None
  258. def RereadCM(self, node):
  259. self.log('reloading corosync on : ' + node)
  260. return ClusterManager.RereadCM(self, node)
  261. def find_partitions(self):
  262. ccm_partitions = []
  263. return ccm_partitions
  264. def prepare(self):
  265. '''Finish the Initialization process. Prepare to test...'''
  266. self.partitions_expected = 1
  267. for node in self.Env["nodes"]:
  268. self.ShouldBeStatus[node] = ""
  269. self.unisolate_node(node)
  270. self.StataCM(node)
  271. def HasQuorum(self, node_list):
  272. # If we are auditing a partition, then one side will
  273. # have quorum and the other not.
  274. # So the caller needs to tell us which we are checking
  275. # If no value for node_list is specified... assume all nodes
  276. if not node_list:
  277. node_list = self.Env["nodes"]
  278. for node in node_list:
  279. if self.ShouldBeStatus[node] == "up":
  280. (quorum, qout) = self.rsh(node, self["QuorumCmd"], stdout=2)
  281. if quorum == 1:
  282. return 1
  283. elif quorum == 0:
  284. return 0
  285. else:
  286. self.log("WARN: Unexpected quorum test result from %s : %d" % (node, quorum))
  287. return 0
  288. def Components(self):
  289. return None
  290. class ShmLeakAudit(ClusterAudit):
  291. def __init__(self, cm):
  292. self.CM = cm
  293. def name(self):
  294. return "ShmLeakAudit"
  295. def is_applicable(self):
  296. return 1
  297. def __call__(self):
  298. rc = 1
  299. for node in self.CM.Env["nodes"]:
  300. (res, lines) = self.CM.rsh(node, "/usr/share/corosync/tests/shm_leak_audit.sh", None)
  301. for line in lines:
  302. self.CM.log("%s leaked %s" % (node, line))
  303. rc = 0
  304. return rc
  305. ###################################################################
  306. class TestAgentComponent(ScenarioComponent):
  307. def __init__(self, Env):
  308. self.Env = Env
  309. def IsApplicable(self):
  310. '''Return TRUE if the current ScenarioComponent is applicable
  311. in the given LabEnvironment given to the constructor.
  312. '''
  313. return True
  314. def SetUp(self, CM):
  315. '''Set up the given ScenarioComponent'''
  316. self.CM = CM
  317. for node in self.Env["nodes"]:
  318. if not CM.StataCM(node):
  319. raise RuntimeError ("corosync not up")
  320. if self.CM.start_cpg:
  321. if node in self.CM.cpg_agent:
  322. self.CM.cpg_agent[node].clean_start()
  323. else:
  324. self.CM.cpg_agent[node] = CpgTestAgent(node, CM.Env)
  325. self.CM.cpg_agent[node].start()
  326. if node in self.CM.sam_agent:
  327. self.CM.sam_agent[node].clean_start()
  328. else:
  329. self.CM.sam_agent[node] = SamTestAgent(node, CM.Env)
  330. self.CM.sam_agent[node].start()
  331. # votequorum agent started as needed.
  332. if 'quorum/provider' in self.CM.applied_config:
  333. if CM.applied_config['quorum/provider'] is 'corosync_votequorum':
  334. self.CM.votequorum_agent[node] = VoteQuorumTestAgent(node, CM.Env)
  335. self.CM.votequorum_agent[node].start()
  336. return 1
  337. def TearDown(self, CM):
  338. '''Tear down (undo) the given ScenarioComponent'''
  339. self.CM = CM
  340. for node in self.Env["nodes"]:
  341. if node in self.CM.cpg_agent:
  342. self.CM.cpg_agent[node].stop()
  343. self.CM.sam_agent[node].stop()
  344. if node in self.CM.votequorum_agent:
  345. self.CM.votequorum_agent[node].stop()
  346. ###################################################################
  347. class TestAgent(object):
  348. def __init__(self, binary, node, port, Env=None):
  349. self.node = node
  350. self.node_address = None
  351. self.port = port
  352. self.sock = None
  353. self.binary = binary
  354. self.started = False
  355. resh = RemoteFactory.rsh
  356. self.rsh = RemoteExec(resh)
  357. self.__name__ = None
  358. self.used = False
  359. self.env = Env
  360. self.send_recv = False
  361. def restart(self):
  362. LogFactory().debug('%s:%s restarting' % (self.node, self.binary))
  363. self.stop()
  364. self.start()
  365. def clean_start(self):
  366. if self.used or not self.status():
  367. LogFactory().debug('%s:%s cleaning' % (self.node, self.binary))
  368. self.stop()
  369. self.start()
  370. def status(self):
  371. if not self.started:
  372. return False
  373. try:
  374. self.send_internal(["are_you_ok_dude"])
  375. self.read()
  376. self.started = True
  377. return True
  378. except RuntimeError as msg:
  379. self.started = False
  380. return False
  381. def start(self):
  382. '''Set up the given ScenarioComponent'''
  383. if self.status():
  384. return
  385. LogFactory().debug('%s:%s starting ' % (self.node, self.binary))
  386. self.rsh(self.node, self.binary, synchronous=False)
  387. self.sock = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
  388. ip = socket.gethostbyname(self.node)
  389. is_connected = False
  390. retries = 0
  391. while not is_connected:
  392. try:
  393. retries = retries + 1
  394. self.sock.connect ((ip, self.port))
  395. is_connected = True
  396. except socket.error as msg:
  397. if retries > 10:
  398. LogFactory().log("%s:%s Tried connecting %d times. %s" % (self.node, self.binary, retries, str(msg)))
  399. if retries > 30:
  400. raise RuntimeError("%s:%s can't connect" % (self.node, self.binary))
  401. time.sleep(1)
  402. self.started = True
  403. self.used = False
  404. def stop(self):
  405. '''Tear down (undo) the given ScenarioComponent'''
  406. LogFactory().debug('%s:%s stopping' % (self.binary, self.node))
  407. self.rsh(self.node, "killall " + self.binary + " 2>/dev/null")
  408. if self.sock:
  409. self.sock.close ()
  410. del self.sock
  411. self.sock = None
  412. while self.getpid() != '':
  413. time.sleep(1)
  414. self.started = False
  415. def kill(self):
  416. '''Tear down (undo) the given ScenarioComponent'''
  417. LogFactory().log('%s:%s killing' % (self.node, self.binary))
  418. self.rsh(self.node, "killall -9 " + self.binary + " 2>/dev/null")
  419. self.started = False
  420. def getpid(self):
  421. return self.rsh(self.node, 'pidof ' + self.binary, 1)
  422. def send_internal(self, args):
  423. real_msg = str (len (args))
  424. for a in args:
  425. a_str = str(a)
  426. real_msg += ":" + str (len (a_str)) + ":" + a_str
  427. real_msg += ";"
  428. if sys.version_info > (3,):
  429. real_msg = real_msg.encode("utf8")
  430. try:
  431. return self.sock.send (real_msg)
  432. except socket.error as msg:
  433. LogFactory().log("send(%s): %s; error: %s" % (self.node, real_msg, msg))
  434. return 0
  435. def send (self, args):
  436. if not self.started:
  437. self.start()
  438. sent = self.send_internal(args)
  439. if sent == 0:
  440. raise RuntimeError ("socket connection broken")
  441. self.used = True
  442. def __getattribute__(self,name):
  443. try:
  444. return object.__getattribute__(self, name)
  445. except:
  446. self.__name__ = name
  447. if self.send_recv:
  448. return self.send_recv_dynamic
  449. else:
  450. return self.send_dynamic
  451. def send_recv_dynamic (self, *args):
  452. self.send_dynamic (args)
  453. try:
  454. res = self.read ()
  455. except RuntimeError as msg:
  456. res = None
  457. LogFactory().log("send_recv_dynamic: %s(); error: %s" % (self.__name__, msg))
  458. return res
  459. def send_dynamic (self, *args):
  460. if not self.started:
  461. raise RuntimeError ("agent not started")
  462. # number of args+func
  463. real_msg = str (len (args) + 1) + ":" + str(len(self.__name__)) + ":" + self.__name__
  464. for a in args:
  465. a_str = str(a)
  466. real_msg += ":" + str (len (a_str)) + ":" + a_str
  467. real_msg += ";"
  468. sent = 0
  469. if sys.version_info > (3,):
  470. real_msg = bytes(real_msg, encoding = "utf8")
  471. try:
  472. sent = self.sock.send (real_msg)
  473. except socket.error as msg:
  474. LogFactory().log("send_dynamic(%s): %s; error: %s" % (self.node, real_msg, msg))
  475. if sent == 0:
  476. raise RuntimeError ("socket connection broken")
  477. self.used = True
  478. def read (self):
  479. try:
  480. msg = self.sock.recv (4096)
  481. except socket.error as msg:
  482. raise RuntimeError(msg)
  483. if sys.version_info > (3,):
  484. msg = msg.decode("utf8")
  485. if msg == '':
  486. raise RuntimeError("socket connection broken")
  487. return msg
  488. class CpgConfigEvent(object):
  489. def __init__(self, msg):
  490. info = msg.split(',')
  491. self.group_name = info[0]
  492. self.node_id = info[1]
  493. self.node = None
  494. self.pid = info[2]
  495. if "left" in info[3]:
  496. self.is_member = False
  497. else:
  498. self.is_member = True
  499. def __str__ (self):
  500. str = self.group_name + "," + self.node_id + "," + self.pid + ","
  501. if self.is_member:
  502. return str + "joined"
  503. else:
  504. return str + "left"
  505. ###################################################################
  506. class CpgTestAgent(TestAgent):
  507. def __init__(self, node, Env=None):
  508. TestAgent.__init__(self, "cpg_test_agent", node, 9034, Env)
  509. self.nodeid = None
  510. def start(self):
  511. if not self.status():
  512. TestAgent.start(self)
  513. self.cpg_initialize()
  514. self.used = False
  515. def cpg_local_get(self):
  516. if self.nodeid == None:
  517. self.send (["cpg_local_get"])
  518. self.nodeid = self.read ()
  519. return self.nodeid
  520. def record_config_events(self, truncate=True):
  521. if truncate:
  522. self.send (["record_config_events", "truncate"])
  523. else:
  524. self.send (["record_config_events", "append"])
  525. return self.read ()
  526. def read_config_event(self):
  527. self.send (["read_config_event"])
  528. msg = self.read ()
  529. if "None" in msg:
  530. return None
  531. else:
  532. return CpgConfigEvent(msg)
  533. def read_messages(self, atmost):
  534. self.send (["read_messages", atmost])
  535. msg = self.read ()
  536. if "None" in msg:
  537. return None
  538. else:
  539. return msg
  540. def context_test(self):
  541. self.send (["context_test"])
  542. return self.read ()
  543. ###################################################################
  544. class SamTestAgent(TestAgent):
  545. def __init__(self, node, Env=None):
  546. TestAgent.__init__(self, "sam_test_agent", node, 9036, Env)
  547. self.nodeid = None
  548. self.send_recv = True
  549. ###################################################################
  550. class VoteQuorumTestAgent(TestAgent):
  551. def __init__(self, node, Env=None):
  552. TestAgent.__init__(self, "votequorum_test_agent", node, 9037, Env)
  553. self.nodeid = None
  554. self.send_recv = True
  555. AllAuditClasses = []
  556. AllAuditClasses.append(LogAudit)
  557. AllAuditClasses.append(ShmLeakAudit)
  558. def CoroAuditList(cm):
  559. result = []
  560. for auditclass in AllAuditClasses:
  561. a = auditclass(cm)
  562. if a.is_applicable():
  563. result.append(a)
  564. return result