ipc_glue.c 19 KB


  1. /*
  2. * Copyright (c) 2010 Red Hat, Inc.
  3. *
  4. * All rights reserved.
  5. *
  6. * Author: Angus Salkeld <asalkeld@redhat.com>
  7. *
  8. * This software licensed under BSD license, the text of which follows:
  9. *
  10. * Redistribution and use in source and binary forms, with or without
  11. * modification, are permitted provided that the following conditions are met:
  12. *
  13. * - Redistributions of source code must retain the above copyright notice,
  14. * this list of conditions and the following disclaimer.
  15. * - Redistributions in binary form must reproduce the above copyright notice,
  16. * this list of conditions and the following disclaimer in the documentation
  17. * and/or other materials provided with the distribution.
  18. * - Neither the name of Red Hat, Inc. nor the names of its
  19. * contributors may be used to endorse or promote products derived from this
  20. * software without specific prior written permission.
  21. *
  22. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  23. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  24. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  25. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  26. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  27. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  28. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  29. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  30. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  31. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  32. * THE POSSIBILITY OF SUCH DAMAGE.
  33. */
  34. #include <config.h>
  35. #include <stdlib.h>
  36. #include <stdio.h>
  37. #include <errno.h>
  38. #include <assert.h>
  39. #include <sys/uio.h>
  40. #include <string.h>
  41. #include <qb/qbdefs.h>
  42. #include <qb/qblist.h>
  43. #include <qb/qbutil.h>
  44. #include <qb/qbloop.h>
  45. #include <qb/qbipcs.h>
  46. #include <corosync/swab.h>
  47. #include <corosync/corotypes.h>
  48. #include <corosync/corodefs.h>
  49. #include <corosync/totem/totempg.h>
  50. #include <corosync/engine/objdb.h>
  51. #include <corosync/engine/config.h>
  52. #include <corosync/engine/logsys.h>
  53. #include "mainconfig.h"
  54. #include "sync.h"
  55. #include "syncv2.h"
  56. #include "timer.h"
  57. #include "main.h"
  58. #include "util.h"
  59. #include "apidef.h"
  60. #include "service.h"
  61. LOGSYS_DECLARE_SUBSYS ("MAIN");
  62. static struct corosync_api_v1 *api = NULL;
  63. static int ipc_subsys_id = -1;
  64. static int32_t ipc_not_enough_fds_left = 0;
  65. static int32_t ipc_fc_is_quorate; /* boolean */
  66. static int32_t ipc_fc_totem_queue_level; /* percentage used */
  67. static int32_t ipc_fc_sync_in_process; /* boolean */
  68. static qb_handle_t object_connection_handle;
  69. struct cs_ipcs_mapper {
  70. int32_t id;
  71. qb_ipcs_service_t *inst;
  72. char name[256];
  73. };
  74. static struct cs_ipcs_mapper ipcs_mapper[SERVICE_HANDLER_MAXIMUM_COUNT];
  75. static int32_t cs_ipcs_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn);
  76. static int32_t cs_ipcs_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t events,
  77. void *data, qb_ipcs_dispatch_fn_t fn);
  78. static int32_t cs_ipcs_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t events,
  79. void *data, qb_ipcs_dispatch_fn_t fn);
  80. static int32_t cs_ipcs_dispatch_del(int32_t fd);
  81. static struct qb_ipcs_poll_handlers corosync_poll_funcs = {
  82. .job_add = cs_ipcs_job_add,
  83. .dispatch_add = cs_ipcs_dispatch_add,
  84. .dispatch_mod = cs_ipcs_dispatch_mod,
  85. .dispatch_del = cs_ipcs_dispatch_del,
  86. };
  87. static int32_t cs_ipcs_connection_accept (qb_ipcs_connection_t *c, uid_t euid, gid_t egid);
  88. static void cs_ipcs_connection_created(qb_ipcs_connection_t *c);
  89. static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c,
  90. void *data, size_t size);
  91. static int32_t cs_ipcs_connection_closed (qb_ipcs_connection_t *c);
  92. static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t *c);
  93. static struct qb_ipcs_service_handlers corosync_service_funcs = {
  94. .connection_accept = cs_ipcs_connection_accept,
  95. .connection_created = cs_ipcs_connection_created,
  96. .msg_process = cs_ipcs_msg_process,
  97. .connection_closed = cs_ipcs_connection_closed,
  98. .connection_destroyed = cs_ipcs_connection_destroyed,
  99. };
  100. static const char* cs_ipcs_serv_short_name(int32_t service_id)
  101. {
  102. const char *name;
  103. switch (service_id) {
  104. case EVS_SERVICE:
  105. name = "evs";
  106. break;
  107. case CLM_SERVICE:
  108. name = "saClm";
  109. break;
  110. case AMF_SERVICE:
  111. name = "saAmf";
  112. break;
  113. case CKPT_SERVICE:
  114. name = "saCkpt";
  115. break;
  116. case EVT_SERVICE:
  117. name = "saEvt";
  118. break;
  119. case LCK_SERVICE:
  120. name = "saLck";
  121. break;
  122. case MSG_SERVICE:
  123. name = "saMsg";
  124. break;
  125. case CFG_SERVICE:
  126. name = "cfg";
  127. break;
  128. case CPG_SERVICE:
  129. name = "cpg";
  130. break;
  131. case CMAN_SERVICE:
  132. name = "cman";
  133. break;
  134. case PCMK_SERVICE:
  135. name = "pacemaker.engine";
  136. break;
  137. case CONFDB_SERVICE:
  138. name = "confdb";
  139. break;
  140. case QUORUM_SERVICE:
  141. name = "quorum";
  142. break;
  143. case PLOAD_SERVICE:
  144. name = "pload";
  145. break;
  146. case TMR_SERVICE:
  147. name = "saTmr";
  148. break;
  149. case VOTEQUORUM_SERVICE:
  150. name = "votequorum";
  151. break;
  152. case NTF_SERVICE:
  153. name = "saNtf";
  154. break;
  155. case AMF_V2_SERVICE:
  156. name = "saAmfV2";
  157. break;
  158. case TST_SV1_SERVICE:
  159. name = "tst";
  160. break;
  161. case TST_SV2_SERVICE:
  162. name = "tst2";
  163. break;
  164. case MON_SERVICE:
  165. name = "mon";
  166. break;
  167. case WD_SERVICE:
  168. name = "wd";
  169. break;
  170. default:
  171. name = NULL;
  172. break;
  173. }
  174. return name;
  175. }
  176. int32_t cs_ipcs_service_destroy(int32_t service_id)
  177. {
  178. qb_ipcs_destroy(ipcs_mapper[service_id].inst);
  179. return 0;
  180. }
  181. static int32_t cs_ipcs_connection_accept (qb_ipcs_connection_t *c, uid_t euid, gid_t egid)
  182. {
  183. struct list_head *iter;
  184. int32_t service = qb_ipcs_service_id_get(c);
  185. if (ais_service[service] == NULL || ais_service_exiting[service]) {
  186. return -ENOSYS;
  187. }
  188. if (ipc_not_enough_fds_left) {
  189. return -EMFILE;
  190. }
  191. if (euid == 0 || egid == 0) {
  192. return 0;
  193. }
  194. for (iter = uidgid_list_head.next; iter != &uidgid_list_head;
  195. iter = iter->next) {
  196. struct uidgid_item *ugi = qb_list_entry (iter, struct uidgid_item,
  197. list);
  198. if (euid == ugi->uid || egid == ugi->gid)
  199. return 0;
  200. }
  201. log_printf(LOGSYS_LEVEL_ERROR, "Denied connection attempt from %d:%d", euid, egid);
  202. return -EACCES;
  203. }
  204. static char * pid_to_name (pid_t pid, char *out_name, size_t name_len)
  205. {
  206. char *name;
  207. char *rest;
  208. FILE *fp;
  209. char fname[32];
  210. char buf[256];
  211. snprintf (fname, 32, "/proc/%d/stat", pid);
  212. fp = fopen (fname, "r");
  213. if (!fp) {
  214. return NULL;
  215. }
  216. if (fgets (buf, sizeof (buf), fp) == NULL) {
  217. fclose (fp);
  218. return NULL;
  219. }
  220. fclose (fp);
  221. name = strrchr (buf, '(');
  222. if (!name) {
  223. return NULL;
  224. }
  225. /* move past the bracket */
  226. name++;
  227. rest = strrchr (buf, ')');
  228. if (rest == NULL || rest[1] != ' ') {
  229. return NULL;
  230. }
  231. *rest = '\0';
  232. /* move past the NULL and space */
  233. rest += 2;
  234. /* copy the name */
  235. strncpy (out_name, name, name_len);
  236. out_name[name_len - 1] = '\0';
  237. return out_name;
  238. }
  239. struct cs_ipcs_conn_context {
  240. qb_handle_t stats_handle;
  241. char data[1];
  242. };
  243. static void cs_ipcs_connection_created(qb_ipcs_connection_t *c)
  244. {
  245. int32_t service = 0;
  246. uint32_t zero_32 = 0;
  247. uint64_t zero_64 = 0;
  248. unsigned int key_incr_dummy;
  249. qb_handle_t object_handle;
  250. struct cs_ipcs_conn_context *context;
  251. char conn_name[42];
  252. char proc_name[32];
  253. struct qb_ipcs_connection_stats stats;
  254. int32_t size = sizeof(struct cs_ipcs_conn_context);
  255. log_printf(LOG_INFO, "%s() new connection", __func__);
  256. service = qb_ipcs_service_id_get(c);
  257. size += ais_service[service]->private_data_size;
  258. context = calloc(1, size);
  259. qb_ipcs_context_set(c, context);
  260. ais_service[service]->lib_init_fn(c);
  261. api->object_key_increment (object_connection_handle,
  262. "active", strlen("active"),
  263. &key_incr_dummy);
  264. qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
  265. if (stats.client_pid > 0) {
  266. if (pid_to_name (stats.client_pid, proc_name, sizeof(proc_name))) {
  267. snprintf (conn_name,
  268. sizeof(conn_name),
  269. "%s:%d:%p", proc_name,
  270. stats.client_pid, c);
  271. } else {
  272. snprintf (conn_name,
  273. sizeof(conn_name),
  274. "%d:%p",
  275. stats.client_pid, c);
  276. }
  277. } else {
  278. snprintf (conn_name,
  279. sizeof(conn_name),
  280. "%p", c);
  281. }
  282. api->object_create (object_connection_handle,
  283. &object_handle,
  284. conn_name,
  285. strlen (conn_name));
  286. context->stats_handle = object_handle;
  287. api->object_key_create_typed (object_handle,
  288. "service_id",
  289. &zero_32, sizeof (zero_32),
  290. OBJDB_VALUETYPE_UINT32);
  291. api->object_key_create_typed (object_handle,
  292. "client_pid",
  293. &zero_32, sizeof (zero_32),
  294. OBJDB_VALUETYPE_INT32);
  295. api->object_key_create_typed (object_handle,
  296. "responses",
  297. &zero_64, sizeof (zero_64),
  298. OBJDB_VALUETYPE_UINT64);
  299. api->object_key_create_typed (object_handle,
  300. "dispatched",
  301. &zero_64, sizeof (zero_64),
  302. OBJDB_VALUETYPE_UINT64);
  303. api->object_key_create_typed (object_handle,
  304. "requests",
  305. &zero_64, sizeof (zero_64),
  306. OBJDB_VALUETYPE_INT64);
  307. api->object_key_create_typed (object_handle,
  308. "send_retries",
  309. &zero_64, sizeof (zero_64),
  310. OBJDB_VALUETYPE_UINT64);
  311. api->object_key_create_typed (object_handle,
  312. "recv_retries",
  313. &zero_64, sizeof (zero_64),
  314. OBJDB_VALUETYPE_UINT64);
  315. api->object_key_create_typed (object_handle,
  316. "flow_control",
  317. &zero_32, sizeof (zero_32),
  318. OBJDB_VALUETYPE_UINT32);
  319. api->object_key_create_typed (object_handle,
  320. "flow_control_count",
  321. &zero_64, sizeof (zero_64),
  322. OBJDB_VALUETYPE_UINT64);
  323. }
  324. void cs_ipc_refcnt_inc(void *conn)
  325. {
  326. qb_ipcs_connection_ref(conn);
  327. }
  328. void cs_ipc_refcnt_dec(void *conn)
  329. {
  330. qb_ipcs_connection_unref(conn);
  331. }
  332. void *cs_ipcs_private_data_get(void *conn)
  333. {
  334. struct cs_ipcs_conn_context *cnx;
  335. cnx = qb_ipcs_context_get(conn);
  336. return &cnx->data[0];
  337. }
  338. static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t *c)
  339. {
  340. struct cs_ipcs_conn_context *cnx;
  341. log_printf(LOG_INFO, "%s() ", __func__);
  342. cnx = qb_ipcs_context_get(c);
  343. if (cnx) {
  344. free(cnx);
  345. }
  346. }
  347. static int32_t cs_ipcs_connection_closed (qb_ipcs_connection_t *c)
  348. {
  349. struct cs_ipcs_conn_context *cnx;
  350. unsigned int key_incr_dummy;
  351. int32_t res = 0;
  352. int32_t service = qb_ipcs_service_id_get(c);
  353. log_printf(LOG_INFO, "%s() ", __func__);
  354. res = ais_service[service]->lib_exit_fn(c);
  355. if (res != 0) {
  356. return res;
  357. }
  358. cnx = qb_ipcs_context_get(c);
  359. api->object_destroy (cnx->stats_handle);
  360. api->object_key_increment (object_connection_handle,
  361. "closed", strlen("closed"),
  362. &key_incr_dummy);
  363. api->object_key_decrement (object_connection_handle,
  364. "active", strlen("active"),
  365. &key_incr_dummy);
  366. return 0;
  367. }
  368. int cs_ipcs_response_iov_send (void *conn,
  369. const struct iovec *iov,
  370. unsigned int iov_len)
  371. {
  372. int32_t rc = qb_ipcs_response_sendv(conn, iov, iov_len);
  373. if (rc >= 0) {
  374. return 0;
  375. }
  376. return rc;
  377. }
  378. int cs_ipcs_response_send(void *conn, const void *msg, size_t mlen)
  379. {
  380. int32_t rc = qb_ipcs_response_send(conn, msg, mlen);
  381. if (rc >= 0) {
  382. return 0;
  383. }
  384. return rc;
  385. }
  386. int cs_ipcs_dispatch_send(void *conn, const void *msg, size_t mlen)
  387. {
  388. int32_t rc = qb_ipcs_event_send(conn, msg, mlen);
  389. if (rc >= 0) {
  390. return 0;
  391. }
  392. return rc;
  393. }
  394. int cs_ipcs_dispatch_iov_send (void *conn,
  395. const struct iovec *iov,
  396. unsigned int iov_len)
  397. {
  398. int32_t rc = qb_ipcs_event_sendv(conn, iov, iov_len);
  399. if (rc >= 0) {
  400. return 0;
  401. }
  402. return rc;
  403. }
  404. static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c,
  405. void *data, size_t size)
  406. {
  407. struct qb_ipc_response_header response;
  408. struct qb_ipc_request_header *request_pt = (struct qb_ipc_request_header *)data;
  409. int32_t service = qb_ipcs_service_id_get(c);
  410. int32_t send_ok;
  411. ssize_t res = -1;
  412. int sending_allowed_private_data;
  413. send_ok = corosync_sending_allowed (service,
  414. request_pt->id,
  415. request_pt,
  416. &sending_allowed_private_data);
  417. /*
  418. * This happens when the message contains some kind of invalid
  419. * parameter, such as an invalid size
  420. */
  421. if (send_ok == -1) {
  422. response.size = sizeof (response);
  423. response.id = 0;
  424. response.error = CS_ERR_INVALID_PARAM;
  425. log_printf(LOG_INFO, "%s() invalid message! size:%d error:%d",
  426. __func__, response.size, response.error);
  427. qb_ipcs_response_send (c,
  428. &response,
  429. sizeof (response));
  430. res = -EINVAL;
  431. } else {
  432. if (send_ok) {
  433. ais_service[service]->lib_engine[request_pt->id].lib_handler_fn(c, request_pt);
  434. res = 0;
  435. } else {
  436. /*
  437. * Overload, tell library to retry
  438. */
  439. response.size = sizeof (response);
  440. response.id = 0;
  441. response.error = CS_ERR_TRY_AGAIN;
  442. qb_ipcs_response_send (c,
  443. &response,
  444. sizeof (response));
  445. res = -ENOBUFS;
  446. }
  447. }
  448. corosync_sending_allowed_release (&sending_allowed_private_data);
  449. return res;
  450. }
  451. static int32_t cs_ipcs_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn)
  452. {
  453. return qb_loop_job_add(cs_poll_handle_get(), p, data, fn);
  454. }
  455. static int32_t cs_ipcs_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t events,
  456. void *data, qb_ipcs_dispatch_fn_t fn)
  457. {
  458. return qb_loop_poll_add(cs_poll_handle_get(), p, fd, events, data, fn);
  459. }
  460. static int32_t cs_ipcs_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t events,
  461. void *data, qb_ipcs_dispatch_fn_t fn)
  462. {
  463. return qb_loop_poll_mod(cs_poll_handle_get(), p, fd, events, data, fn);
  464. }
  465. static int32_t cs_ipcs_dispatch_del(int32_t fd)
  466. {
  467. return qb_loop_poll_del(cs_poll_handle_get(), fd);
  468. }
  469. static void cs_ipcs_low_fds_event(int32_t not_enough, int32_t fds_available)
  470. {
  471. ipc_not_enough_fds_left = not_enough;
  472. if (not_enough) {
  473. log_printf(LOGSYS_LEVEL_WARNING, "refusing new connections (fds_available:%d)\n",
  474. fds_available);
  475. } else {
  476. log_printf(LOGSYS_LEVEL_NOTICE, "allowing new connections (fds_available:%d)\n",
  477. fds_available);
  478. }
  479. }
  480. int32_t cs_ipcs_q_level_get(void)
  481. {
  482. return ipc_fc_totem_queue_level;
  483. }
  484. static qb_loop_timer_handle ipcs_check_for_flow_control_timer;
  485. static void cs_ipcs_check_for_flow_control(void)
  486. {
  487. int32_t i;
  488. int32_t fc_enabled;
  489. for (i = 0; i < SERVICE_HANDLER_MAXIMUM_COUNT; i++) {
  490. if (ais_service[i] == NULL) {
  491. continue;
  492. }
  493. fc_enabled = QB_TRUE;
  494. if (ipc_fc_is_quorate == 1 ||
  495. ais_service[i]->allow_inquorate == CS_LIB_ALLOW_INQUORATE) {
  496. /*
  497. * we are quorate
  498. * now check flow control
  499. */
  500. if (ipc_fc_totem_queue_level != TOTEM_Q_LEVEL_CRITICAL &&
  501. ipc_fc_sync_in_process == 0) {
  502. fc_enabled = QB_FALSE;
  503. }
  504. }
  505. if (fc_enabled) {
  506. qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_OFF);
  507. qb_loop_timer_add(cs_poll_handle_get(), QB_LOOP_MED, 1*QB_TIME_NS_IN_MSEC,
  508. NULL, corosync_recheck_the_q_level, &ipcs_check_for_flow_control_timer);
  509. } else if (ipc_fc_totem_queue_level == TOTEM_Q_LEVEL_LOW) {
  510. qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_FAST);
  511. } else if (ipc_fc_totem_queue_level == TOTEM_Q_LEVEL_GOOD) {
  512. qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_NORMAL);
  513. } else if (ipc_fc_totem_queue_level == TOTEM_Q_LEVEL_HIGH) {
  514. qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_SLOW);
  515. }
  516. }
  517. }
  518. static void cs_ipcs_fc_quorum_changed(int quorate, void *context)
  519. {
  520. ipc_fc_is_quorate = quorate;
  521. cs_ipcs_check_for_flow_control();
  522. }
  523. static void cs_ipcs_totem_queue_level_changed(enum totem_q_level level)
  524. {
  525. ipc_fc_totem_queue_level = level;
  526. cs_ipcs_check_for_flow_control();
  527. }
  528. void cs_ipcs_sync_state_changed(int32_t sync_in_process)
  529. {
  530. ipc_fc_sync_in_process = sync_in_process;
  531. cs_ipcs_check_for_flow_control();
  532. }
  533. static void cs_ipcs_libqb_log_fn(const char *file_name,
  534. int32_t file_line,
  535. int32_t severity,
  536. const char *msg)
  537. {
  538. int32_t level = severity;
  539. if (severity > LOG_DEBUG) {
  540. level = LOGSYS_LEVEL_DEBUG;
  541. }
  542. _logsys_log_printf (LOGSYS_ENCODE_RECID(level,
  543. ipc_subsys_id,
  544. LOGSYS_RECID_LOG),
  545. __func__, file_name, file_line, "%s", msg);
  546. }
  547. void cs_ipcs_stats_update(void)
  548. {
  549. int32_t i;
  550. struct qb_ipcs_stats srv_stats;
  551. struct qb_ipcs_connection_stats stats;
  552. qb_ipcs_connection_t *c;
  553. struct cs_ipcs_conn_context *cnx;
  554. for (i = 0; i < SERVICE_HANDLER_MAXIMUM_COUNT; i++) {
  555. if (ais_service[i] == NULL) {
  556. continue;
  557. }
  558. qb_ipcs_stats_get(ipcs_mapper[i].inst, &srv_stats, QB_FALSE);
  559. for (c = qb_ipcs_connection_first_get(ipcs_mapper[i].inst); c;
  560. c = qb_ipcs_connection_next_get(ipcs_mapper[i].inst, c)) {
  561. cnx = qb_ipcs_context_get(c);
  562. if (cnx == NULL) continue;
  563. qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
  564. api->object_key_replace(cnx->stats_handle,
  565. "client_pid", strlen("client_pid"),
  566. &stats.client_pid, sizeof(uint32_t));
  567. api->object_key_replace(cnx->stats_handle,
  568. "requests", strlen("requests"),
  569. &stats.requests, sizeof(uint64_t));
  570. api->object_key_replace(cnx->stats_handle,
  571. "responses", strlen("responses"),
  572. &stats.responses, sizeof(uint64_t));
  573. api->object_key_replace(cnx->stats_handle,
  574. "dispatched", strlen("dispatched"),
  575. &stats.events, sizeof(uint64_t));
  576. api->object_key_replace(cnx->stats_handle,
  577. "send_retries", strlen("send_retries"),
  578. &stats.send_retries, sizeof(uint64_t));
  579. api->object_key_replace(cnx->stats_handle,
  580. "recv_retries", strlen("recv_retries"),
  581. &stats.recv_retries, sizeof(uint64_t));
  582. api->object_key_replace(cnx->stats_handle,
  583. "flow_control_count", strlen("flow_control_count"),
  584. &stats.flow_control_count, sizeof(uint64_t));
  585. api->object_key_replace(cnx->stats_handle,
  586. "flow_control_state", strlen("flow_control_state"),
  587. &stats.flow_control_state, sizeof(uint32_t));
  588. qb_ipcs_connection_unref(c);
  589. }
  590. }
  591. }
  592. void cs_ipcs_service_init(struct corosync_service_engine *service)
  593. {
  594. ipcs_mapper[service->id].id = service->id;
  595. strcpy(ipcs_mapper[service->id].name, cs_ipcs_serv_short_name(service->id));
  596. log_printf (LOGSYS_LEVEL_INFO,
  597. "Initializing IPC on %s [%d]",
  598. ipcs_mapper[service->id].name,
  599. ipcs_mapper[service->id].id);
  600. ipcs_mapper[service->id].inst = qb_ipcs_create(ipcs_mapper[service->id].name,
  601. ipcs_mapper[service->id].id,
  602. QB_IPC_SHM,
  603. &corosync_service_funcs);
  604. assert(ipcs_mapper[service->id].inst);
  605. qb_ipcs_poll_handlers_set(ipcs_mapper[service->id].inst,
  606. &corosync_poll_funcs);
  607. qb_ipcs_run(ipcs_mapper[service->id].inst);
  608. }
  609. void cs_ipcs_init(void)
  610. {
  611. qb_handle_t object_find_handle;
  612. qb_handle_t object_runtime_handle;
  613. uint64_t zero_64 = 0;
  614. api = apidef_get ();
  615. qb_loop_poll_low_fds_event_set(cs_poll_handle_get(), cs_ipcs_low_fds_event);
  616. ipc_subsys_id = _logsys_subsys_create ("IPC");
  617. if (ipc_subsys_id < 0) {
  618. log_printf (LOGSYS_LEVEL_ERROR,
  619. "Could not initialize IPC logging subsystem\n");
  620. corosync_exit_error (AIS_DONE_INIT_SERVICES);
  621. }
  622. qb_util_set_log_function (cs_ipcs_libqb_log_fn);
  623. api->quorum_register_callback (cs_ipcs_fc_quorum_changed, NULL);
  624. totempg_queue_level_register_callback (cs_ipcs_totem_queue_level_changed);
  625. api->object_find_create (OBJECT_PARENT_HANDLE,
  626. "runtime", strlen ("runtime"),
  627. &object_find_handle);
  628. if (api->object_find_next (object_find_handle,
  629. &object_runtime_handle) != 0) {
  630. log_printf (LOGSYS_LEVEL_ERROR,"arrg no runtime");
  631. return;
  632. }
  633. /* Connection objects */
  634. api->object_create (object_runtime_handle,
  635. &object_connection_handle,
  636. "connections", strlen ("connections"));
  637. api->object_key_create_typed (object_connection_handle,
  638. "active", &zero_64, sizeof (zero_64),
  639. OBJDB_VALUETYPE_UINT64);
  640. api->object_key_create_typed (object_connection_handle,
  641. "closed", &zero_64, sizeof (zero_64),
  642. OBJDB_VALUETYPE_UINT64);
  643. }