ipc_glue.c 19 KB


  1. /*
  2. * Copyright (c) 2010 Red Hat, Inc.
  3. *
  4. * All rights reserved.
  5. *
  6. * Author: Angus Salkeld <asalkeld@redhat.com>
  7. *
  8. * This software licensed under BSD license, the text of which follows:
  9. *
  10. * Redistribution and use in source and binary forms, with or without
  11. * modification, are permitted provided that the following conditions are met:
  12. *
  13. * - Redistributions of source code must retain the above copyright notice,
  14. * this list of conditions and the following disclaimer.
  15. * - Redistributions in binary form must reproduce the above copyright notice,
  16. * this list of conditions and the following disclaimer in the documentation
  17. * and/or other materials provided with the distribution.
  18. * - Neither the name of Red Hat, Inc. nor the names of its
  19. * contributors may be used to endorse or promote products derived from this
  20. * software without specific prior written permission.
  21. *
  22. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  23. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  24. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  25. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  26. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  27. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  28. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  29. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  30. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  31. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  32. * THE POSSIBILITY OF SUCH DAMAGE.
  33. */
  34. #include <config.h>
  35. #include <stdlib.h>
  36. #include <stdio.h>
  37. #include <errno.h>
  38. #include <assert.h>
  39. #include <sys/uio.h>
  40. #include <string.h>
  41. #include <qb/qbdefs.h>
  42. #include <qb/qblist.h>
  43. #include <qb/qbutil.h>
  44. #include <qb/qbloop.h>
  45. #include <qb/qbipcs.h>
  46. #include <corosync/swab.h>
  47. #include <corosync/corotypes.h>
  48. #include <corosync/corodefs.h>
  49. #include <corosync/totem/totempg.h>
  50. #include <corosync/engine/objdb.h>
  51. #include <corosync/engine/config.h>
  52. #include <corosync/engine/logsys.h>
  53. #include "mainconfig.h"
  54. #include "sync.h"
  55. #include "syncv2.h"
  56. #include "timer.h"
  57. #include "main.h"
  58. #include "util.h"
  59. #include "apidef.h"
  60. #include "service.h"
  61. LOGSYS_DECLARE_SUBSYS ("MAIN");
  62. static struct corosync_api_v1 *api = NULL;
  63. static int ipc_subsys_id = -1;
  64. static int32_t ipc_not_enough_fds_left = 0;
  65. static int32_t ipc_fc_is_quorate; /* boolean */
  66. static int32_t ipc_fc_totem_queue_level; /* percentage used */
  67. static int32_t ipc_fc_sync_in_process; /* boolean */
  68. static qb_handle_t object_connection_handle;
  69. struct cs_ipcs_mapper {
  70. int32_t id;
  71. qb_ipcs_service_t *inst;
  72. char name[256];
  73. };
  74. static struct cs_ipcs_mapper ipcs_mapper[SERVICE_HANDLER_MAXIMUM_COUNT];
  75. static int32_t cs_ipcs_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn);
  76. static int32_t cs_ipcs_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t events,
  77. void *data, qb_ipcs_dispatch_fn_t fn);
  78. static int32_t cs_ipcs_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t events,
  79. void *data, qb_ipcs_dispatch_fn_t fn);
  80. static int32_t cs_ipcs_dispatch_del(int32_t fd);
  81. static struct qb_ipcs_poll_handlers corosync_poll_funcs = {
  82. .job_add = cs_ipcs_job_add,
  83. .dispatch_add = cs_ipcs_dispatch_add,
  84. .dispatch_mod = cs_ipcs_dispatch_mod,
  85. .dispatch_del = cs_ipcs_dispatch_del,
  86. };
  87. static int32_t cs_ipcs_connection_accept (qb_ipcs_connection_t *c, uid_t euid, gid_t egid);
  88. static void cs_ipcs_connection_created(qb_ipcs_connection_t *c);
  89. static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c,
  90. void *data, size_t size);
  91. static int32_t cs_ipcs_connection_closed (qb_ipcs_connection_t *c);
  92. static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t *c);
  93. static struct qb_ipcs_service_handlers corosync_service_funcs = {
  94. .connection_accept = cs_ipcs_connection_accept,
  95. .connection_created = cs_ipcs_connection_created,
  96. .msg_process = cs_ipcs_msg_process,
  97. .connection_closed = cs_ipcs_connection_closed,
  98. .connection_destroyed = cs_ipcs_connection_destroyed,
  99. };
  100. static const char* cs_ipcs_serv_short_name(int32_t service_id)
  101. {
  102. const char *name;
  103. switch (service_id) {
  104. case EVS_SERVICE:
  105. name = "evs";
  106. break;
  107. case CLM_SERVICE:
  108. name = "saClm";
  109. break;
  110. case AMF_SERVICE:
  111. name = "saAmf";
  112. break;
  113. case CKPT_SERVICE:
  114. name = "saCkpt";
  115. break;
  116. case EVT_SERVICE:
  117. name = "saEvt";
  118. break;
  119. case LCK_SERVICE:
  120. name = "saLck";
  121. break;
  122. case MSG_SERVICE:
  123. name = "saMsg";
  124. break;
  125. case CFG_SERVICE:
  126. name = "cfg";
  127. break;
  128. case CPG_SERVICE:
  129. name = "cpg";
  130. break;
  131. case CMAN_SERVICE:
  132. name = "cman";
  133. break;
  134. case PCMK_SERVICE:
  135. name = "pacemaker.engine";
  136. break;
  137. case CONFDB_SERVICE:
  138. name = "confdb";
  139. break;
  140. case QUORUM_SERVICE:
  141. name = "quorum";
  142. break;
  143. case PLOAD_SERVICE:
  144. name = "pload";
  145. break;
  146. case TMR_SERVICE:
  147. name = "saTmr";
  148. break;
  149. case VOTEQUORUM_SERVICE:
  150. name = "votequorum";
  151. break;
  152. case NTF_SERVICE:
  153. name = "saNtf";
  154. break;
  155. case AMF_V2_SERVICE:
  156. name = "saAmfV2";
  157. break;
  158. case TST_SV1_SERVICE:
  159. name = "tst";
  160. break;
  161. case TST_SV2_SERVICE:
  162. name = "tst2";
  163. break;
  164. case MON_SERVICE:
  165. name = "mon";
  166. break;
  167. case WD_SERVICE:
  168. name = "wd";
  169. break;
  170. default:
  171. name = NULL;
  172. break;
  173. }
  174. return name;
  175. }
  176. int32_t cs_ipcs_service_destroy(int32_t service_id)
  177. {
  178. if (ipcs_mapper[service_id].inst) {
  179. qb_ipcs_destroy(ipcs_mapper[service_id].inst);
  180. ipcs_mapper[service_id].inst = NULL;
  181. }
  182. return 0;
  183. }
  184. static int32_t cs_ipcs_connection_accept (qb_ipcs_connection_t *c, uid_t euid, gid_t egid)
  185. {
  186. struct list_head *iter;
  187. int32_t service = qb_ipcs_service_id_get(c);
  188. if (ais_service[service] == NULL ||
  189. ais_service_exiting[service] ||
  190. ipcs_mapper[service].inst == NULL) {
  191. return -ENOSYS;
  192. }
  193. if (ipc_not_enough_fds_left) {
  194. return -EMFILE;
  195. }
  196. if (euid == 0 || egid == 0) {
  197. return 0;
  198. }
  199. for (iter = uidgid_list_head.next; iter != &uidgid_list_head;
  200. iter = iter->next) {
  201. struct uidgid_item *ugi = qb_list_entry (iter, struct uidgid_item,
  202. list);
  203. if (euid == ugi->uid || egid == ugi->gid)
  204. return 0;
  205. }
  206. log_printf(LOGSYS_LEVEL_ERROR, "Denied connection attempt from %d:%d", euid, egid);
  207. return -EACCES;
  208. }
  209. static char * pid_to_name (pid_t pid, char *out_name, size_t name_len)
  210. {
  211. char *name;
  212. char *rest;
  213. FILE *fp;
  214. char fname[32];
  215. char buf[256];
  216. snprintf (fname, 32, "/proc/%d/stat", pid);
  217. fp = fopen (fname, "r");
  218. if (!fp) {
  219. return NULL;
  220. }
  221. if (fgets (buf, sizeof (buf), fp) == NULL) {
  222. fclose (fp);
  223. return NULL;
  224. }
  225. fclose (fp);
  226. name = strrchr (buf, '(');
  227. if (!name) {
  228. return NULL;
  229. }
  230. /* move past the bracket */
  231. name++;
  232. rest = strrchr (buf, ')');
  233. if (rest == NULL || rest[1] != ' ') {
  234. return NULL;
  235. }
  236. *rest = '\0';
  237. /* move past the NULL and space */
  238. rest += 2;
  239. /* copy the name */
  240. strncpy (out_name, name, name_len);
  241. out_name[name_len - 1] = '\0';
  242. return out_name;
  243. }
  244. struct cs_ipcs_conn_context {
  245. qb_handle_t stats_handle;
  246. char data[1];
  247. };
  248. static void cs_ipcs_connection_created(qb_ipcs_connection_t *c)
  249. {
  250. int32_t service = 0;
  251. uint32_t zero_32 = 0;
  252. uint64_t zero_64 = 0;
  253. unsigned int key_incr_dummy;
  254. qb_handle_t object_handle;
  255. struct cs_ipcs_conn_context *context;
  256. char conn_name[42];
  257. char proc_name[32];
  258. struct qb_ipcs_connection_stats stats;
  259. int32_t size = sizeof(struct cs_ipcs_conn_context);
  260. log_printf(LOG_INFO, "%s() new connection", __func__);
  261. service = qb_ipcs_service_id_get(c);
  262. size += ais_service[service]->private_data_size;
  263. context = calloc(1, size);
  264. qb_ipcs_context_set(c, context);
  265. ais_service[service]->lib_init_fn(c);
  266. api->object_key_increment (object_connection_handle,
  267. "active", strlen("active"),
  268. &key_incr_dummy);
  269. qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
  270. if (stats.client_pid > 0) {
  271. if (pid_to_name (stats.client_pid, proc_name, sizeof(proc_name))) {
  272. snprintf (conn_name,
  273. sizeof(conn_name),
  274. "%s:%d:%p", proc_name,
  275. stats.client_pid, c);
  276. } else {
  277. snprintf (conn_name,
  278. sizeof(conn_name),
  279. "%d:%p",
  280. stats.client_pid, c);
  281. }
  282. } else {
  283. snprintf (conn_name,
  284. sizeof(conn_name),
  285. "%p", c);
  286. }
  287. api->object_create (object_connection_handle,
  288. &object_handle,
  289. conn_name,
  290. strlen (conn_name));
  291. context->stats_handle = object_handle;
  292. api->object_key_create_typed (object_handle,
  293. "service_id",
  294. &zero_32, sizeof (zero_32),
  295. OBJDB_VALUETYPE_UINT32);
  296. api->object_key_create_typed (object_handle,
  297. "client_pid",
  298. &zero_32, sizeof (zero_32),
  299. OBJDB_VALUETYPE_INT32);
  300. api->object_key_create_typed (object_handle,
  301. "responses",
  302. &zero_64, sizeof (zero_64),
  303. OBJDB_VALUETYPE_UINT64);
  304. api->object_key_create_typed (object_handle,
  305. "dispatched",
  306. &zero_64, sizeof (zero_64),
  307. OBJDB_VALUETYPE_UINT64);
  308. api->object_key_create_typed (object_handle,
  309. "requests",
  310. &zero_64, sizeof (zero_64),
  311. OBJDB_VALUETYPE_INT64);
  312. api->object_key_create_typed (object_handle,
  313. "send_retries",
  314. &zero_64, sizeof (zero_64),
  315. OBJDB_VALUETYPE_UINT64);
  316. api->object_key_create_typed (object_handle,
  317. "recv_retries",
  318. &zero_64, sizeof (zero_64),
  319. OBJDB_VALUETYPE_UINT64);
  320. api->object_key_create_typed (object_handle,
  321. "flow_control",
  322. &zero_32, sizeof (zero_32),
  323. OBJDB_VALUETYPE_UINT32);
  324. api->object_key_create_typed (object_handle,
  325. "flow_control_count",
  326. &zero_64, sizeof (zero_64),
  327. OBJDB_VALUETYPE_UINT64);
  328. }
  329. void cs_ipc_refcnt_inc(void *conn)
  330. {
  331. qb_ipcs_connection_ref(conn);
  332. }
  333. void cs_ipc_refcnt_dec(void *conn)
  334. {
  335. qb_ipcs_connection_unref(conn);
  336. }
  337. void *cs_ipcs_private_data_get(void *conn)
  338. {
  339. struct cs_ipcs_conn_context *cnx;
  340. cnx = qb_ipcs_context_get(conn);
  341. return &cnx->data[0];
  342. }
  343. static void cs_ipcs_connection_destroyed (qb_ipcs_connection_t *c)
  344. {
  345. struct cs_ipcs_conn_context *cnx;
  346. log_printf(LOG_INFO, "%s() ", __func__);
  347. cnx = qb_ipcs_context_get(c);
  348. if (cnx) {
  349. free(cnx);
  350. }
  351. }
  352. static int32_t cs_ipcs_connection_closed (qb_ipcs_connection_t *c)
  353. {
  354. struct cs_ipcs_conn_context *cnx;
  355. unsigned int key_incr_dummy;
  356. int32_t res = 0;
  357. int32_t service = qb_ipcs_service_id_get(c);
  358. log_printf(LOG_INFO, "%s() ", __func__);
  359. res = ais_service[service]->lib_exit_fn(c);
  360. if (res != 0) {
  361. return res;
  362. }
  363. cnx = qb_ipcs_context_get(c);
  364. api->object_destroy (cnx->stats_handle);
  365. api->object_key_increment (object_connection_handle,
  366. "closed", strlen("closed"),
  367. &key_incr_dummy);
  368. api->object_key_decrement (object_connection_handle,
  369. "active", strlen("active"),
  370. &key_incr_dummy);
  371. return 0;
  372. }
  373. int cs_ipcs_response_iov_send (void *conn,
  374. const struct iovec *iov,
  375. unsigned int iov_len)
  376. {
  377. int32_t rc = qb_ipcs_response_sendv(conn, iov, iov_len);
  378. if (rc >= 0) {
  379. return 0;
  380. }
  381. return rc;
  382. }
  383. int cs_ipcs_response_send(void *conn, const void *msg, size_t mlen)
  384. {
  385. int32_t rc = qb_ipcs_response_send(conn, msg, mlen);
  386. if (rc >= 0) {
  387. return 0;
  388. }
  389. return rc;
  390. }
  391. int cs_ipcs_dispatch_send(void *conn, const void *msg, size_t mlen)
  392. {
  393. int32_t rc = qb_ipcs_event_send(conn, msg, mlen);
  394. if (rc >= 0) {
  395. return 0;
  396. }
  397. return rc;
  398. }
  399. int cs_ipcs_dispatch_iov_send (void *conn,
  400. const struct iovec *iov,
  401. unsigned int iov_len)
  402. {
  403. int32_t rc = qb_ipcs_event_sendv(conn, iov, iov_len);
  404. if (rc >= 0) {
  405. return 0;
  406. }
  407. return rc;
  408. }
  409. static int32_t cs_ipcs_msg_process(qb_ipcs_connection_t *c,
  410. void *data, size_t size)
  411. {
  412. struct qb_ipc_response_header response;
  413. struct qb_ipc_request_header *request_pt = (struct qb_ipc_request_header *)data;
  414. int32_t service = qb_ipcs_service_id_get(c);
  415. int32_t send_ok;
  416. ssize_t res = -1;
  417. int sending_allowed_private_data;
  418. send_ok = corosync_sending_allowed (service,
  419. request_pt->id,
  420. request_pt,
  421. &sending_allowed_private_data);
  422. /*
  423. * This happens when the message contains some kind of invalid
  424. * parameter, such as an invalid size
  425. */
  426. if (send_ok == -1) {
  427. response.size = sizeof (response);
  428. response.id = 0;
  429. response.error = CS_ERR_INVALID_PARAM;
  430. log_printf(LOG_INFO, "%s() invalid message! size:%d error:%d",
  431. __func__, response.size, response.error);
  432. qb_ipcs_response_send (c,
  433. &response,
  434. sizeof (response));
  435. res = -EINVAL;
  436. } else {
  437. if (send_ok) {
  438. ais_service[service]->lib_engine[request_pt->id].lib_handler_fn(c, request_pt);
  439. res = 0;
  440. } else {
  441. /*
  442. * Overload, tell library to retry
  443. */
  444. response.size = sizeof (response);
  445. response.id = 0;
  446. response.error = CS_ERR_TRY_AGAIN;
  447. qb_ipcs_response_send (c,
  448. &response,
  449. sizeof (response));
  450. res = -ENOBUFS;
  451. }
  452. }
  453. corosync_sending_allowed_release (&sending_allowed_private_data);
  454. return res;
  455. }
  456. static int32_t cs_ipcs_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn)
  457. {
  458. return qb_loop_job_add(cs_poll_handle_get(), p, data, fn);
  459. }
  460. static int32_t cs_ipcs_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t events,
  461. void *data, qb_ipcs_dispatch_fn_t fn)
  462. {
  463. return qb_loop_poll_add(cs_poll_handle_get(), p, fd, events, data, fn);
  464. }
  465. static int32_t cs_ipcs_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t events,
  466. void *data, qb_ipcs_dispatch_fn_t fn)
  467. {
  468. return qb_loop_poll_mod(cs_poll_handle_get(), p, fd, events, data, fn);
  469. }
  470. static int32_t cs_ipcs_dispatch_del(int32_t fd)
  471. {
  472. return qb_loop_poll_del(cs_poll_handle_get(), fd);
  473. }
  474. static void cs_ipcs_low_fds_event(int32_t not_enough, int32_t fds_available)
  475. {
  476. ipc_not_enough_fds_left = not_enough;
  477. if (not_enough) {
  478. log_printf(LOGSYS_LEVEL_WARNING, "refusing new connections (fds_available:%d)\n",
  479. fds_available);
  480. } else {
  481. log_printf(LOGSYS_LEVEL_NOTICE, "allowing new connections (fds_available:%d)\n",
  482. fds_available);
  483. }
  484. }
  485. int32_t cs_ipcs_q_level_get(void)
  486. {
  487. return ipc_fc_totem_queue_level;
  488. }
  489. static qb_loop_timer_handle ipcs_check_for_flow_control_timer;
  490. static void cs_ipcs_check_for_flow_control(void)
  491. {
  492. int32_t i;
  493. int32_t fc_enabled;
  494. for (i = 0; i < SERVICE_HANDLER_MAXIMUM_COUNT; i++) {
  495. if (ais_service[i] == NULL || ipcs_mapper[i].inst == NULL) {
  496. continue;
  497. }
  498. fc_enabled = QB_TRUE;
  499. if (ipc_fc_is_quorate == 1 ||
  500. ais_service[i]->allow_inquorate == CS_LIB_ALLOW_INQUORATE) {
  501. /*
  502. * we are quorate
  503. * now check flow control
  504. */
  505. if (ipc_fc_totem_queue_level != TOTEM_Q_LEVEL_CRITICAL &&
  506. ipc_fc_sync_in_process == 0) {
  507. fc_enabled = QB_FALSE;
  508. }
  509. }
  510. if (fc_enabled) {
  511. qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_OFF);
  512. qb_loop_timer_add(cs_poll_handle_get(), QB_LOOP_MED, 1*QB_TIME_NS_IN_MSEC,
  513. NULL, corosync_recheck_the_q_level, &ipcs_check_for_flow_control_timer);
  514. } else if (ipc_fc_totem_queue_level == TOTEM_Q_LEVEL_LOW) {
  515. qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_FAST);
  516. } else if (ipc_fc_totem_queue_level == TOTEM_Q_LEVEL_GOOD) {
  517. qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_NORMAL);
  518. } else if (ipc_fc_totem_queue_level == TOTEM_Q_LEVEL_HIGH) {
  519. qb_ipcs_request_rate_limit(ipcs_mapper[i].inst, QB_IPCS_RATE_SLOW);
  520. }
  521. }
  522. }
  523. static void cs_ipcs_fc_quorum_changed(int quorate, void *context)
  524. {
  525. ipc_fc_is_quorate = quorate;
  526. cs_ipcs_check_for_flow_control();
  527. }
  528. static void cs_ipcs_totem_queue_level_changed(enum totem_q_level level)
  529. {
  530. ipc_fc_totem_queue_level = level;
  531. cs_ipcs_check_for_flow_control();
  532. }
  533. void cs_ipcs_sync_state_changed(int32_t sync_in_process)
  534. {
  535. ipc_fc_sync_in_process = sync_in_process;
  536. cs_ipcs_check_for_flow_control();
  537. }
  538. static void cs_ipcs_libqb_log_fn(const char *file_name,
  539. int32_t file_line,
  540. int32_t severity,
  541. const char *msg)
  542. {
  543. int32_t level = severity;
  544. if (severity > LOG_DEBUG) {
  545. level = LOGSYS_LEVEL_DEBUG;
  546. }
  547. _logsys_log_printf (LOGSYS_ENCODE_RECID(level,
  548. ipc_subsys_id,
  549. LOGSYS_RECID_LOG),
  550. __func__, file_name, file_line, "%s", msg);
  551. }
  552. void cs_ipcs_stats_update(void)
  553. {
  554. int32_t i;
  555. struct qb_ipcs_stats srv_stats;
  556. struct qb_ipcs_connection_stats stats;
  557. qb_ipcs_connection_t *c;
  558. struct cs_ipcs_conn_context *cnx;
  559. for (i = 0; i < SERVICE_HANDLER_MAXIMUM_COUNT; i++) {
  560. if (ais_service[i] == NULL || ipcs_mapper[i].inst == NULL) {
  561. continue;
  562. }
  563. qb_ipcs_stats_get(ipcs_mapper[i].inst, &srv_stats, QB_FALSE);
  564. for (c = qb_ipcs_connection_first_get(ipcs_mapper[i].inst); c;
  565. c = qb_ipcs_connection_next_get(ipcs_mapper[i].inst, c)) {
  566. cnx = qb_ipcs_context_get(c);
  567. if (cnx == NULL) continue;
  568. qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
  569. api->object_key_replace(cnx->stats_handle,
  570. "client_pid", strlen("client_pid"),
  571. &stats.client_pid, sizeof(uint32_t));
  572. api->object_key_replace(cnx->stats_handle,
  573. "requests", strlen("requests"),
  574. &stats.requests, sizeof(uint64_t));
  575. api->object_key_replace(cnx->stats_handle,
  576. "responses", strlen("responses"),
  577. &stats.responses, sizeof(uint64_t));
  578. api->object_key_replace(cnx->stats_handle,
  579. "dispatched", strlen("dispatched"),
  580. &stats.events, sizeof(uint64_t));
  581. api->object_key_replace(cnx->stats_handle,
  582. "send_retries", strlen("send_retries"),
  583. &stats.send_retries, sizeof(uint64_t));
  584. api->object_key_replace(cnx->stats_handle,
  585. "recv_retries", strlen("recv_retries"),
  586. &stats.recv_retries, sizeof(uint64_t));
  587. api->object_key_replace(cnx->stats_handle,
  588. "flow_control_count", strlen("flow_control_count"),
  589. &stats.flow_control_count, sizeof(uint64_t));
  590. api->object_key_replace(cnx->stats_handle,
  591. "flow_control_state", strlen("flow_control_state"),
  592. &stats.flow_control_state, sizeof(uint32_t));
  593. qb_ipcs_connection_unref(c);
  594. }
  595. }
  596. }
  597. void cs_ipcs_service_init(struct corosync_service_engine *service)
  598. {
  599. if (service->lib_engine_count == 0) {
  600. log_printf (LOGSYS_LEVEL_DEBUG,
  601. "NOT Initializing IPC on %s [%d]",
  602. cs_ipcs_serv_short_name(service->id),
  603. service->id);
  604. return;
  605. }
  606. ipcs_mapper[service->id].id = service->id;
  607. strcpy(ipcs_mapper[service->id].name, cs_ipcs_serv_short_name(service->id));
  608. log_printf (LOGSYS_LEVEL_DEBUG,
  609. "Initializing IPC on %s [%d]",
  610. ipcs_mapper[service->id].name,
  611. ipcs_mapper[service->id].id);
  612. ipcs_mapper[service->id].inst = qb_ipcs_create(ipcs_mapper[service->id].name,
  613. ipcs_mapper[service->id].id,
  614. QB_IPC_SHM,
  615. &corosync_service_funcs);
  616. assert(ipcs_mapper[service->id].inst);
  617. qb_ipcs_poll_handlers_set(ipcs_mapper[service->id].inst,
  618. &corosync_poll_funcs);
  619. qb_ipcs_run(ipcs_mapper[service->id].inst);
  620. }
  621. void cs_ipcs_init(void)
  622. {
  623. qb_handle_t object_find_handle;
  624. qb_handle_t object_runtime_handle;
  625. uint64_t zero_64 = 0;
  626. api = apidef_get ();
  627. qb_loop_poll_low_fds_event_set(cs_poll_handle_get(), cs_ipcs_low_fds_event);
  628. ipc_subsys_id = _logsys_subsys_create ("IPC");
  629. if (ipc_subsys_id < 0) {
  630. log_printf (LOGSYS_LEVEL_ERROR,
  631. "Could not initialize IPC logging subsystem\n");
  632. corosync_exit_error (AIS_DONE_INIT_SERVICES);
  633. }
  634. qb_util_set_log_function (cs_ipcs_libqb_log_fn);
  635. api->quorum_register_callback (cs_ipcs_fc_quorum_changed, NULL);
  636. totempg_queue_level_register_callback (cs_ipcs_totem_queue_level_changed);
  637. api->object_find_create (OBJECT_PARENT_HANDLE,
  638. "runtime", strlen ("runtime"),
  639. &object_find_handle);
  640. if (api->object_find_next (object_find_handle,
  641. &object_runtime_handle) != 0) {
  642. log_printf (LOGSYS_LEVEL_ERROR,"arrg no runtime");
  643. return;
  644. }
  645. /* Connection objects */
  646. api->object_create (object_runtime_handle,
  647. &object_connection_handle,
  648. "connections", strlen ("connections"));
  649. api->object_key_create_typed (object_connection_handle,
  650. "active", &zero_64, sizeof (zero_64),
  651. OBJDB_VALUETYPE_UINT64);
  652. api->object_key_create_typed (object_connection_handle,
  653. "closed", &zero_64, sizeof (zero_64),
  654. OBJDB_VALUETYPE_UINT64);
  655. }