ipc_glue.c 19 KB


  1. /*
  2. * Copyright (c) 2010 Red Hat, Inc.
  3. *
  4. * All rights reserved.
  5. *
  6. * Author: Angus Salkeld <asalkeld@redhat.com>
  7. *
  8. * This software licensed under BSD license, the text of which follows:
  9. *
  10. * Redistribution and use in source and binary forms, with or without
  11. * modification, are permitted provided that the following conditions are met:
  12. *
  13. * - Redistributions of source code must retain the above copyright notice,
  14. * this list of conditions and the following disclaimer.
  15. * - Redistributions in binary form must reproduce the above copyright notice,
  16. * this list of conditions and the following disclaimer in the documentation
  17. * and/or other materials provided with the distribution.
  18. * - Neither the name of Red Hat, Inc. nor the names of its
  19. * contributors may be used to endorse or promote products derived from this
  20. * software without specific prior written permission.
  21. *
  22. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  23. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  24. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  25. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  26. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  27. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  28. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  29. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  30. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  31. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  32. * THE POSSIBILITY OF SUCH DAMAGE.
  33. */
  34. #include <config.h>
  35. #include <stdlib.h>
  36. #include <stdio.h>
  37. #include <errno.h>
  38. #include <assert.h>
  39. #include <sys/uio.h>
  40. #include <string.h>
  41. #include <qb/qbdefs.h>
  42. #include <qb/qblist.h>
  43. #include <qb/qbutil.h>
  44. #include <qb/qbloop.h>
  45. #include <qb/qbipcs.h>
  46. #include <corosync/swab.h>
  47. #include <corosync/corotypes.h>
  48. #include <corosync/corodefs.h>
  49. #include <corosync/totem/totempg.h>
  50. #include <corosync/engine/objdb.h>
  51. #include <corosync/engine/config.h>
  52. #include <corosync/engine/logsys.h>
  53. #include "mainconfig.h"
  54. #include "sync.h"
  55. #include "syncv2.h"
  56. #include "timer.h"
  57. #include "main.h"
  58. #include "util.h"
  59. #include "apidef.h"
  60. #include "service.h"
  61. LOGSYS_DECLARE_SUBSYS ("MAIN");
  62. static struct corosync_api_v1 *api = NULL;
  63. static int ipc_subsys_id = -1;
  64. static int32_t ipc_not_enough_fds_left = 0;
  65. static int32_t ipc_fc_is_quorate; /* boolean */
  66. static int32_t ipc_fc_totem_queue_level; /* percentage used */
  67. static int32_t ipc_fc_sync_in_process; /* boolean */
  68. static qb_handle_t object_connection_handle;
  69. struct cs_ipcs_mapper {
  70. int32_t id;
  71. qb_ipcs_service_t *inst;
  72. char name[256];
  73. };
  74. static struct cs_ipcs_mapper ipcs_mapper[SERVICE_HANDLER_MAXIMUM_COUNT];
  75. static int32_t cs_ipcs_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn);
  76. static int32_t cs_ipcs_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t events,
  77. void *data, qb_ipcs_dispatch_fn_t fn);
  78. static int32_t cs_ipcs_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t events,
  79. void *data, qb_ipcs_dispatch_fn_t fn);
  80. static int32_t cs_ipcs_dispatch_del(int32_t fd);
  81. static struct qb_ipcs_poll_handlers corosync_poll_funcs = {
  82. .job_add = cs_ipcs_job_add,
  83. .dispatch_add = cs_ipcs_dispatch_add,
  84. .dispatch_mod = cs_ipcs_dispatch_mod,
  85. .dispatch_del = cs_ipcs_dispatch_del,
  86. };
  87. static int32_t cs_ipcs_connection_accept (qb_ipcs_connection_t *c, uid_t euid, gid_t egid);
  88. static void cs_ipcs_connection_created(qb_ipcs_connection_t *c);
  89. static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c,
  90. void *data, size_t size);
  91. static int32_t cs_ipcs_connection_closed (qb_ipcs_connection_t *c);
  92. static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t *c);
  93. static struct qb_ipcs_service_handlers corosync_service_funcs = {
  94. .connection_accept = cs_ipcs_connection_accept,
  95. .connection_created = cs_ipcs_connection_created,
  96. .msg_process = cs_ipcs_msg_process,
  97. .connection_closed = cs_ipcs_connection_closed,
  98. .connection_destroyed = cs_ipcs_connection_destroyed,
  99. };
  100. static const char* cs_ipcs_serv_short_name(int32_t service_id)
  101. {
  102. const char *name;
  103. switch (service_id) {
  104. case EVS_SERVICE:
  105. name = "evs";
  106. break;
  107. case CLM_SERVICE:
  108. name = "saClm";
  109. break;
  110. case AMF_SERVICE:
  111. name = "saAmf";
  112. break;
  113. case CKPT_SERVICE:
  114. name = "saCkpt";
  115. break;
  116. case EVT_SERVICE:
  117. name = "saEvt";
  118. break;
  119. case LCK_SERVICE:
  120. name = "saLck";
  121. break;
  122. case MSG_SERVICE:
  123. name = "saMsg";
  124. break;
  125. case CFG_SERVICE:
  126. name = "cfg";
  127. break;
  128. case CPG_SERVICE:
  129. name = "cpg";
  130. break;
  131. case CMAN_SERVICE:
  132. name = "cman";
  133. break;
  134. case PCMK_SERVICE:
  135. name = "pacemaker.engine";
  136. break;
  137. case CONFDB_SERVICE:
  138. name = "confdb";
  139. break;
  140. case QUORUM_SERVICE:
  141. name = "quorum";
  142. break;
  143. case PLOAD_SERVICE:
  144. name = "pload";
  145. break;
  146. case TMR_SERVICE:
  147. name = "saTmr";
  148. break;
  149. case VOTEQUORUM_SERVICE:
  150. name = "votequorum";
  151. break;
  152. case NTF_SERVICE:
  153. name = "saNtf";
  154. break;
  155. case AMF_V2_SERVICE:
  156. name = "saAmfV2";
  157. break;
  158. case TST_SV1_SERVICE:
  159. name = "tst";
  160. break;
  161. case TST_SV2_SERVICE:
  162. name = "tst2";
  163. break;
  164. case MON_SERVICE:
  165. name = "mon";
  166. break;
  167. case WD_SERVICE:
  168. name = "wd";
  169. break;
  170. default:
  171. name = NULL;
  172. break;
  173. }
  174. return name;
  175. }
  176. int32_t cs_ipcs_service_destroy(int32_t service_id)
  177. {
  178. if (ipcs_mapper[service_id].inst) {
  179. qb_ipcs_destroy(ipcs_mapper[service_id].inst);
  180. ipcs_mapper[service_id].inst = NULL;
  181. }
  182. return 0;
  183. }
  184. static int32_t cs_ipcs_connection_accept (qb_ipcs_connection_t *c, uid_t euid, gid_t egid)
  185. {
  186. struct list_head *iter;
  187. int32_t service = qb_ipcs_service_id_get(c);
  188. if (ais_service[service] == NULL || ais_service_exiting[service]) {
  189. return -ENOSYS;
  190. }
  191. if (ipc_not_enough_fds_left) {
  192. return -EMFILE;
  193. }
  194. if (euid == 0 || egid == 0) {
  195. return 0;
  196. }
  197. for (iter = uidgid_list_head.next; iter != &uidgid_list_head;
  198. iter = iter->next) {
  199. struct uidgid_item *ugi = qb_list_entry (iter, struct uidgid_item,
  200. list);
  201. if (euid == ugi->uid || egid == ugi->gid)
  202. return 0;
  203. }
  204. log_printf(LOGSYS_LEVEL_ERROR, "Denied connection attempt from %d:%d", euid, egid);
  205. return -EACCES;
  206. }
  207. static char * pid_to_name (pid_t pid, char *out_name, size_t name_len)
  208. {
  209. char *name;
  210. char *rest;
  211. FILE *fp;
  212. char fname[32];
  213. char buf[256];
  214. snprintf (fname, 32, "/proc/%d/stat", pid);
  215. fp = fopen (fname, "r");
  216. if (!fp) {
  217. return NULL;
  218. }
  219. if (fgets (buf, sizeof (buf), fp) == NULL) {
  220. fclose (fp);
  221. return NULL;
  222. }
  223. fclose (fp);
  224. name = strrchr (buf, '(');
  225. if (!name) {
  226. return NULL;
  227. }
  228. /* move past the bracket */
  229. name++;
  230. rest = strrchr (buf, ')');
  231. if (rest == NULL || rest[1] != ' ') {
  232. return NULL;
  233. }
  234. *rest = '\0';
  235. /* move past the NULL and space */
  236. rest += 2;
  237. /* copy the name */
  238. strncpy (out_name, name, name_len);
  239. out_name[name_len - 1] = '\0';
  240. return out_name;
  241. }
  242. struct cs_ipcs_conn_context {
  243. qb_handle_t stats_handle;
  244. char data[1];
  245. };
  246. static void cs_ipcs_connection_created(qb_ipcs_connection_t *c)
  247. {
  248. int32_t service = 0;
  249. uint32_t zero_32 = 0;
  250. uint64_t zero_64 = 0;
  251. unsigned int key_incr_dummy;
  252. qb_handle_t object_handle;
  253. struct cs_ipcs_conn_context *context;
  254. char conn_name[42];
  255. char proc_name[32];
  256. struct qb_ipcs_connection_stats stats;
  257. int32_t size = sizeof(struct cs_ipcs_conn_context);
  258. log_printf(LOG_INFO, "%s() new connection", __func__);
  259. service = qb_ipcs_service_id_get(c);
  260. size += ais_service[service]->private_data_size;
  261. context = calloc(1, size);
  262. qb_ipcs_context_set(c, context);
  263. ais_service[service]->lib_init_fn(c);
  264. api->object_key_increment (object_connection_handle,
  265. "active", strlen("active"),
  266. &key_incr_dummy);
  267. qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
  268. if (stats.client_pid > 0) {
  269. if (pid_to_name (stats.client_pid, proc_name, sizeof(proc_name))) {
  270. snprintf (conn_name,
  271. sizeof(conn_name),
  272. "%s:%d:%p", proc_name,
  273. stats.client_pid, c);
  274. } else {
  275. snprintf (conn_name,
  276. sizeof(conn_name),
  277. "%d:%p",
  278. stats.client_pid, c);
  279. }
  280. } else {
  281. snprintf (conn_name,
  282. sizeof(conn_name),
  283. "%p", c);
  284. }
  285. api->object_create (object_connection_handle,
  286. &object_handle,
  287. conn_name,
  288. strlen (conn_name));
  289. context->stats_handle = object_handle;
  290. api->object_key_create_typed (object_handle,
  291. "service_id",
  292. &zero_32, sizeof (zero_32),
  293. OBJDB_VALUETYPE_UINT32);
  294. api->object_key_create_typed (object_handle,
  295. "client_pid",
  296. &zero_32, sizeof (zero_32),
  297. OBJDB_VALUETYPE_INT32);
  298. api->object_key_create_typed (object_handle,
  299. "responses",
  300. &zero_64, sizeof (zero_64),
  301. OBJDB_VALUETYPE_UINT64);
  302. api->object_key_create_typed (object_handle,
  303. "dispatched",
  304. &zero_64, sizeof (zero_64),
  305. OBJDB_VALUETYPE_UINT64);
  306. api->object_key_create_typed (object_handle,
  307. "requests",
  308. &zero_64, sizeof (zero_64),
  309. OBJDB_VALUETYPE_INT64);
  310. api->object_key_create_typed (object_handle,
  311. "send_retries",
  312. &zero_64, sizeof (zero_64),
  313. OBJDB_VALUETYPE_UINT64);
  314. api->object_key_create_typed (object_handle,
  315. "recv_retries",
  316. &zero_64, sizeof (zero_64),
  317. OBJDB_VALUETYPE_UINT64);
  318. api->object_key_create_typed (object_handle,
  319. "flow_control",
  320. &zero_32, sizeof (zero_32),
  321. OBJDB_VALUETYPE_UINT32);
  322. api->object_key_create_typed (object_handle,
  323. "flow_control_count",
  324. &zero_64, sizeof (zero_64),
  325. OBJDB_VALUETYPE_UINT64);
  326. }
  327. void cs_ipc_refcnt_inc(void *conn)
  328. {
  329. qb_ipcs_connection_ref(conn);
  330. }
  331. void cs_ipc_refcnt_dec(void *conn)
  332. {
  333. qb_ipcs_connection_unref(conn);
  334. }
  335. void *cs_ipcs_private_data_get(void *conn)
  336. {
  337. struct cs_ipcs_conn_context *cnx;
  338. cnx = qb_ipcs_context_get(conn);
  339. return &cnx->data[0];
  340. }
  341. static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t *c)
  342. {
  343. struct cs_ipcs_conn_context *cnx;
  344. log_printf(LOG_INFO, "%s() ", __func__);
  345. cnx = qb_ipcs_context_get(c);
  346. if (cnx) {
  347. free(cnx);
  348. }
  349. }
  350. static int32_t cs_ipcs_connection_closed (qb_ipcs_connection_t *c)
  351. {
  352. struct cs_ipcs_conn_context *cnx;
  353. unsigned int key_incr_dummy;
  354. int32_t res = 0;
  355. int32_t service = qb_ipcs_service_id_get(c);
  356. log_printf(LOG_INFO, "%s() ", __func__);
  357. res = ais_service[service]->lib_exit_fn(c);
  358. if (res != 0) {
  359. return res;
  360. }
  361. cnx = qb_ipcs_context_get(c);
  362. api->object_destroy (cnx->stats_handle);
  363. api->object_key_increment (object_connection_handle,
  364. "closed", strlen("closed"),
  365. &key_incr_dummy);
  366. api->object_key_decrement (object_connection_handle,
  367. "active", strlen("active"),
  368. &key_incr_dummy);
  369. return 0;
  370. }
  371. int cs_ipcs_response_iov_send (void *conn,
  372. const struct iovec *iov,
  373. unsigned int iov_len)
  374. {
  375. int32_t rc = qb_ipcs_response_sendv(conn, iov, iov_len);
  376. if (rc >= 0) {
  377. return 0;
  378. }
  379. return rc;
  380. }
  381. int cs_ipcs_response_send(void *conn, const void *msg, size_t mlen)
  382. {
  383. int32_t rc = qb_ipcs_response_send(conn, msg, mlen);
  384. if (rc >= 0) {
  385. return 0;
  386. }
  387. return rc;
  388. }
  389. int cs_ipcs_dispatch_send(void *conn, const void *msg, size_t mlen)
  390. {
  391. int32_t rc = qb_ipcs_event_send(conn, msg, mlen);
  392. if (rc >= 0) {
  393. return 0;
  394. }
  395. return rc;
  396. }
  397. int cs_ipcs_dispatch_iov_send (void *conn,
  398. const struct iovec *iov,
  399. unsigned int iov_len)
  400. {
  401. int32_t rc = qb_ipcs_event_sendv(conn, iov, iov_len);
  402. if (rc >= 0) {
  403. return 0;
  404. }
  405. return rc;
  406. }
  407. static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c,
  408. void *data, size_t size)
  409. {
  410. struct qb_ipc_response_header response;
  411. struct qb_ipc_request_header *request_pt = (struct qb_ipc_request_header *)data;
  412. int32_t service = qb_ipcs_service_id_get(c);
  413. int32_t send_ok;
  414. ssize_t res = -1;
  415. int sending_allowed_private_data;
  416. send_ok = corosync_sending_allowed (service,
  417. request_pt->id,
  418. request_pt,
  419. &sending_allowed_private_data);
  420. /*
  421. * This happens when the message contains some kind of invalid
  422. * parameter, such as an invalid size
  423. */
  424. if (send_ok == -1) {
  425. response.size = sizeof (response);
  426. response.id = 0;
  427. response.error = CS_ERR_INVALID_PARAM;
  428. log_printf(LOG_INFO, "%s() invalid message! size:%d error:%d",
  429. __func__, response.size, response.error);
  430. qb_ipcs_response_send (c,
  431. &response,
  432. sizeof (response));
  433. res = -EINVAL;
  434. } else {
  435. if (send_ok) {
  436. ais_service[service]->lib_engine[request_pt->id].lib_handler_fn(c, request_pt);
  437. res = 0;
  438. } else {
  439. /*
  440. * Overload, tell library to retry
  441. */
  442. response.size = sizeof (response);
  443. response.id = 0;
  444. response.error = CS_ERR_TRY_AGAIN;
  445. qb_ipcs_response_send (c,
  446. &response,
  447. sizeof (response));
  448. res = -ENOBUFS;
  449. }
  450. }
  451. corosync_sending_allowed_release (&sending_allowed_private_data);
  452. return res;
  453. }
  454. static int32_t cs_ipcs_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn)
  455. {
  456. return qb_loop_job_add(cs_poll_handle_get(), p, data, fn);
  457. }
  458. static int32_t cs_ipcs_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t events,
  459. void *data, qb_ipcs_dispatch_fn_t fn)
  460. {
  461. return qb_loop_poll_add(cs_poll_handle_get(), p, fd, events, data, fn);
  462. }
  463. static int32_t cs_ipcs_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t events,
  464. void *data, qb_ipcs_dispatch_fn_t fn)
  465. {
  466. return qb_loop_poll_mod(cs_poll_handle_get(), p, fd, events, data, fn);
  467. }
  468. static int32_t cs_ipcs_dispatch_del(int32_t fd)
  469. {
  470. return qb_loop_poll_del(cs_poll_handle_get(), fd);
  471. }
  472. static void cs_ipcs_low_fds_event(int32_t not_enough, int32_t fds_available)
  473. {
  474. ipc_not_enough_fds_left = not_enough;
  475. if (not_enough) {
  476. log_printf(LOGSYS_LEVEL_WARNING, "refusing new connections (fds_available:%d)\n",
  477. fds_available);
  478. } else {
  479. log_printf(LOGSYS_LEVEL_NOTICE, "allowing new connections (fds_available:%d)\n",
  480. fds_available);
  481. }
  482. }
  483. int32_t cs_ipcs_q_level_get(void)
  484. {
  485. return ipc_fc_totem_queue_level;
  486. }
  487. static qb_loop_timer_handle ipcs_check_for_flow_control_timer;
  488. static void cs_ipcs_check_for_flow_control(void)
  489. {
  490. int32_t i;
  491. int32_t fc_enabled;
  492. for (i = 0; i < SERVICE_HANDLER_MAXIMUM_COUNT; i++) {
  493. if (ais_service[i] == NULL) {
  494. continue;
  495. }
  496. fc_enabled = QB_TRUE;
  497. if (ipc_fc_is_quorate == 1 ||
  498. ais_service[i]->allow_inquorate == CS_LIB_ALLOW_INQUORATE) {
  499. /*
  500. * we are quorate
  501. * now check flow control
  502. */
  503. if (ipc_fc_totem_queue_level != TOTEM_Q_LEVEL_CRITICAL &&
  504. ipc_fc_sync_in_process == 0) {
  505. fc_enabled = QB_FALSE;
  506. }
  507. }
  508. if (fc_enabled) {
  509. qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_OFF);
  510. qb_loop_timer_add(cs_poll_handle_get(), QB_LOOP_MED, 1*QB_TIME_NS_IN_MSEC,
  511. NULL, corosync_recheck_the_q_level, &ipcs_check_for_flow_control_timer);
  512. } else if (ipc_fc_totem_queue_level == TOTEM_Q_LEVEL_LOW) {
  513. qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_FAST);
  514. } else if (ipc_fc_totem_queue_level == TOTEM_Q_LEVEL_GOOD) {
  515. qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_NORMAL);
  516. } else if (ipc_fc_totem_queue_level == TOTEM_Q_LEVEL_HIGH) {
  517. qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_SLOW);
  518. }
  519. }
  520. }
  521. static void cs_ipcs_fc_quorum_changed(int quorate, void *context)
  522. {
  523. ipc_fc_is_quorate = quorate;
  524. cs_ipcs_check_for_flow_control();
  525. }
  526. static void cs_ipcs_totem_queue_level_changed(enum totem_q_level level)
  527. {
  528. ipc_fc_totem_queue_level = level;
  529. cs_ipcs_check_for_flow_control();
  530. }
  531. void cs_ipcs_sync_state_changed(int32_t sync_in_process)
  532. {
  533. ipc_fc_sync_in_process = sync_in_process;
  534. cs_ipcs_check_for_flow_control();
  535. }
  536. static void cs_ipcs_libqb_log_fn(const char *file_name,
  537. int32_t file_line,
  538. int32_t severity,
  539. const char *msg)
  540. {
  541. int32_t level = severity;
  542. if (severity > LOG_DEBUG) {
  543. level = LOGSYS_LEVEL_DEBUG;
  544. }
  545. _logsys_log_printf (LOGSYS_ENCODE_RECID(level,
  546. ipc_subsys_id,
  547. LOGSYS_RECID_LOG),
  548. __func__, file_name, file_line, "%s", msg);
  549. }
  550. void cs_ipcs_stats_update(void)
  551. {
  552. int32_t i;
  553. struct qb_ipcs_stats srv_stats;
  554. struct qb_ipcs_connection_stats stats;
  555. qb_ipcs_connection_t *c;
  556. struct cs_ipcs_conn_context *cnx;
  557. for (i = 0; i < SERVICE_HANDLER_MAXIMUM_COUNT; i++) {
  558. if (ais_service[i] == NULL) {
  559. continue;
  560. }
  561. qb_ipcs_stats_get(ipcs_mapper[i].inst, &srv_stats, QB_FALSE);
  562. for (c = qb_ipcs_connection_first_get(ipcs_mapper[i].inst); c;
  563. c = qb_ipcs_connection_next_get(ipcs_mapper[i].inst, c)) {
  564. cnx = qb_ipcs_context_get(c);
  565. if (cnx == NULL) continue;
  566. qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
  567. api->object_key_replace(cnx->stats_handle,
  568. "client_pid", strlen("client_pid"),
  569. &stats.client_pid, sizeof(uint32_t));
  570. api->object_key_replace(cnx->stats_handle,
  571. "requests", strlen("requests"),
  572. &stats.requests, sizeof(uint64_t));
  573. api->object_key_replace(cnx->stats_handle,
  574. "responses", strlen("responses"),
  575. &stats.responses, sizeof(uint64_t));
  576. api->object_key_replace(cnx->stats_handle,
  577. "dispatched", strlen("dispatched"),
  578. &stats.events, sizeof(uint64_t));
  579. api->object_key_replace(cnx->stats_handle,
  580. "send_retries", strlen("send_retries"),
  581. &stats.send_retries, sizeof(uint64_t));
  582. api->object_key_replace(cnx->stats_handle,
  583. "recv_retries", strlen("recv_retries"),
  584. &stats.recv_retries, sizeof(uint64_t));
  585. api->object_key_replace(cnx->stats_handle,
  586. "flow_control_count", strlen("flow_control_count"),
  587. &stats.flow_control_count, sizeof(uint64_t));
  588. api->object_key_replace(cnx->stats_handle,
  589. "flow_control_state", strlen("flow_control_state"),
  590. &stats.flow_control_state, sizeof(uint32_t));
  591. qb_ipcs_connection_unref(c);
  592. }
  593. }
  594. }
  595. void cs_ipcs_service_init(struct corosync_service_engine *service)
  596. {
  597. ipcs_mapper[service->id].id = service->id;
  598. strcpy(ipcs_mapper[service->id].name, cs_ipcs_serv_short_name(service->id));
  599. log_printf (LOGSYS_LEVEL_INFO,
  600. "Initializing IPC on %s [%d]",
  601. ipcs_mapper[service->id].name,
  602. ipcs_mapper[service->id].id);
  603. ipcs_mapper[service->id].inst = qb_ipcs_create(ipcs_mapper[service->id].name,
  604. ipcs_mapper[service->id].id,
  605. QB_IPC_SHM,
  606. &corosync_service_funcs);
  607. assert(ipcs_mapper[service->id].inst);
  608. qb_ipcs_poll_handlers_set(ipcs_mapper[service->id].inst,
  609. &corosync_poll_funcs);
  610. qb_ipcs_run(ipcs_mapper[service->id].inst);
  611. }
  612. void cs_ipcs_init(void)
  613. {
  614. qb_handle_t object_find_handle;
  615. qb_handle_t object_runtime_handle;
  616. uint64_t zero_64 = 0;
  617. api = apidef_get ();
  618. qb_loop_poll_low_fds_event_set(cs_poll_handle_get(), cs_ipcs_low_fds_event);
  619. ipc_subsys_id = _logsys_subsys_create ("IPC");
  620. if (ipc_subsys_id < 0) {
  621. log_printf (LOGSYS_LEVEL_ERROR,
  622. "Could not initialize IPC logging subsystem\n");
  623. corosync_exit_error (AIS_DONE_INIT_SERVICES);
  624. }
  625. qb_util_set_log_function (cs_ipcs_libqb_log_fn);
  626. api->quorum_register_callback (cs_ipcs_fc_quorum_changed, NULL);
  627. totempg_queue_level_register_callback (cs_ipcs_totem_queue_level_changed);
  628. api->object_find_create (OBJECT_PARENT_HANDLE,
  629. "runtime", strlen ("runtime"),
  630. &object_find_handle);
  631. if (api->object_find_next (object_find_handle,
  632. &object_runtime_handle) != 0) {
  633. log_printf (LOGSYS_LEVEL_ERROR,"arrg no runtime");
  634. return;
  635. }
  636. /* Connection objects */
  637. api->object_create (object_runtime_handle,
  638. &object_connection_handle,
  639. "connections", strlen ("connections"));
  640. api->object_key_create_typed (object_connection_handle,
  641. "active", &zero_64, sizeof (zero_64),
  642. OBJDB_VALUETYPE_UINT64);
  643. api->object_key_create_typed (object_connection_handle,
  644. "closed", &zero_64, sizeof (zero_64),
  645. OBJDB_VALUETYPE_UINT64);
  646. }