cpg.rs 19 KB


  1. // libcpg interface for Rust
  2. // Copyright (c) 2020 Red Hat, Inc.
  3. //
  4. // All rights reserved.
  5. //
  6. // Author: Christine Caulfield (ccaulfi@redhat.com)
  7. //
  8. #![allow(clippy::single_match)]
  9. #![allow(clippy::needless_range_loop)]
  10. #![allow(clippy::type_complexity)]
  11. // For the code generated by bindgen
  12. use crate::sys::cpg as ffi;
  13. use std::collections::HashMap;
  14. use std::ffi::{CStr, CString};
  15. use std::fmt;
  16. use std::os::raw::{c_int, c_void};
  17. use std::ptr::copy_nonoverlapping;
  18. use std::slice;
  19. use std::string::String;
  20. use std::sync::Mutex;
  21. // General corosync things
  22. use crate::string_from_bytes;
  23. use crate::{CsError, DispatchFlags, NodeId, Result};
  24. const CPG_NAMELEN_MAX: usize = 128;
  25. const CPG_MEMBERS_MAX: usize = 128;
  26. /// RingId returned by totem_confchg_fn
  27. #[derive(Copy, Clone)]
  28. pub struct RingId {
  29. pub nodeid: NodeId,
  30. pub seq: u64,
  31. }
  32. /// Totem delivery guarantee options for [mcast_joined]
  33. // The C enum doesn't have numbers in the code
  34. // so don't assume we can match them
  35. #[derive(Copy, Clone)]
  36. pub enum Guarantee {
  37. TypeUnordered,
  38. TypeFifo,
  39. TypeAgreed,
  40. TypeSafe,
  41. }
  42. // Convert internal to cpg.h values.
  43. impl Guarantee {
  44. pub fn to_c(&self) -> u32 {
  45. match self {
  46. Guarantee::TypeUnordered => ffi::CPG_TYPE_UNORDERED,
  47. Guarantee::TypeFifo => ffi::CPG_TYPE_FIFO,
  48. Guarantee::TypeAgreed => ffi::CPG_TYPE_AGREED,
  49. Guarantee::TypeSafe => ffi::CPG_TYPE_SAFE,
  50. }
  51. }
  52. }
  53. /// Flow control state returned from [flow_control_state_get]
  54. #[derive(Copy, Clone)]
  55. pub enum FlowControlState {
  56. Disabled,
  57. Enabled,
  58. }
  59. /// No flags current specified for model1 so leave this at None
  60. #[derive(Copy, Clone)]
  61. pub enum Model1Flags {
  62. None,
  63. }
  64. /// Reason for cpg item callback
  65. #[derive(Copy, Clone)]
  66. pub enum Reason {
  67. Undefined = 0,
  68. Join = 1,
  69. Leave = 2,
  70. NodeDown = 3,
  71. NodeUp = 4,
  72. ProcDown = 5,
  73. }
  74. // Convert to cpg.h values
  75. impl Reason {
  76. pub fn new(r: u32) -> Reason {
  77. match r {
  78. 0 => Reason::Undefined,
  79. 1 => Reason::Join,
  80. 2 => Reason::Leave,
  81. 3 => Reason::NodeDown,
  82. 4 => Reason::NodeUp,
  83. 5 => Reason::ProcDown,
  84. _ => Reason::Undefined,
  85. }
  86. }
  87. }
  88. impl fmt::Display for Reason {
  89. fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
  90. match self {
  91. Reason::Undefined => write!(f, "Undefined"),
  92. Reason::Join => write!(f, "Join"),
  93. Reason::Leave => write!(f, "Leave"),
  94. Reason::NodeDown => write!(f, "NodeDown"),
  95. Reason::NodeUp => write!(f, "NodeUp"),
  96. Reason::ProcDown => write!(f, "ProcDown"),
  97. }
  98. }
  99. }
  100. /// A CPG address entry returned in the callbacks
  101. pub struct Address {
  102. pub nodeid: NodeId,
  103. pub pid: u32,
  104. pub reason: Reason,
  105. }
  106. impl fmt::Debug for Address {
  107. fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
  108. write!(
  109. f,
  110. "[nodeid: {}, pid: {}, reason: {}]",
  111. self.nodeid, self.pid, self.reason
  112. )
  113. }
  114. }
  115. /// Data for model1 [initialize]
  116. #[derive(Copy, Clone)]
  117. pub struct Model1Data {
  118. pub flags: Model1Flags,
  119. pub deliver_fn: Option<
  120. fn(
  121. handle: &Handle,
  122. group_name: String,
  123. nodeid: NodeId,
  124. pid: u32,
  125. msg: &[u8],
  126. msg_len: usize,
  127. ),
  128. >,
  129. pub confchg_fn: Option<
  130. fn(
  131. handle: &Handle,
  132. group_name: &str,
  133. member_list: Vec<Address>,
  134. left_list: Vec<Address>,
  135. joined_list: Vec<Address>,
  136. ),
  137. >,
  138. pub totem_confchg_fn: Option<fn(handle: &Handle, ring_id: RingId, member_list: Vec<NodeId>)>,
  139. }
  140. /// Modeldata for [initialize], only v1 supported at the moment
  141. #[derive(Copy, Clone)]
  142. pub enum ModelData {
  143. ModelNone,
  144. ModelV1(Model1Data),
  145. }
  146. /// A handle into the cpg library. Returned from [initialize] and needed for all other calls
  147. pub struct Handle {
  148. cpg_handle: u64, // Corosync library handle
  149. model_data: ModelData,
  150. clone: bool,
  151. }
  152. impl Clone for Handle {
  153. fn clone(&self) -> Handle {
  154. Handle {
  155. cpg_handle: self.cpg_handle,
  156. model_data: self.model_data,
  157. clone: true,
  158. }
  159. }
  160. }
  161. impl Drop for Handle {
  162. fn drop(self: &mut Handle) {
  163. if !self.clone {
  164. let _e = finalize(self);
  165. }
  166. }
  167. }
  168. // Clones count as equivalent
  169. impl PartialEq for Handle {
  170. fn eq(&self, other: &Handle) -> bool {
  171. self.cpg_handle == other.cpg_handle
  172. }
  173. }
  174. // Used to convert a CPG handle into one of ours
  175. lazy_static! {
  176. static ref HANDLE_HASH: Mutex<HashMap<u64, Handle>> = Mutex::new(HashMap::new());
  177. }
  178. // Convert a Rust String into a cpg_name struct for libcpg
  179. fn string_to_cpg_name(group: &str) -> Result<ffi::cpg_name> {
  180. if group.len() > CPG_NAMELEN_MAX - 1 {
  181. return Err(CsError::CsErrInvalidParam);
  182. }
  183. let c_name = match CString::new(group) {
  184. Ok(n) => n,
  185. Err(_) => return Err(CsError::CsErrLibrary),
  186. };
  187. let mut c_group = ffi::cpg_name {
  188. length: group.len() as u32,
  189. value: [0; CPG_NAMELEN_MAX],
  190. };
  191. unsafe {
  192. // NOTE param order is 'wrong-way round' from C
  193. copy_nonoverlapping(c_name.as_ptr(), c_group.value.as_mut_ptr(), group.len());
  194. }
  195. Ok(c_group)
  196. }
  197. // Convert an array of cpg_addresses to a Vec<cpg::Address> - used in callbacks
  198. fn cpg_array_to_vec(list: *const ffi::cpg_address, list_entries: usize) -> Vec<Address> {
  199. let temp: &[ffi::cpg_address] = unsafe { slice::from_raw_parts(list, list_entries) };
  200. let mut r_vec = Vec::<Address>::new();
  201. for i in 0..list_entries {
  202. let a: Address = Address {
  203. nodeid: NodeId::from(temp[i].nodeid),
  204. pid: temp[i].pid,
  205. reason: Reason::new(temp[i].reason),
  206. };
  207. r_vec.push(a);
  208. }
  209. r_vec
  210. }
  211. // Called from CPG callback function - munge params back to Rust from C
  212. extern "C" fn rust_deliver_fn(
  213. handle: ffi::cpg_handle_t,
  214. group_name: *const ffi::cpg_name,
  215. nodeid: u32,
  216. pid: u32,
  217. msg: *mut ::std::os::raw::c_void,
  218. msg_len: usize,
  219. ) {
  220. if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) {
  221. // Convert group_name into a Rust str.
  222. let r_group_name = unsafe {
  223. CStr::from_ptr(&(*group_name).value[0])
  224. .to_string_lossy()
  225. .into_owned()
  226. };
  227. let data: &[u8] = unsafe { std::slice::from_raw_parts(msg as *const u8, msg_len) };
  228. match h.model_data {
  229. ModelData::ModelV1(md) => {
  230. if let Some(cb) = md.deliver_fn {
  231. (cb)(h, r_group_name, NodeId::from(nodeid), pid, data, msg_len);
  232. }
  233. }
  234. _ => {}
  235. }
  236. }
  237. }
  238. // Called from CPG callback function - munge params back to Rust from C
  239. extern "C" fn rust_confchg_fn(
  240. handle: ffi::cpg_handle_t,
  241. group_name: *const ffi::cpg_name,
  242. member_list: *const ffi::cpg_address,
  243. member_list_entries: usize,
  244. left_list: *const ffi::cpg_address,
  245. left_list_entries: usize,
  246. joined_list: *const ffi::cpg_address,
  247. joined_list_entries: usize,
  248. ) {
  249. if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) {
  250. let r_group_name = unsafe {
  251. CStr::from_ptr(&(*group_name).value[0])
  252. .to_string_lossy()
  253. .into_owned()
  254. };
  255. let r_member_list = cpg_array_to_vec(member_list, member_list_entries);
  256. let r_left_list = cpg_array_to_vec(left_list, left_list_entries);
  257. let r_joined_list = cpg_array_to_vec(joined_list, joined_list_entries);
  258. match h.model_data {
  259. ModelData::ModelV1(md) => {
  260. if let Some(cb) = md.confchg_fn {
  261. (cb)(h, &r_group_name, r_member_list, r_left_list, r_joined_list);
  262. }
  263. }
  264. _ => {}
  265. }
  266. }
  267. }
  268. // Called from CPG callback function - munge params back to Rust from C
  269. extern "C" fn rust_totem_confchg_fn(
  270. handle: ffi::cpg_handle_t,
  271. ring_id: ffi::cpg_ring_id,
  272. member_list_entries: u32,
  273. member_list: *const u32,
  274. ) {
  275. if let Some(h) = HANDLE_HASH.lock().unwrap().get(&handle) {
  276. let r_ring_id = RingId {
  277. nodeid: NodeId::from(ring_id.nodeid),
  278. seq: ring_id.seq,
  279. };
  280. let mut r_member_list = Vec::<NodeId>::new();
  281. let temp_members: &[u32] =
  282. unsafe { slice::from_raw_parts(member_list, member_list_entries as usize) };
  283. for i in 0..member_list_entries as usize {
  284. r_member_list.push(NodeId::from(temp_members[i]));
  285. }
  286. match h.model_data {
  287. ModelData::ModelV1(md) => {
  288. if let Some(cb) = md.totem_confchg_fn {
  289. (cb)(h, r_ring_id, r_member_list);
  290. }
  291. }
  292. _ => {}
  293. }
  294. }
  295. }
  296. /// Initialize a connection to the cpg library. You must call this before doing anything
  297. /// else and use the passed back [Handle].
  298. /// Remember to free the handle using [finalize] when finished.
  299. pub fn initialize(model_data: &ModelData, context: u64) -> Result<Handle> {
  300. let mut handle: ffi::cpg_handle_t = 0;
  301. let mut m = match model_data {
  302. ModelData::ModelV1(_v1) => {
  303. ffi::cpg_model_v1_data_t {
  304. model: ffi::CPG_MODEL_V1,
  305. cpg_deliver_fn: Some(rust_deliver_fn),
  306. cpg_confchg_fn: Some(rust_confchg_fn),
  307. cpg_totem_confchg_fn: Some(rust_totem_confchg_fn),
  308. flags: 0, // No supported flags (yet)
  309. }
  310. }
  311. _ => return Err(CsError::CsErrInvalidParam),
  312. };
  313. unsafe {
  314. let c_context: *mut c_void = &mut &context as *mut _ as *mut c_void;
  315. let c_model: *mut ffi::cpg_model_data_t = &mut m as *mut _ as *mut ffi::cpg_model_data_t;
  316. let res = ffi::cpg_model_initialize(&mut handle, m.model, c_model, c_context);
  317. if res == ffi::CS_OK {
  318. let rhandle = Handle {
  319. cpg_handle: handle,
  320. model_data: *model_data,
  321. clone: false,
  322. };
  323. HANDLE_HASH.lock().unwrap().insert(handle, rhandle.clone());
  324. Ok(rhandle)
  325. } else {
  326. Err(CsError::from_c(res))
  327. }
  328. }
  329. }
  330. /// Finish with a connection to corosync
  331. pub fn finalize(handle: &Handle) -> Result<()> {
  332. let res = unsafe { ffi::cpg_finalize(handle.cpg_handle) };
  333. if res == ffi::CS_OK {
  334. HANDLE_HASH.lock().unwrap().remove(&handle.cpg_handle);
  335. Ok(())
  336. } else {
  337. Err(CsError::from_c(res))
  338. }
  339. }
  340. // Not sure if an FD is the right thing to return here, but it will do for now.
  341. /// Returns a file descriptor to use for poll/select on the CPG handle
  342. pub fn fd_get(handle: &Handle) -> Result<i32> {
  343. let c_fd: *mut c_int = &mut 0 as *mut _ as *mut c_int;
  344. let res = unsafe { ffi::cpg_fd_get(handle.cpg_handle, c_fd) };
  345. if res == ffi::CS_OK {
  346. Ok(c_fd as i32)
  347. } else {
  348. Err(CsError::from_c(res))
  349. }
  350. }
  351. /// Call any/all active CPG callbacks for this [Handle] see [DispatchFlags] for details
  352. pub fn dispatch(handle: &Handle, flags: DispatchFlags) -> Result<()> {
  353. let res = unsafe { ffi::cpg_dispatch(handle.cpg_handle, flags as u32) };
  354. if res == ffi::CS_OK {
  355. Ok(())
  356. } else {
  357. Err(CsError::from_c(res))
  358. }
  359. }
  360. /// Joins a CPG group for sending and receiving messages
  361. pub fn join(handle: &Handle, group: &str) -> Result<()> {
  362. let res = unsafe {
  363. let c_group = string_to_cpg_name(group)?;
  364. ffi::cpg_join(handle.cpg_handle, &c_group)
  365. };
  366. if res == ffi::CS_OK {
  367. Ok(())
  368. } else {
  369. Err(CsError::from_c(res))
  370. }
  371. }
  372. /// Leave the currently joined CPG group, another group can now be joined on
  373. /// the same [Handle] or [finalize] can be called to finish using CPG
  374. pub fn leave(handle: &Handle, group: &str) -> Result<()> {
  375. let res = unsafe {
  376. let c_group = string_to_cpg_name(group)?;
  377. ffi::cpg_leave(handle.cpg_handle, &c_group)
  378. };
  379. if res == ffi::CS_OK {
  380. Ok(())
  381. } else {
  382. Err(CsError::from_c(res))
  383. }
  384. }
  385. /// Get the local node ID
  386. pub fn local_get(handle: &Handle) -> Result<NodeId> {
  387. let mut nodeid: u32 = 0;
  388. let res = unsafe { ffi::cpg_local_get(handle.cpg_handle, &mut nodeid) };
  389. if res == ffi::CS_OK {
  390. Ok(NodeId::from(nodeid))
  391. } else {
  392. Err(CsError::from_c(res))
  393. }
  394. }
  395. /// Get a list of members of a CPG group as a vector of [Address] structs
  396. pub fn membership_get(handle: &Handle, group: &str) -> Result<Vec<Address>> {
  397. let mut member_list_entries: i32 = 0;
  398. let member_list = [ffi::cpg_address {
  399. nodeid: 0,
  400. pid: 0,
  401. reason: 0,
  402. }; CPG_MEMBERS_MAX];
  403. let res = unsafe {
  404. let mut c_group = string_to_cpg_name(group)?;
  405. let c_memlist = member_list.as_ptr() as *mut ffi::cpg_address;
  406. ffi::cpg_membership_get(
  407. handle.cpg_handle,
  408. &mut c_group,
  409. &mut *c_memlist,
  410. &mut member_list_entries,
  411. )
  412. };
  413. if res == ffi::CS_OK {
  414. Ok(cpg_array_to_vec(
  415. member_list.as_ptr(),
  416. member_list_entries as usize,
  417. ))
  418. } else {
  419. Err(CsError::from_c(res))
  420. }
  421. }
  422. /// Get the maximum size that CPG can send in one corosync message,
  423. /// any messages sent via [mcast_joined] that are larger than this
  424. /// will be fragmented
  425. pub fn max_atomic_msgsize_get(handle: &Handle) -> Result<u32> {
  426. let mut asize: u32 = 0;
  427. let res = unsafe { ffi::cpg_max_atomic_msgsize_get(handle.cpg_handle, &mut asize) };
  428. if res == ffi::CS_OK {
  429. Ok(asize)
  430. } else {
  431. Err(CsError::from_c(res))
  432. }
  433. }
  434. /// Get the current 'context' value for this handle.
  435. /// The context value is an arbitrary value that is always passed
  436. /// back to callbacks to help identify the source
  437. pub fn context_get(handle: &Handle) -> Result<u64> {
  438. let mut c_context: *mut c_void = &mut 0u64 as *mut _ as *mut c_void;
  439. let (res, context) = unsafe {
  440. let r = ffi::cpg_context_get(handle.cpg_handle, &mut c_context);
  441. let context: u64 = c_context as u64;
  442. (r, context)
  443. };
  444. if res == ffi::CS_OK {
  445. Ok(context)
  446. } else {
  447. Err(CsError::from_c(res))
  448. }
  449. }
  450. /// Set the current 'context' value for this handle.
  451. /// The context value is an arbitrary value that is always passed
  452. /// back to callbacks to help identify the source.
  453. /// Normally this is set in [initialize], but this allows it to be changed
  454. pub fn context_set(handle: &Handle, context: u64) -> Result<()> {
  455. let res = unsafe {
  456. let c_context = context as *mut c_void;
  457. ffi::cpg_context_set(handle.cpg_handle, c_context)
  458. };
  459. if res == ffi::CS_OK {
  460. Ok(())
  461. } else {
  462. Err(CsError::from_c(res))
  463. }
  464. }
  465. /// Get the flow control state of corosync CPG
  466. pub fn flow_control_state_get(handle: &Handle) -> Result<bool> {
  467. let mut fc_state: u32 = 0;
  468. let res = unsafe { ffi::cpg_flow_control_state_get(handle.cpg_handle, &mut fc_state) };
  469. if res == ffi::CS_OK {
  470. if fc_state == 1 {
  471. Ok(true)
  472. } else {
  473. Ok(false)
  474. }
  475. } else {
  476. Err(CsError::from_c(res))
  477. }
  478. }
  479. /// Send a message to the currently joined CPG group
  480. pub fn mcast_joined(handle: &Handle, guarantee: Guarantee, msg: &[u8]) -> Result<()> {
  481. let c_iovec = ffi::iovec {
  482. iov_base: msg.as_ptr() as *mut c_void,
  483. iov_len: msg.len(),
  484. };
  485. let res = unsafe { ffi::cpg_mcast_joined(handle.cpg_handle, guarantee.to_c(), &c_iovec, 1) };
  486. if res == ffi::CS_OK {
  487. Ok(())
  488. } else {
  489. Err(CsError::from_c(res))
  490. }
  491. }
  492. /// Type of iteration for [CpgIterStart]
  493. #[derive(Copy, Clone)]
  494. pub enum CpgIterType {
  495. NameOnly = 1,
  496. OneGroup = 2,
  497. All = 3,
  498. }
  499. // Iterator based on information on this page. thank you!
  500. // https://stackoverflow.com/questions/30218886/how-to-implement-iterator-and-intoiterator-for-a-simple-struct
  501. // Object to iterate over
  502. /// An object to iterate over a list of CPG groups, create one of these and then use 'for' over it
  503. pub struct CpgIterStart {
  504. iter_handle: u64,
  505. }
  506. /// struct returned from iterating over a [CpgIterStart]
  507. pub struct CpgIter {
  508. pub group: String,
  509. pub nodeid: NodeId,
  510. pub pid: u32,
  511. }
  512. pub struct CpgIntoIter {
  513. iter_handle: u64,
  514. }
  515. impl fmt::Debug for CpgIter {
  516. fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
  517. write!(
  518. f,
  519. "[group: {}, nodeid: {}, pid: {}]",
  520. self.group, self.nodeid, self.pid
  521. )
  522. }
  523. }
  524. impl Iterator for CpgIntoIter {
  525. type Item = CpgIter;
  526. fn next(&mut self) -> Option<CpgIter> {
  527. let mut c_iter_description = ffi::cpg_iteration_description_t {
  528. nodeid: 0,
  529. pid: 0,
  530. group: ffi::cpg_name {
  531. length: 0_u32,
  532. value: [0; CPG_NAMELEN_MAX],
  533. },
  534. };
  535. let res = unsafe { ffi::cpg_iteration_next(self.iter_handle, &mut c_iter_description) };
  536. if res == ffi::CS_OK {
  537. let r_group =
  538. match string_from_bytes(c_iter_description.group.value.as_ptr(), CPG_NAMELEN_MAX) {
  539. Ok(groupname) => groupname,
  540. Err(_) => return None,
  541. };
  542. Some(CpgIter {
  543. group: r_group,
  544. nodeid: NodeId::from(c_iter_description.nodeid),
  545. pid: c_iter_description.pid,
  546. })
  547. } else if res == ffi::CS_ERR_NO_SECTIONS {
  548. // End of list
  549. unsafe {
  550. // Yeah, we don't check this return code. There's nowhere to report it.
  551. ffi::cpg_iteration_finalize(self.iter_handle)
  552. };
  553. None
  554. } else {
  555. None
  556. }
  557. }
  558. }
  559. impl CpgIterStart {
  560. /// Create a new [CpgIterStart] object for iterating over a list of active CPG groups
  561. pub fn new(cpg_handle: &Handle, group: &str, iter_type: CpgIterType) -> Result<CpgIterStart> {
  562. let mut iter_handle: u64 = 0;
  563. let res = unsafe {
  564. let mut c_group = string_to_cpg_name(group)?;
  565. let c_itertype = iter_type as u32;
  566. // IterType 'All' requires that the group pointer is passed in as NULL
  567. let c_group_ptr = {
  568. match iter_type {
  569. CpgIterType::All => std::ptr::null_mut(),
  570. _ => &mut c_group,
  571. }
  572. };
  573. ffi::cpg_iteration_initialize(
  574. cpg_handle.cpg_handle,
  575. c_itertype,
  576. c_group_ptr,
  577. &mut iter_handle,
  578. )
  579. };
  580. if res == ffi::CS_OK {
  581. Ok(CpgIterStart { iter_handle })
  582. } else {
  583. Err(CsError::from_c(res))
  584. }
  585. }
  586. }
  587. impl IntoIterator for CpgIterStart {
  588. type Item = CpgIter;
  589. type IntoIter = CpgIntoIter;
  590. fn into_iter(self) -> Self::IntoIter {
  591. CpgIntoIter {
  592. iter_handle: self.iter_handle,
  593. }
  594. }
  595. }