evs.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593
  1. /*
  2. * vi: set autoindent tabstop=4 shiftwidth=4 :
  3. * Copyright (c) 2004-2005 MontaVista Software, Inc.
  4. *
  5. * All rights reserved.
  6. *
  7. * Author: Steven Dake (sdake@mvista.com)
  8. *
  9. * This software licensed under BSD license, the text of which follows:
  10. *
  11. * Redistribution and use in source and binary forms, with or without
  12. * modification, are permitted provided that the following conditions are met:
  13. *
  14. * - Redistributions of source code must retain the above copyright notice,
  15. * this list of conditions and the following disclaimer.
  16. * - Redistributions in binary form must reproduce the above copyright notice,
  17. * this list of conditions and the following disclaimer in the documentation
  18. * and/or other materials provided with the distribution.
  19. * - Neither the name of the MontaVista Software, Inc. nor the names of its
  20. * contributors may be used to endorse or promote products derived from this
  21. * software without specific prior written permission.
  22. *
  23. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  24. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  25. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  26. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  27. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  28. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  29. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  30. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  31. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  32. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  33. * THE POSSIBILITY OF SUCH DAMAGE.
  34. */
  35. /*
  36. * Provides an extended virtual synchrony API using the openais executive
  37. */
  38. #include <stdlib.h>
  39. #include <string.h>
  40. #include <unistd.h>
  41. #include <pthread.h>
  42. #include <sys/types.h>
  43. #include <sys/socket.h>
  44. #include <errno.h>
  45. #include "../include/saAis.h"
  46. #include "../include/evs.h"
  47. #include "../include/ipc_evs.h"
  48. #include "util.h"
  49. struct evs_inst {
  50. int response_fd;
  51. int dispatch_fd;
  52. int finalize;
  53. evs_callbacks_t callbacks;
  54. pthread_mutex_t response_mutex;
  55. pthread_mutex_t dispatch_mutex;
  56. };
  57. static void evs_instance_destructor (void *instance);
  58. static struct saHandleDatabase evs_handle_t_db = {
  59. .handleCount = 0,
  60. .handles = 0,
  61. .mutex = PTHREAD_MUTEX_INITIALIZER,
  62. .handleInstanceDestructor = evs_instance_destructor
  63. };
  64. /*
  65. * Clean up function for an evt instance (saEvtInitialize) handle
  66. */
  67. static void evs_instance_destructor (void *instance)
  68. {
  69. }
  70. evs_error_t evs_initialize (
  71. evs_handle_t *handle,
  72. evs_callbacks_t *callbacks)
  73. {
  74. SaAisErrorT error;
  75. struct evs_inst *evs_inst;
  76. error = saHandleCreate (&evs_handle_t_db, sizeof (struct evs_inst), handle);
  77. if (error != SA_AIS_OK) {
  78. goto error_no_destroy;
  79. }
  80. error = saHandleInstanceGet (&evs_handle_t_db, *handle, (void *)&evs_inst);
  81. if (error != SA_AIS_OK) {
  82. goto error_destroy;
  83. }
  84. error = saServiceConnectTwo (&evs_inst->response_fd,
  85. &evs_inst->dispatch_fd,
  86. EVS_SERVICE);
  87. if (error != SA_AIS_OK) {
  88. goto error_put_destroy;
  89. }
  90. memcpy (&evs_inst->callbacks, callbacks, sizeof (evs_callbacks_t));
  91. pthread_mutex_init (&evs_inst->response_mutex, NULL);
  92. pthread_mutex_init (&evs_inst->dispatch_mutex, NULL);
  93. saHandleInstancePut (&evs_handle_t_db, *handle);
  94. return (SA_AIS_OK);
  95. error_put_destroy:
  96. saHandleInstancePut (&evs_handle_t_db, *handle);
  97. error_destroy:
  98. saHandleDestroy (&evs_handle_t_db, *handle);
  99. error_no_destroy:
  100. return (error);
  101. }
  102. evs_error_t evs_finalize (
  103. evs_handle_t handle)
  104. {
  105. struct evs_inst *evs_inst;
  106. SaAisErrorT error;
  107. error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
  108. if (error != SA_AIS_OK) {
  109. return (error);
  110. }
  111. // TODO is the locking right here
  112. pthread_mutex_lock (&evs_inst->response_mutex);
  113. /*
  114. * Another thread has already started finalizing
  115. */
  116. if (evs_inst->finalize) {
  117. pthread_mutex_unlock (&evs_inst->response_mutex);
  118. saHandleInstancePut (&evs_handle_t_db, handle);
  119. return (EVS_ERR_BAD_HANDLE);
  120. }
  121. evs_inst->finalize = 1;
  122. pthread_mutex_unlock (&evs_inst->response_mutex);
  123. saHandleDestroy (&evs_handle_t_db, handle);
  124. /*
  125. * Disconnect from the server
  126. */
  127. if (evs_inst->response_fd != -1) {
  128. shutdown(evs_inst->response_fd, 0);
  129. close(evs_inst->response_fd);
  130. }
  131. if (evs_inst->dispatch_fd != -1) {
  132. shutdown(evs_inst->dispatch_fd, 0);
  133. close(evs_inst->dispatch_fd);
  134. }
  135. saHandleInstancePut (&evs_handle_t_db, handle);
  136. return (EVS_OK);
  137. }
  138. evs_error_t evs_fd_get (
  139. evs_handle_t handle,
  140. int *fd)
  141. {
  142. SaAisErrorT error;
  143. struct evs_inst *evs_inst;
  144. error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
  145. if (error != SA_AIS_OK) {
  146. return (error);
  147. }
  148. *fd = evs_inst->dispatch_fd;
  149. saHandleInstancePut (&evs_handle_t_db, handle);
  150. return (SA_AIS_OK);
  151. }
  152. struct res_overlay {
  153. struct res_header header;
  154. char data[512000];
  155. };
  156. evs_error_t evs_dispatch (
  157. evs_handle_t handle,
  158. evs_dispatch_t dispatch_types)
  159. {
  160. struct pollfd ufds;
  161. int timeout = -1;
  162. SaAisErrorT error;
  163. int cont = 1; /* always continue do loop except when set to 0 */
  164. int dispatch_avail;
  165. struct evs_inst *evs_inst;
  166. struct res_evs_confchg_callback *res_evs_confchg_callback;
  167. struct res_evs_deliver_callback *res_evs_deliver_callback;
  168. evs_callbacks_t callbacks;
  169. struct res_overlay dispatch_data;
  170. int ignore_dispatch = 0;
  171. error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
  172. if (error != SA_AIS_OK) {
  173. return (error);
  174. }
  175. /*
  176. * Timeout instantly for SA_DISPATCH_ONE or SA_DISPATCH_ALL and
  177. * wait indefinately for SA_DISPATCH_BLOCKING
  178. */
  179. if (dispatch_types == EVS_DISPATCH_ALL) {
  180. timeout = 0;
  181. }
  182. do {
  183. ufds.fd = evs_inst->dispatch_fd;
  184. ufds.events = POLLIN;
  185. ufds.revents = 0;
  186. error = saPollRetry (&ufds, 1, timeout);
  187. if (error != SA_AIS_OK) {
  188. goto error_nounlock;
  189. }
  190. pthread_mutex_lock (&evs_inst->dispatch_mutex);
  191. /*
  192. * Regather poll data in case ufds has changed since taking lock
  193. */
  194. error = saPollRetry (&ufds, 1, 0);
  195. if (error != SA_AIS_OK) {
  196. goto error_nounlock;
  197. }
  198. /*
  199. * Handle has been finalized in another thread
  200. */
  201. if (evs_inst->finalize == 1) {
  202. error = EVS_OK;
  203. pthread_mutex_unlock (&evs_inst->dispatch_mutex);
  204. goto error_unlock;
  205. }
  206. dispatch_avail = ufds.revents & POLLIN;
  207. if (dispatch_avail == 0 && dispatch_types == EVS_DISPATCH_ALL) {
  208. pthread_mutex_unlock (&evs_inst->dispatch_mutex);
  209. break; /* exit do while cont is 1 loop */
  210. } else
  211. if (dispatch_avail == 0) {
  212. pthread_mutex_unlock (&evs_inst->dispatch_mutex);
  213. continue; /* next poll */
  214. }
  215. if (ufds.revents & POLLIN) {
  216. /*
  217. * Queue empty, read response from socket
  218. */
  219. error = saRecvRetry (evs_inst->dispatch_fd, &dispatch_data.header,
  220. sizeof (struct res_header));
  221. if (error != SA_AIS_OK) {
  222. goto error_unlock;
  223. }
  224. if (dispatch_data.header.size > sizeof (struct res_header)) {
  225. error = saRecvRetry (evs_inst->dispatch_fd, &dispatch_data.data,
  226. dispatch_data.header.size - sizeof (struct res_header));
  227. if (error != SA_AIS_OK) {
  228. goto error_unlock;
  229. }
  230. }
  231. } else {
  232. pthread_mutex_unlock (&evs_inst->dispatch_mutex);
  233. continue;
  234. }
  235. /*
  236. * Make copy of callbacks, message data, unlock instance, and call callback
  237. * A risk of this dispatch method is that the callback routines may
  238. * operate at the same time that evsFinalize has been called.
  239. */
  240. memcpy (&callbacks, &evs_inst->callbacks, sizeof (evs_callbacks_t));
  241. pthread_mutex_unlock (&evs_inst->dispatch_mutex);
  242. /*
  243. * Dispatch incoming message
  244. */
  245. switch (dispatch_data.header.id) {
  246. case MESSAGE_RES_EVS_DELIVER_CALLBACK:
  247. res_evs_deliver_callback = (struct res_evs_deliver_callback *)&dispatch_data;
  248. callbacks.evs_deliver_fn (
  249. &res_evs_deliver_callback->evs_address,
  250. &res_evs_deliver_callback->msg,
  251. res_evs_deliver_callback->msglen);
  252. break;
  253. case MESSAGE_RES_EVS_CONFCHG_CALLBACK:
  254. res_evs_confchg_callback = (struct res_evs_confchg_callback *)&dispatch_data;
  255. callbacks.evs_confchg_fn (
  256. res_evs_confchg_callback->member_list,
  257. res_evs_confchg_callback->member_list_entries,
  258. res_evs_confchg_callback->left_list,
  259. res_evs_confchg_callback->left_list_entries,
  260. res_evs_confchg_callback->joined_list,
  261. res_evs_confchg_callback->joined_list_entries);
  262. break;
  263. default:
  264. error = SA_AIS_ERR_LIBRARY;
  265. goto error_nounlock;
  266. break;
  267. }
  268. /*
  269. * Determine if more messages should be processed
  270. * */
  271. switch (dispatch_types) {
  272. case EVS_DISPATCH_ONE:
  273. if (ignore_dispatch) {
  274. ignore_dispatch = 0;
  275. } else {
  276. cont = 0;
  277. }
  278. break;
  279. case EVS_DISPATCH_ALL:
  280. if (ignore_dispatch) {
  281. ignore_dispatch = 0;
  282. }
  283. break;
  284. case EVS_DISPATCH_BLOCKING:
  285. break;
  286. }
  287. } while (cont);
  288. error_unlock:
  289. saHandleInstancePut (&evs_handle_t_db, handle);
  290. error_nounlock:
  291. return (error);
  292. }
  293. evs_error_t evs_join (
  294. evs_handle_t handle,
  295. struct evs_group *groups,
  296. int group_entries)
  297. {
  298. evs_error_t error;
  299. struct evs_inst *evs_inst;
  300. struct iovec iov[2];
  301. struct req_lib_evs_join req_lib_evs_join;
  302. struct res_lib_evs_join res_lib_evs_join;
  303. error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
  304. if (error != SA_AIS_OK) {
  305. return (error);
  306. }
  307. req_lib_evs_join.header.size = sizeof (struct req_lib_evs_join) +
  308. (group_entries * sizeof (struct evs_group));
  309. req_lib_evs_join.header.id = MESSAGE_REQ_EVS_JOIN;
  310. req_lib_evs_join.group_entries = group_entries;
  311. iov[0].iov_base = &req_lib_evs_join;
  312. iov[0].iov_len = sizeof (struct req_lib_evs_join);
  313. iov[1].iov_base = groups;
  314. iov[1].iov_len = (group_entries * sizeof (struct evs_group));
  315. pthread_mutex_lock (&evs_inst->response_mutex);
  316. error = saSendMsgReceiveReply (evs_inst->response_fd, iov, 2,
  317. &res_lib_evs_join, sizeof (struct res_lib_evs_join));
  318. pthread_mutex_unlock (&evs_inst->response_mutex);
  319. if (error != SA_AIS_OK) {
  320. goto error_exit;
  321. }
  322. error = res_lib_evs_join.header.error;
  323. error_exit:
  324. saHandleInstancePut (&evs_handle_t_db, handle);
  325. return (error);
  326. }
  327. evs_error_t evs_leave (
  328. evs_handle_t handle,
  329. struct evs_group *groups,
  330. int group_entries)
  331. {
  332. evs_error_t error;
  333. struct evs_inst *evs_inst;
  334. struct iovec iov[2];
  335. struct req_lib_evs_leave req_lib_evs_leave;
  336. struct res_lib_evs_leave res_lib_evs_leave;
  337. error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
  338. if (error != SA_AIS_OK) {
  339. return (error);
  340. }
  341. req_lib_evs_leave.header.size = sizeof (struct req_lib_evs_leave) +
  342. (group_entries * sizeof (struct evs_group));
  343. req_lib_evs_leave.header.id = MESSAGE_REQ_EVS_LEAVE;
  344. req_lib_evs_leave.group_entries = group_entries;
  345. iov[0].iov_base = &req_lib_evs_leave;
  346. iov[0].iov_len = sizeof (struct req_lib_evs_leave);
  347. iov[1].iov_base = groups;
  348. iov[1].iov_len = (group_entries * sizeof (struct evs_group));
  349. pthread_mutex_lock (&evs_inst->response_mutex);
  350. error = saSendMsgReceiveReply (evs_inst->response_fd, iov, 2,
  351. &res_lib_evs_leave, sizeof (struct res_lib_evs_leave));
  352. pthread_mutex_unlock (&evs_inst->response_mutex);
  353. if (error != SA_AIS_OK) {
  354. goto error_exit;
  355. }
  356. error = res_lib_evs_leave.header.error;
  357. error_exit:
  358. saHandleInstancePut (&evs_handle_t_db, handle);
  359. return (error);
  360. }
  361. evs_error_t evs_mcast_joined (
  362. evs_handle_t handle,
  363. evs_guarantee_t guarantee,
  364. struct iovec *iovec,
  365. int iov_len)
  366. {
  367. int i;
  368. evs_error_t error;
  369. struct evs_inst *evs_inst;
  370. struct iovec iov[64];
  371. struct req_lib_evs_mcast_joined req_lib_evs_mcast_joined;
  372. struct res_lib_evs_mcast_joined res_lib_evs_mcast_joined;
  373. int msg_len = 0;
  374. error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
  375. if (error != SA_AIS_OK) {
  376. return (error);
  377. }
  378. for (i = 0; i < iov_len; i++ ) {
  379. msg_len += iovec[i].iov_len;
  380. }
  381. req_lib_evs_mcast_joined.header.size = sizeof (struct req_lib_evs_mcast_joined) +
  382. msg_len;
  383. req_lib_evs_mcast_joined.header.id = MESSAGE_REQ_EVS_MCAST_JOINED;
  384. req_lib_evs_mcast_joined.guarantee = guarantee;
  385. req_lib_evs_mcast_joined.msg_len = msg_len;
  386. iov[0].iov_base = &req_lib_evs_mcast_joined;
  387. iov[0].iov_len = sizeof (struct req_lib_evs_mcast_joined);
  388. memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
  389. pthread_mutex_lock (&evs_inst->response_mutex);
  390. error = saSendMsgReceiveReply (evs_inst->response_fd, iov, iov_len + 1,
  391. &res_lib_evs_mcast_joined, sizeof (struct res_lib_evs_mcast_joined));
  392. pthread_mutex_unlock (&evs_inst->response_mutex);
  393. if (error != SA_AIS_OK) {
  394. goto error_exit;
  395. }
  396. error = res_lib_evs_mcast_joined.header.error;
  397. error_exit:
  398. saHandleInstancePut (&evs_handle_t_db, handle);
  399. return (error);
  400. }
  401. evs_error_t evs_mcast_groups (
  402. evs_handle_t handle,
  403. evs_guarantee_t guarantee,
  404. struct evs_group *groups,
  405. int group_entries,
  406. struct iovec *iovec,
  407. int iov_len)
  408. {
  409. int i;
  410. evs_error_t error;
  411. struct evs_inst *evs_inst;
  412. struct iovec iov[64];
  413. struct req_lib_evs_mcast_groups req_lib_evs_mcast_groups;
  414. struct res_lib_evs_mcast_groups res_lib_evs_mcast_groups;
  415. int msg_len = 0;
  416. error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
  417. if (error != SA_AIS_OK) {
  418. return (error);
  419. }
  420. for (i = 0; i < iov_len; i++) {
  421. msg_len += iovec[i].iov_len;
  422. }
  423. req_lib_evs_mcast_groups.header.size = sizeof (struct req_lib_evs_mcast_groups) +
  424. (group_entries * sizeof (struct evs_group)) + msg_len;
  425. req_lib_evs_mcast_groups.header.id = MESSAGE_REQ_EVS_MCAST_GROUPS;
  426. req_lib_evs_mcast_groups.guarantee = guarantee;
  427. req_lib_evs_mcast_groups.msg_len = msg_len;
  428. req_lib_evs_mcast_groups.group_entries = group_entries;
  429. iov[0].iov_base = &req_lib_evs_mcast_groups;
  430. iov[0].iov_len = sizeof (struct req_lib_evs_mcast_groups);
  431. iov[1].iov_base = groups;
  432. iov[1].iov_len = (group_entries * sizeof (struct evs_group));
  433. memcpy (&iov[2], iovec, iov_len * sizeof (struct iovec));
  434. pthread_mutex_lock (&evs_inst->response_mutex);
  435. error = saSendMsgReceiveReply (evs_inst->response_fd, iov, iov_len + 2,
  436. &res_lib_evs_mcast_groups, sizeof (struct res_lib_evs_mcast_groups));
  437. pthread_mutex_unlock (&evs_inst->response_mutex);
  438. if (error != SA_AIS_OK) {
  439. goto error_exit;
  440. }
  441. error = res_lib_evs_mcast_groups.header.error;
  442. error_exit:
  443. saHandleInstancePut (&evs_handle_t_db, handle);
  444. return (error);
  445. }
  446. evs_error_t evs_membership_get (
  447. evs_handle_t handle,
  448. struct evs_address *local_addr,
  449. struct evs_address *member_list,
  450. int *member_list_entries)
  451. {
  452. evs_error_t error;
  453. struct evs_inst *evs_inst;
  454. struct iovec iov;
  455. struct req_lib_evs_membership_get req_lib_evs_membership_get;
  456. struct res_lib_evs_membership_get res_lib_evs_membership_get;
  457. error = saHandleInstanceGet (&evs_handle_t_db, handle, (void *)&evs_inst);
  458. if (error != SA_AIS_OK) {
  459. return (error);
  460. }
  461. req_lib_evs_membership_get.header.size = sizeof (struct req_lib_evs_membership_get);
  462. req_lib_evs_membership_get.header.id = MESSAGE_REQ_EVS_MEMBERSHIP_GET;
  463. iov.iov_base = &req_lib_evs_membership_get;
  464. iov.iov_len = sizeof (struct req_lib_evs_membership_get);
  465. pthread_mutex_lock (&evs_inst->response_mutex);
  466. error = saSendMsgReceiveReply (evs_inst->response_fd, &iov, 1,
  467. &res_lib_evs_membership_get, sizeof (struct res_lib_evs_membership_get));
  468. pthread_mutex_unlock (&evs_inst->response_mutex);
  469. if (error != SA_AIS_OK) {
  470. goto error_exit;
  471. }
  472. error = res_lib_evs_membership_get.header.error;
  473. /*
  474. * Copy results to caller
  475. */
  476. if (local_addr) {
  477. memcpy (local_addr, &res_lib_evs_membership_get.local_addr, sizeof (struct in_addr));
  478. }
  479. *member_list_entries = *member_list_entries < res_lib_evs_membership_get.member_list_entries ?
  480. *member_list_entries : res_lib_evs_membership_get.member_list_entries;
  481. if (member_list) {
  482. memcpy (member_list, &res_lib_evs_membership_get.member_list,
  483. *member_list_entries * sizeof (struct in_addr));
  484. }
  485. error_exit:
  486. saHandleInstancePut (&evs_handle_t_db, handle);
  487. return (error);
  488. }