msg.c 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302
  1. /*
  2. * Copyright (c) 2005 MontaVista Software, Inc.
  3. *
  4. * All rights reserved.
  5. *
  6. * Author: Steven Dake (sdake@mvista.com)
  7. *
  8. * This software licensed under BSD license, the text of which follows:
  9. *
  10. * Redistribution and use in source and binary forms, with or without
  11. * modification, are permitted provided that the following conditions are met:
  12. *
  13. * - Redistributions of source code must retain the above copyright notice,
  14. * this list of conditions and the following disclaimer.
  15. * - Redistributions in binary form must reproduce the above copyright notice,
  16. * this list of conditions and the following disclaimer in the documentation
  17. * and/or other materials provided with the distribution.
  18. * - Neither the name of the MontaVista Software, Inc. nor the names of its
  19. * contributors may be used to endorse or promote products derived from this
  20. * software without specific prior written permission.
  21. *
  22. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  23. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  24. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  25. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  26. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  27. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  28. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  29. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  30. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  31. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  32. * THE POSSIBILITY OF SUCH DAMAGE.
  33. */
  34. #include <stdio.h>
  35. #include <string.h>
  36. #include <stdlib.h>
  37. #include <assert.h>
  38. #include <unistd.h>
  39. #include <errno.h>
  40. #include <pthread.h>
  41. #include <sys/types.h>
  42. #include <sys/uio.h>
  43. #include <sys/socket.h>
  44. #include <sys/select.h>
  45. #include <sys/un.h>
  46. #include <saAis.h>
  47. #include <list.h>
  48. #include <saMsg.h>
  49. #include <ipc_gen.h>
  50. #include <ipc_msg.h>
  51. #include <ais_util.h>
  52. struct message_overlay {
  53. mar_res_header_t header __attribute__((aligned(8)));
  54. char data[4096];
  55. };
  56. /*
  57. * Data structure for instance data
  58. */
  59. struct msgInstance {
  60. int response_fd;
  61. int dispatch_fd;
  62. SaMsgCallbacksT callbacks;
  63. int finalize;
  64. SaMsgHandleT msgHandle;
  65. pthread_mutex_t response_mutex;
  66. pthread_mutex_t dispatch_mutex;
  67. struct list_head queue_list;
  68. };
  69. struct msgQueueInstance {
  70. int response_fd;
  71. SaMsgHandleT msgHandle;
  72. SaMsgQueueHandleT queueHandle;
  73. SaMsgQueueOpenFlagsT openFlags;
  74. SaNameT queueName;
  75. struct list_head list;
  76. struct list_head section_iteration_list_head;
  77. pthread_mutex_t *response_mutex;
  78. };
  79. void msgHandleInstanceDestructor (void *instance);
  80. void queueHandleInstanceDestructor (void *instance);
  81. /*
  82. * All MSG instances in this database
  83. */
  84. static struct saHandleDatabase msgHandleDatabase = {
  85. .handleCount = 0,
  86. .handles = 0,
  87. .mutex = PTHREAD_MUTEX_INITIALIZER,
  88. .handleInstanceDestructor = msgHandleInstanceDestructor
  89. };
  90. /*
  91. * All Queue instances in this database
  92. */
  93. static struct saHandleDatabase queueHandleDatabase = {
  94. .handleCount = 0,
  95. .handles = 0,
  96. .mutex = PTHREAD_MUTEX_INITIALIZER,
  97. .handleInstanceDestructor = queueHandleInstanceDestructor
  98. };
  99. /*
  100. * Versions supported
  101. */
  102. static SaVersionT msgVersionsSupported[] = {
  103. { 'B', 1, 1 }
  104. };
  105. static struct saVersionDatabase msgVersionDatabase = {
  106. sizeof (msgVersionsSupported) / sizeof (SaVersionT),
  107. msgVersionsSupported
  108. };
  109. struct iteratorSectionIdListEntry {
  110. struct list_head list;
  111. unsigned char data[0];
  112. };
  113. /*
  114. * Implementation
  115. */
  116. void msgHandleInstanceDestructor (void *instance)
  117. {
  118. struct msgInstance *msgInstance = instance;
  119. pthread_mutex_destroy (&msgInstance->response_mutex);
  120. pthread_mutex_destroy (&msgInstance->dispatch_mutex);
  121. }
  122. void queueHandleInstanceDestructor (void *instance)
  123. {
  124. }
  125. #ifdef COMPILE_OUT
  126. static void msgQueueInstanceFinalize (struct msgQueueInstance *msgQueueInstance)
  127. {
  128. struct msgSectionIterationInstance *sectionIterationInstance;
  129. struct list_head *sectionIterationList;
  130. struct list_head *sectionIterationListNext;
  131. for (sectionIterationList = msgQueueInstance->section_iteration_list_head.next,
  132. sectionIterationListNext = sectionIterationList->next;
  133. sectionIterationList != &msgQueueInstance->section_iteration_list_head;
  134. sectionIterationList = sectionIterationListNext,
  135. sectionIterationListNext = sectionIterationList->next) {
  136. sectionIterationInstance = list_entry (sectionIterationList,
  137. struct msgSectionIterationInstance, list);
  138. msgSectionIterationInstanceFinalize (sectionIterationInstance);
  139. }
  140. list_del (&msgQueueInstance->list);
  141. saHandleDestroy (&queueHandleDatabase, msgQueueInstance->queueHandle);
  142. }
  143. static void msgInstanceFinalize (struct msgInstance *msgInstance)
  144. {
  145. struct msgQueueInstance *msgQueueInstance;
  146. struct list_head *queueInstanceList;
  147. struct list_head *queueInstanceListNext;
  148. for (queueInstanceList = msgInstance->queue_list.next,
  149. queueInstanceListNext = queueInstanceList->next;
  150. queueInstanceList != &msgInstance->queue_list;
  151. queueInstanceList = queueInstanceListNext,
  152. queueInstanceListNext = queueInstanceList->next) {
  153. msgQueueInstance = list_entry (queueInstanceList,
  154. struct msgQueueInstance, list);
  155. msgQueueInstanceFinalize (msgQueueInstance);
  156. }
  157. saHandleDestroy (&msgHandleDatabase, msgInstance->msgHandle);
  158. }
  159. #endif
  160. SaAisErrorT
  161. saMsgInitialize (
  162. SaMsgHandleT *msgHandle,
  163. const SaMsgCallbacksT *callbacks,
  164. SaVersionT *version)
  165. {
  166. struct msgInstance *msgInstance;
  167. SaAisErrorT error = SA_AIS_OK;
  168. if (msgHandle == NULL) {
  169. return (SA_AIS_ERR_INVALID_PARAM);
  170. }
  171. error = saVersionVerify (&msgVersionDatabase, version);
  172. if (error != SA_AIS_OK) {
  173. goto error_no_destroy;
  174. }
  175. error = saHandleCreate (&msgHandleDatabase, sizeof (struct msgInstance),
  176. msgHandle);
  177. if (error != SA_AIS_OK) {
  178. goto error_no_destroy;
  179. }
  180. error = saHandleInstanceGet (&msgHandleDatabase, *msgHandle,
  181. (void *)&msgInstance);
  182. if (error != SA_AIS_OK) {
  183. goto error_destroy;
  184. }
  185. msgInstance->response_fd = -1;
  186. error = saServiceConnect (&msgInstance->response_fd,
  187. &msgInstance->dispatch_fd, MSG_SERVICE);
  188. if (error != SA_AIS_OK) {
  189. goto error_put_destroy;
  190. }
  191. if (callbacks) {
  192. memcpy (&msgInstance->callbacks, callbacks, sizeof (SaMsgCallbacksT));
  193. } else {
  194. memset (&msgInstance->callbacks, 0, sizeof (SaMsgCallbacksT));
  195. }
  196. list_init (&msgInstance->queue_list);
  197. msgInstance->msgHandle = *msgHandle;
  198. pthread_mutex_init (&msgInstance->response_mutex, NULL);
  199. saHandleInstancePut (&msgHandleDatabase, *msgHandle);
  200. return (SA_AIS_OK);
  201. error_put_destroy:
  202. saHandleInstancePut (&msgHandleDatabase, *msgHandle);
  203. error_destroy:
  204. saHandleDestroy (&msgHandleDatabase, *msgHandle);
  205. error_no_destroy:
  206. return (error);
  207. }
  208. SaAisErrorT
  209. saMsgSelectionObjectGet (
  210. const SaMsgHandleT msgHandle,
  211. SaSelectionObjectT *selectionObject)
  212. {
  213. struct msgInstance *msgInstance;
  214. SaAisErrorT error;
  215. if (selectionObject == NULL) {
  216. return (SA_AIS_ERR_INVALID_PARAM);
  217. }
  218. error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
  219. if (error != SA_AIS_OK) {
  220. return (error);
  221. }
  222. *selectionObject = msgInstance->dispatch_fd;
  223. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  224. return (SA_AIS_OK);
  225. }
  226. SaAisErrorT
  227. saMsgDispatch (
  228. const SaMsgHandleT msgHandle,
  229. SaDispatchFlagsT dispatchFlags)
  230. {
  231. struct pollfd ufds;
  232. int poll_fd;
  233. int timeout = 1;
  234. SaMsgCallbacksT callbacks;
  235. SaAisErrorT error;
  236. int dispatch_avail;
  237. struct msgInstance *msgInstance;
  238. int cont = 1; /* always continue do loop except when set to 0 */
  239. struct message_overlay dispatch_data;
  240. /*
  241. struct res_lib_msg_queueopenasync *res_lib_msg_queueopenasync;
  242. struct res_lib_msg_queuesynchronizeasync *res_lib_msg_queuesynchronizeasync;
  243. struct msgQueueInstance *msgQueueInstance;
  244. */
  245. if (dispatchFlags != SA_DISPATCH_ONE &&
  246. dispatchFlags != SA_DISPATCH_ALL &&
  247. dispatchFlags != SA_DISPATCH_BLOCKING) {
  248. return (SA_AIS_ERR_INVALID_PARAM);
  249. }
  250. error = saHandleInstanceGet (&msgHandleDatabase, msgHandle,
  251. (void *)&msgInstance);
  252. if (error != SA_AIS_OK) {
  253. goto error_exit;
  254. }
  255. /*
  256. * Timeout instantly for SA_DISPATCH_ALL
  257. */
  258. if (dispatchFlags == SA_DISPATCH_ALL) {
  259. timeout = 0;
  260. }
  261. do {
  262. /*
  263. * Read data directly from socket
  264. */
  265. poll_fd = msgInstance->dispatch_fd;
  266. ufds.fd = poll_fd;
  267. ufds.events = POLLIN;
  268. ufds.revents = 0;
  269. error = saPollRetry(&ufds, 1, timeout);
  270. if (error != SA_AIS_OK) {
  271. goto error_put;
  272. }
  273. pthread_mutex_lock(&msgInstance->dispatch_mutex);
  274. if (msgInstance->finalize == 1) {
  275. error = SA_AIS_OK;
  276. goto error_unlock;
  277. }
  278. if ((ufds.revents & (POLLERR|POLLHUP|POLLNVAL)) != 0) {
  279. error = SA_AIS_ERR_BAD_HANDLE;
  280. goto error_unlock;
  281. }
  282. dispatch_avail = (ufds.revents & POLLIN);
  283. if (dispatch_avail == 0 && dispatchFlags == SA_DISPATCH_ALL) {
  284. pthread_mutex_unlock(&msgInstance->dispatch_mutex);
  285. break; /* exit do while cont is 1 loop */
  286. } else
  287. if (dispatch_avail == 0) {
  288. pthread_mutex_unlock(&msgInstance->dispatch_mutex);
  289. continue;
  290. }
  291. memset(&dispatch_data,0, sizeof(struct message_overlay));
  292. error = saRecvRetry (msgInstance->dispatch_fd, &dispatch_data.header, sizeof (mar_res_header_t));
  293. if (error != SA_AIS_OK) {
  294. goto error_unlock;
  295. }
  296. if (dispatch_data.header.size > sizeof (mar_res_header_t)) {
  297. error = saRecvRetry (msgInstance->dispatch_fd, &dispatch_data.data,
  298. dispatch_data.header.size - sizeof (mar_res_header_t));
  299. if (error != SA_AIS_OK) {
  300. goto error_unlock;
  301. }
  302. }
  303. /*
  304. * Make copy of callbacks, message data, unlock instance,
  305. * and call callback. A risk of this dispatch method is that
  306. * the callback routines may operate at the same time that
  307. * MsgFinalize has been called in another thread.
  308. */
  309. memcpy(&callbacks,&msgInstance->callbacks, sizeof(msgInstance->callbacks));
  310. pthread_mutex_unlock(&msgInstance->dispatch_mutex);
  311. /*
  312. * Dispatch incoming response
  313. */
  314. switch (dispatch_data.header.id) {
  315. #ifdef COMPILE_OUT
  316. case MESSAGE_RES_MSG_QUEUE_QUEUEOPENASYNC:
  317. if (callbacks.saMsgQueueOpenCallback == NULL) {
  318. continue;
  319. }
  320. res_lib_msg_queueopenasync = (struct res_lib_msg_queueopenasync *) &dispatch_data;
  321. /*
  322. * This instance get/listadd/put required so that close
  323. * later has the proper list of queues
  324. */
  325. if (res_lib_msg_queueopenasync->header.error == SA_AIS_OK) {
  326. error = saHandleInstanceGet (&queueHandleDatabase,
  327. res_lib_msg_queueopenasync->queueHandle,
  328. (void *)&msgQueueInstance);
  329. assert (error == SA_AIS_OK); /* should only be valid handles here */
  330. /*
  331. * open succeeded without error
  332. */
  333. list_init (&msgQueueInstance->list);
  334. list_init (&msgQueueInstance->section_iteration_list_head);
  335. list_add (&msgQueueInstance->list,
  336. &msgInstance->queue_list);
  337. callbacks.saMsgQueueOpenCallback(
  338. res_lib_msg_queueopenasync->invocation,
  339. res_lib_msg_queueopenasync->queueHandle,
  340. res_lib_msg_queueopenasync->header.error);
  341. saHandleInstancePut (&queueHandleDatabase,
  342. res_lib_msg_queueopenasync->queueHandle);
  343. } else {
  344. /*
  345. * open failed with error
  346. */
  347. callbacks.saMsgQueueOpenCallback(
  348. res_lib_msg_queueopenasync->invocation,
  349. -1,
  350. res_lib_msg_queueopenasync->header.error);
  351. }
  352. break;
  353. case MESSAGE_RES_MSG_QUEUE_QUEUESYNCHRONIZEASYNC:
  354. if (callbacks.saMsgQueueSynchronizeCallback == NULL) {
  355. continue;
  356. }
  357. res_lib_msg_queuesynchronizeasync = (struct res_lib_msg_queuesynchronizeasync *) &dispatch_data;
  358. callbacks.saMsgQueueSynchronizeCallback(
  359. res_lib_msg_queuesynchronizeasync->invocation,
  360. res_lib_msg_queuesynchronizeasync->header.error);
  361. break;
  362. #endif
  363. default:
  364. /* TODO */
  365. break;
  366. }
  367. /*
  368. * Determine if more messages should be processed
  369. */
  370. switch (dispatchFlags) {
  371. case SA_DISPATCH_ONE:
  372. cont = 0;
  373. break;
  374. case SA_DISPATCH_ALL:
  375. break;
  376. case SA_DISPATCH_BLOCKING:
  377. break;
  378. }
  379. } while (cont);
  380. error_unlock:
  381. pthread_mutex_unlock(&msgInstance->dispatch_mutex);
  382. error_put:
  383. saHandleInstancePut(&msgHandleDatabase, msgHandle);
  384. error_exit:
  385. return (error);
  386. }
  387. SaAisErrorT
  388. saMsgFinalize (
  389. const SaMsgHandleT msgHandle)
  390. {
  391. struct msgInstance *msgInstance;
  392. SaAisErrorT error;
  393. error = saHandleInstanceGet (&msgHandleDatabase, msgHandle,
  394. (void *)&msgInstance);
  395. if (error != SA_AIS_OK) {
  396. return (error);
  397. }
  398. pthread_mutex_lock (&msgInstance->response_mutex);
  399. /*
  400. * Another thread has already started finalizing
  401. */
  402. if (msgInstance->finalize) {
  403. pthread_mutex_unlock (&msgInstance->response_mutex);
  404. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  405. return (SA_AIS_ERR_BAD_HANDLE);
  406. }
  407. msgInstance->finalize = 1;
  408. pthread_mutex_unlock (&msgInstance->response_mutex);
  409. // TODO msgInstanceFinalize (msgInstance);
  410. if (msgInstance->response_fd != -1) {
  411. shutdown (msgInstance->response_fd, 0);
  412. close (msgInstance->response_fd);
  413. }
  414. if (msgInstance->dispatch_fd != -1) {
  415. shutdown (msgInstance->dispatch_fd, 0);
  416. close (msgInstance->dispatch_fd);
  417. }
  418. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  419. return (SA_AIS_OK);
  420. }
  421. SaAisErrorT
  422. saMsgQueueOpen (
  423. SaMsgHandleT msgHandle,
  424. const SaNameT *queueName,
  425. const SaMsgQueueCreationAttributesT *creationAttributes,
  426. SaMsgQueueOpenFlagsT openFlags,
  427. SaTimeT timeout,
  428. SaMsgQueueHandleT *queueHandle)
  429. {
  430. SaAisErrorT error;
  431. struct msgQueueInstance *msgQueueInstance;
  432. struct msgInstance *msgInstance;
  433. struct req_lib_msg_queueopen req_lib_msg_queueopen;
  434. struct res_lib_msg_queueopen res_lib_msg_queueopen;
  435. error = saHandleInstanceGet (&msgHandleDatabase, msgHandle,
  436. (void *)&msgInstance);
  437. if (error != SA_AIS_OK) {
  438. goto error_exit;
  439. }
  440. error = saHandleCreate (&queueHandleDatabase,
  441. sizeof (struct msgQueueInstance), queueHandle);
  442. if (error != SA_AIS_OK) {
  443. goto error_put_msg;
  444. }
  445. error = saHandleInstanceGet (&queueHandleDatabase,
  446. *queueHandle, (void *)&msgQueueInstance);
  447. if (error != SA_AIS_OK) {
  448. goto error_destroy;
  449. }
  450. msgQueueInstance->response_fd = msgInstance->response_fd;
  451. msgQueueInstance->response_mutex = &msgInstance->response_mutex;
  452. msgQueueInstance->msgHandle = msgHandle;
  453. msgQueueInstance->queueHandle = *queueHandle;
  454. msgQueueInstance->openFlags = openFlags;
  455. req_lib_msg_queueopen.header.size = sizeof (struct req_lib_msg_queueopen);
  456. req_lib_msg_queueopen.header.id = MESSAGE_REQ_MSG_QUEUEOPEN;
  457. memcpy (&req_lib_msg_queueopen.queueName, queueName, sizeof (SaNameT));
  458. memcpy (&msgQueueInstance->queueName, queueName, sizeof (SaNameT));
  459. req_lib_msg_queueopen.creationAttributesSet = 0;
  460. if (creationAttributes) {
  461. memcpy (&req_lib_msg_queueopen.creationAttributes,
  462. creationAttributes,
  463. sizeof (SaMsgQueueCreationAttributesT));
  464. req_lib_msg_queueopen.creationAttributesSet = 1;
  465. }
  466. req_lib_msg_queueopen.openFlags = openFlags;
  467. req_lib_msg_queueopen.timeout = timeout;
  468. pthread_mutex_lock (msgQueueInstance->response_mutex);
  469. error = saSendReceiveReply (msgQueueInstance->response_fd,
  470. &req_lib_msg_queueopen,
  471. sizeof (struct req_lib_msg_queueopen),
  472. &res_lib_msg_queueopen,
  473. sizeof (struct res_lib_msg_queueopen));
  474. pthread_mutex_unlock (msgQueueInstance->response_mutex);
  475. if (res_lib_msg_queueopen.header.error != SA_AIS_OK) {
  476. error = res_lib_msg_queueopen.header.error;
  477. goto error_put_destroy;
  478. }
  479. saHandleInstancePut (&queueHandleDatabase, *queueHandle);
  480. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  481. return (error);
  482. error_put_destroy:
  483. saHandleInstancePut (&queueHandleDatabase, *queueHandle);
  484. error_destroy:
  485. saHandleDestroy (&queueHandleDatabase, *queueHandle);
  486. error_put_msg:
  487. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  488. error_exit:
  489. return (error);
  490. }
  491. SaAisErrorT
  492. saMsgQueueOpenAsync (
  493. SaMsgHandleT msgHandle,
  494. SaInvocationT invocation,
  495. const SaNameT *queueName,
  496. const SaMsgQueueCreationAttributesT *creationAttributes,
  497. SaMsgQueueOpenFlagsT openFlags)
  498. {
  499. struct msgQueueInstance *msgQueueInstance;
  500. struct msgInstance *msgInstance;
  501. SaMsgQueueHandleT queueHandle;
  502. SaAisErrorT error;
  503. struct req_lib_msg_queueopen req_lib_msg_queueopen;
  504. struct res_lib_msg_queueopenasync res_lib_msg_queueopenasync;
  505. error = saHandleInstanceGet (&msgHandleDatabase, msgHandle,
  506. (void *)&msgInstance);
  507. if (error != SA_AIS_OK) {
  508. goto error_exit;
  509. }
  510. if (msgInstance->callbacks.saMsgQueueOpenCallback == NULL) {
  511. error = SA_AIS_ERR_INIT;
  512. goto error_put_msg;
  513. }
  514. error = saHandleCreate (&queueHandleDatabase,
  515. sizeof (struct msgQueueInstance), &queueHandle);
  516. if (error != SA_AIS_OK) {
  517. goto error_put_msg;
  518. }
  519. error = saHandleInstanceGet (&queueHandleDatabase, queueHandle,
  520. (void *)&msgQueueInstance);
  521. if (error != SA_AIS_OK) {
  522. goto error_destroy;
  523. }
  524. msgQueueInstance->response_fd = msgInstance->response_fd;
  525. msgQueueInstance->response_mutex = &msgInstance->response_mutex;
  526. msgQueueInstance->msgHandle = msgHandle;
  527. msgQueueInstance->queueHandle = queueHandle;
  528. msgQueueInstance->openFlags = openFlags;
  529. req_lib_msg_queueopen.header.size = sizeof (struct req_lib_msg_queueopen);
  530. req_lib_msg_queueopen.header.id = MESSAGE_REQ_MSG_QUEUEOPEN;
  531. req_lib_msg_queueopen.invocation = invocation;
  532. req_lib_msg_queueopen.creationAttributesSet = 0;
  533. if (creationAttributes) {
  534. memcpy (&req_lib_msg_queueopen.creationAttributes,
  535. creationAttributes,
  536. sizeof (SaMsgQueueCreationAttributesT));
  537. req_lib_msg_queueopen.creationAttributesSet = 1;
  538. }
  539. req_lib_msg_queueopen.openFlags = openFlags;
  540. req_lib_msg_queueopen.queueHandle = queueHandle;
  541. pthread_mutex_unlock (msgQueueInstance->response_mutex);
  542. error = saSendReceiveReply (msgQueueInstance->response_fd,
  543. &req_lib_msg_queueopen,
  544. sizeof (struct req_lib_msg_queueopen),
  545. &res_lib_msg_queueopenasync,
  546. sizeof (struct res_lib_msg_queueopenasync));
  547. pthread_mutex_unlock (msgQueueInstance->response_mutex);
  548. if (res_lib_msg_queueopenasync.header.error != SA_AIS_OK) {
  549. error = res_lib_msg_queueopenasync.header.error;
  550. goto error_put_destroy;
  551. }
  552. saHandleInstancePut (&queueHandleDatabase, queueHandle);
  553. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  554. return (error);
  555. error_put_destroy:
  556. saHandleInstancePut (&queueHandleDatabase, queueHandle);
  557. error_destroy:
  558. saHandleDestroy (&queueHandleDatabase, queueHandle);
  559. error_put_msg:
  560. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  561. error_exit:
  562. return (error);
  563. }
  564. SaAisErrorT
  565. saMsgQueueClose (
  566. SaMsgQueueHandleT queueHandle)
  567. {
  568. struct req_lib_msg_queueclose req_lib_msg_queueclose;
  569. struct res_lib_msg_queueclose res_lib_msg_queueclose;
  570. SaAisErrorT error;
  571. struct msgQueueInstance *msgQueueInstance;
  572. error = saHandleInstanceGet (&queueHandleDatabase, queueHandle,
  573. (void *)&msgQueueInstance);
  574. if (error != SA_AIS_OK) {
  575. return (error);
  576. }
  577. req_lib_msg_queueclose.header.size = sizeof (struct req_lib_msg_queueclose);
  578. req_lib_msg_queueclose.header.id = MESSAGE_REQ_MSG_QUEUECLOSE;
  579. memcpy (&req_lib_msg_queueclose.queueName,
  580. &msgQueueInstance->queueName, sizeof (SaNameT));
  581. pthread_mutex_lock (msgQueueInstance->response_mutex);
  582. error = saSendReceiveReply (msgQueueInstance->response_fd,
  583. &req_lib_msg_queueclose,
  584. sizeof (struct req_lib_msg_queueclose),
  585. &res_lib_msg_queueclose,
  586. sizeof (struct res_lib_msg_queueclose));
  587. pthread_mutex_unlock (msgQueueInstance->response_mutex);
  588. if (error == SA_AIS_OK) {
  589. error = res_lib_msg_queueclose.header.error;
  590. }
  591. if (error == SA_AIS_OK) {
  592. // TODO msgQueueInstanceFinalize (msgQueueInstance);
  593. }
  594. saHandleInstancePut (&queueHandleDatabase, queueHandle);
  595. return (error);
  596. }
  597. SaAisErrorT
  598. saMsgQueueStatusGet (
  599. SaMsgHandleT msgHandle,
  600. const SaNameT *queueName,
  601. SaMsgQueueStatusT *queueStatus)
  602. {
  603. struct msgInstance *msgInstance;
  604. struct req_lib_msg_queuestatusget req_lib_msg_queuestatusget;
  605. struct res_lib_msg_queuestatusget res_lib_msg_queuestatusget;
  606. SaAisErrorT error;
  607. if (queueName == NULL) {
  608. return (SA_AIS_ERR_INVALID_PARAM);
  609. }
  610. error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
  611. if (error != SA_AIS_OK) {
  612. return (error);
  613. }
  614. req_lib_msg_queuestatusget.header.size = sizeof (struct req_lib_msg_queuestatusget);
  615. req_lib_msg_queuestatusget.header.id = MESSAGE_REQ_MSG_QUEUESTATUSGET;
  616. memcpy (&req_lib_msg_queuestatusget.queueName, queueName, sizeof (SaNameT));
  617. pthread_mutex_lock (&msgInstance->response_mutex);
  618. error = saSendReceiveReply (msgInstance->response_fd,
  619. &req_lib_msg_queuestatusget,
  620. sizeof (struct req_lib_msg_queuestatusget),
  621. &res_lib_msg_queuestatusget,
  622. sizeof (struct res_lib_msg_queuestatusget));
  623. pthread_mutex_unlock (&msgInstance->response_mutex);
  624. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  625. if (error == SA_AIS_OK)
  626. error = res_lib_msg_queuestatusget.header.error;
  627. if (error == SA_AIS_OK) {
  628. memcpy (queueStatus, &res_lib_msg_queuestatusget.queueStatus,
  629. sizeof (SaMsgQueueStatusT));
  630. }
  631. return (error);
  632. }
  633. SaAisErrorT
  634. saMsgQueueUnlink (
  635. SaMsgHandleT msgHandle,
  636. const SaNameT *queueName)
  637. {
  638. SaAisErrorT error;
  639. struct msgInstance *msgInstance;
  640. struct req_lib_msg_queueunlink req_lib_msg_queueunlink;
  641. struct res_lib_msg_queueunlink res_lib_msg_queueunlink;
  642. if (queueName == NULL) {
  643. return (SA_AIS_ERR_INVALID_PARAM);
  644. }
  645. error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
  646. if (error != SA_AIS_OK) {
  647. return (error);
  648. }
  649. req_lib_msg_queueunlink.header.size = sizeof (struct req_lib_msg_queueunlink);
  650. req_lib_msg_queueunlink.header.id = MESSAGE_REQ_MSG_QUEUEUNLINK;
  651. memcpy (&req_lib_msg_queueunlink.queueName, queueName, sizeof (SaNameT));
  652. pthread_mutex_lock (&msgInstance->response_mutex);
  653. error = saSendReceiveReply (msgInstance->response_fd,
  654. &req_lib_msg_queueunlink,
  655. sizeof (struct req_lib_msg_queueunlink),
  656. &res_lib_msg_queueunlink,
  657. sizeof (struct res_lib_msg_queueunlink));
  658. pthread_mutex_unlock (&msgInstance->response_mutex);
  659. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  660. return (error == SA_AIS_OK ? res_lib_msg_queueunlink.header.error : error);
  661. }
  662. SaAisErrorT
  663. saMsgQueueGroupCreate (
  664. SaMsgHandleT msgHandle,
  665. const SaNameT *queueGroupName,
  666. SaMsgQueueGroupPolicyT queueGroupPolicy)
  667. {
  668. SaAisErrorT error;
  669. struct msgInstance *msgInstance;
  670. struct req_lib_msg_queuegroupcreate req_lib_msg_queuegroupcreate;
  671. struct res_lib_msg_queuegroupcreate res_lib_msg_queuegroupcreate;
  672. if (queueGroupName == NULL) {
  673. return (SA_AIS_ERR_INVALID_PARAM);
  674. }
  675. error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
  676. if (error != SA_AIS_OK) {
  677. return (error);
  678. }
  679. req_lib_msg_queuegroupcreate.header.size = sizeof (struct req_lib_msg_queuegroupcreate);
  680. req_lib_msg_queuegroupcreate.header.id = MESSAGE_REQ_MSG_QUEUEGROUPCREATE;
  681. memcpy (&req_lib_msg_queuegroupcreate.queueGroupName, queueGroupName,
  682. sizeof (SaNameT));
  683. req_lib_msg_queuegroupcreate.queueGroupPolicy = queueGroupPolicy;
  684. pthread_mutex_lock (&msgInstance->response_mutex);
  685. error = saSendReceiveReply (msgInstance->response_fd,
  686. &req_lib_msg_queuegroupcreate,
  687. sizeof (struct req_lib_msg_queuegroupcreate),
  688. &res_lib_msg_queuegroupcreate,
  689. sizeof (struct res_lib_msg_queuegroupcreate));
  690. pthread_mutex_unlock (&msgInstance->response_mutex);
  691. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  692. return (error == SA_AIS_OK ? res_lib_msg_queuegroupcreate.header.error : error);
  693. }
  694. SaAisErrorT
  695. saMsgQueueGroupInsert (
  696. SaMsgHandleT msgHandle,
  697. const SaNameT *queueGroupName,
  698. const SaNameT *queueName)
  699. {
  700. SaAisErrorT error;
  701. struct msgInstance *msgInstance;
  702. struct req_lib_msg_queuegroupinsert req_lib_msg_queuegroupinsert;
  703. struct res_lib_msg_queuegroupinsert res_lib_msg_queuegroupinsert;
  704. if (queueName == NULL) {
  705. return (SA_AIS_ERR_INVALID_PARAM);
  706. }
  707. error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
  708. if (error != SA_AIS_OK) {
  709. return (error);
  710. }
  711. req_lib_msg_queuegroupinsert.header.size = sizeof (struct req_lib_msg_queuegroupinsert);
  712. req_lib_msg_queuegroupinsert.header.id = MESSAGE_REQ_MSG_QUEUEGROUPINSERT;
  713. memcpy (&req_lib_msg_queuegroupinsert.queueName, queueName, sizeof (SaNameT));
  714. memcpy (&req_lib_msg_queuegroupinsert.queueGroupName, queueGroupName,
  715. sizeof (SaNameT));
  716. pthread_mutex_lock (&msgInstance->response_mutex);
  717. error = saSendReceiveReply (msgInstance->response_fd,
  718. &req_lib_msg_queuegroupinsert,
  719. sizeof (struct req_lib_msg_queuegroupinsert),
  720. &res_lib_msg_queuegroupinsert,
  721. sizeof (struct res_lib_msg_queuegroupinsert));
  722. pthread_mutex_unlock (&msgInstance->response_mutex);
  723. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  724. return (error == SA_AIS_OK ? res_lib_msg_queuegroupinsert.header.error : error);
  725. }
  726. SaAisErrorT
  727. saMsgQueueGroupRemove (
  728. SaMsgHandleT msgHandle,
  729. const SaNameT *queueGroupName,
  730. const SaNameT *queueName)
  731. {
  732. SaAisErrorT error;
  733. struct msgInstance *msgInstance;
  734. struct req_lib_msg_queuegroupremove req_lib_msg_queuegroupremove;
  735. struct res_lib_msg_queuegroupremove res_lib_msg_queuegroupremove;
  736. if (queueName == NULL) {
  737. return (SA_AIS_ERR_INVALID_PARAM);
  738. }
  739. error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
  740. if (error != SA_AIS_OK) {
  741. return (error);
  742. }
  743. req_lib_msg_queuegroupremove.header.size = sizeof (struct req_lib_msg_queuegroupremove);
  744. req_lib_msg_queuegroupremove.header.id = MESSAGE_REQ_MSG_QUEUEGROUPREMOVE;
  745. memcpy (&req_lib_msg_queuegroupremove.queueName, queueName, sizeof (SaNameT));
  746. memcpy (&req_lib_msg_queuegroupremove.queueGroupName, queueGroupName,
  747. sizeof (SaNameT));
  748. pthread_mutex_lock (&msgInstance->response_mutex);
  749. error = saSendReceiveReply (msgInstance->response_fd,
  750. &req_lib_msg_queuegroupremove,
  751. sizeof (struct req_lib_msg_queuegroupremove),
  752. &res_lib_msg_queuegroupremove,
  753. sizeof (struct res_lib_msg_queuegroupremove));
  754. pthread_mutex_unlock (&msgInstance->response_mutex);
  755. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  756. return (error == SA_AIS_OK ? res_lib_msg_queuegroupremove.header.error : error);
  757. }
  758. SaAisErrorT
  759. saMsgQueueGroupDelete (
  760. SaMsgHandleT msgHandle,
  761. const SaNameT *queueGroupName)
  762. {
  763. SaAisErrorT error;
  764. struct msgInstance *msgInstance;
  765. struct req_lib_msg_queuegroupdelete req_lib_msg_queuegroupdelete;
  766. struct res_lib_msg_queuegroupdelete res_lib_msg_queuegroupdelete;
  767. if (queueGroupName == NULL) {
  768. return (SA_AIS_ERR_INVALID_PARAM);
  769. }
  770. error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
  771. if (error != SA_AIS_OK) {
  772. return (error);
  773. }
  774. req_lib_msg_queuegroupdelete.header.size = sizeof (struct req_lib_msg_queuegroupdelete);
  775. req_lib_msg_queuegroupdelete.header.id = MESSAGE_REQ_MSG_QUEUEGROUPDELETE;
  776. memcpy (&req_lib_msg_queuegroupdelete.queueGroupName, queueGroupName,
  777. sizeof (SaNameT));
  778. pthread_mutex_lock (&msgInstance->response_mutex);
  779. error = saSendReceiveReply (msgInstance->response_fd,
  780. &req_lib_msg_queuegroupdelete,
  781. sizeof (struct req_lib_msg_queuegroupdelete),
  782. &res_lib_msg_queuegroupdelete,
  783. sizeof (struct res_lib_msg_queuegroupdelete));
  784. pthread_mutex_unlock (&msgInstance->response_mutex);
  785. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  786. return (error == SA_AIS_OK ? res_lib_msg_queuegroupdelete.header.error : error);
  787. }
  788. SaAisErrorT
  789. saMsgQueueGroupTrack (
  790. SaMsgHandleT msgHandle,
  791. const SaNameT *queueGroupName,
  792. SaUint8T trackFlags,
  793. SaMsgQueueGroupNotificationBufferT *notificationBuffer)
  794. {
  795. SaAisErrorT error;
  796. struct msgInstance *msgInstance;
  797. struct req_lib_msg_queuegrouptrack req_lib_msg_queuegrouptrack;
  798. struct res_lib_msg_queuegrouptrack res_lib_msg_queuegrouptrack;
  799. if (queueGroupName == NULL) {
  800. return (SA_AIS_ERR_INVALID_PARAM);
  801. }
  802. error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
  803. if (error != SA_AIS_OK) {
  804. return (error);
  805. }
  806. req_lib_msg_queuegrouptrack.header.size = sizeof (struct req_lib_msg_queuegrouptrack);
  807. req_lib_msg_queuegrouptrack.header.id = MESSAGE_REQ_MSG_QUEUEGROUPTRACK;
  808. req_lib_msg_queuegrouptrack.trackFlags = trackFlags;
  809. memcpy (&req_lib_msg_queuegrouptrack.queueGroupName, queueGroupName,
  810. sizeof (SaNameT));
  811. pthread_mutex_lock (&msgInstance->response_mutex);
  812. error = saSendReceiveReply (msgInstance->response_fd,
  813. &req_lib_msg_queuegrouptrack,
  814. sizeof (struct req_lib_msg_queuegrouptrack),
  815. &res_lib_msg_queuegrouptrack,
  816. sizeof (struct res_lib_msg_queuegrouptrack));
  817. pthread_mutex_unlock (&msgInstance->response_mutex);
  818. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  819. return (error == SA_AIS_OK ? res_lib_msg_queuegrouptrack.header.error : error);
  820. }
  821. SaAisErrorT
  822. saMsgQueueGroupTrackStop (
  823. SaMsgHandleT msgHandle,
  824. const SaNameT *queueGroupName)
  825. {
  826. SaAisErrorT error;
  827. struct msgInstance *msgInstance;
  828. struct req_lib_msg_queuegrouptrackstop req_lib_msg_queuegrouptrackstop;
  829. struct res_lib_msg_queuegrouptrackstop res_lib_msg_queuegrouptrackstop;
  830. if (queueGroupName == NULL) {
  831. return (SA_AIS_ERR_INVALID_PARAM);
  832. }
  833. error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
  834. if (error != SA_AIS_OK) {
  835. return (error);
  836. }
  837. req_lib_msg_queuegrouptrackstop.header.size = sizeof (struct req_lib_msg_queuegrouptrackstop);
  838. req_lib_msg_queuegrouptrackstop.header.id = MESSAGE_REQ_MSG_QUEUEGROUPTRACKSTOP;
  839. memcpy (&req_lib_msg_queuegrouptrackstop.queueGroupName, queueGroupName,
  840. sizeof (SaNameT));
  841. pthread_mutex_lock (&msgInstance->response_mutex);
  842. error = saSendReceiveReply (msgInstance->response_fd,
  843. &req_lib_msg_queuegrouptrackstop,
  844. sizeof (struct req_lib_msg_queuegrouptrackstop),
  845. &res_lib_msg_queuegrouptrackstop,
  846. sizeof (struct res_lib_msg_queuegrouptrackstop));
  847. pthread_mutex_unlock (&msgInstance->response_mutex);
  848. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  849. return (error == SA_AIS_OK ? res_lib_msg_queuegrouptrackstop.header.error : error);
  850. }
  851. SaAisErrorT
  852. saMsgMessageSend (
  853. SaMsgHandleT msgHandle,
  854. const SaNameT *destination,
  855. const SaMsgMessageT *message,
  856. SaTimeT timeout)
  857. {
  858. SaAisErrorT error;
  859. struct msgInstance *msgInstance;
  860. struct req_lib_msg_messagesend req_lib_msg_messagesend;
  861. struct res_lib_msg_messagesend res_lib_msg_messagesend;
  862. error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
  863. if (error != SA_AIS_OK) {
  864. return (error);
  865. }
  866. req_lib_msg_messagesend.header.size = sizeof (struct req_lib_msg_messagesend);
  867. req_lib_msg_messagesend.header.id = MESSAGE_REQ_MSG_MESSAGESEND;
  868. memcpy (&req_lib_msg_messagesend.destination, destination, sizeof (SaNameT));
  869. pthread_mutex_lock (&msgInstance->response_mutex);
  870. error = saSendReceiveReply (msgInstance->response_fd,
  871. &req_lib_msg_messagesend,
  872. sizeof (struct req_lib_msg_messagesend),
  873. &res_lib_msg_messagesend,
  874. sizeof (struct res_lib_msg_messagesend));
  875. pthread_mutex_unlock (&msgInstance->response_mutex);
  876. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  877. return (error == SA_AIS_OK ? res_lib_msg_messagesend.header.error : error);
  878. }
  879. SaAisErrorT
  880. saMsgMessageSendAsync (
  881. SaMsgHandleT msgHandle,
  882. SaInvocationT invocation,
  883. const SaNameT *destination,
  884. const SaMsgMessageT *message,
  885. SaMsgAckFlagsT ackFlags)
  886. {
  887. SaAisErrorT error;
  888. struct msgInstance *msgInstance;
  889. struct req_lib_msg_messagesend req_lib_msg_messagesend;
  890. struct res_lib_msg_messagesendasync res_lib_msg_messagesendasync;
  891. error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
  892. if (error != SA_AIS_OK) {
  893. return (error);
  894. }
  895. req_lib_msg_messagesend.header.size = sizeof (struct req_lib_msg_messagesend);
  896. req_lib_msg_messagesend.header.id = MESSAGE_REQ_MSG_MESSAGESEND;
  897. memcpy (&req_lib_msg_messagesend.destination, destination, sizeof (SaNameT));
  898. pthread_mutex_lock (&msgInstance->response_mutex);
  899. error = saSendReceiveReply (msgInstance->response_fd,
  900. &req_lib_msg_messagesend,
  901. sizeof (struct req_lib_msg_messagesend),
  902. &res_lib_msg_messagesendasync,
  903. sizeof (struct res_lib_msg_messagesendasync));
  904. pthread_mutex_unlock (&msgInstance->response_mutex);
  905. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  906. return (error == SA_AIS_OK ? res_lib_msg_messagesendasync.header.error : error);
  907. }
  908. SaAisErrorT
  909. saMsgMessageGet (
  910. SaMsgQueueHandleT queueHandle,
  911. SaMsgMessageT *message,
  912. SaTimeT *sendTime,
  913. SaMsgSenderIdT *senderId,
  914. SaTimeT timeout)
  915. {
  916. SaAisErrorT error;
  917. struct msgQueueInstance *msgQueueInstance;
  918. struct req_lib_msg_messageget req_lib_msg_messageget;
  919. struct res_lib_msg_messageget res_lib_msg_messageget;
  920. error = saHandleInstanceGet (&queueHandleDatabase, queueHandle, (void *)&msgQueueInstance);
  921. if (error != SA_AIS_OK) {
  922. return (error);
  923. }
  924. req_lib_msg_messageget.header.size = sizeof (struct req_lib_msg_messageget);
  925. req_lib_msg_messageget.header.id = MESSAGE_REQ_MSG_MESSAGEGET;
  926. req_lib_msg_messageget.timeout = timeout;
  927. pthread_mutex_lock (msgQueueInstance->response_mutex);
  928. error = saSendReceiveReply (msgQueueInstance->response_fd,
  929. &req_lib_msg_messageget,
  930. sizeof (struct req_lib_msg_messageget),
  931. &res_lib_msg_messageget,
  932. sizeof (struct res_lib_msg_messageget));
  933. pthread_mutex_unlock (msgQueueInstance->response_mutex);
  934. saHandleInstancePut (&queueHandleDatabase, queueHandle);
  935. if (error == SA_AIS_OK)
  936. error = res_lib_msg_messageget.header.error;
  937. if (error == SA_AIS_OK) {
  938. *sendTime = res_lib_msg_messageget.sendTime;
  939. memcpy (senderId, &res_lib_msg_messageget.senderId,
  940. sizeof (SaMsgSenderIdT));
  941. }
  942. return (error);
  943. }
  944. SaAisErrorT
  945. saMsgMessageCancel (
  946. SaMsgQueueHandleT queueHandle)
  947. {
  948. SaAisErrorT error;
  949. struct msgQueueInstance *msgQueueInstance;
  950. struct req_lib_msg_messagecancel req_lib_msg_messagecancel;
  951. struct res_lib_msg_messagecancel res_lib_msg_messagecancel;
  952. error = saHandleInstanceGet (&msgHandleDatabase, queueHandle, (void *)&msgQueueInstance);
  953. if (error != SA_AIS_OK) {
  954. return (error);
  955. }
  956. req_lib_msg_messagecancel.header.size = sizeof (struct req_lib_msg_messagecancel);
  957. req_lib_msg_messagecancel.header.id = MESSAGE_REQ_MSG_MESSAGECANCEL;
  958. pthread_mutex_lock (msgQueueInstance->response_mutex);
  959. error = saSendReceiveReply (msgQueueInstance->response_fd,
  960. &req_lib_msg_messagecancel,
  961. sizeof (struct req_lib_msg_messagecancel),
  962. &res_lib_msg_messagecancel,
  963. sizeof (struct res_lib_msg_messagecancel));
  964. pthread_mutex_unlock (msgQueueInstance->response_mutex);
  965. saHandleInstancePut (&queueHandleDatabase, queueHandle);
  966. return (error == SA_AIS_OK ? res_lib_msg_messagecancel.header.error : error);
  967. }
  968. SaAisErrorT
  969. saMsgMessageSendReceive (
  970. SaMsgHandleT msgHandle,
  971. const SaNameT *destination,
  972. const SaMsgMessageT *sendMessage,
  973. SaMsgMessageT *receiveMessage,
  974. SaTimeT *replySendTime,
  975. SaTimeT timeout)
  976. {
  977. SaAisErrorT error;
  978. struct msgInstance *msgInstance;
  979. struct req_lib_msg_messagesendreceive req_lib_msg_messagesendreceive;
  980. struct res_lib_msg_messagesendreceive res_lib_msg_messagesendreceive;
  981. error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
  982. if (error != SA_AIS_OK) {
  983. return (error);
  984. }
  985. req_lib_msg_messagesendreceive.header.size = sizeof (struct req_lib_msg_messagesendreceive);
  986. req_lib_msg_messagesendreceive.header.id = MESSAGE_REQ_MSG_MESSAGEREPLY;
  987. memcpy (&req_lib_msg_messagesendreceive.destination, destination,
  988. sizeof (SaNameT));
  989. req_lib_msg_messagesendreceive.timeout = timeout;
  990. pthread_mutex_lock (&msgInstance->response_mutex);
  991. error = saSendReceiveReply (msgInstance->response_fd,
  992. &req_lib_msg_messagesendreceive,
  993. sizeof (struct req_lib_msg_messagesendreceive),
  994. &res_lib_msg_messagesendreceive,
  995. sizeof (struct res_lib_msg_messagesendreceive));
  996. pthread_mutex_unlock (&msgInstance->response_mutex);
  997. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  998. if (error == SA_AIS_OK)
  999. error = res_lib_msg_messagesendreceive.header.error;
  1000. if (error == SA_AIS_OK) {
  1001. *replySendTime = res_lib_msg_messagesendreceive.replySendTime;
  1002. }
  1003. return (error);
  1004. }
  1005. SaAisErrorT
  1006. saMsgMessageReply (
  1007. SaMsgHandleT msgHandle,
  1008. const SaMsgMessageT *replyMessage,
  1009. const SaMsgSenderIdT *senderId,
  1010. SaTimeT timeout)
  1011. {
  1012. SaAisErrorT error;
  1013. struct msgInstance *msgInstance;
  1014. struct req_lib_msg_messagereply req_lib_msg_messagereply;
  1015. struct res_lib_msg_messagereply res_lib_msg_messagereply;
  1016. error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
  1017. if (error != SA_AIS_OK) {
  1018. return (error);
  1019. }
  1020. req_lib_msg_messagereply.header.size = sizeof (struct req_lib_msg_messagereply);
  1021. req_lib_msg_messagereply.header.id = MESSAGE_REQ_MSG_MESSAGEREPLY;
  1022. memcpy (&req_lib_msg_messagereply.senderId, senderId, sizeof (SaMsgSenderIdT));
  1023. pthread_mutex_lock (&msgInstance->response_mutex);
  1024. error = saSendReceiveReply (msgInstance->response_fd,
  1025. &req_lib_msg_messagereply,
  1026. sizeof (struct req_lib_msg_messagereply),
  1027. &res_lib_msg_messagereply,
  1028. sizeof (struct res_lib_msg_messagereply));
  1029. pthread_mutex_unlock (&msgInstance->response_mutex);
  1030. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  1031. return (error == SA_AIS_OK ? res_lib_msg_messagereply.header.error : error);
  1032. }
  1033. SaAisErrorT saMsgMessageReplyAsync (
  1034. SaMsgHandleT msgHandle,
  1035. SaInvocationT invocation,
  1036. const SaMsgMessageT *replyMessage,
  1037. const SaMsgSenderIdT *senderId,
  1038. SaMsgAckFlagsT ackFlags)
  1039. {
  1040. SaAisErrorT error;
  1041. struct msgInstance *msgInstance;
  1042. struct req_lib_msg_messagereply req_lib_msg_messagereply;
  1043. struct res_lib_msg_messagereplyasync res_lib_msg_messagereplyasync;
  1044. error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
  1045. if (error != SA_AIS_OK) {
  1046. return (error);
  1047. }
  1048. req_lib_msg_messagereply.header.size = sizeof (struct req_lib_msg_messagereply);
  1049. req_lib_msg_messagereply.header.id = MESSAGE_REQ_MSG_MESSAGEREPLY;
  1050. memcpy (&req_lib_msg_messagereply.senderId, senderId, sizeof (SaMsgSenderIdT));
  1051. pthread_mutex_lock (&msgInstance->response_mutex);
  1052. error = saSendReceiveReply (msgInstance->response_fd,
  1053. &req_lib_msg_messagereply,
  1054. sizeof (struct req_lib_msg_messagereply),
  1055. &res_lib_msg_messagereplyasync,
  1056. sizeof (struct res_lib_msg_messagereplyasync));
  1057. pthread_mutex_unlock (&msgInstance->response_mutex);
  1058. saHandleInstancePut (&msgHandleDatabase, msgHandle);
  1059. return (error == SA_AIS_OK ? res_lib_msg_messagereplyasync.header.error : error);
  1060. }