evs.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594
  1. /*
  2. * vi: set autoindent tabstop=4 shiftwidth=4 :
  3. * Copyright (c) 2004-2005 MontaVista Software, Inc.
  4. *
  5. * All rights reserved.
  6. *
  7. * Author: Steven Dake (sdake@mvista.com)
  8. *
  9. * This software licensed under BSD license, the text of which follows:
  10. *
  11. * Redistribution and use in source and binary forms, with or without
  12. * modification, are permitted provided that the following conditions are met:
  13. *
  14. * - Redistributions of source code must retain the above copyright notice,
  15. * this list of conditions and the following disclaimer.
  16. * - Redistributions in binary form must reproduce the above copyright notice,
  17. * this list of conditions and the following disclaimer in the documentation
  18. * and/or other materials provided with the distribution.
  19. * - Neither the name of the MontaVista Software, Inc. nor the names of its
  20. * contributors may be used to endorse or promote products derived from this
  21. * software without specific prior written permission.
  22. *
  23. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  24. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  25. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  26. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  27. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  28. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  29. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  30. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  31. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  32. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  33. * THE POSSIBILITY OF SUCH DAMAGE.
  34. */
  35. /*
  36. * Provides an extended virtual synchrony API using the openais executive
  37. */
  38. #include <stdlib.h>
  39. #include <string.h>
  40. #include <unistd.h>
  41. #include <pthread.h>
  42. #include <sys/types.h>
  43. #include <sys/socket.h>
  44. #include <errno.h>
  45. #include "../include/ais_types.h"
  46. #include "../include/evs.h"
  47. #include "../include/ipc_evs.h"
  48. #include "util.h"
  49. struct evs_inst {
  50. int response_fd;
  51. int dispatch_fd;
  52. int finalize;
  53. evs_callbacks_t callbacks;
  54. pthread_mutex_t response_mutex;
  55. pthread_mutex_t dispatch_mutex;
  56. };
  57. static void evs_instance_destructor (void *instance);
  58. static struct saHandleDatabase evs_handle_t_db = {
  59. .handleCount = 0,
  60. .handles = 0,
  61. .mutex = PTHREAD_MUTEX_INITIALIZER,
  62. .handleInstanceDestructor = evs_instance_destructor
  63. };
  64. /*
  65. * Clean up function for an evt instance (saEvtInitialize) handle
  66. */
  67. static void evs_instance_destructor (void *instance)
  68. {
  69. }
  70. evs_error_t evs_initialize (
  71. evs_handle_t *handle,
  72. evs_callbacks_t *callbacks)
  73. {
  74. SaErrorT error;
  75. struct evs_inst *evs_inst;
  76. error = saHandleCreate (&evs_handle_t_db, sizeof (struct evs_inst), handle);
  77. if (error != SA_OK) {
  78. goto error_no_destroy;
  79. }
  80. error = saHandleInstanceGet (&evs_handle_t_db, *handle, (void *)&evs_inst);
  81. if (error != SA_OK) {
  82. goto error_destroy;
  83. }
  84. error = saServiceConnectTwo (&evs_inst->response_fd,
  85. &evs_inst->dispatch_fd,
  86. EVS_SERVICE);
  87. if (error != SA_OK) {
  88. goto error_put_destroy;
  89. }
  90. memcpy (&evs_inst->callbacks, callbacks, sizeof (evs_callbacks_t));
  91. pthread_mutex_init (&evs_inst->response_mutex, NULL);
  92. pthread_mutex_init (&evs_inst->dispatch_mutex, NULL);
  93. saHandleInstancePut (&evs_handle_t_db, *handle);
  94. return (SA_OK);
  95. error_put_destroy:
  96. saHandleInstancePut (&evs_handle_t_db, *handle);
  97. error_destroy:
  98. saHandleDestroy (&evs_handle_t_db, *handle);
  99. error_no_destroy:
  100. return (error);
  101. }
  102. evs_error_t evs_finalize (
  103. evs_handle_t handle)
  104. {
  105. struct evs_inst *evs_inst;
  106. SaErrorT error;
  107. error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
  108. if (error != SA_OK) {
  109. return (error);
  110. }
  111. // TODO is the locking right here
  112. pthread_mutex_lock (&evs_inst->response_mutex);
  113. /*
  114. * Another thread has already started finalizing
  115. */
  116. if (evs_inst->finalize) {
  117. pthread_mutex_unlock (&evs_inst->response_mutex);
  118. saHandleInstancePut (&evs_handle_t_db, handle);
  119. return (EVS_ERR_BAD_HANDLE);
  120. }
  121. evs_inst->finalize = 1;
  122. pthread_mutex_unlock (&evs_inst->response_mutex);
  123. saHandleDestroy (&evs_handle_t_db, handle);
  124. /*
  125. * Disconnect from the server
  126. */
  127. if (evs_inst->response_fd != -1) {
  128. shutdown(evs_inst->response_fd, 0);
  129. close(evs_inst->response_fd);
  130. }
  131. if (evs_inst->dispatch_fd != -1) {
  132. shutdown(evs_inst->dispatch_fd, 0);
  133. close(evs_inst->dispatch_fd);
  134. }
  135. saHandleInstancePut (&evs_handle_t_db, handle);
  136. return (EVS_OK);
  137. }
  138. evs_error_t evs_fd_get (
  139. evs_handle_t handle,
  140. int *fd)
  141. {
  142. SaErrorT error;
  143. struct evs_inst *evs_inst;
  144. error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
  145. if (error != SA_OK) {
  146. return (error);
  147. }
  148. *fd = evs_inst->dispatch_fd;
  149. saHandleInstancePut (&evs_handle_t_db, handle);
  150. return (SA_OK);
  151. }
  152. struct res_overlay {
  153. struct res_header header;
  154. char data[512000];
  155. };
  156. evs_error_t evs_dispatch (
  157. evs_handle_t handle,
  158. evs_dispatch_t dispatch_types)
  159. {
  160. struct pollfd ufds;
  161. int timeout = -1;
  162. SaErrorT error;
  163. int cont = 1; /* always continue do loop except when set to 0 */
  164. int dispatch_avail;
  165. struct evs_inst *evs_inst;
  166. struct res_evs_confchg_callback *res_evs_confchg_callback;
  167. struct res_evs_deliver_callback *res_evs_deliver_callback;
  168. evs_callbacks_t callbacks;
  169. struct res_overlay dispatch_data;
  170. int ignore_dispatch = 0;
  171. error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
  172. if (error != SA_OK) {
  173. return (error);
  174. }
  175. /*
  176. * Timeout instantly for SA_DISPATCH_ONE or SA_DISPATCH_ALL and
  177. * wait indefinately for SA_DISPATCH_BLOCKING
  178. */
  179. if (dispatch_types == EVS_DISPATCH_ALL) {
  180. timeout = 0;
  181. }
  182. do {
  183. ufds.fd = evs_inst->dispatch_fd;
  184. ufds.events = POLLIN;
  185. ufds.revents = 0;
  186. error = saPollRetry (&ufds, 1, timeout);
  187. if (error != SA_OK) {
  188. goto error_nounlock;
  189. }
  190. pthread_mutex_lock (&evs_inst->dispatch_mutex);
  191. /*
  192. * Regather poll data in case ufds has changed since taking lock
  193. */
  194. error = saPollRetry (&ufds, 1, 0);
  195. if (error != SA_OK) {
  196. goto error_nounlock;
  197. }
  198. /*
  199. * Handle has been finalized in another thread
  200. */
  201. if (evs_inst->finalize == 1) {
  202. error = EVS_OK;
  203. pthread_mutex_unlock (&evs_inst->dispatch_mutex);
  204. goto error_unlock;
  205. }
  206. dispatch_avail = ufds.revents & POLLIN;
  207. if (dispatch_avail == 0 && dispatch_types == EVS_DISPATCH_ALL) {
  208. pthread_mutex_unlock (&evs_inst->dispatch_mutex);
  209. break; /* exit do while cont is 1 loop */
  210. } else
  211. if (dispatch_avail == 0) {
  212. pthread_mutex_unlock (&evs_inst->dispatch_mutex);
  213. continue; /* next poll */
  214. }
  215. if (ufds.revents & POLLIN) {
  216. /*
  217. * Queue empty, read response from socket
  218. */
  219. error = saRecvRetry (evs_inst->dispatch_fd, &dispatch_data.header,
  220. sizeof (struct res_header), MSG_WAITALL | MSG_NOSIGNAL);
  221. if (error != SA_OK) {
  222. goto error_unlock;
  223. }
  224. if (dispatch_data.header.size > sizeof (struct res_header)) {
  225. error = saRecvRetry (evs_inst->dispatch_fd, &dispatch_data.data,
  226. dispatch_data.header.size - sizeof (struct res_header),
  227. MSG_WAITALL | MSG_NOSIGNAL);
  228. if (error != SA_OK) {
  229. goto error_unlock;
  230. }
  231. }
  232. } else {
  233. pthread_mutex_unlock (&evs_inst->dispatch_mutex);
  234. continue;
  235. }
  236. /*
  237. * Make copy of callbacks, message data, unlock instance, and call callback
  238. * A risk of this dispatch method is that the callback routines may
  239. * operate at the same time that evsFinalize has been called.
  240. */
  241. memcpy (&callbacks, &evs_inst->callbacks, sizeof (evs_callbacks_t));
  242. pthread_mutex_unlock (&evs_inst->dispatch_mutex);
  243. /*
  244. * Dispatch incoming message
  245. */
  246. switch (dispatch_data.header.id) {
  247. case MESSAGE_RES_EVS_DELIVER_CALLBACK:
  248. res_evs_deliver_callback = (struct res_evs_deliver_callback *)&dispatch_data;
  249. callbacks.evs_deliver_fn (
  250. res_evs_deliver_callback->source_addr,
  251. &res_evs_deliver_callback->msg,
  252. res_evs_deliver_callback->msglen);
  253. break;
  254. case MESSAGE_RES_EVS_CONFCHG_CALLBACK:
  255. res_evs_confchg_callback = (struct res_evs_confchg_callback *)&dispatch_data;
  256. callbacks.evs_confchg_fn (
  257. res_evs_confchg_callback->member_list,
  258. res_evs_confchg_callback->member_list_entries,
  259. res_evs_confchg_callback->left_list,
  260. res_evs_confchg_callback->left_list_entries,
  261. res_evs_confchg_callback->joined_list,
  262. res_evs_confchg_callback->joined_list_entries);
  263. break;
  264. default:
  265. error = SA_ERR_LIBRARY;
  266. goto error_nounlock;
  267. break;
  268. }
  269. /*
  270. * Determine if more messages should be processed
  271. * */
  272. switch (dispatch_types) {
  273. case EVS_DISPATCH_ONE:
  274. if (ignore_dispatch) {
  275. ignore_dispatch = 0;
  276. } else {
  277. cont = 0;
  278. }
  279. break;
  280. case EVS_DISPATCH_ALL:
  281. if (ignore_dispatch) {
  282. ignore_dispatch = 0;
  283. }
  284. break;
  285. case EVS_DISPATCH_BLOCKING:
  286. break;
  287. }
  288. } while (cont);
  289. error_unlock:
  290. saHandleInstancePut (&evs_handle_t_db, handle);
  291. error_nounlock:
  292. return (error);
  293. }
  294. evs_error_t evs_join (
  295. evs_handle_t handle,
  296. struct evs_group *groups,
  297. int group_entries)
  298. {
  299. evs_error_t error;
  300. struct evs_inst *evs_inst;
  301. struct iovec iov[2];
  302. struct req_lib_evs_join req_lib_evs_join;
  303. struct res_lib_evs_join res_lib_evs_join;
  304. error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
  305. if (error != SA_OK) {
  306. return (error);
  307. }
  308. req_lib_evs_join.header.size = sizeof (struct req_lib_evs_join) +
  309. (group_entries * sizeof (struct evs_group));
  310. req_lib_evs_join.header.id = MESSAGE_REQ_EVS_JOIN;
  311. req_lib_evs_join.group_entries = group_entries;
  312. iov[0].iov_base = &req_lib_evs_join;
  313. iov[0].iov_len = sizeof (struct req_lib_evs_join);
  314. iov[1].iov_base = groups;
  315. iov[1].iov_len = (group_entries * sizeof (struct evs_group));
  316. pthread_mutex_lock (&evs_inst->response_mutex);
  317. error = saSendMsgReceiveReply (evs_inst->response_fd, iov, 2,
  318. &res_lib_evs_join, sizeof (struct res_lib_evs_join));
  319. pthread_mutex_unlock (&evs_inst->response_mutex);
  320. if (error != SA_OK) {
  321. goto error_exit;
  322. }
  323. error = res_lib_evs_join.header.error;
  324. error_exit:
  325. saHandleInstancePut (&evs_handle_t_db, handle);
  326. return (error);
  327. }
  328. evs_error_t evs_leave (
  329. evs_handle_t handle,
  330. struct evs_group *groups,
  331. int group_entries)
  332. {
  333. evs_error_t error;
  334. struct evs_inst *evs_inst;
  335. struct iovec iov[2];
  336. struct req_lib_evs_leave req_lib_evs_leave;
  337. struct res_lib_evs_leave res_lib_evs_leave;
  338. error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
  339. if (error != SA_OK) {
  340. return (error);
  341. }
  342. req_lib_evs_leave.header.size = sizeof (struct req_lib_evs_leave) +
  343. (group_entries * sizeof (struct evs_group));
  344. req_lib_evs_leave.header.id = MESSAGE_REQ_EVS_LEAVE;
  345. req_lib_evs_leave.group_entries = group_entries;
  346. iov[0].iov_base = &req_lib_evs_leave;
  347. iov[0].iov_len = sizeof (struct req_lib_evs_leave);
  348. iov[1].iov_base = groups;
  349. iov[1].iov_len = (group_entries * sizeof (struct evs_group));
  350. pthread_mutex_lock (&evs_inst->response_mutex);
  351. error = saSendMsgReceiveReply (evs_inst->response_fd, iov, 2,
  352. &res_lib_evs_leave, sizeof (struct res_lib_evs_leave));
  353. pthread_mutex_unlock (&evs_inst->response_mutex);
  354. if (error != SA_OK) {
  355. goto error_exit;
  356. }
  357. error = res_lib_evs_leave.header.error;
  358. error_exit:
  359. saHandleInstancePut (&evs_handle_t_db, handle);
  360. return (error);
  361. }
  362. evs_error_t evs_mcast_joined (
  363. evs_handle_t handle,
  364. evs_guarantee_t guarantee,
  365. struct iovec *iovec,
  366. int iov_len)
  367. {
  368. int i;
  369. evs_error_t error;
  370. struct evs_inst *evs_inst;
  371. struct iovec iov[64];
  372. struct req_lib_evs_mcast_joined req_lib_evs_mcast_joined;
  373. struct res_lib_evs_mcast_joined res_lib_evs_mcast_joined;
  374. int msg_len = 0;
  375. error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
  376. if (error != SA_OK) {
  377. return (error);
  378. }
  379. for (i = 0; i < iov_len; i++ ) {
  380. msg_len += iovec[i].iov_len;
  381. }
  382. req_lib_evs_mcast_joined.header.size = sizeof (struct req_lib_evs_mcast_joined) +
  383. msg_len;
  384. req_lib_evs_mcast_joined.header.id = MESSAGE_REQ_EVS_MCAST_JOINED;
  385. req_lib_evs_mcast_joined.guarantee = guarantee;
  386. req_lib_evs_mcast_joined.msg_len = msg_len;
  387. iov[0].iov_base = &req_lib_evs_mcast_joined;
  388. iov[0].iov_len = sizeof (struct req_lib_evs_mcast_joined);
  389. memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
  390. pthread_mutex_lock (&evs_inst->response_mutex);
  391. error = saSendMsgReceiveReply (evs_inst->response_fd, iov, iov_len + 1,
  392. &res_lib_evs_mcast_joined, sizeof (struct res_lib_evs_mcast_joined));
  393. pthread_mutex_unlock (&evs_inst->response_mutex);
  394. if (error != SA_OK) {
  395. goto error_exit;
  396. }
  397. error = res_lib_evs_mcast_joined.header.error;
  398. error_exit:
  399. saHandleInstancePut (&evs_handle_t_db, handle);
  400. return (error);
  401. }
  402. evs_error_t evs_mcast_groups (
  403. evs_handle_t handle,
  404. evs_guarantee_t guarantee,
  405. struct evs_group *groups,
  406. int group_entries,
  407. struct iovec *iovec,
  408. int iov_len)
  409. {
  410. int i;
  411. evs_error_t error;
  412. struct evs_inst *evs_inst;
  413. struct iovec iov[64];
  414. struct req_lib_evs_mcast_groups req_lib_evs_mcast_groups;
  415. struct res_lib_evs_mcast_groups res_lib_evs_mcast_groups;
  416. int msg_len = 0;
  417. error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
  418. if (error != SA_OK) {
  419. return (error);
  420. }
  421. for (i = 0; i < iov_len; i++) {
  422. msg_len += iovec[i].iov_len;
  423. }
  424. req_lib_evs_mcast_groups.header.size = sizeof (struct req_lib_evs_mcast_groups) +
  425. (group_entries * sizeof (struct evs_group)) + msg_len;
  426. req_lib_evs_mcast_groups.header.id = MESSAGE_REQ_EVS_MCAST_GROUPS;
  427. req_lib_evs_mcast_groups.guarantee = guarantee;
  428. req_lib_evs_mcast_groups.msg_len = msg_len;
  429. req_lib_evs_mcast_groups.group_entries = group_entries;
  430. iov[0].iov_base = &req_lib_evs_mcast_groups;
  431. iov[0].iov_len = sizeof (struct req_lib_evs_mcast_groups);
  432. iov[1].iov_base = groups;
  433. iov[1].iov_len = (group_entries * sizeof (struct evs_group));
  434. memcpy (&iov[2], iovec, iov_len * sizeof (struct iovec));
  435. pthread_mutex_lock (&evs_inst->response_mutex);
  436. error = saSendMsgReceiveReply (evs_inst->response_fd, iov, iov_len + 2,
  437. &res_lib_evs_mcast_groups, sizeof (struct res_lib_evs_mcast_groups));
  438. pthread_mutex_unlock (&evs_inst->response_mutex);
  439. if (error != SA_OK) {
  440. goto error_exit;
  441. }
  442. error = res_lib_evs_mcast_groups.header.error;
  443. error_exit:
  444. saHandleInstancePut (&evs_handle_t_db, handle);
  445. return (error);
  446. }
  447. evs_error_t evs_membership_get (
  448. evs_handle_t handle,
  449. struct in_addr *local_addr,
  450. struct in_addr *member_list,
  451. int *member_list_entries)
  452. {
  453. evs_error_t error;
  454. struct evs_inst *evs_inst;
  455. struct iovec iov;
  456. struct req_lib_evs_membership_get req_lib_evs_membership_get;
  457. struct res_lib_evs_membership_get res_lib_evs_membership_get;
  458. error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
  459. if (error != SA_OK) {
  460. return (error);
  461. }
  462. req_lib_evs_membership_get.header.size = sizeof (struct req_lib_evs_membership_get);
  463. req_lib_evs_membership_get.header.id = MESSAGE_REQ_EVS_MEMBERSHIP_GET;
  464. iov.iov_base = &req_lib_evs_membership_get;
  465. iov.iov_len = sizeof (struct req_lib_evs_membership_get);
  466. pthread_mutex_lock (&evs_inst->response_mutex);
  467. error = saSendMsgReceiveReply (evs_inst->response_fd, &iov, 1,
  468. &res_lib_evs_membership_get, sizeof (struct res_lib_evs_membership_get));
  469. pthread_mutex_unlock (&evs_inst->response_mutex);
  470. if (error != SA_OK) {
  471. goto error_exit;
  472. }
  473. error = res_lib_evs_membership_get.header.error;
  474. /*
  475. * Copy results to caller
  476. */
  477. if (local_addr) {
  478. memcpy (local_addr, &res_lib_evs_membership_get.local_addr, sizeof (struct in_addr));
  479. }
  480. *member_list_entries = *member_list_entries < res_lib_evs_membership_get.member_list_entries ?
  481. *member_list_entries : res_lib_evs_membership_get.member_list_entries;
  482. if (member_list) {
  483. memcpy (member_list, &res_lib_evs_membership_get.member_list,
  484. *member_list_entries * sizeof (struct in_addr));
  485. }
  486. error_exit:
  487. saHandleInstancePut (&evs_handle_t_db, handle);
  488. return (error);
  489. }