cpg.c 31 KB


  1. /*
  2. * Copyright (c) 2006-2009 Red Hat, Inc.
  3. *
  4. * All rights reserved.
  5. *
  6. * Author: Christine Caulfield (ccaulfie@redhat.com)
  7. * Author: Jan Friesse (jfriesse@redhat.com)
  8. *
  9. * This software licensed under BSD license, the text of which follows:
  10. *
  11. * Redistribution and use in source and binary forms, with or without
  12. * modification, are permitted provided that the following conditions are met:
  13. *
  14. * - Redistributions of source code must retain the above copyright notice,
  15. * this list of conditions and the following disclaimer.
  16. * - Redistributions in binary form must reproduce the above copyright notice,
  17. * this list of conditions and the following disclaimer in the documentation
  18. * and/or other materials provided with the distribution.
  19. * - Neither the name of the MontaVista Software, Inc. nor the names of its
  20. * contributors may be used to endorse or promote products derived from this
  21. * software without specific prior written permission.
  22. *
  23. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS"
  24. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  25. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  26. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  27. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  28. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  29. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  30. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  31. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  32. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  33. * THE POSSIBILITY OF SUCH DAMAGE.
  34. */
  35. #include <config.h>
  36. #ifndef COROSYNC_BSD
  37. #include <alloca.h>
  38. #endif
  39. #include <sys/types.h>
  40. #include <sys/socket.h>
  41. #include <sys/un.h>
  42. #include <sys/ioctl.h>
  43. #include <netinet/in.h>
  44. #include <sys/uio.h>
  45. #include <unistd.h>
  46. #include <fcntl.h>
  47. #include <stdlib.h>
  48. #include <stdio.h>
  49. #include <errno.h>
  50. #include <signal.h>
  51. #include <time.h>
  52. #include <unistd.h>
  53. #include <netinet/in.h>
  54. #include <arpa/inet.h>
  55. #include <corosync/corotypes.h>
  56. #include <corosync/coroipc_types.h>
  57. #include <corosync/corodefs.h>
  58. #include <corosync/list.h>
  59. #include <corosync/queue.h>
  60. #include <corosync/jhash.h>
  61. #include <corosync/lcr/lcr_comp.h>
  62. #include <corosync/engine/logsys.h>
  63. #include <corosync/engine/coroapi.h>
  64. #include <corosync/cpg.h>
  65. #include <corosync/ipc_cpg.h>
  66. LOGSYS_DECLARE_SUBSYS ("CPG");
  67. #define GROUP_HASH_SIZE 32
  68. enum cpg_message_req_types {
  69. MESSAGE_REQ_EXEC_CPG_PROCJOIN = 0,
  70. MESSAGE_REQ_EXEC_CPG_PROCLEAVE = 1,
  71. MESSAGE_REQ_EXEC_CPG_JOINLIST = 2,
  72. MESSAGE_REQ_EXEC_CPG_MCAST = 3,
  73. MESSAGE_REQ_EXEC_CPG_DOWNLIST = 4
  74. };
  75. /*
  76. * state` exec deliver
  77. * match group name, pid -> if matched deliver for YES:
  78. * XXX indicates impossible state
  79. *
  80. * join leave mcast
  81. * UNJOINED XXX XXX NO
  82. * LEAVE_STARTED XXX YES(unjoined_enter) YES
  83. * JOIN_STARTED YES(join_started_enter) XXX NO
  84. * JOIN_COMPLETED XXX NO YES
  85. *
  86. * join_started_enter
  87. * set JOIN_COMPLETED
  88. * add entry to process_info list
  89. * unjoined_enter
  90. * set UNJOINED
  91. * delete entry from process_info list
  92. *
  93. *
  94. * library accept join error codes
  95. * UNJOINED YES(CPG_OK) set JOIN_STARTED
  96. * LEAVE_STARTED NO(CPG_ERR_BUSY)
  97. * JOIN_STARTED NO(CPG_ERR_EXIST)
  98. * JOIN_COMPlETED NO(CPG_ERR_EXIST)
  99. *
  100. * library accept leave error codes
  101. * UNJOINED NO(CPG_ERR_NOT_EXIST)
  102. * LEAVE_STARTED NO(CPG_ERR_NOT_EXIST)
  103. * JOIN_STARTED NO(CPG_ERR_BUSY)
  104. * JOIN_COMPLETED YES(CPG_OK) set LEAVE_STARTED
  105. *
  106. * library accept mcast
  107. * UNJOINED NO(CPG_ERR_NOT_EXIST)
  108. * LEAVE_STARTED NO(CPG_ERR_NOT_EXIST)
  109. * JOIN_STARTED YES(CPG_OK)
  110. * JOIN_COMPLETED YES(CPG_OK)
  111. */
  112. enum cpd_state {
  113. CPD_STATE_UNJOINED,
  114. CPD_STATE_LEAVE_STARTED,
  115. CPD_STATE_JOIN_STARTED,
  116. CPD_STATE_JOIN_COMPLETED
  117. };
  118. struct cpg_pd {
  119. void *conn;
  120. mar_cpg_name_t group_name;
  121. uint32_t pid;
  122. enum cpd_state cpd_state;
  123. struct list_head list;
  124. };
  125. DECLARE_LIST_INIT(cpg_pd_list_head);
  126. struct process_info {
  127. unsigned int nodeid;
  128. uint32_t pid;
  129. mar_cpg_name_t group;
  130. struct list_head list; /* on the group_info members list */
  131. };
  132. DECLARE_LIST_INIT(process_info_list_head);
  133. struct join_list_entry {
  134. uint32_t pid;
  135. mar_cpg_name_t group_name;
  136. };
  137. static struct corosync_api_v1 *api = NULL;
  138. /*
  139. * Service Interfaces required by service_message_handler struct
  140. */
  141. static void cpg_confchg_fn (
  142. enum totem_configuration_type configuration_type,
  143. const unsigned int *member_list, size_t member_list_entries,
  144. const unsigned int *left_list, size_t left_list_entries,
  145. const unsigned int *joined_list, size_t joined_list_entries,
  146. const struct memb_ring_id *ring_id);
  147. static int cpg_exec_init_fn (struct corosync_api_v1 *);
  148. static int cpg_lib_init_fn (void *conn);
  149. static int cpg_lib_exit_fn (void *conn);
  150. static void message_handler_req_exec_cpg_procjoin (
  151. const void *message,
  152. unsigned int nodeid);
  153. static void message_handler_req_exec_cpg_procleave (
  154. const void *message,
  155. unsigned int nodeid);
  156. static void message_handler_req_exec_cpg_joinlist (
  157. const void *message,
  158. unsigned int nodeid);
  159. static void message_handler_req_exec_cpg_mcast (
  160. const void *message,
  161. unsigned int nodeid);
  162. static void message_handler_req_exec_cpg_downlist (
  163. const void *message,
  164. unsigned int nodeid);
  165. static void exec_cpg_procjoin_endian_convert (void *msg);
  166. static void exec_cpg_joinlist_endian_convert (void *msg);
  167. static void exec_cpg_mcast_endian_convert (void *msg);
  168. static void exec_cpg_downlist_endian_convert (void *msg);
  169. static void message_handler_req_lib_cpg_join (void *conn, const void *message);
  170. static void message_handler_req_lib_cpg_leave (void *conn, const void *message);
  171. static void message_handler_req_lib_cpg_mcast (void *conn, const void *message);
  172. static void message_handler_req_lib_cpg_membership (void *conn,
  173. const void *message);
  174. static void message_handler_req_lib_cpg_local_get (void *conn,
  175. const void *message);
  176. static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason);
  177. static int cpg_exec_send_joinlist(void);
  178. static void cpg_sync_init (void);
  179. static int cpg_sync_process (void);
  180. static void cpg_sync_activate (void);
  181. static void cpg_sync_abort (void);
  182. /*
  183. * Library Handler Definition
  184. */
  185. static struct corosync_lib_handler cpg_lib_engine[] =
  186. {
  187. { /* 0 */
  188. .lib_handler_fn = message_handler_req_lib_cpg_join,
  189. .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
  190. },
  191. { /* 1 */
  192. .lib_handler_fn = message_handler_req_lib_cpg_leave,
  193. .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
  194. },
  195. { /* 2 */
  196. .lib_handler_fn = message_handler_req_lib_cpg_mcast,
  197. .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
  198. },
  199. { /* 3 */
  200. .lib_handler_fn = message_handler_req_lib_cpg_membership,
  201. .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
  202. },
  203. { /* 4 */
  204. .lib_handler_fn = message_handler_req_lib_cpg_local_get,
  205. .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
  206. }
  207. };
  208. static struct corosync_exec_handler cpg_exec_engine[] =
  209. {
  210. { /* 0 */
  211. .exec_handler_fn = message_handler_req_exec_cpg_procjoin,
  212. .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
  213. },
  214. { /* 1 */
  215. .exec_handler_fn = message_handler_req_exec_cpg_procleave,
  216. .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
  217. },
  218. { /* 2 */
  219. .exec_handler_fn = message_handler_req_exec_cpg_joinlist,
  220. .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert
  221. },
  222. { /* 3 */
  223. .exec_handler_fn = message_handler_req_exec_cpg_mcast,
  224. .exec_endian_convert_fn = exec_cpg_mcast_endian_convert
  225. },
  226. { /* 4 */
  227. .exec_handler_fn = message_handler_req_exec_cpg_downlist,
  228. .exec_endian_convert_fn = exec_cpg_downlist_endian_convert
  229. },
  230. };
  231. struct corosync_service_engine cpg_service_engine = {
  232. .name = "corosync cluster closed process group service v1.01",
  233. .id = CPG_SERVICE,
  234. .private_data_size = sizeof (struct cpg_pd),
  235. .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED,
  236. .allow_inquorate = CS_LIB_ALLOW_INQUORATE,
  237. .lib_init_fn = cpg_lib_init_fn,
  238. .lib_exit_fn = cpg_lib_exit_fn,
  239. .lib_engine = cpg_lib_engine,
  240. .lib_engine_count = sizeof (cpg_lib_engine) / sizeof (struct corosync_lib_handler),
  241. .exec_init_fn = cpg_exec_init_fn,
  242. .exec_dump_fn = NULL,
  243. .exec_engine = cpg_exec_engine,
  244. .exec_engine_count = sizeof (cpg_exec_engine) / sizeof (struct corosync_exec_handler),
  245. .confchg_fn = cpg_confchg_fn,
  246. .sync_init = cpg_sync_init,
  247. .sync_process = cpg_sync_process,
  248. .sync_activate = cpg_sync_activate,
  249. .sync_abort = cpg_sync_abort
  250. };
  251. /*
  252. * Dynamic loader definition
  253. */
  254. static struct corosync_service_engine *cpg_get_service_engine_ver0 (void);
  255. static struct corosync_service_engine_iface_ver0 cpg_service_engine_iface = {
  256. .corosync_get_service_engine_ver0 = cpg_get_service_engine_ver0
  257. };
  258. static struct lcr_iface corosync_cpg_ver0[1] = {
  259. {
  260. .name = "corosync_cpg",
  261. .version = 0,
  262. .versions_replace = 0,
  263. .versions_replace_count = 0,
  264. .dependencies = 0,
  265. .dependency_count = 0,
  266. .constructor = NULL,
  267. .destructor = NULL,
  268. .interfaces = NULL
  269. }
  270. };
  271. static struct lcr_comp cpg_comp_ver0 = {
  272. .iface_count = 1,
  273. .ifaces = corosync_cpg_ver0
  274. };
  275. static struct corosync_service_engine *cpg_get_service_engine_ver0 (void)
  276. {
  277. return (&cpg_service_engine);
  278. }
  279. __attribute__ ((constructor)) static void cpg_comp_register (void) {
  280. lcr_interfaces_set (&corosync_cpg_ver0[0], &cpg_service_engine_iface);
  281. lcr_component_register (&cpg_comp_ver0);
  282. }
  283. struct req_exec_cpg_procjoin {
  284. coroipc_request_header_t header __attribute__((aligned(8)));
  285. mar_cpg_name_t group_name __attribute__((aligned(8)));
  286. mar_uint32_t pid __attribute__((aligned(8)));
  287. mar_uint32_t reason __attribute__((aligned(8)));
  288. };
  289. struct req_exec_cpg_mcast {
  290. coroipc_request_header_t header __attribute__((aligned(8)));
  291. mar_cpg_name_t group_name __attribute__((aligned(8)));
  292. mar_uint32_t msglen __attribute__((aligned(8)));
  293. mar_uint32_t pid __attribute__((aligned(8)));
  294. mar_message_source_t source __attribute__((aligned(8)));
  295. mar_uint8_t message[] __attribute__((aligned(8)));
  296. };
  297. struct req_exec_cpg_downlist {
  298. coroipc_request_header_t header __attribute__((aligned(8)));
  299. mar_uint32_t left_nodes __attribute__((aligned(8)));
  300. mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
  301. };
  302. static struct req_exec_cpg_downlist g_req_exec_cpg_downlist;
  303. static void cpg_sync_init (void)
  304. {
  305. }
  306. static int cpg_sync_process (void)
  307. {
  308. return cpg_exec_send_joinlist();
  309. }
  310. static void cpg_sync_activate (void)
  311. {
  312. }
  313. static void cpg_sync_abort (void)
  314. {
  315. }
  316. static int notify_lib_joinlist(
  317. const mar_cpg_name_t *group_name,
  318. void *conn,
  319. int joined_list_entries,
  320. mar_cpg_address_t *joined_list,
  321. int left_list_entries,
  322. mar_cpg_address_t *left_list,
  323. int id)
  324. {
  325. int size;
  326. char *buf;
  327. struct list_head *iter;
  328. int count;
  329. struct res_lib_cpg_confchg_callback *res;
  330. mar_cpg_address_t *retgi;
  331. count = 0;
  332. for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
  333. struct process_info *pi = list_entry (iter, struct process_info, list);
  334. if (mar_name_compare (&pi->group, group_name) == 0) {
  335. int i;
  336. int founded = 0;
  337. for (i = 0; i < left_list_entries; i++) {
  338. if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) {
  339. founded++;
  340. }
  341. }
  342. if (!founded)
  343. count++;
  344. }
  345. }
  346. size = sizeof(struct res_lib_cpg_confchg_callback) +
  347. sizeof(mar_cpg_address_t) * (count + left_list_entries + joined_list_entries);
  348. buf = alloca(size);
  349. if (!buf)
  350. return CPG_ERR_LIBRARY;
  351. res = (struct res_lib_cpg_confchg_callback *)buf;
  352. res->joined_list_entries = joined_list_entries;
  353. res->left_list_entries = left_list_entries;
  354. res->member_list_entries = count;
  355. retgi = res->member_list;
  356. res->header.size = size;
  357. res->header.id = id;
  358. res->header.error = CS_OK;
  359. memcpy(&res->group_name, group_name, sizeof(mar_cpg_name_t));
  360. for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
  361. struct process_info *pi=list_entry (iter, struct process_info, list);
  362. if (mar_name_compare (&pi->group, group_name) == 0) {
  363. int i;
  364. int founded = 0;
  365. for (i = 0;i < left_list_entries; i++) {
  366. if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) {
  367. founded++;
  368. }
  369. }
  370. if (!founded) {
  371. retgi->nodeid = pi->nodeid;
  372. retgi->pid = pi->pid;
  373. retgi++;
  374. }
  375. }
  376. }
  377. if (left_list_entries) {
  378. memcpy (retgi, left_list, left_list_entries * sizeof(mar_cpg_address_t));
  379. retgi += left_list_entries;
  380. }
  381. if (joined_list_entries) {
  382. memcpy (retgi, joined_list, joined_list_entries * sizeof(mar_cpg_address_t));
  383. retgi += joined_list_entries;
  384. }
  385. if (conn) {
  386. api->ipc_dispatch_send (conn, buf, size);
  387. } else {
  388. for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
  389. struct cpg_pd *cpd = list_entry (iter, struct cpg_pd, list);
  390. if (mar_name_compare (&cpd->group_name, group_name) == 0) {
  391. api->ipc_dispatch_send (cpd->conn, buf, size);
  392. assert (left_list_entries <= 1);
  393. assert (joined_list_entries <= 1);
  394. if (left_list_entries) {
  395. if (left_list[0].pid == cpd->pid &&
  396. left_list[0].nodeid == api->totem_nodeid_get()) {
  397. cpd->pid = 0;
  398. memset (&cpd->group_name, 0, sizeof(cpd->group_name));
  399. cpd->cpd_state = CPD_STATE_UNJOINED;
  400. }
  401. }
  402. if (joined_list_entries) {
  403. if (joined_list[0].pid == cpd->pid &&
  404. joined_list[0].nodeid == api->totem_nodeid_get()) {
  405. cpd->cpd_state = CPD_STATE_JOIN_COMPLETED;
  406. }
  407. }
  408. }
  409. }
  410. }
  411. return CPG_OK;
  412. }
  413. static int cpg_exec_init_fn (struct corosync_api_v1 *corosync_api)
  414. {
  415. api = corosync_api;
  416. return (0);
  417. }
  418. static int cpg_lib_exit_fn (void *conn)
  419. {
  420. struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
  421. log_printf(LOGSYS_LEVEL_DEBUG, "exit_fn for conn=%p\n", conn);
  422. if (cpd->group_name.length > 0) {
  423. cpg_node_joinleave_send (cpd->pid, &cpd->group_name,
  424. MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_LEAVE);
  425. }
  426. list_del (&cpd->list);
  427. api->ipc_refcnt_dec (conn);
  428. return (0);
  429. }
  430. static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason)
  431. {
  432. struct req_exec_cpg_procjoin req_exec_cpg_procjoin;
  433. struct iovec req_exec_cpg_iovec;
  434. int result;
  435. memcpy(&req_exec_cpg_procjoin.group_name, group_name, sizeof(mar_cpg_name_t));
  436. req_exec_cpg_procjoin.pid = pid;
  437. req_exec_cpg_procjoin.reason = reason;
  438. req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin);
  439. req_exec_cpg_procjoin.header.id = SERVICE_ID_MAKE(CPG_SERVICE, fn);
  440. req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_procjoin;
  441. req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin);
  442. result = api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED);
  443. return (result);
  444. }
  445. static void cpg_confchg_fn (
  446. enum totem_configuration_type configuration_type,
  447. const unsigned int *member_list, size_t member_list_entries,
  448. const unsigned int *left_list, size_t left_list_entries,
  449. const unsigned int *joined_list, size_t joined_list_entries,
  450. const struct memb_ring_id *ring_id)
  451. {
  452. int i;
  453. uint32_t lowest_nodeid = 0xffffffff;
  454. struct iovec req_exec_cpg_iovec;
  455. /* We don't send the library joinlist in here because it can end up
  456. out of order with the rest of the messages (which are totem ordered).
  457. So we get the lowest nodeid to send out a list of left nodes instead.
  458. On receipt of that message, all nodes will then notify their local clients
  459. of the new joinlist */
  460. if (left_list_entries) {
  461. for (i = 0; i < member_list_entries; i++) {
  462. if (member_list[i] < lowest_nodeid)
  463. lowest_nodeid = member_list[i];
  464. }
  465. log_printf(LOGSYS_LEVEL_DEBUG, "confchg, low nodeid=%d, us = %d\n", lowest_nodeid, api->totem_nodeid_get());
  466. if (lowest_nodeid == api->totem_nodeid_get()) {
  467. g_req_exec_cpg_downlist.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_DOWNLIST);
  468. g_req_exec_cpg_downlist.header.size = sizeof(struct req_exec_cpg_downlist);
  469. g_req_exec_cpg_downlist.left_nodes = left_list_entries;
  470. for (i = 0; i < left_list_entries; i++) {
  471. g_req_exec_cpg_downlist.nodeids[i] = left_list[i];
  472. }
  473. log_printf(LOGSYS_LEVEL_DEBUG,
  474. "confchg, build downlist: %lu nodes\n",
  475. (long unsigned int) left_list_entries);
  476. }
  477. }
  478. /* Don't send this message until we get the final configuration message */
  479. if (configuration_type == TOTEM_CONFIGURATION_REGULAR && g_req_exec_cpg_downlist.left_nodes) {
  480. req_exec_cpg_iovec.iov_base = (char *)&g_req_exec_cpg_downlist;
  481. req_exec_cpg_iovec.iov_len = g_req_exec_cpg_downlist.header.size;
  482. api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED);
  483. g_req_exec_cpg_downlist.left_nodes = 0;
  484. log_printf(LOGSYS_LEVEL_DEBUG, "confchg, sent downlist\n");
  485. }
  486. }
  487. /* Can byteswap join & leave messages */
  488. static void exec_cpg_procjoin_endian_convert (void *msg)
  489. {
  490. struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = msg;
  491. req_exec_cpg_procjoin->pid = swab32(req_exec_cpg_procjoin->pid);
  492. swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
  493. req_exec_cpg_procjoin->reason = swab32(req_exec_cpg_procjoin->reason);
  494. }
  495. static void exec_cpg_joinlist_endian_convert (void *msg_v)
  496. {
  497. char *msg = msg_v;
  498. coroipc_response_header_t *res = (coroipc_response_header_t *)msg;
  499. struct join_list_entry *jle = (struct join_list_entry *)(msg + sizeof(coroipc_response_header_t));
  500. /* XXX shouldn't mar_res_header be swabbed? */
  501. while ((const char*)jle < msg + res->size) {
  502. jle->pid = swab32(jle->pid);
  503. swab_mar_cpg_name_t (&jle->group_name);
  504. jle++;
  505. }
  506. }
  507. static void exec_cpg_downlist_endian_convert (void *msg)
  508. {
  509. struct req_exec_cpg_downlist *req_exec_cpg_downlist = msg;
  510. unsigned int i;
  511. req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes);
  512. for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
  513. req_exec_cpg_downlist->nodeids[i] = swab32(req_exec_cpg_downlist->nodeids[i]);
  514. }
  515. }
  516. static void exec_cpg_mcast_endian_convert (void *msg)
  517. {
  518. struct req_exec_cpg_mcast *req_exec_cpg_mcast = msg;
  519. swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
  520. swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
  521. req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
  522. req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
  523. swab_mar_message_source_t (&req_exec_cpg_mcast->source);
  524. }
  525. static struct process_info *process_info_find(const mar_cpg_name_t *group_name, uint32_t pid, unsigned int nodeid) {
  526. struct list_head *iter;
  527. for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
  528. struct process_info *pi = list_entry (iter, struct process_info, list);
  529. iter = iter->next;
  530. if (pi->pid == pid && pi->nodeid == nodeid &&
  531. mar_name_compare (&pi->group, group_name) == 0) {
  532. return pi;
  533. }
  534. }
  535. return NULL;
  536. }
  537. static void do_proc_join(
  538. const mar_cpg_name_t *name,
  539. uint32_t pid,
  540. unsigned int nodeid,
  541. int reason)
  542. {
  543. struct process_info *pi;
  544. mar_cpg_address_t notify_info;
  545. if (process_info_find (name, pid, nodeid) != NULL) {
  546. return ;
  547. }
  548. pi = malloc (sizeof (struct process_info));
  549. if (!pi) {
  550. log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
  551. return;
  552. }
  553. pi->nodeid = nodeid;
  554. pi->pid = pid;
  555. memcpy(&pi->group, name, sizeof(*name));
  556. list_init(&pi->list);
  557. list_add(&pi->list, &process_info_list_head);
  558. notify_info.pid = pi->pid;
  559. notify_info.nodeid = nodeid;
  560. notify_info.reason = reason;
  561. notify_lib_joinlist(&pi->group, NULL,
  562. 1, &notify_info,
  563. 0, NULL,
  564. MESSAGE_RES_CPG_CONFCHG_CALLBACK);
  565. }
  566. static void message_handler_req_exec_cpg_downlist (
  567. const void *message,
  568. unsigned int nodeid)
  569. {
  570. const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message;
  571. int i;
  572. mar_cpg_address_t left_list[1];
  573. struct list_head *iter;
  574. /*
  575. FOR OPTIMALIZATION - Make list of lists
  576. */
  577. log_printf (LOGSYS_LEVEL_DEBUG, "downlist left_list: %d\n", req_exec_cpg_downlist->left_nodes);
  578. for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
  579. struct process_info *pi = list_entry(iter, struct process_info, list);
  580. iter = iter->next;
  581. for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
  582. if (pi->nodeid == req_exec_cpg_downlist->nodeids[i]) {
  583. left_list[0].nodeid = pi->nodeid;
  584. left_list[0].pid = pi->pid;
  585. left_list[0].reason = CONFCHG_CPG_REASON_NODEDOWN;
  586. notify_lib_joinlist(&pi->group, NULL,
  587. 0, NULL,
  588. 1, left_list,
  589. MESSAGE_RES_CPG_CONFCHG_CALLBACK);
  590. list_del (&pi->list);
  591. free (pi);
  592. }
  593. }
  594. }
  595. }
  596. static void message_handler_req_exec_cpg_procjoin (
  597. const void *message,
  598. unsigned int nodeid)
  599. {
  600. const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
  601. log_printf(LOGSYS_LEVEL_DEBUG, "got procjoin message from cluster node %d\n", nodeid);
  602. do_proc_join (&req_exec_cpg_procjoin->group_name,
  603. req_exec_cpg_procjoin->pid, nodeid,
  604. CONFCHG_CPG_REASON_JOIN);
  605. }
  606. static void message_handler_req_exec_cpg_procleave (
  607. const void *message,
  608. unsigned int nodeid)
  609. {
  610. const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
  611. struct process_info *pi;
  612. struct list_head *iter;
  613. mar_cpg_address_t notify_info;
  614. log_printf(LOGSYS_LEVEL_DEBUG, "got procleave message from cluster node %d\n", nodeid);
  615. notify_info.pid = req_exec_cpg_procjoin->pid;
  616. notify_info.nodeid = nodeid;
  617. notify_info.reason = req_exec_cpg_procjoin->reason;
  618. notify_lib_joinlist(&req_exec_cpg_procjoin->group_name, NULL,
  619. 0, NULL,
  620. 1, &notify_info,
  621. MESSAGE_RES_CPG_CONFCHG_CALLBACK);
  622. for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
  623. pi = list_entry(iter, struct process_info, list);
  624. iter = iter->next;
  625. if (pi->pid == req_exec_cpg_procjoin->pid && pi->nodeid == nodeid &&
  626. mar_name_compare (&pi->group, &req_exec_cpg_procjoin->group_name)==0) {
  627. list_del (&pi->list);
  628. free (pi);
  629. }
  630. }
  631. }
  632. /* Got a proclist from another node */
  633. static void message_handler_req_exec_cpg_joinlist (
  634. const void *message_v,
  635. unsigned int nodeid)
  636. {
  637. const char *message = message_v;
  638. const coroipc_response_header_t *res = (const coroipc_response_header_t *)message;
  639. const struct join_list_entry *jle = (const struct join_list_entry *)(message + sizeof(coroipc_response_header_t));
  640. log_printf(LOGSYS_LEVEL_NOTICE, "got joinlist message from node %d\n",
  641. nodeid);
  642. /* Ignore our own messages */
  643. if (nodeid == api->totem_nodeid_get()) {
  644. return;
  645. }
  646. while ((const char*)jle < message + res->size) {
  647. do_proc_join (&jle->group_name, jle->pid, nodeid,
  648. CONFCHG_CPG_REASON_NODEUP);
  649. jle++;
  650. }
  651. }
  652. static void message_handler_req_exec_cpg_mcast (
  653. const void *message,
  654. unsigned int nodeid)
  655. {
  656. const struct req_exec_cpg_mcast *req_exec_cpg_mcast = message;
  657. struct res_lib_cpg_deliver_callback res_lib_cpg_mcast;
  658. int msglen = req_exec_cpg_mcast->msglen;
  659. struct list_head *iter;
  660. struct cpg_pd *cpd;
  661. struct iovec iovec[2];
  662. res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_DELIVER_CALLBACK;
  663. res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
  664. res_lib_cpg_mcast.msglen = msglen;
  665. res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
  666. res_lib_cpg_mcast.nodeid = nodeid;
  667. memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
  668. sizeof(mar_cpg_name_t));
  669. iovec[0].iov_base = &res_lib_cpg_mcast;
  670. iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
  671. iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
  672. iovec[1].iov_len = msglen;
  673. for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) {
  674. cpd = list_entry(iter, struct cpg_pd, list);
  675. iter = iter->next;
  676. if ((cpd->cpd_state == CPD_STATE_LEAVE_STARTED || cpd->cpd_state == CPD_STATE_JOIN_COMPLETED)
  677. && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
  678. api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
  679. }
  680. }
  681. }
  682. static int cpg_exec_send_joinlist(void)
  683. {
  684. int count = 0;
  685. struct list_head *iter;
  686. coroipc_response_header_t *res;
  687. char *buf;
  688. struct join_list_entry *jle;
  689. struct iovec req_exec_cpg_iovec;
  690. for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
  691. struct process_info *pi = list_entry (iter, struct process_info, list);
  692. if (pi->nodeid == api->totem_nodeid_get ()) {
  693. count++;
  694. }
  695. }
  696. /* Nothing to send */
  697. if (!count)
  698. return 0;
  699. buf = alloca(sizeof(coroipc_response_header_t) + sizeof(struct join_list_entry) * count);
  700. if (!buf) {
  701. log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate joinlist buffer");
  702. return -1;
  703. }
  704. jle = (struct join_list_entry *)(buf + sizeof(coroipc_response_header_t));
  705. res = (coroipc_response_header_t *)buf;
  706. for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
  707. struct process_info *pi = list_entry (iter, struct process_info, list);
  708. if (pi->nodeid == api->totem_nodeid_get ()) {
  709. memcpy (&jle->group_name, &pi->group, sizeof (mar_cpg_name_t));
  710. jle->pid = pi->pid;
  711. jle++;
  712. }
  713. }
  714. res->id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_JOINLIST);
  715. res->size = sizeof(coroipc_response_header_t)+sizeof(struct join_list_entry) * count;
  716. req_exec_cpg_iovec.iov_base = buf;
  717. req_exec_cpg_iovec.iov_len = res->size;
  718. return (api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED));
  719. }
  720. static int cpg_lib_init_fn (void *conn)
  721. {
  722. struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
  723. memset (cpd, 0, sizeof(struct cpg_pd));
  724. cpd->conn = conn;
  725. list_add (&cpd->list, &cpg_pd_list_head);
  726. api->ipc_refcnt_inc (conn);
  727. log_printf(LOGSYS_LEVEL_DEBUG, "lib_init_fn: conn=%p, cpd=%p\n", conn, cpd);
  728. return (0);
  729. }
  730. /* Join message from the library */
  731. static void message_handler_req_lib_cpg_join (void *conn, const void *message)
  732. {
  733. const struct req_lib_cpg_join *req_lib_cpg_join = message;
  734. struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
  735. struct res_lib_cpg_join res_lib_cpg_join;
  736. cs_error_t error = CPG_OK;
  737. switch (cpd->cpd_state) {
  738. case CPD_STATE_UNJOINED:
  739. error = CPG_OK;
  740. cpd->cpd_state = CPD_STATE_JOIN_STARTED;
  741. cpd->pid = req_lib_cpg_join->pid;
  742. memcpy (&cpd->group_name, &req_lib_cpg_join->group_name,
  743. sizeof (cpd->group_name));
  744. cpg_node_joinleave_send (req_lib_cpg_join->pid,
  745. &req_lib_cpg_join->group_name,
  746. MESSAGE_REQ_EXEC_CPG_PROCJOIN, CONFCHG_CPG_REASON_JOIN);
  747. break;
  748. case CPD_STATE_LEAVE_STARTED:
  749. error = CPG_ERR_BUSY;
  750. break;
  751. case CPD_STATE_JOIN_STARTED:
  752. error = CPG_ERR_EXIST;
  753. break;
  754. case CPD_STATE_JOIN_COMPLETED:
  755. error = CPG_ERR_EXIST;
  756. break;
  757. }
  758. res_lib_cpg_join.header.size = sizeof(res_lib_cpg_join);
  759. res_lib_cpg_join.header.id = MESSAGE_RES_CPG_JOIN;
  760. res_lib_cpg_join.header.error = error;
  761. api->ipc_response_send (conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join));
  762. }
  763. /* Leave message from the library */
  764. static void message_handler_req_lib_cpg_leave (void *conn, const void *message)
  765. {
  766. struct res_lib_cpg_leave res_lib_cpg_leave;
  767. cs_error_t error = CPG_OK;
  768. struct req_lib_cpg_leave *req_lib_cpg_leave = (struct req_lib_cpg_leave *)message;
  769. struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
  770. log_printf(LOGSYS_LEVEL_DEBUG, "got leave request on %p\n", conn);
  771. switch (cpd->cpd_state) {
  772. case CPD_STATE_UNJOINED:
  773. error = CPG_ERR_NOT_EXIST;
  774. break;
  775. case CPD_STATE_LEAVE_STARTED:
  776. error = CPG_ERR_NOT_EXIST;
  777. break;
  778. case CPD_STATE_JOIN_STARTED:
  779. error = CPG_ERR_BUSY;
  780. break;
  781. case CPD_STATE_JOIN_COMPLETED:
  782. error = CPG_OK;
  783. cpd->cpd_state = CPD_STATE_LEAVE_STARTED;
  784. cpg_node_joinleave_send (req_lib_cpg_leave->pid,
  785. &req_lib_cpg_leave->group_name,
  786. MESSAGE_REQ_EXEC_CPG_PROCLEAVE,
  787. CONFCHG_CPG_REASON_LEAVE);
  788. break;
  789. }
  790. /* send return */
  791. res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave);
  792. res_lib_cpg_leave.header.id = MESSAGE_RES_CPG_LEAVE;
  793. res_lib_cpg_leave.header.error = error;
  794. api->ipc_response_send(conn, &res_lib_cpg_leave, sizeof(res_lib_cpg_leave));
  795. }
  796. /* Mcast message from the library */
  797. static void message_handler_req_lib_cpg_mcast (void *conn, const void *message)
  798. {
  799. const struct req_lib_cpg_mcast *req_lib_cpg_mcast = message;
  800. struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
  801. mar_cpg_name_t group_name = cpd->group_name;
  802. struct iovec req_exec_cpg_iovec[2];
  803. struct req_exec_cpg_mcast req_exec_cpg_mcast;
  804. struct res_lib_cpg_mcast res_lib_cpg_mcast;
  805. int msglen = req_lib_cpg_mcast->msglen;
  806. int result;
  807. cs_error_t error = CPG_ERR_NOT_EXIST;
  808. log_printf(LOGSYS_LEVEL_DEBUG, "got mcast request on %p\n", conn);
  809. switch (cpd->cpd_state) {
  810. case CPD_STATE_UNJOINED:
  811. error = CPG_ERR_NOT_EXIST;
  812. break;
  813. case CPD_STATE_LEAVE_STARTED:
  814. error = CPG_ERR_NOT_EXIST;
  815. break;
  816. case CPD_STATE_JOIN_STARTED:
  817. error = CPG_OK;
  818. break;
  819. case CPD_STATE_JOIN_COMPLETED:
  820. error = CPG_OK;
  821. break;
  822. }
  823. if (error == CPG_OK) {
  824. req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
  825. req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
  826. MESSAGE_REQ_EXEC_CPG_MCAST);
  827. req_exec_cpg_mcast.pid = cpd->pid;
  828. req_exec_cpg_mcast.msglen = msglen;
  829. api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
  830. memcpy(&req_exec_cpg_mcast.group_name, &group_name,
  831. sizeof(mar_cpg_name_t));
  832. req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
  833. req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
  834. req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
  835. req_exec_cpg_iovec[1].iov_len = msglen;
  836. result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
  837. assert(result == 0);
  838. }
  839. res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast);
  840. res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST;
  841. res_lib_cpg_mcast.header.error = error;
  842. api->ipc_response_send (conn, &res_lib_cpg_mcast,
  843. sizeof (res_lib_cpg_mcast));
  844. }
  845. static void message_handler_req_lib_cpg_membership (void *conn,
  846. const void *message)
  847. {
  848. struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
  849. cs_error_t error = CPG_ERR_NOT_EXIST;
  850. coroipc_response_header_t res;
  851. switch (cpd->cpd_state) {
  852. case CPD_STATE_UNJOINED:
  853. error = CPG_ERR_NOT_EXIST;
  854. break;
  855. case CPD_STATE_LEAVE_STARTED:
  856. error = CPG_ERR_NOT_EXIST;
  857. break;
  858. case CPD_STATE_JOIN_STARTED:
  859. error = CPG_ERR_BUSY;
  860. break;
  861. case CPD_STATE_JOIN_COMPLETED:
  862. error = CPG_OK;
  863. break;
  864. }
  865. res.size = sizeof (res);
  866. res.id = MESSAGE_RES_CPG_MEMBERSHIP;
  867. res.error = error;
  868. api->ipc_response_send (conn, &res, sizeof(res));
  869. return;
  870. if (error == CPG_OK) {
  871. notify_lib_joinlist (&cpd->group_name, conn, 0, NULL, 0, NULL,
  872. MESSAGE_RES_CPG_MEMBERSHIP);
  873. }
  874. }
  875. static void message_handler_req_lib_cpg_local_get (void *conn,
  876. const void *message)
  877. {
  878. struct res_lib_cpg_local_get res_lib_cpg_local_get;
  879. res_lib_cpg_local_get.header.size = sizeof (res_lib_cpg_local_get);
  880. res_lib_cpg_local_get.header.id = MESSAGE_RES_CPG_LOCAL_GET;
  881. res_lib_cpg_local_get.header.error = CPG_OK;
  882. res_lib_cpg_local_get.local_nodeid = api->totem_nodeid_get ();
  883. api->ipc_response_send (conn, &res_lib_cpg_local_get,
  884. sizeof (res_lib_cpg_local_get));
  885. }