cpg.c 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206
  1. /*
  2. * Copyright (c) 2006-2009 Red Hat, Inc.
  3. *
  4. * All rights reserved.
  5. *
  6. * Author: Christine Caulfield (ccaulfie@redhat.com)
  7. *
  8. * This software licensed under BSD license, the text of which follows:
  9. *
  10. * Redistribution and use in source and binary forms, with or without
  11. * modification, are permitted provided that the following conditions are met:
  12. *
  13. * - Redistributions of source code must retain the above copyright notice,
  14. * this list of conditions and the following disclaimer.
  15. * - Redistributions in binary form must reproduce the above copyright notice,
  16. * this list of conditions and the following disclaimer in the documentation
  17. * and/or other materials provided with the distribution.
  18. * - Neither the name of the MontaVista Software, Inc. nor the names of its
  19. * contributors may be used to endorse or promote products derived from this
  20. * software without specific prior written permission.
  21. *
  22. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS"
  23. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  24. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  25. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  26. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  27. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  28. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  29. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  30. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  31. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  32. * THE POSSIBILITY OF SUCH DAMAGE.
  33. */
  34. #include <config.h>
  35. #ifndef COROSYNC_BSD
  36. #include <alloca.h>
  37. #endif
  38. #include <sys/types.h>
  39. #include <sys/socket.h>
  40. #include <sys/un.h>
  41. #include <sys/ioctl.h>
  42. #include <netinet/in.h>
  43. #include <sys/uio.h>
  44. #include <unistd.h>
  45. #include <fcntl.h>
  46. #include <stdlib.h>
  47. #include <stdio.h>
  48. #include <errno.h>
  49. #include <signal.h>
  50. #include <time.h>
  51. #include <unistd.h>
  52. #include <netinet/in.h>
  53. #include <arpa/inet.h>
  54. #include <corosync/corotypes.h>
  55. #include <corosync/coroipc_types.h>
  56. #include <corosync/corodefs.h>
  57. #include <corosync/list.h>
  58. #include <corosync/queue.h>
  59. #include <corosync/jhash.h>
  60. #include <corosync/mar_gen.h>
  61. #include <corosync/mar_cpg.h>
  62. #include <corosync/ipc_cpg.h>
  63. #include <corosync/lcr/lcr_comp.h>
  64. #include <corosync/engine/logsys.h>
  65. #include <corosync/engine/coroapi.h>
  66. LOGSYS_DECLARE_SUBSYS ("CPG");
  67. #define GROUP_HASH_SIZE 32
  68. #define PI_FLAG_MEMBER 1
  69. enum cpg_message_req_types {
  70. MESSAGE_REQ_EXEC_CPG_PROCJOIN = 0,
  71. MESSAGE_REQ_EXEC_CPG_PROCLEAVE = 1,
  72. MESSAGE_REQ_EXEC_CPG_JOINLIST = 2,
  73. MESSAGE_REQ_EXEC_CPG_MCAST = 3,
  74. MESSAGE_REQ_EXEC_CPG_DOWNLIST = 4
  75. };
  76. struct removed_group
  77. {
  78. struct group_info *gi;
  79. struct list_head list; /* on removed_list */
  80. int left_list_entries;
  81. mar_cpg_address_t left_list[PROCESSOR_COUNT_MAX];
  82. int left_list_size;
  83. };
  84. struct group_info {
  85. mar_cpg_name_t group_name;
  86. struct list_head members;
  87. struct list_head list; /* on hash list */
  88. struct removed_group *rg; /* when a node goes down */
  89. };
  90. struct process_info {
  91. unsigned int nodeid;
  92. uint32_t pid;
  93. uint32_t flags;
  94. void *conn;
  95. void *trackerconn;
  96. struct group_info *group;
  97. struct list_head list; /* on the group_info members list */
  98. };
  99. struct join_list_entry {
  100. uint32_t pid;
  101. mar_cpg_name_t group_name;
  102. };
  103. static struct list_head group_lists[GROUP_HASH_SIZE];
  104. static struct corosync_api_v1 *api = NULL;
  105. /*
  106. * Service Interfaces required by service_message_handler struct
  107. */
  108. static void cpg_confchg_fn (
  109. enum totem_configuration_type configuration_type,
  110. const unsigned int *member_list, size_t member_list_entries,
  111. const unsigned int *left_list, size_t left_list_entries,
  112. const unsigned int *joined_list, size_t joined_list_entries,
  113. const struct memb_ring_id *ring_id);
  114. static int cpg_exec_init_fn (struct corosync_api_v1 *);
  115. static int cpg_lib_init_fn (void *conn);
  116. static int cpg_lib_exit_fn (void *conn);
  117. static void message_handler_req_exec_cpg_procjoin (
  118. const void *message,
  119. unsigned int nodeid);
  120. static void message_handler_req_exec_cpg_procleave (
  121. const void *message,
  122. unsigned int nodeid);
  123. static void message_handler_req_exec_cpg_joinlist (
  124. const void *message,
  125. unsigned int nodeid);
  126. static void message_handler_req_exec_cpg_mcast (
  127. const void *message,
  128. unsigned int nodeid);
  129. static void message_handler_req_exec_cpg_downlist (
  130. const void *message,
  131. unsigned int nodeid);
  132. static void exec_cpg_procjoin_endian_convert (void *msg);
  133. static void exec_cpg_joinlist_endian_convert (void *msg);
  134. static void exec_cpg_mcast_endian_convert (void *msg);
  135. static void exec_cpg_downlist_endian_convert (void *msg);
  136. static void message_handler_req_lib_cpg_join (void *conn, const void *message);
  137. static void message_handler_req_lib_cpg_leave (void *conn, const void *message);
  138. static void message_handler_req_lib_cpg_mcast (void *conn, const void *message);
  139. static void message_handler_req_lib_cpg_membership (void *conn,
  140. const void *message);
  141. static void message_handler_req_lib_cpg_trackstart (void *conn,
  142. const void *message);
  143. static void message_handler_req_lib_cpg_trackstop (void *conn,
  144. const void *message);
  145. static void message_handler_req_lib_cpg_local_get (void *conn,
  146. const void *message);
  147. static int cpg_node_joinleave_send (struct group_info *gi, struct process_info *pi, int fn, int reason);
  148. static int cpg_exec_send_joinlist(void);
  149. static void cpg_sync_init (void);
  150. static int cpg_sync_process (void);
  151. static void cpg_sync_activate (void);
  152. static void cpg_sync_abort (void);
  153. /*
  154. * Library Handler Definition
  155. */
  156. static struct corosync_lib_handler cpg_lib_engine[] =
  157. {
  158. { /* 0 */
  159. .lib_handler_fn = message_handler_req_lib_cpg_join,
  160. .response_size = sizeof (struct res_lib_cpg_join),
  161. .response_id = MESSAGE_RES_CPG_JOIN,
  162. .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
  163. },
  164. { /* 1 */
  165. .lib_handler_fn = message_handler_req_lib_cpg_leave,
  166. .response_size = sizeof (struct res_lib_cpg_leave),
  167. .response_id = MESSAGE_RES_CPG_LEAVE,
  168. .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
  169. },
  170. { /* 2 */
  171. .lib_handler_fn = message_handler_req_lib_cpg_mcast,
  172. .response_size = sizeof (struct res_lib_cpg_mcast),
  173. .response_id = MESSAGE_RES_CPG_MCAST,
  174. .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
  175. },
  176. { /* 3 */
  177. .lib_handler_fn = message_handler_req_lib_cpg_membership,
  178. .response_size = sizeof (coroipc_response_header_t),
  179. .response_id = MESSAGE_RES_CPG_MEMBERSHIP,
  180. .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
  181. },
  182. { /* 4 */
  183. .lib_handler_fn = message_handler_req_lib_cpg_trackstart,
  184. .response_size = sizeof (struct res_lib_cpg_trackstart),
  185. .response_id = MESSAGE_RES_CPG_TRACKSTART,
  186. .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
  187. },
  188. { /* 5 */
  189. .lib_handler_fn = message_handler_req_lib_cpg_trackstop,
  190. .response_size = sizeof (struct res_lib_cpg_trackstart),
  191. .response_id = MESSAGE_RES_CPG_TRACKSTOP,
  192. .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
  193. },
  194. { /* 6 */
  195. .lib_handler_fn = message_handler_req_lib_cpg_local_get,
  196. .response_size = sizeof (struct res_lib_cpg_local_get),
  197. .response_id = MESSAGE_RES_CPG_LOCAL_GET,
  198. .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
  199. }
  200. };
  201. static struct corosync_exec_handler cpg_exec_engine[] =
  202. {
  203. { /* 0 */
  204. .exec_handler_fn = message_handler_req_exec_cpg_procjoin,
  205. .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
  206. },
  207. { /* 1 */
  208. .exec_handler_fn = message_handler_req_exec_cpg_procleave,
  209. .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
  210. },
  211. { /* 2 */
  212. .exec_handler_fn = message_handler_req_exec_cpg_joinlist,
  213. .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert
  214. },
  215. { /* 3 */
  216. .exec_handler_fn = message_handler_req_exec_cpg_mcast,
  217. .exec_endian_convert_fn = exec_cpg_mcast_endian_convert
  218. },
  219. { /* 4 */
  220. .exec_handler_fn = message_handler_req_exec_cpg_downlist,
  221. .exec_endian_convert_fn = exec_cpg_downlist_endian_convert
  222. },
  223. };
  224. struct corosync_service_engine cpg_service_engine = {
  225. .name = "corosync cluster closed process group service v1.01",
  226. .id = CPG_SERVICE,
  227. .private_data_size = sizeof (struct process_info),
  228. .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED,
  229. .allow_inquorate = CS_LIB_ALLOW_INQUORATE,
  230. .lib_init_fn = cpg_lib_init_fn,
  231. .lib_exit_fn = cpg_lib_exit_fn,
  232. .lib_engine = cpg_lib_engine,
  233. .lib_engine_count = sizeof (cpg_lib_engine) / sizeof (struct corosync_lib_handler),
  234. .exec_init_fn = cpg_exec_init_fn,
  235. .exec_dump_fn = NULL,
  236. .exec_engine = cpg_exec_engine,
  237. .exec_engine_count = sizeof (cpg_exec_engine) / sizeof (struct corosync_exec_handler),
  238. .confchg_fn = cpg_confchg_fn,
  239. .sync_init = cpg_sync_init,
  240. .sync_process = cpg_sync_process,
  241. .sync_activate = cpg_sync_activate,
  242. .sync_abort = cpg_sync_abort
  243. };
  244. /*
  245. * Dynamic loader definition
  246. */
  247. static struct corosync_service_engine *cpg_get_service_engine_ver0 (void);
  248. static struct corosync_service_engine_iface_ver0 cpg_service_engine_iface = {
  249. .corosync_get_service_engine_ver0 = cpg_get_service_engine_ver0
  250. };
  251. static struct lcr_iface corosync_cpg_ver0[1] = {
  252. {
  253. .name = "corosync_cpg",
  254. .version = 0,
  255. .versions_replace = 0,
  256. .versions_replace_count = 0,
  257. .dependencies = 0,
  258. .dependency_count = 0,
  259. .constructor = NULL,
  260. .destructor = NULL,
  261. .interfaces = NULL
  262. }
  263. };
  264. static struct lcr_comp cpg_comp_ver0 = {
  265. .iface_count = 1,
  266. .ifaces = corosync_cpg_ver0
  267. };
  268. static struct corosync_service_engine *cpg_get_service_engine_ver0 (void)
  269. {
  270. return (&cpg_service_engine);
  271. }
  272. __attribute__ ((constructor)) static void cpg_comp_register (void) {
  273. lcr_interfaces_set (&corosync_cpg_ver0[0], &cpg_service_engine_iface);
  274. lcr_component_register (&cpg_comp_ver0);
  275. }
  276. struct req_exec_cpg_procjoin {
  277. coroipc_request_header_t header __attribute__((aligned(8)));
  278. mar_cpg_name_t group_name __attribute__((aligned(8)));
  279. mar_uint32_t pid __attribute__((aligned(8)));
  280. mar_uint32_t reason __attribute__((aligned(8)));
  281. };
  282. struct req_exec_cpg_mcast {
  283. coroipc_request_header_t header __attribute__((aligned(8)));
  284. mar_cpg_name_t group_name __attribute__((aligned(8)));
  285. mar_uint32_t msglen __attribute__((aligned(8)));
  286. mar_uint32_t pid __attribute__((aligned(8)));
  287. mar_message_source_t source __attribute__((aligned(8)));
  288. mar_uint8_t message[] __attribute__((aligned(8)));
  289. };
  290. struct req_exec_cpg_downlist {
  291. coroipc_request_header_t header __attribute__((aligned(8)));
  292. mar_uint32_t left_nodes __attribute__((aligned(8)));
  293. mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)));
  294. };
  295. static struct req_exec_cpg_downlist g_req_exec_cpg_downlist;
  296. static void cpg_sync_init (void)
  297. {
  298. }
  299. static int cpg_sync_process (void)
  300. {
  301. return cpg_exec_send_joinlist();
  302. }
  303. static void cpg_sync_activate (void)
  304. {
  305. }
  306. static void cpg_sync_abort (void)
  307. {
  308. }
  309. static int notify_lib_joinlist(
  310. struct group_info *gi,
  311. void *conn,
  312. int joined_list_entries,
  313. mar_cpg_address_t *joined_list,
  314. int left_list_entries,
  315. mar_cpg_address_t *left_list,
  316. int id)
  317. {
  318. int count = 0;
  319. char *buf;
  320. struct res_lib_cpg_confchg_callback *res;
  321. struct list_head *iter;
  322. struct list_head *tmp;
  323. mar_cpg_address_t *retgi;
  324. int size;
  325. /* First, we need to know how many nodes are in the list. While we're
  326. traversing this list, look for the 'us' entry so we know which
  327. connection to send back down */
  328. for (iter = gi->members.next; iter != &gi->members; iter = iter->next) {
  329. struct process_info *pi = list_entry(iter, struct process_info, list);
  330. if (pi->pid)
  331. count++;
  332. }
  333. log_printf(LOGSYS_LEVEL_DEBUG, "Sending new joinlist (%d elements) to clients\n", count);
  334. size = sizeof(struct res_lib_cpg_confchg_callback) +
  335. sizeof(mar_cpg_address_t) * (count + left_list_entries + joined_list_entries);
  336. buf = alloca(size);
  337. if (!buf)
  338. return CS_ERR_NO_SPACE;
  339. res = (struct res_lib_cpg_confchg_callback *)buf;
  340. res->joined_list_entries = joined_list_entries;
  341. res->left_list_entries = left_list_entries;
  342. retgi = res->member_list;
  343. res->header.size = size;
  344. res->header.id = id;
  345. res->header.error = CS_OK;
  346. memcpy(&res->group_name, &gi->group_name, sizeof(mar_cpg_name_t));
  347. /* Build up the message */
  348. count = 0;
  349. for (iter = gi->members.next; iter != &gi->members; iter = iter->next) {
  350. struct process_info *pi = list_entry(iter, struct process_info, list);
  351. if (pi->pid) {
  352. /* Processes leaving will be removed AFTER this is done (so that they get their
  353. own leave notifications), so exclude them from the members list here */
  354. int i;
  355. for (i=0; i<left_list_entries; i++) {
  356. if (left_list[i].pid == pi->pid && left_list[i].nodeid == pi->nodeid)
  357. goto next_member;
  358. }
  359. retgi->nodeid = pi->nodeid;
  360. retgi->pid = pi->pid;
  361. retgi++;
  362. count++;
  363. next_member: ;
  364. }
  365. }
  366. res->member_list_entries = count;
  367. if (left_list_entries) {
  368. memcpy(retgi, left_list, left_list_entries * sizeof(mar_cpg_address_t));
  369. retgi += left_list_entries;
  370. }
  371. if (joined_list_entries) {
  372. memcpy(retgi, joined_list, joined_list_entries * sizeof(mar_cpg_address_t));
  373. retgi += joined_list_entries;
  374. }
  375. if (conn) {
  376. api->ipc_dispatch_send(conn, buf, size);
  377. }
  378. else {
  379. /* Send it to all listeners */
  380. for (iter = gi->members.next, tmp=iter->next; iter != &gi->members; iter = tmp, tmp=iter->next) {
  381. struct process_info *pi = list_entry(iter, struct process_info, list);
  382. if (pi->trackerconn && (pi->flags & PI_FLAG_MEMBER)) {
  383. if (api->ipc_dispatch_send(pi->trackerconn, buf, size) == -1) {
  384. // Error ??
  385. }
  386. }
  387. }
  388. }
  389. return CS_OK;
  390. }
  391. static void remove_group(struct group_info *gi)
  392. {
  393. list_del(&gi->list);
  394. free(gi);
  395. }
  396. static int cpg_exec_init_fn (struct corosync_api_v1 *corosync_api)
  397. {
  398. int i;
  399. for (i=0; i<GROUP_HASH_SIZE; i++) {
  400. list_init(&group_lists[i]);
  401. }
  402. api = corosync_api;
  403. return (0);
  404. }
  405. static int cpg_lib_exit_fn (void *conn)
  406. {
  407. struct process_info *pi = (struct process_info *)api->ipc_private_data_get (conn);
  408. struct group_info *gi = pi->group;
  409. mar_cpg_address_t notify_info;
  410. log_printf(LOGSYS_LEVEL_DEBUG, "exit_fn for conn=%p\n", conn);
  411. if (gi) {
  412. notify_info.pid = pi->pid;
  413. notify_info.nodeid = api->totem_nodeid_get();
  414. notify_info.reason = CONFCHG_CPG_REASON_PROCDOWN;
  415. cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_PROCDOWN);
  416. }
  417. if (pi->pid)
  418. list_del(&pi->list);
  419. api->ipc_refcnt_dec (conn);
  420. return (0);
  421. }
  422. static struct group_info *get_group(const mar_cpg_name_t *name)
  423. {
  424. struct list_head *iter;
  425. struct group_info *gi = NULL;
  426. struct group_info *itergi;
  427. uint32_t hash = jhash(name->value, name->length, 0) % GROUP_HASH_SIZE;
  428. for (iter = group_lists[hash].next; iter != &group_lists[hash]; iter = iter->next) {
  429. itergi = list_entry(iter, struct group_info, list);
  430. if (memcmp(itergi->group_name.value, name->value, name->length) == 0) {
  431. gi = itergi;
  432. break;
  433. }
  434. }
  435. if (!gi) {
  436. gi = malloc(sizeof(struct group_info));
  437. if (!gi) {
  438. log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate group_info struct");
  439. return NULL;
  440. }
  441. memcpy(&gi->group_name, name, sizeof(mar_cpg_name_t));
  442. gi->rg = NULL;
  443. list_init(&gi->members);
  444. list_add(&gi->list, &group_lists[hash]);
  445. }
  446. return gi;
  447. }
  448. static int cpg_node_joinleave_send (struct group_info *gi, struct process_info *pi, int fn, int reason)
  449. {
  450. struct req_exec_cpg_procjoin req_exec_cpg_procjoin;
  451. struct iovec req_exec_cpg_iovec;
  452. int result;
  453. memcpy(&req_exec_cpg_procjoin.group_name, &gi->group_name, sizeof(mar_cpg_name_t));
  454. req_exec_cpg_procjoin.pid = pi->pid;
  455. req_exec_cpg_procjoin.reason = reason;
  456. req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin);
  457. req_exec_cpg_procjoin.header.id = SERVICE_ID_MAKE(CPG_SERVICE, fn);
  458. req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_procjoin;
  459. req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin);
  460. result = api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED);
  461. return (result);
  462. }
  463. static void remove_node_from_groups(
  464. unsigned int nodeid,
  465. struct list_head *remlist)
  466. {
  467. int i;
  468. struct list_head *iter, *iter2, *tmp;
  469. struct process_info *pi;
  470. struct group_info *gi;
  471. for (i=0; i < GROUP_HASH_SIZE; i++) {
  472. for (iter = group_lists[i].next; iter != &group_lists[i]; iter = iter->next) {
  473. gi = list_entry(iter, struct group_info, list);
  474. for (iter2 = gi->members.next, tmp = iter2->next; iter2 != &gi->members; iter2 = tmp, tmp = iter2->next) {
  475. pi = list_entry(iter2, struct process_info, list);
  476. if (pi->nodeid == nodeid) {
  477. /* Add it to the list of nodes to send notifications for */
  478. if (!gi->rg) {
  479. gi->rg = malloc(sizeof(struct removed_group));
  480. if (gi->rg) {
  481. list_add(&gi->rg->list, remlist);
  482. gi->rg->gi = gi;
  483. gi->rg->left_list_entries = 0;
  484. gi->rg->left_list_size = PROCESSOR_COUNT_MAX;
  485. }
  486. else {
  487. log_printf(LOGSYS_LEVEL_CRIT, "Unable to allocate removed group struct. CPG callbacks will be junk.");
  488. return;
  489. }
  490. }
  491. /* Do we need to increase the size ?
  492. * Yes, I increase this exponentially. Generally, if you've got a lot of groups,
  493. * you'll have a /lot/ of groups, and cgp_groupinfo is pretty small anyway
  494. */
  495. if (gi->rg->left_list_size == gi->rg->left_list_entries) {
  496. int newsize;
  497. struct removed_group *newrg;
  498. list_del(&gi->rg->list);
  499. newsize = gi->rg->left_list_size * 2;
  500. newrg = realloc(gi->rg, sizeof(struct removed_group) + newsize*sizeof(mar_cpg_address_t));
  501. if (!newrg) {
  502. log_printf(LOGSYS_LEVEL_CRIT, "Unable to realloc removed group struct. CPG callbacks will be junk.");
  503. return;
  504. }
  505. newrg->left_list_size = newsize+PROCESSOR_COUNT_MAX;
  506. gi->rg = newrg;
  507. list_add(&gi->rg->list, remlist);
  508. }
  509. gi->rg->left_list[gi->rg->left_list_entries].pid = pi->pid;
  510. gi->rg->left_list[gi->rg->left_list_entries].nodeid = pi->nodeid;
  511. gi->rg->left_list[gi->rg->left_list_entries].reason = CONFCHG_CPG_REASON_NODEDOWN;
  512. gi->rg->left_list_entries++;
  513. /* Remove node info for dead node */
  514. list_del(&pi->list);
  515. free(pi);
  516. }
  517. }
  518. }
  519. }
  520. }
  521. static void cpg_confchg_fn (
  522. enum totem_configuration_type configuration_type,
  523. const unsigned int *member_list, size_t member_list_entries,
  524. const unsigned int *left_list, size_t left_list_entries,
  525. const unsigned int *joined_list, size_t joined_list_entries,
  526. const struct memb_ring_id *ring_id)
  527. {
  528. int i;
  529. uint32_t lowest_nodeid = 0xffffffff;
  530. struct iovec req_exec_cpg_iovec;
  531. /* We don't send the library joinlist in here because it can end up
  532. out of order with the rest of the messages (which are totem ordered).
  533. So we get the lowest nodeid to send out a list of left nodes instead.
  534. On receipt of that message, all nodes will then notify their local clients
  535. of the new joinlist */
  536. if (left_list_entries) {
  537. for (i = 0; i < member_list_entries; i++) {
  538. if (member_list[i] < lowest_nodeid)
  539. lowest_nodeid = member_list[i];
  540. }
  541. log_printf(LOGSYS_LEVEL_DEBUG, "confchg, low nodeid=%d, us = %d\n", lowest_nodeid, api->totem_nodeid_get());
  542. if (lowest_nodeid == api->totem_nodeid_get()) {
  543. g_req_exec_cpg_downlist.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_DOWNLIST);
  544. g_req_exec_cpg_downlist.header.size = sizeof(struct req_exec_cpg_downlist);
  545. g_req_exec_cpg_downlist.left_nodes = left_list_entries;
  546. for (i = 0; i < left_list_entries; i++) {
  547. g_req_exec_cpg_downlist.nodeids[i] = left_list[i];
  548. }
  549. log_printf(LOGSYS_LEVEL_DEBUG,
  550. "confchg, build downlist: %lu nodes\n",
  551. (long unsigned int) left_list_entries);
  552. }
  553. }
  554. /* Don't send this message until we get the final configuration message */
  555. if (configuration_type == TOTEM_CONFIGURATION_REGULAR && g_req_exec_cpg_downlist.left_nodes) {
  556. req_exec_cpg_iovec.iov_base = (char *)&g_req_exec_cpg_downlist;
  557. req_exec_cpg_iovec.iov_len = g_req_exec_cpg_downlist.header.size;
  558. api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED);
  559. g_req_exec_cpg_downlist.left_nodes = 0;
  560. log_printf(LOGSYS_LEVEL_DEBUG, "confchg, sent downlist\n");
  561. }
  562. }
  563. /* Can byteswap join & leave messages */
  564. static void exec_cpg_procjoin_endian_convert (void *msg)
  565. {
  566. struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = msg;
  567. req_exec_cpg_procjoin->pid = swab32(req_exec_cpg_procjoin->pid);
  568. swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
  569. req_exec_cpg_procjoin->reason = swab32(req_exec_cpg_procjoin->reason);
  570. }
  571. static void exec_cpg_joinlist_endian_convert (void *msg_v)
  572. {
  573. char *msg = msg_v;
  574. coroipc_response_header_t *res = (coroipc_response_header_t *)msg;
  575. struct join_list_entry *jle = (struct join_list_entry *)(msg + sizeof(coroipc_response_header_t));
  576. /* XXX shouldn't mar_res_header be swabbed? */
  577. while ((const char*)jle < msg + res->size) {
  578. jle->pid = swab32(jle->pid);
  579. swab_mar_cpg_name_t (&jle->group_name);
  580. jle++;
  581. }
  582. }
  583. static void exec_cpg_downlist_endian_convert (void *msg)
  584. {
  585. struct req_exec_cpg_downlist *req_exec_cpg_downlist = msg;
  586. unsigned int i;
  587. req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes);
  588. for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
  589. req_exec_cpg_downlist->nodeids[i] = swab32(req_exec_cpg_downlist->nodeids[i]);
  590. }
  591. }
  592. static void exec_cpg_mcast_endian_convert (void *msg)
  593. {
  594. struct req_exec_cpg_mcast *req_exec_cpg_mcast = msg;
  595. swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
  596. swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
  597. req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
  598. req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
  599. swab_mar_message_source_t (&req_exec_cpg_mcast->source);
  600. }
  601. static void do_proc_join(
  602. const mar_cpg_name_t *name,
  603. uint32_t pid,
  604. unsigned int nodeid,
  605. int reason)
  606. {
  607. struct group_info *gi;
  608. struct process_info *pi;
  609. struct list_head *iter;
  610. mar_cpg_address_t notify_info;
  611. gi = get_group(name); /* this will always succeed ! */
  612. assert(gi);
  613. /* See if it already exists in this group */
  614. for (iter = gi->members.next; iter != &gi->members; iter = iter->next) {
  615. pi = list_entry(iter, struct process_info, list);
  616. if (pi->pid == pid && pi->nodeid == nodeid) {
  617. /* It could be a local join message */
  618. if ((nodeid == api->totem_nodeid_get()) &&
  619. (!pi->flags & PI_FLAG_MEMBER)) {
  620. goto local_join;
  621. } else {
  622. return;
  623. }
  624. }
  625. }
  626. pi = malloc(sizeof(struct process_info));
  627. if (!pi) {
  628. log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
  629. return;
  630. }
  631. pi->nodeid = nodeid;
  632. pi->pid = pid;
  633. pi->group = gi;
  634. pi->conn = NULL;
  635. pi->trackerconn = NULL;
  636. list_add_tail(&pi->list, &gi->members);
  637. local_join:
  638. pi->flags = PI_FLAG_MEMBER;
  639. notify_info.pid = pi->pid;
  640. notify_info.nodeid = nodeid;
  641. notify_info.reason = reason;
  642. notify_lib_joinlist(gi, NULL,
  643. 1, &notify_info,
  644. 0, NULL,
  645. MESSAGE_RES_CPG_CONFCHG_CALLBACK);
  646. }
  647. static void message_handler_req_exec_cpg_downlist (
  648. const void *message,
  649. unsigned int nodeid)
  650. {
  651. const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message;
  652. int i;
  653. struct list_head removed_list;
  654. log_printf(LOGSYS_LEVEL_DEBUG, "downlist left_list: %d\n", req_exec_cpg_downlist->left_nodes);
  655. list_init(&removed_list);
  656. /* Remove nodes from joined groups and add removed groups to the list */
  657. for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
  658. remove_node_from_groups( req_exec_cpg_downlist->nodeids[i], &removed_list);
  659. }
  660. if (!list_empty(&removed_list)) {
  661. struct list_head *iter, *tmp;
  662. for (iter = removed_list.next, tmp=iter->next; iter != &removed_list; iter = tmp, tmp = iter->next) {
  663. struct removed_group *rg = list_entry(iter, struct removed_group, list);
  664. notify_lib_joinlist(rg->gi, NULL,
  665. 0, NULL,
  666. rg->left_list_entries, rg->left_list,
  667. MESSAGE_RES_CPG_CONFCHG_CALLBACK);
  668. rg->gi->rg = NULL;
  669. free(rg);
  670. }
  671. }
  672. }
  673. static void message_handler_req_exec_cpg_procjoin (
  674. const void *message,
  675. unsigned int nodeid)
  676. {
  677. const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
  678. log_printf(LOGSYS_LEVEL_DEBUG, "got procjoin message from cluster node %d\n", nodeid);
  679. do_proc_join(&req_exec_cpg_procjoin->group_name,
  680. req_exec_cpg_procjoin->pid, nodeid,
  681. CONFCHG_CPG_REASON_JOIN);
  682. }
  683. static void message_handler_req_exec_cpg_procleave (
  684. const void *message,
  685. unsigned int nodeid)
  686. {
  687. const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
  688. struct group_info *gi;
  689. struct process_info *pi;
  690. struct list_head *iter;
  691. mar_cpg_address_t notify_info;
  692. log_printf(LOGSYS_LEVEL_DEBUG, "got procleave message from cluster node %d\n", nodeid);
  693. gi = get_group(&req_exec_cpg_procjoin->group_name); /* this will always succeed ! */
  694. assert(gi);
  695. notify_info.pid = req_exec_cpg_procjoin->pid;
  696. notify_info.nodeid = nodeid;
  697. notify_info.reason = req_exec_cpg_procjoin->reason;
  698. notify_lib_joinlist(gi, NULL,
  699. 0, NULL,
  700. 1, &notify_info,
  701. MESSAGE_RES_CPG_CONFCHG_CALLBACK);
  702. /* Find the node/PID to remove */
  703. for (iter = gi->members.next; iter != &gi->members; iter = iter->next) {
  704. pi = list_entry(iter, struct process_info, list);
  705. if (pi->pid == req_exec_cpg_procjoin->pid &&
  706. pi->nodeid == nodeid) {
  707. list_del(&pi->list);
  708. if (!pi->conn)
  709. free(pi);
  710. else
  711. pi->pid = 0;
  712. if (list_empty(&gi->members)) {
  713. remove_group(gi);
  714. }
  715. break;
  716. }
  717. }
  718. }
  719. /* Got a proclist from another node */
  720. static void message_handler_req_exec_cpg_joinlist (
  721. const void *message_v,
  722. unsigned int nodeid)
  723. {
  724. const char *message = message_v;
  725. const coroipc_response_header_t *res = (const coroipc_response_header_t *)message;
  726. const struct join_list_entry *jle = (const struct join_list_entry *)(message + sizeof(coroipc_response_header_t));
  727. log_printf(LOGSYS_LEVEL_NOTICE, "got joinlist message from node %d\n",
  728. nodeid);
  729. /* Ignore our own messages */
  730. if (nodeid == api->totem_nodeid_get()) {
  731. return;
  732. }
  733. while ((const char*)jle < message + res->size) {
  734. do_proc_join(&jle->group_name, jle->pid, nodeid,
  735. CONFCHG_CPG_REASON_NODEUP);
  736. jle++;
  737. }
  738. }
  739. static void message_handler_req_exec_cpg_mcast (
  740. const void *message,
  741. unsigned int nodeid)
  742. {
  743. const struct req_exec_cpg_mcast *req_exec_cpg_mcast = message;
  744. struct res_lib_cpg_deliver_callback *res_lib_cpg_mcast;
  745. int msglen = req_exec_cpg_mcast->msglen;
  746. char buf[sizeof(*res_lib_cpg_mcast) + msglen];
  747. struct group_info *gi;
  748. struct list_head *iter;
  749. /*
  750. * Track local messages so that flow is controlled on the local node
  751. */
  752. gi = get_group(&req_exec_cpg_mcast->group_name); /* this will always succeed ! */
  753. assert(gi);
  754. res_lib_cpg_mcast = (struct res_lib_cpg_deliver_callback *)buf;
  755. res_lib_cpg_mcast->header.id = MESSAGE_RES_CPG_DELIVER_CALLBACK;
  756. res_lib_cpg_mcast->header.size = sizeof(*res_lib_cpg_mcast) + msglen;
  757. res_lib_cpg_mcast->msglen = msglen;
  758. res_lib_cpg_mcast->pid = req_exec_cpg_mcast->pid;
  759. res_lib_cpg_mcast->nodeid = nodeid;
  760. if (api->ipc_source_is_local (&req_exec_cpg_mcast->source)) {
  761. api->ipc_refcnt_dec (req_exec_cpg_mcast->source.conn);
  762. }
  763. memcpy(&res_lib_cpg_mcast->group_name, &gi->group_name,
  764. sizeof(mar_cpg_name_t));
  765. memcpy(&res_lib_cpg_mcast->message,
  766. (const char*)message+sizeof(*req_exec_cpg_mcast), msglen);
  767. /* Send to all interested members */
  768. for (iter = gi->members.next; iter != &gi->members; iter = iter->next) {
  769. struct process_info *pi = list_entry(iter, struct process_info, list);
  770. if (pi->trackerconn && (pi->flags & PI_FLAG_MEMBER)) {
  771. api->ipc_dispatch_send(
  772. pi->trackerconn,
  773. buf,
  774. res_lib_cpg_mcast->header.size);
  775. }
  776. }
  777. }
  778. static int cpg_exec_send_joinlist(void)
  779. {
  780. int count = 0;
  781. char *buf;
  782. int i;
  783. struct list_head *iter;
  784. struct list_head *iter2;
  785. struct group_info *gi;
  786. coroipc_response_header_t *res;
  787. struct join_list_entry *jle;
  788. struct iovec req_exec_cpg_iovec;
  789. log_printf(LOGSYS_LEVEL_DEBUG, "sending joinlist to cluster\n");
  790. /* Count the number of groups we are a member of */
  791. for (i=0; i<GROUP_HASH_SIZE; i++) {
  792. for (iter = group_lists[i].next; iter != &group_lists[i]; iter = iter->next) {
  793. gi = list_entry(iter, struct group_info, list);
  794. for (iter2 = gi->members.next; iter2 != &gi->members; iter2 = iter2->next) {
  795. struct process_info *pi = list_entry(iter2, struct process_info, list);
  796. if (pi->pid && pi->nodeid == api->totem_nodeid_get()) {
  797. count++;
  798. }
  799. }
  800. }
  801. }
  802. /* Nothing to send */
  803. if (!count)
  804. return 0;
  805. buf = alloca(sizeof(coroipc_response_header_t) + sizeof(struct join_list_entry) * count);
  806. if (!buf) {
  807. log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate joinlist buffer");
  808. return -1;
  809. }
  810. jle = (struct join_list_entry *)(buf + sizeof(coroipc_response_header_t));
  811. res = (coroipc_response_header_t *)buf;
  812. for (i=0; i<GROUP_HASH_SIZE; i++) {
  813. for (iter = group_lists[i].next; iter != &group_lists[i]; iter = iter->next) {
  814. gi = list_entry(iter, struct group_info, list);
  815. for (iter2 = gi->members.next; iter2 != &gi->members; iter2 = iter2->next) {
  816. struct process_info *pi = list_entry(iter2, struct process_info, list);
  817. if (pi->pid && pi->nodeid == api->totem_nodeid_get()) {
  818. memcpy(&jle->group_name, &gi->group_name, sizeof(mar_cpg_name_t));
  819. jle->pid = pi->pid;
  820. jle++;
  821. }
  822. }
  823. }
  824. }
  825. res->id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_JOINLIST);
  826. res->size = sizeof(coroipc_response_header_t)+sizeof(struct join_list_entry) * count;
  827. req_exec_cpg_iovec.iov_base = buf;
  828. req_exec_cpg_iovec.iov_len = res->size;
  829. return (api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED));
  830. }
  831. static int cpg_lib_init_fn (void *conn)
  832. {
  833. struct process_info *pi = (struct process_info *)api->ipc_private_data_get (conn);
  834. api->ipc_refcnt_inc (conn);
  835. pi->conn = conn;
  836. log_printf(LOGSYS_LEVEL_DEBUG, "lib_init_fn: conn=%p, pi=%p\n", conn, pi);
  837. return (0);
  838. }
  839. /* Join message from the library */
  840. static void message_handler_req_lib_cpg_join (void *conn, const void *message)
  841. {
  842. const struct req_lib_cpg_join *req_lib_cpg_join = message;
  843. struct process_info *pi = (struct process_info *)api->ipc_private_data_get (conn);
  844. struct res_lib_cpg_join res_lib_cpg_join;
  845. struct group_info *gi;
  846. cs_error_t error = CS_OK;
  847. log_printf(LOGSYS_LEVEL_DEBUG, "got join request on %p, pi=%p, pi->pid=%d\n", conn, pi, pi->pid);
  848. /* Already joined on this conn */
  849. if (pi->pid) {
  850. error = CS_ERR_INVALID_PARAM;
  851. goto join_err;
  852. }
  853. gi = get_group(&req_lib_cpg_join->group_name);
  854. if (!gi) {
  855. error = CS_ERR_NO_SPACE;
  856. goto join_err;
  857. }
  858. /* Add a node entry for us */
  859. pi->nodeid = api->totem_nodeid_get();
  860. pi->pid = req_lib_cpg_join->pid;
  861. pi->group = gi;
  862. list_add(&pi->list, &gi->members);
  863. /* Tell the rest of the cluster */
  864. cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCJOIN, CONFCHG_CPG_REASON_JOIN);
  865. join_err:
  866. res_lib_cpg_join.header.size = sizeof(res_lib_cpg_join);
  867. res_lib_cpg_join.header.id = MESSAGE_RES_CPG_JOIN;
  868. res_lib_cpg_join.header.error = error;
  869. api->ipc_response_send(conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join));
  870. }
  871. /* Leave message from the library */
  872. static void message_handler_req_lib_cpg_leave (void *conn, const void *message)
  873. {
  874. struct process_info *pi = (struct process_info *)api->ipc_private_data_get (conn);
  875. struct res_lib_cpg_leave res_lib_cpg_leave;
  876. struct group_info *gi;
  877. cs_error_t error = CS_OK;
  878. log_printf(LOGSYS_LEVEL_DEBUG, "got leave request on %p\n", conn);
  879. if (!pi || !pi->pid || !pi->group) {
  880. error = CS_ERR_INVALID_PARAM;
  881. goto leave_ret;
  882. }
  883. gi = pi->group;
  884. /* Tell other nodes we are leaving.
  885. When we get this message back we will leave too */
  886. cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_LEAVE);
  887. pi->group = NULL;
  888. leave_ret:
  889. /* send return */
  890. res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave);
  891. res_lib_cpg_leave.header.id = MESSAGE_RES_CPG_LEAVE;
  892. res_lib_cpg_leave.header.error = error;
  893. api->ipc_response_send(conn, &res_lib_cpg_leave, sizeof(res_lib_cpg_leave));
  894. }
  895. /* Mcast message from the library */
  896. static void message_handler_req_lib_cpg_mcast (void *conn, const void *message)
  897. {
  898. const struct req_lib_cpg_mcast *req_lib_cpg_mcast = message;
  899. struct process_info *pi = (struct process_info *)api->ipc_private_data_get (conn);
  900. struct group_info *gi = pi->group;
  901. struct iovec req_exec_cpg_iovec[2];
  902. struct req_exec_cpg_mcast req_exec_cpg_mcast;
  903. struct res_lib_cpg_mcast res_lib_cpg_mcast;
  904. int msglen = req_lib_cpg_mcast->msglen;
  905. int result;
  906. log_printf(LOGSYS_LEVEL_DEBUG, "got mcast request on %p\n", conn);
  907. /* Can't send if we're not joined */
  908. if (!gi) {
  909. res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast);
  910. res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST;
  911. res_lib_cpg_mcast.header.error = CS_ERR_ACCESS; /* TODO Better error code ?? */
  912. api->ipc_response_send(conn, &res_lib_cpg_mcast,
  913. sizeof(res_lib_cpg_mcast));
  914. return;
  915. }
  916. req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
  917. req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
  918. MESSAGE_REQ_EXEC_CPG_MCAST);
  919. req_exec_cpg_mcast.pid = pi->pid;
  920. req_exec_cpg_mcast.msglen = msglen;
  921. api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
  922. memcpy(&req_exec_cpg_mcast.group_name, &gi->group_name,
  923. sizeof(mar_cpg_name_t));
  924. req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
  925. req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
  926. req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
  927. req_exec_cpg_iovec[1].iov_len = msglen;
  928. // TODO: guarantee type...
  929. result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
  930. api->ipc_refcnt_inc (conn);
  931. res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast);
  932. res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST;
  933. res_lib_cpg_mcast.header.error = CS_OK;
  934. api->ipc_response_send(conn, &res_lib_cpg_mcast,
  935. sizeof(res_lib_cpg_mcast));
  936. }
  937. static void message_handler_req_lib_cpg_membership (void *conn,
  938. const void *message)
  939. {
  940. struct process_info *pi = (struct process_info *)api->ipc_private_data_get (conn);
  941. log_printf(LOGSYS_LEVEL_DEBUG, "got membership request on %p\n", conn);
  942. if (!pi->group) {
  943. coroipc_response_header_t res;
  944. res.size = sizeof(res);
  945. res.id = MESSAGE_RES_CPG_MEMBERSHIP;
  946. res.error = CS_ERR_ACCESS; /* TODO Better error code */
  947. api->ipc_response_send(conn, &res, sizeof(res));
  948. return;
  949. }
  950. notify_lib_joinlist(pi->group, conn, 0, NULL, 0, NULL, MESSAGE_RES_CPG_MEMBERSHIP);
  951. }
  952. static void message_handler_req_lib_cpg_trackstart (void *conn,
  953. const void *message)
  954. {
  955. const struct req_lib_cpg_trackstart *req_lib_cpg_trackstart = message;
  956. struct res_lib_cpg_trackstart res_lib_cpg_trackstart;
  957. struct group_info *gi;
  958. struct process_info *otherpi;
  959. cs_error_t error = CS_OK;
  960. log_printf(LOGSYS_LEVEL_DEBUG, "got trackstart request on %p\n", conn);
  961. gi = get_group(&req_lib_cpg_trackstart->group_name);
  962. if (!gi) {
  963. error = CS_ERR_NO_SPACE;
  964. goto tstart_ret;
  965. }
  966. /* Find the partner connection and add us to it's process_info struct */
  967. otherpi = (struct process_info *)api->ipc_private_data_get (conn);
  968. otherpi->trackerconn = conn;
  969. tstart_ret:
  970. res_lib_cpg_trackstart.header.size = sizeof(res_lib_cpg_trackstart);
  971. res_lib_cpg_trackstart.header.id = MESSAGE_RES_CPG_TRACKSTART;
  972. res_lib_cpg_trackstart.header.error = CS_OK;
  973. api->ipc_response_send(conn, &res_lib_cpg_trackstart, sizeof(res_lib_cpg_trackstart));
  974. }
  975. static void message_handler_req_lib_cpg_trackstop (void *conn,
  976. const void *message)
  977. {
  978. const struct req_lib_cpg_trackstop *req_lib_cpg_trackstop = message;
  979. struct res_lib_cpg_trackstop res_lib_cpg_trackstop;
  980. struct process_info *otherpi;
  981. struct group_info *gi;
  982. cs_error_t error = CS_OK;
  983. log_printf(LOGSYS_LEVEL_DEBUG, "got trackstop request on %p\n", conn);
  984. gi = get_group(&req_lib_cpg_trackstop->group_name);
  985. if (!gi) {
  986. error = CS_ERR_NO_SPACE;
  987. goto tstop_ret;
  988. }
  989. /* Find the partner connection and add us to it's process_info struct */
  990. otherpi = (struct process_info *)api->ipc_private_data_get (conn);
  991. otherpi->trackerconn = NULL;
  992. tstop_ret:
  993. res_lib_cpg_trackstop.header.size = sizeof(res_lib_cpg_trackstop);
  994. res_lib_cpg_trackstop.header.id = MESSAGE_RES_CPG_TRACKSTOP;
  995. res_lib_cpg_trackstop.header.error = CS_OK;
  996. api->ipc_response_send(conn, &res_lib_cpg_trackstop.header, sizeof(res_lib_cpg_trackstop));
  997. }
  998. static void message_handler_req_lib_cpg_local_get (void *conn,
  999. const void *message)
  1000. {
  1001. struct res_lib_cpg_local_get res_lib_cpg_local_get;
  1002. res_lib_cpg_local_get.header.size = sizeof(res_lib_cpg_local_get);
  1003. res_lib_cpg_local_get.header.id = MESSAGE_RES_CPG_LOCAL_GET;
  1004. res_lib_cpg_local_get.header.error = CS_OK;
  1005. res_lib_cpg_local_get.local_nodeid = api->totem_nodeid_get ();
  1006. api->ipc_response_send(conn, &res_lib_cpg_local_get,
  1007. sizeof(res_lib_cpg_local_get));
  1008. }