totempg.c 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330
  1. /*
  2. * Copyright (c) 2003-2005 MontaVista Software, Inc.
  3. * Copyright (c) 2005 OSDL.
  4. * Copyright (c) 2006-2009 Red Hat, Inc.
  5. *
  6. * All rights reserved.
  7. *
  8. * Author: Steven Dake (sdake@redhat.com)
  9. * Author: Mark Haverkamp (markh@osdl.org)
  10. *
  11. * This software licensed under BSD license, the text of which follows:
  12. *
  13. * Redistribution and use in source and binary forms, with or without
  14. * modification, are permitted provided that the following conditions are met:
  15. *
  16. * - Redistributions of source code must retain the above copyright notice,
  17. * this list of conditions and the following disclaimer.
  18. * - Redistributions in binary form must reproduce the above copyright notice,
  19. * this list of conditions and the following disclaimer in the documentation
  20. * and/or other materials provided with the distribution.
  21. * - Neither the name of the MontaVista Software, Inc. nor the names of its
  22. * contributors may be used to endorse or promote products derived from this
  23. * software without specific prior written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  26. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  27. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  28. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  29. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  30. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  31. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  32. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  33. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  34. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  35. * THE POSSIBILITY OF SUCH DAMAGE.
  36. */
  37. /*
  38. * FRAGMENTATION AND PACKING ALGORITHM:
  39. *
  40. * Assemble the entire message into one buffer
  41. * if full fragment
  42. * store fragment into lengths list
  43. * for each full fragment
  44. * multicast fragment
  45. * set length and fragment fields of pg mesage
  46. * store remaining multicast into head of fragmentation data and set lens field
  47. *
  48. * If a message exceeds the maximum packet size allowed by the totem
  49. * single ring protocol, the protocol could lose forward progress.
  50. * Statically calculating the allowed data amount doesn't work because
  51. * the amount of data allowed depends on the number of fragments in
  52. * each message. In this implementation, the maximum fragment size
  53. * is dynamically calculated for each fragment added to the message.
  54. * It is possible for a message to be two bytes short of the maximum
  55. * packet size. This occurs when a message or collection of
  56. * messages + the mcast header + the lens are two bytes short of the
  57. * end of the packet. Since another len field consumes two bytes, the
  58. * len field would consume the rest of the packet without room for data.
  59. *
  60. * One optimization would be to forgo the final len field and determine
  61. * it from the size of the udp datagram. Then this condition would no
  62. * longer occur.
  63. */
  64. /*
  65. * ASSEMBLY AND UNPACKING ALGORITHM:
  66. *
  67. * copy incoming packet into assembly data buffer indexed by current
  68. * location of end of fragment
  69. *
  70. * if not fragmented
  71. * deliver all messages in assembly data buffer
  72. * else
  73. * if msg_count > 1 and fragmented
  74. * deliver all messages except last message in assembly data buffer
  75. * copy last fragmented section to start of assembly data buffer
  76. * else
  77. * if msg_count = 1 and fragmented
  78. * do nothing
  79. *
  80. */
  81. #include <config.h>
  82. #include <alloca.h>
  83. #include <netinet/in.h>
  84. #include <sys/uio.h>
  85. #include <stdio.h>
  86. #include <stdlib.h>
  87. #include <string.h>
  88. #include <assert.h>
  89. #include <pthread.h>
  90. #include <errno.h>
  91. #include <limits.h>
  92. #include <corosync/swab.h>
  93. #include <corosync/hdb.h>
  94. #include <corosync/list.h>
  95. #include <corosync/totem/coropoll.h>
  96. #include <corosync/totem/totempg.h>
  97. #define LOGSYS_UTILS_ONLY 1
  98. #include <corosync/engine/logsys.h>
  99. #include "totemmrp.h"
  100. #include "totemsrp.h"
  101. #define min(a,b) ((a) < (b)) ? a : b
  102. struct totempg_mcast_header {
  103. short version;
  104. short type;
  105. };
  106. /*
  107. * totempg_mcast structure
  108. *
  109. * header: Identify the mcast.
  110. * fragmented: Set if this message continues into next message
  111. * continuation: Set if this message is a continuation from last message
  112. * msg_count Indicates how many packed messages are contained
  113. * in the mcast.
  114. * Also, the size of each packed message and the messages themselves are
  115. * appended to the end of this structure when sent.
  116. */
  117. struct totempg_mcast {
  118. struct totempg_mcast_header header;
  119. unsigned char fragmented;
  120. unsigned char continuation;
  121. unsigned short msg_count;
  122. /*
  123. * short msg_len[msg_count];
  124. */
  125. /*
  126. * data for messages
  127. */
  128. };
  129. /*
  130. * Maximum packet size for totem pg messages
  131. */
  132. #define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \
  133. sizeof (struct totempg_mcast))
  134. /*
  135. * Local variables used for packing small messages
  136. */
  137. static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX];
  138. static int mcast_packed_msg_count = 0;
  139. static int totempg_reserved = 0;
  140. /*
  141. * Function and data used to log messages
  142. */
  143. static int totempg_log_level_security;
  144. static int totempg_log_level_error;
  145. static int totempg_log_level_warning;
  146. static int totempg_log_level_notice;
  147. static int totempg_log_level_debug;
  148. static int totempg_subsys_id;
  149. static void (*totempg_log_printf) (
  150. unsigned int rec_ident,
  151. const char *function,
  152. const char *file,
  153. int line,
  154. const char *format, ...) __attribute__((format(printf, 5, 6)));
  155. struct totem_config *totempg_totem_config;
  156. enum throw_away_mode {
  157. THROW_AWAY_INACTIVE,
  158. THROW_AWAY_ACTIVE
  159. };
  160. struct assembly {
  161. unsigned int nodeid;
  162. unsigned char data[MESSAGE_SIZE_MAX];
  163. int index;
  164. unsigned char last_frag_num;
  165. enum throw_away_mode throw_away_mode;
  166. struct list_head list;
  167. };
  168. static void assembly_deref (struct assembly *assembly);
  169. static int callback_token_received_fn (enum totem_callback_token_type type,
  170. const void *data);
  171. DECLARE_LIST_INIT(assembly_list_inuse);
  172. DECLARE_LIST_INIT(assembly_list_free);
  173. /*
  174. * Staging buffer for packed messages. Messages are staged in this buffer
  175. * before sending. Multiple messages may fit which cuts down on the
  176. * number of mcasts sent. If a message doesn't completely fit, then
  177. * the mcast header has a fragment bit set that says that there are more
  178. * data to follow. fragment_size is an index into the buffer. It indicates
  179. * the size of message data and where to place new message data.
  180. * fragment_contuation indicates whether the first packed message in
  181. * the buffer is a continuation of a previously packed fragment.
  182. */
  183. static unsigned char *fragmentation_data;
  184. static int fragment_size = 0;
  185. static int fragment_continuation = 0;
  186. static struct iovec iov_delv;
  187. static unsigned int totempg_max_handle = 0;
  188. struct totempg_group_instance {
  189. void (*deliver_fn) (
  190. unsigned int nodeid,
  191. const void *msg,
  192. unsigned int msg_len,
  193. int endian_conversion_required);
  194. void (*confchg_fn) (
  195. enum totem_configuration_type configuration_type,
  196. const unsigned int *member_list, size_t member_list_entries,
  197. const unsigned int *left_list, size_t left_list_entries,
  198. const unsigned int *joined_list, size_t joined_list_entries,
  199. const struct memb_ring_id *ring_id);
  200. struct totempg_group *groups;
  201. int groups_cnt;
  202. };
  203. DECLARE_HDB_DATABASE (totempg_groups_instance_database,NULL);
  204. static unsigned char next_fragment = 1;
  205. static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
  206. static pthread_mutex_t callback_token_mutex = PTHREAD_MUTEX_INITIALIZER;
  207. static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER;
  208. #define log_printf(level, format, args...) \
  209. do { \
  210. totempg_log_printf ( \
  211. LOGSYS_ENCODE_RECID(level, \
  212. totempg_subsys_id, \
  213. LOGSYS_RECID_LOG), \
  214. __FUNCTION__, __FILE__, __LINE__, \
  215. format, ##args); \
  216. } while (0);
  217. static int msg_count_send_ok (int msg_count);
  218. static int byte_count_send_ok (int byte_count);
  219. static struct assembly *assembly_ref (unsigned int nodeid)
  220. {
  221. struct assembly *assembly;
  222. struct list_head *list;
  223. /*
  224. * Search inuse list for node id and return assembly buffer if found
  225. */
  226. for (list = assembly_list_inuse.next;
  227. list != &assembly_list_inuse;
  228. list = list->next) {
  229. assembly = list_entry (list, struct assembly, list);
  230. if (nodeid == assembly->nodeid) {
  231. return (assembly);
  232. }
  233. }
  234. /*
  235. * Nothing found in inuse list get one from free list if available
  236. */
  237. if (list_empty (&assembly_list_free) == 0) {
  238. assembly = list_entry (assembly_list_free.next, struct assembly, list);
  239. list_del (&assembly->list);
  240. list_add (&assembly->list, &assembly_list_inuse);
  241. assembly->nodeid = nodeid;
  242. assembly->index = 0;
  243. assembly->last_frag_num = 0;
  244. assembly->throw_away_mode = THROW_AWAY_INACTIVE;
  245. return (assembly);
  246. }
  247. /*
  248. * Nothing available in inuse or free list, so allocate a new one
  249. */
  250. assembly = malloc (sizeof (struct assembly));
  251. /*
  252. * TODO handle memory allocation failure here
  253. */
  254. assert (assembly);
  255. assembly->nodeid = nodeid;
  256. assembly->data[0] = 0;
  257. assembly->index = 0;
  258. assembly->last_frag_num = 0;
  259. assembly->throw_away_mode = THROW_AWAY_INACTIVE;
  260. list_init (&assembly->list);
  261. list_add (&assembly->list, &assembly_list_inuse);
  262. return (assembly);
  263. }
  264. static void assembly_deref (struct assembly *assembly)
  265. {
  266. list_del (&assembly->list);
  267. list_add (&assembly->list, &assembly_list_free);
  268. }
  269. static inline void app_confchg_fn (
  270. enum totem_configuration_type configuration_type,
  271. const unsigned int *member_list, size_t member_list_entries,
  272. const unsigned int *left_list, size_t left_list_entries,
  273. const unsigned int *joined_list, size_t joined_list_entries,
  274. const struct memb_ring_id *ring_id)
  275. {
  276. int i;
  277. struct totempg_group_instance *instance;
  278. unsigned int res;
  279. for (i = 0; i <= totempg_max_handle; i++) {
  280. res = hdb_handle_get (&totempg_groups_instance_database,
  281. hdb_nocheck_convert (i), (void *)&instance);
  282. if (res == 0) {
  283. if (instance->confchg_fn) {
  284. instance->confchg_fn (
  285. configuration_type,
  286. member_list,
  287. member_list_entries,
  288. left_list,
  289. left_list_entries,
  290. joined_list,
  291. joined_list_entries,
  292. ring_id);
  293. }
  294. hdb_handle_put (&totempg_groups_instance_database,
  295. hdb_nocheck_convert (i));
  296. }
  297. }
  298. }
  299. static inline void group_endian_convert (
  300. void *msg,
  301. int msg_len)
  302. {
  303. unsigned short *group_len;
  304. int i;
  305. char *aligned_msg;
  306. /*
  307. * Align data structure for sparc and ia64
  308. */
  309. if ((size_t)msg % 4 != 0) {
  310. aligned_msg = alloca(msg_len);
  311. memcpy(aligned_msg, msg, msg_len);
  312. } else {
  313. aligned_msg = msg;
  314. }
  315. group_len = (unsigned short *)aligned_msg;
  316. group_len[0] = swab16(group_len[0]);
  317. for (i = 1; i < group_len[0] + 1; i++) {
  318. group_len[i] = swab16(group_len[i]);
  319. }
  320. if (aligned_msg != msg) {
  321. memcpy(msg, aligned_msg, msg_len);
  322. }
  323. }
  324. static inline int group_matches (
  325. struct iovec *iovec,
  326. unsigned int iov_len,
  327. struct totempg_group *groups_b,
  328. unsigned int group_b_cnt,
  329. unsigned int *adjust_iovec)
  330. {
  331. unsigned short *group_len;
  332. char *group_name;
  333. int i;
  334. int j;
  335. struct iovec iovec_aligned = { NULL, 0 };
  336. assert (iov_len == 1);
  337. /*
  338. * Align data structure for sparc and ia64
  339. */
  340. if ((size_t)iovec->iov_base % 4 != 0) {
  341. iovec_aligned.iov_base = alloca(iovec->iov_len);
  342. memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len);
  343. iovec_aligned.iov_len = iovec->iov_len;
  344. iovec = &iovec_aligned;
  345. }
  346. group_len = (unsigned short *)iovec->iov_base;
  347. group_name = ((char *)iovec->iov_base) +
  348. sizeof (unsigned short) * (group_len[0] + 1);
  349. /*
  350. * Calculate amount to adjust the iovec by before delivering to app
  351. */
  352. *adjust_iovec = sizeof (unsigned short) * (group_len[0] + 1);
  353. for (i = 1; i < group_len[0] + 1; i++) {
  354. *adjust_iovec += group_len[i];
  355. }
  356. /*
  357. * Determine if this message should be delivered to this instance
  358. */
  359. for (i = 1; i < group_len[0] + 1; i++) {
  360. for (j = 0; j < group_b_cnt; j++) {
  361. if ((group_len[i] == groups_b[j].group_len) &&
  362. (memcmp (groups_b[j].group, group_name, group_len[i]) == 0)) {
  363. return (1);
  364. }
  365. }
  366. group_name += group_len[i];
  367. }
  368. return (0);
  369. }
  370. static inline void app_deliver_fn (
  371. unsigned int nodeid,
  372. void *msg,
  373. unsigned int msg_len,
  374. int endian_conversion_required)
  375. {
  376. int i;
  377. struct totempg_group_instance *instance;
  378. struct iovec stripped_iovec;
  379. unsigned int adjust_iovec;
  380. unsigned int res;
  381. struct iovec *iovec;
  382. struct iovec aligned_iovec = { NULL, 0 };
  383. if (endian_conversion_required) {
  384. group_endian_convert (msg, msg_len);
  385. }
  386. /*
  387. * TODO This function needs to be rewritten for proper alignment to avoid 3+ memory copies
  388. */
  389. /*
  390. * Align data structure for sparc and ia64
  391. */
  392. aligned_iovec.iov_base = alloca(msg_len);
  393. aligned_iovec.iov_len = msg_len;
  394. memcpy(aligned_iovec.iov_base, msg, msg_len);
  395. iovec = &aligned_iovec;
  396. for (i = 0; i <= totempg_max_handle; i++) {
  397. res = hdb_handle_get (&totempg_groups_instance_database,
  398. hdb_nocheck_convert (i), (void *)&instance);
  399. if (res == 0) {
  400. if (group_matches (iovec, 1, instance->groups, instance->groups_cnt, &adjust_iovec)) {
  401. stripped_iovec.iov_len = iovec->iov_len - adjust_iovec;
  402. stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec;
  403. /*
  404. * Align data structure for sparc and ia64
  405. */
  406. if ((char *)iovec->iov_base + adjust_iovec % 4 != 0) {
  407. /*
  408. * Deal with misalignment
  409. */
  410. stripped_iovec.iov_base =
  411. alloca (stripped_iovec.iov_len);
  412. memcpy (stripped_iovec.iov_base,
  413. (char *)iovec->iov_base + adjust_iovec,
  414. stripped_iovec.iov_len);
  415. }
  416. instance->deliver_fn (
  417. nodeid,
  418. stripped_iovec.iov_base,
  419. stripped_iovec.iov_len,
  420. endian_conversion_required);
  421. }
  422. hdb_handle_put (&totempg_groups_instance_database, hdb_nocheck_convert(i));
  423. }
  424. }
  425. }
  426. static void totempg_confchg_fn (
  427. enum totem_configuration_type configuration_type,
  428. const unsigned int *member_list, size_t member_list_entries,
  429. const unsigned int *left_list, size_t left_list_entries,
  430. const unsigned int *joined_list, size_t joined_list_entries,
  431. const struct memb_ring_id *ring_id)
  432. {
  433. // TODO optimize this
  434. app_confchg_fn (configuration_type,
  435. member_list, member_list_entries,
  436. left_list, left_list_entries,
  437. joined_list, joined_list_entries,
  438. ring_id);
  439. }
  440. static void totempg_deliver_fn (
  441. unsigned int nodeid,
  442. const void *msg,
  443. unsigned int msg_len,
  444. int endian_conversion_required)
  445. {
  446. struct totempg_mcast *mcast;
  447. unsigned short *msg_lens;
  448. int i;
  449. struct assembly *assembly;
  450. char header[FRAME_SIZE_MAX];
  451. int msg_count;
  452. int continuation;
  453. int start;
  454. const char *data;
  455. int datasize;
  456. assembly = assembly_ref (nodeid);
  457. assert (assembly);
  458. /*
  459. * Assemble the header into one block of data and
  460. * assemble the packet contents into one block of data to simplify delivery
  461. */
  462. mcast = (struct totempg_mcast *)msg;
  463. if (endian_conversion_required) {
  464. mcast->msg_count = swab16 (mcast->msg_count);
  465. }
  466. msg_count = mcast->msg_count;
  467. datasize = sizeof (struct totempg_mcast) +
  468. msg_count * sizeof (unsigned short);
  469. memcpy (header, msg, datasize);
  470. data = msg;
  471. msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast));
  472. if (endian_conversion_required) {
  473. for (i = 0; i < mcast->msg_count; i++) {
  474. msg_lens[i] = swab16 (msg_lens[i]);
  475. }
  476. }
  477. memcpy (&assembly->data[assembly->index], &data[datasize],
  478. msg_len - datasize);
  479. /*
  480. * If the last message in the buffer is a fragment, then we
  481. * can't deliver it. We'll first deliver the full messages
  482. * then adjust the assembly buffer so we can add the rest of the
  483. * fragment when it arrives.
  484. */
  485. msg_count = mcast->fragmented ? mcast->msg_count - 1 : mcast->msg_count;
  486. continuation = mcast->continuation;
  487. iov_delv.iov_base = (void *)&assembly->data[0];
  488. iov_delv.iov_len = assembly->index + msg_lens[0];
  489. /*
  490. * Make sure that if this message is a continuation, that it
  491. * matches the sequence number of the previous fragment.
  492. * Also, if the first packed message is a continuation
  493. * of a previous message, but the assembly buffer
  494. * is empty, then we need to discard it since we can't
  495. * assemble a complete message. Likewise, if this message isn't a
  496. * continuation and the assembly buffer is empty, we have to discard
  497. * the continued message.
  498. */
  499. start = 0;
  500. if (assembly->throw_away_mode == THROW_AWAY_ACTIVE) {
  501. /* Throw away the first msg block */
  502. if (mcast->fragmented == 0 || mcast->fragmented == 1) {
  503. assembly->throw_away_mode = THROW_AWAY_INACTIVE;
  504. assembly->index += msg_lens[0];
  505. iov_delv.iov_base = (void *)&assembly->data[assembly->index];
  506. iov_delv.iov_len = msg_lens[1];
  507. start = 1;
  508. }
  509. } else
  510. if (assembly->throw_away_mode == THROW_AWAY_INACTIVE) {
  511. if (continuation == assembly->last_frag_num) {
  512. assembly->last_frag_num = mcast->fragmented;
  513. for (i = start; i < msg_count; i++) {
  514. app_deliver_fn(nodeid, iov_delv.iov_base, iov_delv.iov_len,
  515. endian_conversion_required);
  516. assembly->index += msg_lens[i];
  517. iov_delv.iov_base = (void *)&assembly->data[assembly->index];
  518. if (i < (msg_count - 1)) {
  519. iov_delv.iov_len = msg_lens[i + 1];
  520. }
  521. }
  522. } else {
  523. assembly->throw_away_mode = THROW_AWAY_ACTIVE;
  524. }
  525. }
  526. if (mcast->fragmented == 0) {
  527. /*
  528. * End of messages, dereference assembly struct
  529. */
  530. assembly->last_frag_num = 0;
  531. assembly->index = 0;
  532. assembly_deref (assembly);
  533. } else {
  534. /*
  535. * Message is fragmented, keep around assembly list
  536. */
  537. if (mcast->msg_count > 1) {
  538. memmove (&assembly->data[0],
  539. &assembly->data[assembly->index],
  540. msg_lens[msg_count]);
  541. assembly->index = 0;
  542. }
  543. assembly->index += msg_lens[msg_count];
  544. }
  545. }
  546. /*
  547. * Totem Process Group Abstraction
  548. * depends on poll abstraction, POSIX, IPV4
  549. */
  550. void *callback_token_received_handle;
  551. int callback_token_received_fn (enum totem_callback_token_type type,
  552. const void *data)
  553. {
  554. struct totempg_mcast mcast;
  555. struct iovec iovecs[3];
  556. int res;
  557. pthread_mutex_lock (&mcast_msg_mutex);
  558. if (mcast_packed_msg_count == 0) {
  559. pthread_mutex_unlock (&mcast_msg_mutex);
  560. return (0);
  561. }
  562. if (totemmrp_avail() == 0) {
  563. pthread_mutex_unlock (&mcast_msg_mutex);
  564. return (0);
  565. }
  566. mcast.fragmented = 0;
  567. /*
  568. * Was the first message in this buffer a continuation of a
  569. * fragmented message?
  570. */
  571. mcast.continuation = fragment_continuation;
  572. fragment_continuation = 0;
  573. mcast.msg_count = mcast_packed_msg_count;
  574. iovecs[0].iov_base = (void *)&mcast;
  575. iovecs[0].iov_len = sizeof (struct totempg_mcast);
  576. iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
  577. iovecs[1].iov_len = mcast_packed_msg_count * sizeof (unsigned short);
  578. iovecs[2].iov_base = (void *)&fragmentation_data[0];
  579. iovecs[2].iov_len = fragment_size;
  580. res = totemmrp_mcast (iovecs, 3, 0);
  581. mcast_packed_msg_count = 0;
  582. fragment_size = 0;
  583. pthread_mutex_unlock (&mcast_msg_mutex);
  584. return (0);
  585. }
  586. /*
  587. * Initialize the totem process group abstraction
  588. */
  589. int totempg_initialize (
  590. hdb_handle_t poll_handle,
  591. struct totem_config *totem_config)
  592. {
  593. int res;
  594. totempg_totem_config = totem_config;
  595. totempg_log_level_security = totem_config->totem_logging_configuration.log_level_security;
  596. totempg_log_level_error = totem_config->totem_logging_configuration.log_level_error;
  597. totempg_log_level_warning = totem_config->totem_logging_configuration.log_level_warning;
  598. totempg_log_level_notice = totem_config->totem_logging_configuration.log_level_notice;
  599. totempg_log_level_debug = totem_config->totem_logging_configuration.log_level_debug;
  600. totempg_log_printf = totem_config->totem_logging_configuration.log_printf;
  601. totempg_subsys_id = totem_config->totem_logging_configuration.log_subsys_id;
  602. fragmentation_data = malloc (TOTEMPG_PACKET_SIZE);
  603. if (fragmentation_data == 0) {
  604. return (-1);
  605. }
  606. res = totemmrp_initialize (
  607. poll_handle,
  608. totem_config,
  609. totempg_deliver_fn,
  610. totempg_confchg_fn);
  611. totemmrp_callback_token_create (
  612. &callback_token_received_handle,
  613. TOTEM_CALLBACK_TOKEN_RECEIVED,
  614. 0,
  615. callback_token_received_fn,
  616. 0);
  617. totemsrp_net_mtu_adjust (totem_config);
  618. return (res);
  619. }
  620. void totempg_finalize (void)
  621. {
  622. pthread_mutex_lock (&totempg_mutex);
  623. totemmrp_finalize ();
  624. pthread_mutex_unlock (&totempg_mutex);
  625. }
  626. /*
  627. * Multicast a message
  628. */
  629. static int mcast_msg (
  630. struct iovec *iovec_in,
  631. unsigned int iov_len,
  632. int guarantee)
  633. {
  634. int res = 0;
  635. struct totempg_mcast mcast;
  636. struct iovec iovecs[3];
  637. struct iovec iovec[64];
  638. int i;
  639. int dest, src;
  640. int max_packet_size = 0;
  641. int copy_len = 0;
  642. int copy_base = 0;
  643. int total_size = 0;
  644. pthread_mutex_lock (&mcast_msg_mutex);
  645. totemmrp_new_msg_signal ();
  646. /*
  647. * Remove zero length iovectors from the list
  648. */
  649. assert (iov_len < 64);
  650. for (dest = 0, src = 0; src < iov_len; src++) {
  651. if (iovec_in[src].iov_len) {
  652. memcpy (&iovec[dest++], &iovec_in[src],
  653. sizeof (struct iovec));
  654. }
  655. }
  656. iov_len = dest;
  657. max_packet_size = TOTEMPG_PACKET_SIZE -
  658. (sizeof (unsigned short) * (mcast_packed_msg_count + 1));
  659. mcast_packed_msg_lens[mcast_packed_msg_count] = 0;
  660. /*
  661. * Check if we would overwrite new message queue
  662. */
  663. for (i = 0; i < iov_len; i++) {
  664. total_size += iovec[i].iov_len;
  665. }
  666. if (byte_count_send_ok (total_size + sizeof(unsigned short) *
  667. (mcast_packed_msg_count+1)) == 0) {
  668. pthread_mutex_unlock (&mcast_msg_mutex);
  669. return(-1);
  670. }
  671. for (i = 0; i < iov_len; ) {
  672. mcast.fragmented = 0;
  673. mcast.continuation = fragment_continuation;
  674. copy_len = iovec[i].iov_len - copy_base;
  675. /*
  676. * If it all fits with room left over, copy it in.
  677. * We need to leave at least sizeof(short) + 1 bytes in the
  678. * fragment_buffer on exit so that max_packet_size + fragment_size
  679. * doesn't exceed the size of the fragment_buffer on the next call.
  680. */
  681. if ((copy_len + fragment_size) <
  682. (max_packet_size - sizeof (unsigned short))) {
  683. memcpy (&fragmentation_data[fragment_size],
  684. (char *)iovec[i].iov_base + copy_base, copy_len);
  685. fragment_size += copy_len;
  686. mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
  687. next_fragment = 1;
  688. copy_len = 0;
  689. copy_base = 0;
  690. i++;
  691. continue;
  692. /*
  693. * If it just fits or is too big, then send out what fits.
  694. */
  695. } else {
  696. unsigned char *data_ptr;
  697. copy_len = min(copy_len, max_packet_size - fragment_size);
  698. if( copy_len == max_packet_size )
  699. data_ptr = (unsigned char *)iovec[i].iov_base + copy_base;
  700. else {
  701. data_ptr = fragmentation_data;
  702. memcpy (&fragmentation_data[fragment_size],
  703. (unsigned char *)iovec[i].iov_base + copy_base, copy_len);
  704. }
  705. memcpy (&fragmentation_data[fragment_size],
  706. (unsigned char *)iovec[i].iov_base + copy_base, copy_len);
  707. mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
  708. /*
  709. * if we're not on the last iovec or the iovec is too large to
  710. * fit, then indicate a fragment. This also means that the next
  711. * message will have the continuation of this one.
  712. */
  713. if ((i < (iov_len - 1)) ||
  714. ((copy_base + copy_len) < iovec[i].iov_len)) {
  715. if (!next_fragment) {
  716. next_fragment++;
  717. }
  718. fragment_continuation = next_fragment;
  719. mcast.fragmented = next_fragment++;
  720. assert(fragment_continuation != 0);
  721. assert(mcast.fragmented != 0);
  722. } else {
  723. fragment_continuation = 0;
  724. }
  725. /*
  726. * assemble the message and send it
  727. */
  728. mcast.msg_count = ++mcast_packed_msg_count;
  729. iovecs[0].iov_base = (void *)&mcast;
  730. iovecs[0].iov_len = sizeof(struct totempg_mcast);
  731. iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
  732. iovecs[1].iov_len = mcast_packed_msg_count *
  733. sizeof(unsigned short);
  734. iovecs[2].iov_base = (void *)data_ptr;
  735. iovecs[2].iov_len = max_packet_size;
  736. assert (totemmrp_avail() > 0);
  737. res = totemmrp_mcast (iovecs, 3, guarantee);
  738. /*
  739. * Recalculate counts and indexes for the next.
  740. */
  741. mcast_packed_msg_lens[0] = 0;
  742. mcast_packed_msg_count = 0;
  743. fragment_size = 0;
  744. max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof(unsigned short));
  745. /*
  746. * If the iovec all fit, go to the next iovec
  747. */
  748. if ((copy_base + copy_len) == iovec[i].iov_len) {
  749. copy_len = 0;
  750. copy_base = 0;
  751. i++;
  752. /*
  753. * Continue with the rest of the current iovec.
  754. */
  755. } else {
  756. copy_base += copy_len;
  757. }
  758. }
  759. }
  760. /*
  761. * Bump only if we added message data. This may be zero if
  762. * the last buffer just fit into the fragmentation_data buffer
  763. * and we were at the last iovec.
  764. */
  765. if (mcast_packed_msg_lens[mcast_packed_msg_count]) {
  766. mcast_packed_msg_count++;
  767. }
  768. pthread_mutex_unlock (&mcast_msg_mutex);
  769. return (res);
  770. }
  771. /*
  772. * Determine if a message of msg_size could be queued
  773. */
  774. static int msg_count_send_ok (
  775. int msg_count)
  776. {
  777. int avail = 0;
  778. avail = totemmrp_avail () - totempg_reserved - 1;
  779. return (avail > msg_count);
  780. }
  781. static int byte_count_send_ok (
  782. int byte_count)
  783. {
  784. unsigned int msg_count = 0;
  785. int avail = 0;
  786. avail = totemmrp_avail () - 1;
  787. msg_count = (byte_count / (totempg_totem_config->net_mtu - 25)) + 1;
  788. return (avail > msg_count);
  789. }
  790. static int send_reserve (
  791. int msg_size)
  792. {
  793. unsigned int msg_count = 0;
  794. msg_count = (msg_size / (totempg_totem_config->net_mtu - 25)) + 1;
  795. totempg_reserved += msg_count;
  796. return (msg_count);
  797. }
  798. static void send_release (
  799. int msg_count)
  800. {
  801. totempg_reserved -= msg_count;
  802. }
  803. int totempg_callback_token_create (
  804. void **handle_out,
  805. enum totem_callback_token_type type,
  806. int delete,
  807. int (*callback_fn) (enum totem_callback_token_type type, const void *),
  808. const void *data)
  809. {
  810. unsigned int res;
  811. pthread_mutex_lock (&callback_token_mutex);
  812. res = totemmrp_callback_token_create (handle_out, type, delete,
  813. callback_fn, data);
  814. pthread_mutex_unlock (&callback_token_mutex);
  815. return (res);
  816. }
  817. void totempg_callback_token_destroy (
  818. void *handle_out)
  819. {
  820. pthread_mutex_lock (&callback_token_mutex);
  821. totemmrp_callback_token_destroy (handle_out);
  822. pthread_mutex_unlock (&callback_token_mutex);
  823. }
  824. /*
  825. * vi: set autoindent tabstop=4 shiftwidth=4 :
  826. */
  827. int totempg_groups_initialize (
  828. hdb_handle_t *handle,
  829. void (*deliver_fn) (
  830. unsigned int nodeid,
  831. const void *msg,
  832. unsigned int msg_len,
  833. int endian_conversion_required),
  834. void (*confchg_fn) (
  835. enum totem_configuration_type configuration_type,
  836. const unsigned int *member_list, size_t member_list_entries,
  837. const unsigned int *left_list, size_t left_list_entries,
  838. const unsigned int *joined_list, size_t joined_list_entries,
  839. const struct memb_ring_id *ring_id))
  840. {
  841. struct totempg_group_instance *instance;
  842. unsigned int res;
  843. pthread_mutex_lock (&totempg_mutex);
  844. res = hdb_handle_create (&totempg_groups_instance_database,
  845. sizeof (struct totempg_group_instance), handle);
  846. if (res != 0) {
  847. goto error_exit;
  848. }
  849. if (*handle > totempg_max_handle) {
  850. totempg_max_handle = *handle;
  851. }
  852. res = hdb_handle_get (&totempg_groups_instance_database, *handle,
  853. (void *)&instance);
  854. if (res != 0) {
  855. goto error_destroy;
  856. }
  857. instance->deliver_fn = deliver_fn;
  858. instance->confchg_fn = confchg_fn;
  859. instance->groups = 0;
  860. instance->groups_cnt = 0;
  861. hdb_handle_put (&totempg_groups_instance_database, *handle);
  862. pthread_mutex_unlock (&totempg_mutex);
  863. return (0);
  864. error_destroy:
  865. hdb_handle_destroy (&totempg_groups_instance_database, *handle);
  866. error_exit:
  867. pthread_mutex_unlock (&totempg_mutex);
  868. return (-1);
  869. }
  870. int totempg_groups_join (
  871. hdb_handle_t handle,
  872. const struct totempg_group *groups,
  873. size_t group_cnt)
  874. {
  875. struct totempg_group_instance *instance;
  876. struct totempg_group *new_groups;
  877. unsigned int res;
  878. pthread_mutex_lock (&totempg_mutex);
  879. res = hdb_handle_get (&totempg_groups_instance_database, handle,
  880. (void *)&instance);
  881. if (res != 0) {
  882. goto error_exit;
  883. }
  884. new_groups = realloc (instance->groups,
  885. sizeof (struct totempg_group) *
  886. (instance->groups_cnt + group_cnt));
  887. if (new_groups == 0) {
  888. res = ENOMEM;
  889. goto error_exit;
  890. }
  891. memcpy (&new_groups[instance->groups_cnt],
  892. groups, group_cnt * sizeof (struct totempg_group));
  893. instance->groups = new_groups;
  894. instance->groups_cnt += group_cnt;
  895. hdb_handle_put (&totempg_groups_instance_database, handle);
  896. error_exit:
  897. pthread_mutex_unlock (&totempg_mutex);
  898. return (res);
  899. }
  900. int totempg_groups_leave (
  901. hdb_handle_t handle,
  902. const struct totempg_group *groups,
  903. size_t group_cnt)
  904. {
  905. struct totempg_group_instance *instance;
  906. unsigned int res;
  907. pthread_mutex_lock (&totempg_mutex);
  908. res = hdb_handle_get (&totempg_groups_instance_database, handle,
  909. (void *)&instance);
  910. if (res != 0) {
  911. goto error_exit;
  912. }
  913. hdb_handle_put (&totempg_groups_instance_database, handle);
  914. error_exit:
  915. pthread_mutex_unlock (&totempg_mutex);
  916. return (res);
  917. }
  918. #define MAX_IOVECS_FROM_APP 32
  919. #define MAX_GROUPS_PER_MSG 32
  920. int totempg_groups_mcast_joined (
  921. hdb_handle_t handle,
  922. const struct iovec *iovec,
  923. unsigned int iov_len,
  924. int guarantee)
  925. {
  926. struct totempg_group_instance *instance;
  927. unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
  928. struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
  929. int i;
  930. unsigned int res;
  931. pthread_mutex_lock (&totempg_mutex);
  932. res = hdb_handle_get (&totempg_groups_instance_database, handle,
  933. (void *)&instance);
  934. if (res != 0) {
  935. goto error_exit;
  936. }
  937. /*
  938. * Build group_len structure and the iovec_mcast structure
  939. */
  940. group_len[0] = instance->groups_cnt;
  941. for (i = 0; i < instance->groups_cnt; i++) {
  942. group_len[i + 1] = instance->groups[i].group_len;
  943. iovec_mcast[i + 1].iov_len = instance->groups[i].group_len;
  944. iovec_mcast[i + 1].iov_base = (void *) instance->groups[i].group;
  945. }
  946. iovec_mcast[0].iov_len = (instance->groups_cnt + 1) * sizeof (unsigned short);
  947. iovec_mcast[0].iov_base = group_len;
  948. for (i = 0; i < iov_len; i++) {
  949. iovec_mcast[i + instance->groups_cnt + 1].iov_len = iovec[i].iov_len;
  950. iovec_mcast[i + instance->groups_cnt + 1].iov_base = iovec[i].iov_base;
  951. }
  952. res = mcast_msg (iovec_mcast, iov_len + instance->groups_cnt + 1, guarantee);
  953. hdb_handle_put (&totempg_groups_instance_database, handle);
  954. error_exit:
  955. pthread_mutex_unlock (&totempg_mutex);
  956. return (res);
  957. }
  958. int totempg_groups_joined_reserve (
  959. hdb_handle_t handle,
  960. const struct iovec *iovec,
  961. unsigned int iov_len)
  962. {
  963. struct totempg_group_instance *instance;
  964. unsigned int size = 0;
  965. unsigned int i;
  966. unsigned int res;
  967. unsigned int reserved = 0;
  968. pthread_mutex_lock (&totempg_mutex);
  969. pthread_mutex_lock (&mcast_msg_mutex);
  970. res = hdb_handle_get (&totempg_groups_instance_database, handle,
  971. (void *)&instance);
  972. if (res != 0) {
  973. goto error_exit;
  974. }
  975. for (i = 0; i < instance->groups_cnt; i++) {
  976. size += instance->groups[i].group_len;
  977. }
  978. for (i = 0; i < iov_len; i++) {
  979. size += iovec[i].iov_len;
  980. }
  981. reserved = send_reserve (size);
  982. if (msg_count_send_ok (reserved) == 0) {
  983. send_release (reserved);
  984. reserved = 0;
  985. }
  986. hdb_handle_put (&totempg_groups_instance_database, handle);
  987. error_exit:
  988. pthread_mutex_unlock (&mcast_msg_mutex);
  989. pthread_mutex_unlock (&totempg_mutex);
  990. return (reserved);
  991. }
  992. int totempg_groups_joined_release (int msg_count)
  993. {
  994. pthread_mutex_lock (&totempg_mutex);
  995. pthread_mutex_lock (&mcast_msg_mutex);
  996. send_release (msg_count);
  997. pthread_mutex_unlock (&mcast_msg_mutex);
  998. pthread_mutex_unlock (&totempg_mutex);
  999. return 0;
  1000. }
  1001. int totempg_groups_mcast_groups (
  1002. hdb_handle_t handle,
  1003. int guarantee,
  1004. const struct totempg_group *groups,
  1005. size_t groups_cnt,
  1006. const struct iovec *iovec,
  1007. unsigned int iov_len)
  1008. {
  1009. struct totempg_group_instance *instance;
  1010. unsigned short group_len[MAX_GROUPS_PER_MSG + 1];
  1011. struct iovec iovec_mcast[MAX_GROUPS_PER_MSG + 1 + MAX_IOVECS_FROM_APP];
  1012. int i;
  1013. unsigned int res;
  1014. pthread_mutex_lock (&totempg_mutex);
  1015. res = hdb_handle_get (&totempg_groups_instance_database, handle,
  1016. (void *)&instance);
  1017. if (res != 0) {
  1018. goto error_exit;
  1019. }
  1020. /*
  1021. * Build group_len structure and the iovec_mcast structure
  1022. */
  1023. group_len[0] = groups_cnt;
  1024. for (i = 0; i < groups_cnt; i++) {
  1025. group_len[i + 1] = groups[i].group_len;
  1026. iovec_mcast[i + 1].iov_len = groups[i].group_len;
  1027. iovec_mcast[i + 1].iov_base = (void *) groups[i].group;
  1028. }
  1029. iovec_mcast[0].iov_len = (groups_cnt + 1) * sizeof (unsigned short);
  1030. iovec_mcast[0].iov_base = group_len;
  1031. for (i = 0; i < iov_len; i++) {
  1032. iovec_mcast[i + groups_cnt + 1].iov_len = iovec[i].iov_len;
  1033. iovec_mcast[i + groups_cnt + 1].iov_base = iovec[i].iov_base;
  1034. }
  1035. res = mcast_msg (iovec_mcast, iov_len + groups_cnt + 1, guarantee);
  1036. hdb_handle_put (&totempg_groups_instance_database, handle);
  1037. error_exit:
  1038. pthread_mutex_unlock (&totempg_mutex);
  1039. return (res);
  1040. }
  1041. /*
  1042. * Returns -1 if error, 0 if can't send, 1 if can send the message
  1043. */
  1044. int totempg_groups_send_ok_groups (
  1045. hdb_handle_t handle,
  1046. const struct totempg_group *groups,
  1047. size_t groups_cnt,
  1048. const struct iovec *iovec,
  1049. unsigned int iov_len)
  1050. {
  1051. struct totempg_group_instance *instance;
  1052. unsigned int size = 0;
  1053. unsigned int i;
  1054. unsigned int res;
  1055. pthread_mutex_lock (&totempg_mutex);
  1056. res = hdb_handle_get (&totempg_groups_instance_database, handle,
  1057. (void *)&instance);
  1058. if (res != 0) {
  1059. goto error_exit;
  1060. }
  1061. for (i = 0; i < groups_cnt; i++) {
  1062. size += groups[i].group_len;
  1063. }
  1064. for (i = 0; i < iov_len; i++) {
  1065. size += iovec[i].iov_len;
  1066. }
  1067. res = msg_count_send_ok (size);
  1068. hdb_handle_put (&totempg_groups_instance_database, handle);
  1069. error_exit:
  1070. pthread_mutex_unlock (&totempg_mutex);
  1071. return (res);
  1072. }
  1073. int totempg_ifaces_get (
  1074. unsigned int nodeid,
  1075. struct totem_ip_address *interfaces,
  1076. char ***status,
  1077. unsigned int *iface_count)
  1078. {
  1079. int res;
  1080. res = totemmrp_ifaces_get (
  1081. nodeid,
  1082. interfaces,
  1083. status,
  1084. iface_count);
  1085. return (res);
  1086. }
  1087. int totempg_crypto_set (
  1088. unsigned int type)
  1089. {
  1090. int res;
  1091. res = totemmrp_crypto_set (
  1092. type);
  1093. return (res);
  1094. }
  1095. int totempg_ring_reenable (void)
  1096. {
  1097. int res;
  1098. res = totemmrp_ring_reenable ();
  1099. return (res);
  1100. }
  1101. const char *totempg_ifaces_print (unsigned int nodeid)
  1102. {
  1103. static char iface_string[256 * INTERFACE_MAX];
  1104. char one_iface[64];
  1105. struct totem_ip_address interfaces[INTERFACE_MAX];
  1106. char **status;
  1107. unsigned int iface_count;
  1108. unsigned int i;
  1109. int res;
  1110. iface_string[0] = '\0';
  1111. res = totempg_ifaces_get (nodeid, interfaces, &status, &iface_count);
  1112. if (res == -1) {
  1113. return ("no interface found for nodeid");
  1114. }
  1115. for (i = 0; i < iface_count; i++) {
  1116. sprintf (one_iface, "r(%d) ip(%s) ",
  1117. i, totemip_print (&interfaces[i]));
  1118. strcat (iface_string, one_iface);
  1119. }
  1120. return (iface_string);
  1121. }
  1122. unsigned int totempg_my_nodeid_get (void)
  1123. {
  1124. return (totemmrp_my_nodeid_get());
  1125. }
  1126. int totempg_my_family_get (void)
  1127. {
  1128. return (totemmrp_my_family_get());
  1129. }