coroipcs.c 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387
  1. /*
  2. * Copyright (c) 2006-2009 Red Hat, Inc.
  3. *
  4. * All rights reserved.
  5. *
  6. * Author: Steven Dake (sdake@redhat.com)
  7. *
  8. * This software licensed under BSD license, the text of which follows:
  9. *
  10. * Redistribution and use in source and binary forms, with or without
  11. * modification, are permitted provided that the following conditions are met:
  12. *
  13. * - Redistributions of source code must retain the above copyright notice,
  14. * this list of conditions and the following disclaimer.
  15. * - Redistributions in binary form must reproduce the above copyright notice,
  16. * this list of conditions and the following disclaimer in the documentation
  17. * and/or other materials provided with the distribution.
  18. * - Neither the name of the MontaVista Software, Inc. nor the names of its
  19. * contributors may be used to endorse or promote products derived from this
  20. * software without specific prior written permission.
  21. *
  22. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  23. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  24. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  25. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  26. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  27. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  28. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  29. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  30. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  31. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  32. * THE POSSIBILITY OF SUCH DAMAGE.
  33. */
  34. #include <config.h>
  35. #ifndef _GNU_SOURCE
  36. #define _GNU_SOURCE 1
  37. #endif
  38. #include <pthread.h>
  39. #include <assert.h>
  40. #include <pwd.h>
  41. #include <grp.h>
  42. #include <sys/types.h>
  43. #include <sys/poll.h>
  44. #include <sys/uio.h>
  45. #include <sys/mman.h>
  46. #include <sys/socket.h>
  47. #include <sys/un.h>
  48. #include <sys/time.h>
  49. #include <sys/resource.h>
  50. #include <sys/wait.h>
  51. #include <netinet/in.h>
  52. #include <arpa/inet.h>
  53. #include <unistd.h>
  54. #include <fcntl.h>
  55. #include <stdlib.h>
  56. #include <stdio.h>
  57. #include <errno.h>
  58. #include <signal.h>
  59. #include <sched.h>
  60. #include <time.h>
  61. #if defined(HAVE_GETPEERUCRED)
  62. #include <ucred.h>
  63. #endif
  64. #include <sys/shm.h>
  65. #include <sys/sem.h>
  66. #include <corosync/corotypes.h>
  67. #include <corosync/list.h>
  68. #include <corosync/coroipc_types.h>
  69. #include <corosync/coroipcs.h>
  70. #include <corosync/coroipc_ipc.h>
  71. #ifndef MSG_NOSIGNAL
  72. #define MSG_NOSIGNAL 0
  73. #endif
  74. #define SERVER_BACKLOG 5
  75. #define MSG_SEND_LOCKED 0
  76. #define MSG_SEND_UNLOCKED 1
  77. static struct coroipcs_init_state *api;
  78. DECLARE_LIST_INIT (conn_info_list_head);
  79. struct outq_item {
  80. void *msg;
  81. size_t mlen;
  82. struct list_head list;
  83. };
  84. struct zcb_mapped {
  85. struct list_head list;
  86. void *addr;
  87. size_t size;
  88. };
  89. #if defined(_SEM_SEMUN_UNDEFINED)
  90. union semun {
  91. int val;
  92. struct semid_ds *buf;
  93. unsigned short int *array;
  94. struct seminfo *__buf;
  95. };
  96. #endif
  97. enum conn_state {
  98. CONN_STATE_THREAD_INACTIVE = 0,
  99. CONN_STATE_THREAD_ACTIVE = 1,
  100. CONN_STATE_THREAD_REQUEST_EXIT = 2,
  101. CONN_STATE_THREAD_DESTROYED = 3,
  102. CONN_STATE_LIB_EXIT_CALLED = 4,
  103. CONN_STATE_DISCONNECT_INACTIVE = 5
  104. };
  105. struct conn_info {
  106. int fd;
  107. pthread_t thread;
  108. pthread_attr_t thread_attr;
  109. unsigned int service;
  110. enum conn_state state;
  111. int notify_flow_control_enabled;
  112. int refcount;
  113. key_t shmkey;
  114. key_t semkey;
  115. int semid;
  116. unsigned int pending_semops;
  117. pthread_mutex_t mutex;
  118. struct control_buffer *control_buffer;
  119. char *request_buffer;
  120. char *response_buffer;
  121. char *dispatch_buffer;
  122. size_t control_size;
  123. size_t request_size;
  124. size_t response_size;
  125. size_t dispatch_size;
  126. struct list_head outq_head;
  127. void *private_data;
  128. struct list_head list;
  129. char setup_msg[sizeof (mar_req_setup_t)];
  130. unsigned int setup_bytes_read;
  131. struct list_head zcb_mapped_list_head;
  132. char *sending_allowed_private_data[64];
  133. };
  134. static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info);
  135. static void outq_flush (struct conn_info *conn_info);
  136. static int priv_change (struct conn_info *conn_info);
  137. static void ipc_disconnect (struct conn_info *conn_info);
  138. static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
  139. int locked);
  140. static int
  141. memory_map (
  142. const char *path,
  143. size_t bytes,
  144. void **buf)
  145. {
  146. int fd;
  147. void *addr_orig;
  148. void *addr;
  149. int res;
  150. fd = open (path, O_RDWR, 0600);
  151. unlink (path);
  152. res = ftruncate (fd, bytes);
  153. addr_orig = mmap (NULL, bytes, PROT_NONE,
  154. MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
  155. if (addr_orig == MAP_FAILED) {
  156. return (-1);
  157. }
  158. addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
  159. MAP_FIXED | MAP_SHARED, fd, 0);
  160. if (addr != addr_orig) {
  161. return (-1);
  162. }
  163. res = close (fd);
  164. if (res) {
  165. return (-1);
  166. }
  167. *buf = addr_orig;
  168. return (0);
  169. }
  170. static int
  171. circular_memory_map (
  172. const char *path,
  173. size_t bytes,
  174. void **buf)
  175. {
  176. int fd;
  177. void *addr_orig;
  178. void *addr;
  179. int res;
  180. fd = open (path, O_RDWR, 0600);
  181. unlink (path);
  182. res = ftruncate (fd, bytes);
  183. addr_orig = mmap (NULL, bytes << 1, PROT_NONE,
  184. MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
  185. if (addr_orig == MAP_FAILED) {
  186. return (-1);
  187. }
  188. addr = mmap (addr_orig, bytes, PROT_READ | PROT_WRITE,
  189. MAP_FIXED | MAP_SHARED, fd, 0);
  190. if (addr != addr_orig) {
  191. return (-1);
  192. }
  193. addr = mmap (((char *)addr_orig) + bytes,
  194. bytes, PROT_READ | PROT_WRITE,
  195. MAP_FIXED | MAP_SHARED, fd, 0);
  196. res = close (fd);
  197. if (res) {
  198. return (-1);
  199. }
  200. *buf = addr_orig;
  201. return (0);
  202. }
  203. static inline int
  204. circular_memory_unmap (void *buf, size_t bytes)
  205. {
  206. int res;
  207. res = munmap (buf, bytes << 1);
  208. return (res);
  209. }
  210. static inline int zcb_free (struct zcb_mapped *zcb_mapped)
  211. {
  212. unsigned int res;
  213. res = munmap (zcb_mapped->addr, zcb_mapped->size);
  214. list_del (&zcb_mapped->list);
  215. free (zcb_mapped);
  216. return (res);
  217. }
  218. static inline int zcb_by_addr_free (struct conn_info *conn_info, void *addr)
  219. {
  220. struct list_head *list;
  221. struct zcb_mapped *zcb_mapped;
  222. unsigned int res = 0;
  223. for (list = conn_info->zcb_mapped_list_head.next;
  224. list != &conn_info->zcb_mapped_list_head; list = list->next) {
  225. zcb_mapped = list_entry (list, struct zcb_mapped, list);
  226. if (zcb_mapped->addr == addr) {
  227. res = zcb_free (zcb_mapped);
  228. break;
  229. }
  230. }
  231. return (res);
  232. }
  233. static inline int zcb_all_free (
  234. struct conn_info *conn_info)
  235. {
  236. struct list_head *list;
  237. struct zcb_mapped *zcb_mapped;
  238. for (list = conn_info->zcb_mapped_list_head.next;
  239. list != &conn_info->zcb_mapped_list_head;) {
  240. zcb_mapped = list_entry (list, struct zcb_mapped, list);
  241. list = list->next;
  242. zcb_free (zcb_mapped);
  243. }
  244. return (0);
  245. }
  246. static inline int zcb_alloc (
  247. struct conn_info *conn_info,
  248. const char *path_to_file,
  249. size_t size,
  250. void **addr)
  251. {
  252. struct zcb_mapped *zcb_mapped;
  253. unsigned int res;
  254. zcb_mapped = malloc (sizeof (struct zcb_mapped));
  255. if (zcb_mapped == NULL) {
  256. return (-1);
  257. }
  258. res = memory_map (
  259. path_to_file,
  260. size,
  261. addr);
  262. if (res == -1) {
  263. return (-1);
  264. }
  265. list_init (&zcb_mapped->list);
  266. zcb_mapped->addr = *addr;
  267. zcb_mapped->size = size;
  268. list_add_tail (&zcb_mapped->list, &conn_info->zcb_mapped_list_head);
  269. return (0);
  270. }
  271. static int ipc_thread_active (void *conn)
  272. {
  273. struct conn_info *conn_info = (struct conn_info *)conn;
  274. int retval = 0;
  275. pthread_mutex_lock (&conn_info->mutex);
  276. if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
  277. retval = 1;
  278. }
  279. pthread_mutex_unlock (&conn_info->mutex);
  280. return (retval);
  281. }
  282. static int ipc_thread_exiting (void *conn)
  283. {
  284. struct conn_info *conn_info = (struct conn_info *)conn;
  285. int retval = 1;
  286. pthread_mutex_lock (&conn_info->mutex);
  287. if (conn_info->state == CONN_STATE_THREAD_INACTIVE) {
  288. retval = 0;
  289. } else
  290. if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
  291. retval = 0;
  292. }
  293. pthread_mutex_unlock (&conn_info->mutex);
  294. return (retval);
  295. }
  296. /*
  297. * returns 0 if should be called again, -1 if finished
  298. */
  299. static inline int conn_info_destroy (struct conn_info *conn_info)
  300. {
  301. unsigned int res;
  302. void *retval;
  303. list_del (&conn_info->list);
  304. list_init (&conn_info->list);
  305. if (conn_info->state == CONN_STATE_THREAD_REQUEST_EXIT) {
  306. res = pthread_join (conn_info->thread, &retval);
  307. conn_info->state = CONN_STATE_THREAD_DESTROYED;
  308. return (0);
  309. }
  310. if (conn_info->state == CONN_STATE_THREAD_INACTIVE ||
  311. conn_info->state == CONN_STATE_DISCONNECT_INACTIVE) {
  312. list_del (&conn_info->list);
  313. close (conn_info->fd);
  314. api->free (conn_info);
  315. return (-1);
  316. }
  317. if (conn_info->state == CONN_STATE_THREAD_ACTIVE) {
  318. pthread_kill (conn_info->thread, SIGUSR1);
  319. return (0);
  320. }
  321. api->serialize_lock ();
  322. /*
  323. * Retry library exit function if busy
  324. */
  325. if (conn_info->state == CONN_STATE_THREAD_DESTROYED) {
  326. res = api->exit_fn_get (conn_info->service) (conn_info);
  327. if (res == -1) {
  328. api->serialize_unlock ();
  329. return (0);
  330. } else {
  331. conn_info->state = CONN_STATE_LIB_EXIT_CALLED;
  332. }
  333. }
  334. pthread_mutex_lock (&conn_info->mutex);
  335. if (conn_info->refcount > 0) {
  336. pthread_mutex_unlock (&conn_info->mutex);
  337. api->serialize_unlock ();
  338. return (0);
  339. }
  340. list_del (&conn_info->list);
  341. pthread_mutex_unlock (&conn_info->mutex);
  342. /*
  343. * Destroy shared memory segment and semaphore
  344. */
  345. res = munmap (conn_info->control_buffer, conn_info->control_size);
  346. res = munmap (conn_info->request_buffer, conn_info->request_size);
  347. res = munmap (conn_info->response_buffer, conn_info->response_size);
  348. semctl (conn_info->semid, 0, IPC_RMID);
  349. /*
  350. * Free allocated data needed to retry exiting library IPC connection
  351. */
  352. if (conn_info->private_data) {
  353. api->free (conn_info->private_data);
  354. }
  355. close (conn_info->fd);
  356. res = circular_memory_unmap (conn_info->dispatch_buffer, conn_info->dispatch_size);
  357. zcb_all_free (conn_info);
  358. api->free (conn_info);
  359. api->serialize_unlock ();
  360. return (-1);
  361. }
  362. struct res_overlay {
  363. coroipc_response_header_t header __attribute__((aligned(8)));
  364. char buf[4096];
  365. };
  366. union u {
  367. uint64_t server_addr;
  368. void *server_ptr;
  369. };
  370. static uint64_t void2serveraddr (void *server_ptr)
  371. {
  372. union u u;
  373. u.server_ptr = server_ptr;
  374. return (u.server_addr);
  375. }
  376. static void *serveraddr2void (uint64_t server_addr)
  377. {
  378. union u u;
  379. u.server_addr = server_addr;
  380. return (u.server_ptr);
  381. };
  382. static inline void zerocopy_operations_process (
  383. struct conn_info *conn_info,
  384. coroipc_request_header_t **header_out,
  385. unsigned int *new_message)
  386. {
  387. coroipc_request_header_t *header;
  388. header = (coroipc_request_header_t *)conn_info->request_buffer;
  389. if (header->id == ZC_ALLOC_HEADER) {
  390. mar_req_coroipcc_zc_alloc_t *hdr = (mar_req_coroipcc_zc_alloc_t *)header;
  391. coroipc_response_header_t res_header;
  392. void *addr = NULL;
  393. struct coroipcs_zc_header *zc_header;
  394. unsigned int res;
  395. res = zcb_alloc (conn_info, hdr->path_to_file, hdr->map_size,
  396. &addr);
  397. zc_header = (struct coroipcs_zc_header *)addr;
  398. zc_header->server_address = void2serveraddr(addr);
  399. res_header.size = sizeof (coroipc_response_header_t);
  400. res_header.id = 0;
  401. coroipcs_response_send (
  402. conn_info, &res_header,
  403. res_header.size);
  404. *new_message = 0;
  405. return;
  406. } else
  407. if (header->id == ZC_FREE_HEADER) {
  408. mar_req_coroipcc_zc_free_t *hdr = (mar_req_coroipcc_zc_free_t *)header;
  409. coroipc_response_header_t res_header;
  410. void *addr = NULL;
  411. addr = serveraddr2void (hdr->server_address);
  412. zcb_by_addr_free (conn_info, addr);
  413. res_header.size = sizeof (coroipc_response_header_t);
  414. res_header.id = 0;
  415. coroipcs_response_send (
  416. conn_info, &res_header,
  417. res_header.size);
  418. *new_message = 0;
  419. return;
  420. } else
  421. if (header->id == ZC_EXECUTE_HEADER) {
  422. mar_req_coroipcc_zc_execute_t *hdr = (mar_req_coroipcc_zc_execute_t *)header;
  423. header = (coroipc_request_header_t *)(((char *)serveraddr2void(hdr->server_address) + sizeof (struct coroipcs_zc_header)));
  424. }
  425. *header_out = header;
  426. *new_message = 1;
  427. }
  428. static void *pthread_ipc_consumer (void *conn)
  429. {
  430. struct conn_info *conn_info = (struct conn_info *)conn;
  431. struct sembuf sop;
  432. int res;
  433. coroipc_request_header_t *header;
  434. struct res_overlay res_overlay;
  435. int send_ok;
  436. unsigned int new_message;
  437. if (api->sched_priority != 0) {
  438. struct sched_param sched_param;
  439. sched_param.sched_priority = api->sched_priority;
  440. res = pthread_setschedparam (conn_info->thread, SCHED_RR, &sched_param);
  441. }
  442. for (;;) {
  443. sop.sem_num = 0;
  444. sop.sem_op = -1;
  445. sop.sem_flg = 0;
  446. retry_semop:
  447. if (ipc_thread_active (conn_info) == 0) {
  448. coroipcs_refcount_dec (conn_info);
  449. pthread_exit (0);
  450. }
  451. res = semop (conn_info->semid, &sop, 1);
  452. if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
  453. goto retry_semop;
  454. } else
  455. if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
  456. coroipcs_refcount_dec (conn_info);
  457. pthread_exit (0);
  458. }
  459. zerocopy_operations_process (conn_info, &header, &new_message);
  460. /*
  461. * There is no new message to process, continue for loop
  462. */
  463. if (new_message == 0) {
  464. continue;
  465. }
  466. coroipcs_refcount_inc (conn);
  467. send_ok = api->sending_allowed (conn_info->service,
  468. header->id,
  469. header,
  470. conn_info->sending_allowed_private_data);
  471. if (send_ok) {
  472. api->serialize_lock();
  473. api->handler_fn_get (conn_info->service, header->id) (conn_info, header);
  474. api->serialize_unlock();
  475. } else {
  476. /*
  477. * Overload, tell library to retry
  478. */
  479. res_overlay.header.size =
  480. api->response_size_get (conn_info->service, header->id);
  481. res_overlay.header.id =
  482. api->response_id_get (conn_info->service, header->id);
  483. res_overlay.header.error = CS_ERR_TRY_AGAIN;
  484. coroipcs_response_send (conn_info, &res_overlay,
  485. res_overlay.header.size);
  486. }
  487. api->sending_allowed_release (conn_info->sending_allowed_private_data);
  488. coroipcs_refcount_dec (conn);
  489. }
  490. pthread_exit (0);
  491. }
  492. static int
  493. req_setup_send (
  494. struct conn_info *conn_info,
  495. int error)
  496. {
  497. mar_res_setup_t res_setup;
  498. unsigned int res;
  499. res_setup.error = error;
  500. retry_send:
  501. res = send (conn_info->fd, &res_setup, sizeof (mar_res_setup_t), MSG_WAITALL);
  502. if (res == -1 && errno == EINTR) {
  503. goto retry_send;
  504. } else
  505. if (res == -1 && errno == EAGAIN) {
  506. goto retry_send;
  507. }
  508. return (0);
  509. }
  510. static int
  511. req_setup_recv (
  512. struct conn_info *conn_info)
  513. {
  514. int res;
  515. struct msghdr msg_recv;
  516. struct iovec iov_recv;
  517. #ifdef COROSYNC_LINUX
  518. struct cmsghdr *cmsg;
  519. char cmsg_cred[CMSG_SPACE (sizeof (struct ucred))];
  520. struct ucred *cred;
  521. int off = 0;
  522. int on = 1;
  523. #endif
  524. msg_recv.msg_iov = &iov_recv;
  525. msg_recv.msg_iovlen = 1;
  526. msg_recv.msg_name = 0;
  527. msg_recv.msg_namelen = 0;
  528. #ifdef COROSYNC_LINUX
  529. msg_recv.msg_control = (void *)cmsg_cred;
  530. msg_recv.msg_controllen = sizeof (cmsg_cred);
  531. #endif
  532. #ifdef PORTABILITY_WORK_TODO
  533. #ifdef COROSYNC_SOLARIS
  534. msg_recv.msg_flags = 0;
  535. uid_t euid;
  536. gid_t egid;
  537. euid = -1;
  538. egid = -1;
  539. if (getpeereid(conn_info->fd, &euid, &egid) != -1 &&
  540. (api->security_valid (euid, egid)) {
  541. if (conn_info->state == CONN_IO_STATE_INITIALIZING) {
  542. api->log_printf ("Invalid security authentication\n");
  543. return (-1);
  544. }
  545. }
  546. msg_recv.msg_accrights = 0;
  547. msg_recv.msg_accrightslen = 0;
  548. #else /* COROSYNC_SOLARIS */
  549. #ifdef HAVE_GETPEERUCRED
  550. ucred_t *uc;
  551. uid_t euid = -1;
  552. gid_t egid = -1;
  553. if (getpeerucred (conn_info->fd, &uc) == 0) {
  554. euid = ucred_geteuid (uc);
  555. egid = ucred_getegid (uc);
  556. if (api->security_valid (euid, egid) {
  557. conn_info->authenticated = 1;
  558. }
  559. ucred_free(uc);
  560. }
  561. if (conn_info->authenticated == 0) {
  562. api->log_printf ("Invalid security authentication\n");
  563. }
  564. #else /* HAVE_GETPEERUCRED */
  565. api->log_printf (LOGSYS_LEVEL_SECURITY, "Connection not authenticated "
  566. "because platform does not support "
  567. "authentication with sockets, continuing "
  568. "with a fake authentication\n");
  569. #endif /* HAVE_GETPEERUCRED */
  570. #endif /* COROSYNC_SOLARIS */
  571. #endif
  572. iov_recv.iov_base = &conn_info->setup_msg[conn_info->setup_bytes_read];
  573. iov_recv.iov_len = sizeof (mar_req_setup_t) - conn_info->setup_bytes_read;
  574. #ifdef COROSYNC_LINUX
  575. setsockopt(conn_info->fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on));
  576. #endif
  577. retry_recv:
  578. res = recvmsg (conn_info->fd, &msg_recv, MSG_NOSIGNAL);
  579. if (res == -1 && errno == EINTR) {
  580. goto retry_recv;
  581. } else
  582. if (res == -1 && errno != EAGAIN) {
  583. return (0);
  584. } else
  585. if (res == 0) {
  586. #if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
  587. /* On many OS poll never return POLLHUP or POLLERR.
  588. * EOF is detected when recvmsg return 0.
  589. */
  590. ipc_disconnect (conn_info);
  591. #endif
  592. return (-1);
  593. }
  594. conn_info->setup_bytes_read += res;
  595. #ifdef COROSYNC_LINUX
  596. cmsg = CMSG_FIRSTHDR (&msg_recv);
  597. assert (cmsg);
  598. cred = (struct ucred *)CMSG_DATA (cmsg);
  599. if (cred) {
  600. if (api->security_valid (cred->uid, cred->gid)) {
  601. } else {
  602. ipc_disconnect (conn_info);
  603. api->log_printf ("Invalid security authentication\n");
  604. return (-1);
  605. }
  606. }
  607. #endif
  608. if (conn_info->setup_bytes_read == sizeof (mar_req_setup_t)) {
  609. #ifdef COROSYNC_LINUX
  610. setsockopt(conn_info->fd, SOL_SOCKET, SO_PASSCRED,
  611. &off, sizeof (off));
  612. #endif
  613. return (1);
  614. }
  615. return (0);
  616. }
  617. static void ipc_disconnect (struct conn_info *conn_info)
  618. {
  619. if (conn_info->state == CONN_STATE_THREAD_INACTIVE) {
  620. conn_info->state = CONN_STATE_DISCONNECT_INACTIVE;
  621. return;
  622. }
  623. if (conn_info->state != CONN_STATE_THREAD_ACTIVE) {
  624. return;
  625. }
  626. pthread_mutex_lock (&conn_info->mutex);
  627. conn_info->state = CONN_STATE_THREAD_REQUEST_EXIT;
  628. pthread_mutex_unlock (&conn_info->mutex);
  629. pthread_kill (conn_info->thread, SIGUSR1);
  630. }
  631. static int conn_info_create (int fd)
  632. {
  633. struct conn_info *conn_info;
  634. conn_info = api->malloc (sizeof (struct conn_info));
  635. if (conn_info == NULL) {
  636. return (-1);
  637. }
  638. memset (conn_info, 0, sizeof (struct conn_info));
  639. conn_info->fd = fd;
  640. conn_info->service = SOCKET_SERVICE_INIT;
  641. conn_info->state = CONN_STATE_THREAD_INACTIVE;
  642. list_init (&conn_info->outq_head);
  643. list_init (&conn_info->list);
  644. list_init (&conn_info->zcb_mapped_list_head);
  645. list_add (&conn_info->list, &conn_info_list_head);
  646. api->poll_dispatch_add (fd, conn_info);
  647. return (0);
  648. }
  649. #if defined(COROSYNC_LINUX) || defined(COROSYNC_SOLARIS)
  650. /* SUN_LEN is broken for abstract namespace
  651. */
  652. #define COROSYNC_SUN_LEN(a) sizeof(*(a))
  653. #else
  654. #define COROSYNC_SUN_LEN(a) SUN_LEN(a)
  655. #endif
  656. /*
  657. * Exported functions
  658. */
  659. extern void coroipcs_ipc_init (
  660. struct coroipcs_init_state *init_state)
  661. {
  662. int server_fd;
  663. struct sockaddr_un un_addr;
  664. int res;
  665. api = init_state;
  666. /*
  667. * Create socket for IPC clients, name socket, listen for connections
  668. */
  669. server_fd = socket (PF_UNIX, SOCK_STREAM, 0);
  670. if (server_fd == -1) {
  671. api->log_printf ("Cannot create client connections socket.\n");
  672. api->fatal_error ("Can't create library listen socket");
  673. };
  674. res = fcntl (server_fd, F_SETFL, O_NONBLOCK);
  675. if (res == -1) {
  676. api->log_printf ("Could not set non-blocking operation on server socket: %s\n", strerror (errno));
  677. api->fatal_error ("Could not set non-blocking operation on server socket");
  678. }
  679. memset (&un_addr, 0, sizeof (struct sockaddr_un));
  680. un_addr.sun_family = AF_UNIX;
  681. #if defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
  682. un_addr.sun_len = sizeof(struct sockaddr_un);
  683. #endif
  684. #if defined(COROSYNC_LINUX)
  685. sprintf (un_addr.sun_path + 1, "%s", api->socket_name);
  686. #else
  687. sprintf (un_addr.sun_path, "%s/%s", SOCKETDIR, api->socket_name);
  688. unlink (un_addr.sun_path);
  689. #endif
  690. res = bind (server_fd, (struct sockaddr *)&un_addr, COROSYNC_SUN_LEN(&un_addr));
  691. if (res) {
  692. api->log_printf ("Could not bind AF_UNIX: %s.\n", strerror (errno));
  693. api->fatal_error ("Could not bind to AF_UNIX socket\n");
  694. }
  695. listen (server_fd, SERVER_BACKLOG);
  696. /*
  697. * Setup connection dispatch routine
  698. */
  699. api->poll_accept_add (server_fd);
  700. }
  701. void coroipcs_ipc_exit (void)
  702. {
  703. struct list_head *list;
  704. struct conn_info *conn_info;
  705. unsigned int res;
  706. for (list = conn_info_list_head.next; list != &conn_info_list_head;
  707. list = list->next) {
  708. conn_info = list_entry (list, struct conn_info, list);
  709. /*
  710. * Unmap memory segments
  711. */
  712. res = munmap (conn_info->control_buffer,
  713. conn_info->control_size);
  714. res = munmap (conn_info->request_buffer,
  715. conn_info->request_size);
  716. res = munmap (conn_info->response_buffer,
  717. conn_info->response_size);
  718. res = circular_memory_unmap (conn_info->dispatch_buffer,
  719. conn_info->dispatch_size);
  720. semctl (conn_info->semid, 0, IPC_RMID);
  721. pthread_kill (conn_info->thread, SIGUSR1);
  722. }
  723. }
  724. /*
  725. * Get the conn info private data
  726. */
  727. void *coroipcs_private_data_get (void *conn)
  728. {
  729. struct conn_info *conn_info = (struct conn_info *)conn;
  730. return (conn_info->private_data);
  731. }
  732. int coroipcs_response_send (void *conn, const void *msg, size_t mlen)
  733. {
  734. struct conn_info *conn_info = (struct conn_info *)conn;
  735. struct sembuf sop;
  736. int res;
  737. memcpy (conn_info->response_buffer, msg, mlen);
  738. sop.sem_num = 1;
  739. sop.sem_op = 1;
  740. sop.sem_flg = 0;
  741. retry_semop:
  742. res = semop (conn_info->semid, &sop, 1);
  743. if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
  744. goto retry_semop;
  745. } else
  746. if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
  747. return (0);
  748. }
  749. return (0);
  750. }
  751. int coroipcs_response_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
  752. {
  753. struct conn_info *conn_info = (struct conn_info *)conn;
  754. struct sembuf sop;
  755. int res;
  756. int write_idx = 0;
  757. int i;
  758. for (i = 0; i < iov_len; i++) {
  759. memcpy (&conn_info->response_buffer[write_idx],
  760. iov[i].iov_base, iov[i].iov_len);
  761. write_idx += iov[i].iov_len;
  762. }
  763. sop.sem_num = 1;
  764. sop.sem_op = 1;
  765. sop.sem_flg = 0;
  766. retry_semop:
  767. res = semop (conn_info->semid, &sop, 1);
  768. if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
  769. goto retry_semop;
  770. } else
  771. if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
  772. return (0);
  773. }
  774. return (0);
  775. }
  776. static int shared_mem_dispatch_bytes_left (const struct conn_info *conn_info)
  777. {
  778. unsigned int n_read;
  779. unsigned int n_write;
  780. unsigned int bytes_left;
  781. n_read = conn_info->control_buffer->read;
  782. n_write = conn_info->control_buffer->write;
  783. if (n_read <= n_write) {
  784. bytes_left = conn_info->dispatch_size - n_write + n_read;
  785. } else {
  786. bytes_left = n_read - n_write;
  787. }
  788. return (bytes_left);
  789. }
  790. static void memcpy_dwrap (struct conn_info *conn_info, void *msg, unsigned int len)
  791. {
  792. unsigned int write_idx;
  793. write_idx = conn_info->control_buffer->write;
  794. memcpy (&conn_info->dispatch_buffer[write_idx], msg, len);
  795. conn_info->control_buffer->write = (write_idx + len) % conn_info->dispatch_size;
  796. }
  797. static void msg_send (void *conn, const struct iovec *iov, unsigned int iov_len,
  798. int locked)
  799. {
  800. struct conn_info *conn_info = (struct conn_info *)conn;
  801. struct sembuf sop;
  802. int res;
  803. int i;
  804. char buf;
  805. for (i = 0; i < iov_len; i++) {
  806. memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
  807. }
  808. buf = !list_empty (&conn_info->outq_head);
  809. res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
  810. if (res == -1 && errno == EAGAIN) {
  811. if (locked == 0) {
  812. pthread_mutex_lock (&conn_info->mutex);
  813. }
  814. conn_info->pending_semops += 1;
  815. if (locked == 0) {
  816. pthread_mutex_unlock (&conn_info->mutex);
  817. }
  818. api->poll_dispatch_modify (conn_info->fd,
  819. POLLIN|POLLOUT|POLLNVAL);
  820. } else
  821. if (res == -1) {
  822. ipc_disconnect (conn_info);
  823. }
  824. sop.sem_num = 2;
  825. sop.sem_op = 1;
  826. sop.sem_flg = 0;
  827. retry_semop:
  828. res = semop (conn_info->semid, &sop, 1);
  829. if ((res == -1) && (errno == EINTR || errno == EAGAIN)) {
  830. goto retry_semop;
  831. } else
  832. if ((res == -1) && (errno == EINVAL || errno == EIDRM)) {
  833. return;
  834. }
  835. }
  836. static void outq_flush (struct conn_info *conn_info) {
  837. struct list_head *list, *list_next;
  838. struct outq_item *outq_item;
  839. unsigned int bytes_left;
  840. struct iovec iov;
  841. char buf;
  842. int res;
  843. pthread_mutex_lock (&conn_info->mutex);
  844. if (list_empty (&conn_info->outq_head)) {
  845. buf = 3;
  846. res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
  847. pthread_mutex_unlock (&conn_info->mutex);
  848. return;
  849. }
  850. for (list = conn_info->outq_head.next;
  851. list != &conn_info->outq_head; list = list_next) {
  852. list_next = list->next;
  853. outq_item = list_entry (list, struct outq_item, list);
  854. bytes_left = shared_mem_dispatch_bytes_left (conn_info);
  855. if (bytes_left > outq_item->mlen) {
  856. iov.iov_base = outq_item->msg;
  857. iov.iov_len = outq_item->mlen;
  858. msg_send (conn_info, &iov, 1, MSG_SEND_UNLOCKED);
  859. list_del (list);
  860. api->free (iov.iov_base);
  861. api->free (outq_item);
  862. } else {
  863. break;
  864. }
  865. }
  866. pthread_mutex_unlock (&conn_info->mutex);
  867. }
  868. static int priv_change (struct conn_info *conn_info)
  869. {
  870. mar_req_priv_change req_priv_change;
  871. unsigned int res;
  872. union semun semun;
  873. struct semid_ds ipc_set;
  874. int i;
  875. retry_recv:
  876. res = recv (conn_info->fd, &req_priv_change,
  877. sizeof (mar_req_priv_change),
  878. MSG_NOSIGNAL);
  879. if (res == -1 && errno == EINTR) {
  880. goto retry_recv;
  881. }
  882. if (res == -1 && errno == EAGAIN) {
  883. goto retry_recv;
  884. }
  885. if (res == -1 && errno != EAGAIN) {
  886. return (-1);
  887. }
  888. #if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
  889. /* Error on socket, EOF is detected when recv return 0
  890. */
  891. if (res == 0) {
  892. return (-1);
  893. }
  894. #endif
  895. ipc_set.sem_perm.uid = req_priv_change.euid;
  896. ipc_set.sem_perm.gid = req_priv_change.egid;
  897. ipc_set.sem_perm.mode = 0600;
  898. semun.buf = &ipc_set;
  899. for (i = 0; i < 3; i++) {
  900. res = semctl (conn_info->semid, 0, IPC_SET, semun);
  901. if (res == -1) {
  902. return (-1);
  903. }
  904. }
  905. return (0);
  906. }
  907. static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int iov_len)
  908. {
  909. struct conn_info *conn_info = (struct conn_info *)conn;
  910. unsigned int bytes_left;
  911. unsigned int bytes_msg = 0;
  912. int i;
  913. struct outq_item *outq_item;
  914. char *write_buf = 0;
  915. /*
  916. * Exit transmission if the connection is dead
  917. */
  918. if (ipc_thread_active (conn) == 0) {
  919. return;
  920. }
  921. bytes_left = shared_mem_dispatch_bytes_left (conn_info);
  922. for (i = 0; i < iov_len; i++) {
  923. bytes_msg += iov[i].iov_len;
  924. }
  925. if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
  926. outq_item = api->malloc (sizeof (struct outq_item));
  927. if (outq_item == NULL) {
  928. ipc_disconnect (conn);
  929. return;
  930. }
  931. outq_item->msg = api->malloc (bytes_msg);
  932. if (outq_item->msg == 0) {
  933. api->free (outq_item);
  934. ipc_disconnect (conn);
  935. return;
  936. }
  937. write_buf = outq_item->msg;
  938. for (i = 0; i < iov_len; i++) {
  939. memcpy (write_buf, iov[i].iov_base, iov[i].iov_len);
  940. write_buf += iov[i].iov_len;
  941. }
  942. outq_item->mlen = bytes_msg;
  943. list_init (&outq_item->list);
  944. pthread_mutex_lock (&conn_info->mutex);
  945. if (list_empty (&conn_info->outq_head)) {
  946. conn_info->notify_flow_control_enabled = 1;
  947. api->poll_dispatch_modify (conn_info->fd,
  948. POLLIN|POLLOUT|POLLNVAL);
  949. }
  950. list_add_tail (&outq_item->list, &conn_info->outq_head);
  951. pthread_mutex_unlock (&conn_info->mutex);
  952. return;
  953. }
  954. msg_send (conn, iov, iov_len, MSG_SEND_LOCKED);
  955. }
  956. void coroipcs_refcount_inc (void *conn)
  957. {
  958. struct conn_info *conn_info = (struct conn_info *)conn;
  959. pthread_mutex_lock (&conn_info->mutex);
  960. conn_info->refcount++;
  961. pthread_mutex_unlock (&conn_info->mutex);
  962. }
  963. void coroipcs_refcount_dec (void *conn)
  964. {
  965. struct conn_info *conn_info = (struct conn_info *)conn;
  966. pthread_mutex_lock (&conn_info->mutex);
  967. conn_info->refcount--;
  968. pthread_mutex_unlock (&conn_info->mutex);
  969. }
  970. int coroipcs_dispatch_send (void *conn, const void *msg, size_t mlen)
  971. {
  972. struct iovec iov;
  973. iov.iov_base = (void *)msg;
  974. iov.iov_len = mlen;
  975. msg_send_or_queue (conn, &iov, 1);
  976. return (0);
  977. }
  978. int coroipcs_dispatch_iov_send (void *conn, const struct iovec *iov, unsigned int iov_len)
  979. {
  980. msg_send_or_queue (conn, iov, iov_len);
  981. return (0);
  982. }
  983. int coroipcs_handler_accept (
  984. int fd,
  985. int revent,
  986. void *data)
  987. {
  988. socklen_t addrlen;
  989. struct sockaddr_un un_addr;
  990. int new_fd;
  991. #ifdef COROSYNC_LINUX
  992. int on = 1;
  993. #endif
  994. int res;
  995. addrlen = sizeof (struct sockaddr_un);
  996. retry_accept:
  997. new_fd = accept (fd, (struct sockaddr *)&un_addr, &addrlen);
  998. if (new_fd == -1 && errno == EINTR) {
  999. goto retry_accept;
  1000. }
  1001. if (new_fd == -1) {
  1002. api->log_printf ("Could not accept Library connection: %s\n", strerror (errno));
  1003. return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
  1004. }
  1005. res = fcntl (new_fd, F_SETFL, O_NONBLOCK);
  1006. if (res == -1) {
  1007. api->log_printf ("Could not set non-blocking operation on library connection: %s\n", strerror (errno));
  1008. close (new_fd);
  1009. return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
  1010. }
  1011. /*
  1012. * Valid accept
  1013. */
  1014. /*
  1015. * Request credentials of sender provided by kernel
  1016. */
  1017. #ifdef COROSYNC_LINUX
  1018. setsockopt(new_fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on));
  1019. #endif
  1020. res = conn_info_create (new_fd);
  1021. if (res != 0) {
  1022. close (new_fd);
  1023. }
  1024. return (0);
  1025. }
  1026. int coroipcs_handler_dispatch (
  1027. int fd,
  1028. int revent,
  1029. void *context)
  1030. {
  1031. mar_req_setup_t *req_setup;
  1032. struct conn_info *conn_info = (struct conn_info *)context;
  1033. int res;
  1034. char buf;
  1035. if (ipc_thread_exiting (conn_info)) {
  1036. return conn_info_destroy (conn_info);
  1037. }
  1038. /*
  1039. * If an error occurs, request exit
  1040. */
  1041. if (revent & (POLLERR|POLLHUP)) {
  1042. ipc_disconnect (conn_info);
  1043. return (0);
  1044. }
  1045. /*
  1046. * Read the header and process it
  1047. */
  1048. if (conn_info->service == SOCKET_SERVICE_INIT && (revent & POLLIN)) {
  1049. /*
  1050. * Receive in a nonblocking fashion the request
  1051. * IF security invalid, send TRY_AGAIN, otherwise
  1052. * send OK
  1053. */
  1054. res = req_setup_recv (conn_info);
  1055. if (res == -1) {
  1056. req_setup_send (conn_info, CS_ERR_TRY_AGAIN);
  1057. }
  1058. if (res != 1) {
  1059. return (0);
  1060. }
  1061. req_setup_send (conn_info, CS_OK);
  1062. pthread_mutex_init (&conn_info->mutex, NULL);
  1063. req_setup = (mar_req_setup_t *)conn_info->setup_msg;
  1064. /*
  1065. * Is the service registered ?
  1066. */
  1067. if (api->service_available (req_setup->service) == 0) {
  1068. ipc_disconnect (conn_info);
  1069. return (0);
  1070. }
  1071. conn_info->semkey = req_setup->semkey;
  1072. res = memory_map (
  1073. req_setup->control_file,
  1074. req_setup->control_size,
  1075. (void *)&conn_info->control_buffer);
  1076. conn_info->control_size = req_setup->control_size;
  1077. res = memory_map (
  1078. req_setup->request_file,
  1079. req_setup->request_size,
  1080. (void *)&conn_info->request_buffer);
  1081. conn_info->request_size = req_setup->request_size;
  1082. res = memory_map (
  1083. req_setup->response_file,
  1084. req_setup->response_size,
  1085. (void *)&conn_info->response_buffer);
  1086. conn_info->response_size = req_setup->response_size;
  1087. res = circular_memory_map (
  1088. req_setup->dispatch_file,
  1089. req_setup->dispatch_size,
  1090. (void *)&conn_info->dispatch_buffer);
  1091. conn_info->dispatch_size = req_setup->dispatch_size;
  1092. conn_info->service = req_setup->service;
  1093. conn_info->refcount = 0;
  1094. conn_info->notify_flow_control_enabled = 0;
  1095. conn_info->setup_bytes_read = 0;
  1096. conn_info->semid = semget (conn_info->semkey, 3, 0600);
  1097. conn_info->pending_semops = 0;
  1098. /*
  1099. * ipc thread is the only reference at startup
  1100. */
  1101. conn_info->refcount = 1;
  1102. conn_info->state = CONN_STATE_THREAD_ACTIVE;
  1103. conn_info->private_data = api->malloc (api->private_data_size_get (conn_info->service));
  1104. memset (conn_info->private_data, 0,
  1105. api->private_data_size_get (conn_info->service));
  1106. api->init_fn_get (conn_info->service) (conn_info);
  1107. pthread_attr_init (&conn_info->thread_attr);
  1108. /*
  1109. * IA64 needs more stack space then other arches
  1110. */
  1111. #if defined(__ia64__)
  1112. pthread_attr_setstacksize (&conn_info->thread_attr, 400000);
  1113. #else
  1114. pthread_attr_setstacksize (&conn_info->thread_attr, 200000);
  1115. #endif
  1116. pthread_attr_setdetachstate (&conn_info->thread_attr, PTHREAD_CREATE_JOINABLE);
  1117. res = pthread_create (&conn_info->thread,
  1118. &conn_info->thread_attr,
  1119. pthread_ipc_consumer,
  1120. conn_info);
  1121. /*
  1122. * Security check - disallow multiple configurations of
  1123. * the ipc connection
  1124. */
  1125. if (conn_info->service == SOCKET_SERVICE_INIT) {
  1126. conn_info->service = -1;
  1127. }
  1128. } else
  1129. if (revent & POLLIN) {
  1130. coroipcs_refcount_inc (conn_info);
  1131. res = recv (fd, &buf, 1, MSG_NOSIGNAL);
  1132. if (res == 1) {
  1133. switch (buf) {
  1134. case MESSAGE_REQ_OUTQ_FLUSH:
  1135. outq_flush (conn_info);
  1136. break;
  1137. case MESSAGE_REQ_CHANGE_EUID:
  1138. if (priv_change (conn_info) == -1) {
  1139. ipc_disconnect (conn_info);
  1140. }
  1141. break;
  1142. default:
  1143. res = 0;
  1144. break;
  1145. }
  1146. coroipcs_refcount_dec (conn_info);
  1147. }
  1148. #if defined(COROSYNC_SOLARIS) || defined(COROSYNC_BSD) || defined(COROSYNC_DARWIN)
  1149. /* On many OS poll never return POLLHUP or POLLERR.
  1150. * EOF is detected when recvmsg return 0.
  1151. */
  1152. if (res == 0) {
  1153. ipc_disconnect (conn_info);
  1154. return (0);
  1155. }
  1156. #endif
  1157. }
  1158. coroipcs_refcount_inc (conn_info);
  1159. pthread_mutex_lock (&conn_info->mutex);
  1160. if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent & POLLOUT)) {
  1161. buf = !list_empty (&conn_info->outq_head);
  1162. for (; conn_info->pending_semops;) {
  1163. res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
  1164. if (res == 1) {
  1165. conn_info->pending_semops--;
  1166. } else {
  1167. break;
  1168. }
  1169. }
  1170. if (conn_info->notify_flow_control_enabled) {
  1171. buf = 2;
  1172. res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
  1173. if (res == 1) {
  1174. conn_info->notify_flow_control_enabled = 0;
  1175. }
  1176. }
  1177. if (conn_info->notify_flow_control_enabled == 0 &&
  1178. conn_info->pending_semops == 0) {
  1179. api->poll_dispatch_modify (conn_info->fd,
  1180. POLLIN|POLLNVAL);
  1181. }
  1182. }
  1183. pthread_mutex_unlock (&conn_info->mutex);
  1184. coroipcs_refcount_dec (conn_info);
  1185. return (0);
  1186. }