sync.c 15 KB


  1. /*
  2. * Copyright (c) 2005-2006 MontaVista Software, Inc.
  3. * Copyright (c) 2006 Ericsson AB.
  4. * Copyright (c) 2006-2007 Red Hat, Inc.
  5. *
  6. * Author: Steven Dake (sdake@mvista.com)
  7. * Author: Hans Feldt
  8. *
  9. * All rights reserved.
  10. *
  11. *
  12. * This software licensed under BSD license, the text of which follows:
  13. *
  14. * Redistribution and use in source and binary forms, with or without
  15. * modification, are permitted provided that the following conditions are met:
  16. *
  17. * - Redistributions of source code must retain the above copyright notice,
  18. * this list of conditions and the following disclaimer.
  19. * - Redistributions in binary form must reproduce the above copyright notice,
  20. * this list of conditions and the following disclaimer in the documentation
  21. * and/or other materials provided with the distribution.
  22. * - Neither the name of the MontaVista Software, Inc. nor the names of its
  23. * contributors may be used to endorse or promote products derived from this
  24. * software without specific prior written permission.
  25. *
  26. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  27. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  28. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  29. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  30. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  31. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  32. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  33. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  34. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  35. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  36. * THE POSSIBILITY OF SUCH DAMAGE.
  37. */
  38. #include <sys/types.h>
  39. #include <sys/socket.h>
  40. #include <sys/un.h>
  41. #include <sys/ioctl.h>
  42. #include <netinet/in.h>
  43. #include <sys/uio.h>
  44. #include <unistd.h>
  45. #include <fcntl.h>
  46. #include <stdlib.h>
  47. #include <stdio.h>
  48. #include <errno.h>
  49. #include <signal.h>
  50. #include <time.h>
  51. #include <unistd.h>
  52. #include <netinet/in.h>
  53. #include <arpa/inet.h>
  54. #include "../include/saAis.h"
  55. #include "main.h"
  56. #include "sync.h"
  57. #include "totempg.h"
  58. #include "totemip.h"
  59. #include "totem.h"
  60. #include "vsf.h"
  61. #include "../lcr/lcr_ifact.h"
  62. #include "logsys.h"
  63. #include "util.h"
  64. LOGSYS_DECLARE_SUBSYS ("SYNC", LOG_INFO);
  65. #define MESSAGE_REQ_SYNC_BARRIER 0
  66. #define MESSAGE_REQ_SYNC_REQUEST 1
  67. struct barrier_data {
  68. unsigned int nodeid;
  69. int completed;
  70. };
  71. static struct memb_ring_id *sync_ring_id;
  72. static int vsf_none = 0;
  73. static int (*sync_callbacks_retrieve) (int sync_id,
  74. struct sync_callbacks *callack);
  75. static struct sync_callbacks sync_callbacks;
  76. static int sync_processing = 0;
  77. static void (*sync_synchronization_completed) (void);
  78. static int sync_recovery_index = 0;
  79. static void *sync_callback_token_handle = 0;
  80. static void *sync_request_token_handle;
  81. static struct barrier_data barrier_data_process[PROCESSOR_COUNT_MAX];
  82. static struct openais_vsf_iface_ver0 *vsf_iface;
  83. static int sync_barrier_send (struct memb_ring_id *ring_id);
  84. static int sync_start_process (
  85. enum totem_callback_token_type type, void *data);
  86. static void sync_service_init (struct memb_ring_id *ring_id);
  87. static int sync_service_process (
  88. enum totem_callback_token_type type, void *data);
  89. static void sync_deliver_fn (
  90. unsigned int nodeid,
  91. struct iovec *iovec,
  92. int iov_len,
  93. int endian_conversion_required);
  94. static void sync_confchg_fn (
  95. enum totem_configuration_type configuration_type,
  96. unsigned int *member_list, int member_list_entries,
  97. unsigned int *left_list, int left_list_entries,
  98. unsigned int *joined_list, int joined_list_entries,
  99. struct memb_ring_id *ring_id);
  100. static void sync_primary_callback_fn (
  101. unsigned int *view_list,
  102. int view_list_entries,
  103. int primary_designated,
  104. struct memb_ring_id *ring_id);
  105. static struct totempg_group sync_group = {
  106. .group = "sync",
  107. .group_len = 4
  108. };
  109. static totempg_groups_handle sync_group_handle;
  110. static char *service_name;
  111. static struct memb_ring_id deliver_ring_id;
  112. static unsigned int current_members[PROCESSOR_COUNT_MAX];
  113. static unsigned int current_members_cnt;
  114. struct sync_barrier_start {
  115. };
  116. struct sync_request {
  117. uint32_t name_len;
  118. char name[0] __attribute__((aligned(8)));
  119. };
  120. typedef struct sync_msg {
  121. mar_req_header_t header;
  122. struct memb_ring_id ring_id;
  123. union {
  124. struct sync_barrier_start sync_barrier_start;
  125. struct sync_request sync_request;
  126. };
  127. } sync_msg_t;
  128. /*
  129. * Send a barrier data structure
  130. */
  131. static int sync_barrier_send (struct memb_ring_id *ring_id)
  132. {
  133. sync_msg_t msg;
  134. struct iovec iovec;
  135. int res;
  136. msg.header.size = sizeof (sync_msg_t);
  137. msg.header.id = MESSAGE_REQ_SYNC_BARRIER;
  138. memcpy (&msg.ring_id, ring_id, sizeof (struct memb_ring_id));
  139. iovec.iov_base = (char *)&msg;
  140. iovec.iov_len = sizeof (msg);
  141. res = totempg_groups_mcast_joined (
  142. sync_group_handle, &iovec, 1, TOTEMPG_AGREED);
  143. return (res);
  144. }
  145. static void sync_start_init (struct memb_ring_id *ring_id)
  146. {
  147. ENTER("");
  148. totempg_callback_token_create (
  149. &sync_callback_token_handle,
  150. TOTEM_CALLBACK_TOKEN_SENT,
  151. 0, /* don't delete after callback */
  152. sync_start_process,
  153. (void *)ring_id);
  154. LEAVE("");
  155. }
  156. static void sync_service_init (struct memb_ring_id *ring_id)
  157. {
  158. ENTER("");
  159. sync_callbacks.sync_init ();
  160. totempg_callback_token_destroy (&sync_callback_token_handle);
  161. /*
  162. * Create the token callback for the processing
  163. */
  164. totempg_callback_token_create (
  165. &sync_callback_token_handle,
  166. TOTEM_CALLBACK_TOKEN_SENT,
  167. 0, /* don't delete after callback */
  168. sync_service_process,
  169. (void *)ring_id);
  170. LEAVE("");
  171. }
  172. static int sync_start_process (
  173. enum totem_callback_token_type type, void *data)
  174. {
  175. int res;
  176. struct memb_ring_id *ring_id = (struct memb_ring_id *)data;
  177. ENTER("");
  178. res = sync_barrier_send (ring_id);
  179. if (res == 0) {
  180. /*
  181. * Delete the token callback for the barrier
  182. */
  183. totempg_callback_token_destroy (&sync_callback_token_handle);
  184. }
  185. LEAVE("");
  186. return (0);
  187. }
  188. static void sync_callbacks_load (void)
  189. {
  190. int res;
  191. ENTER("");
  192. // TODO rewrite this to get rid of the for (;;)
  193. for (;;) {
  194. res = sync_callbacks_retrieve (sync_recovery_index, &sync_callbacks);
  195. /*
  196. * No more service handlers have sync callbacks at this time
  197. */
  198. if (res == -1) {
  199. sync_processing = 0;
  200. break;
  201. }
  202. if ((service_name != NULL) &&
  203. strcmp (sync_callbacks.name, service_name) != 0) {
  204. sync_recovery_index += 1;
  205. continue;
  206. }
  207. sync_recovery_index += 1;
  208. if (sync_callbacks.sync_init) {
  209. break;
  210. }
  211. }
  212. LEAVE("");
  213. }
  214. static int sync_service_process (
  215. enum totem_callback_token_type type, void *data)
  216. {
  217. int res;
  218. struct memb_ring_id *ring_id = (struct memb_ring_id *)data;
  219. ENTER("");
  220. /*
  221. * If process operation not from this ring id, then ignore it and stop
  222. * processing
  223. */
  224. if (memcmp (ring_id, sync_ring_id, sizeof (struct memb_ring_id)) != 0) {
  225. goto end;
  226. }
  227. /*
  228. * If process returns 0, then its time to activate
  229. * and start the next service's synchronization
  230. */
  231. res = sync_callbacks.sync_process ();
  232. if (res != 0) {
  233. goto end;
  234. }
  235. totempg_callback_token_destroy (&sync_callback_token_handle);
  236. sync_start_init (ring_id);
  237. end:
  238. LEAVE("");
  239. return (0);
  240. }
  241. int sync_register (
  242. int (*callbacks_retrieve) (int sync_id, struct sync_callbacks *callack),
  243. void (*synchronization_completed) (void),
  244. char *vsf_type)
  245. {
  246. unsigned int res;
  247. unsigned int vsf_handle;
  248. void *vsf_iface_p;
  249. char openais_vsf_type[1024];
  250. res = totempg_groups_initialize (
  251. &sync_group_handle,
  252. sync_deliver_fn,
  253. sync_confchg_fn);
  254. if (res == -1) {
  255. log_printf (LOG_LEVEL_ERROR,
  256. "Couldn't initialize groups interface.\n");
  257. return (-1);
  258. }
  259. res = totempg_groups_join (
  260. sync_group_handle,
  261. &sync_group,
  262. 1);
  263. if (res == -1) {
  264. log_printf (LOG_LEVEL_ERROR, "Couldn't join group.\n");
  265. return (-1);
  266. }
  267. if (strcmp (vsf_type, "none") == 0) {
  268. log_printf (LOG_LEVEL_NOTICE,
  269. "Not using a virtual synchrony filter.\n");
  270. vsf_none = 1;
  271. } else {
  272. vsf_none = 0;
  273. sprintf (openais_vsf_type, "openais_vsf_%s", vsf_type);
  274. res = lcr_ifact_reference (
  275. &vsf_handle,
  276. openais_vsf_type,
  277. 0,
  278. &vsf_iface_p,
  279. 0);
  280. if (res == -1) {
  281. log_printf (LOG_LEVEL_NOTICE,
  282. "Couldn't load virtual synchrony filter %s\n",
  283. vsf_type);
  284. return (-1);
  285. }
  286. log_printf (LOG_LEVEL_NOTICE,
  287. "Using virtual synchrony filter %s\n", openais_vsf_type);
  288. vsf_iface = (struct openais_vsf_iface_ver0 *)vsf_iface_p;
  289. vsf_iface->init (sync_primary_callback_fn);
  290. }
  291. sync_callbacks_retrieve = callbacks_retrieve;
  292. sync_synchronization_completed = synchronization_completed;
  293. return (0);
  294. }
  295. static void sync_primary_callback_fn (
  296. unsigned int *view_list,
  297. int view_list_entries,
  298. int primary_designated,
  299. struct memb_ring_id *ring_id)
  300. {
  301. int i;
  302. ENTER("");
  303. if (primary_designated) {
  304. log_printf (LOG_LEVEL_NOTICE,
  305. "This node is within the primary component and will provide"
  306. " service.\n");
  307. } else {
  308. log_printf (LOG_LEVEL_NOTICE,
  309. "This node is within the non-primary component and will NOT"
  310. " provide any services.\n");
  311. return;
  312. }
  313. /*
  314. * Execute configuration change for synchronization service
  315. */
  316. sync_processing = 1;
  317. totempg_callback_token_destroy (&sync_callback_token_handle);
  318. sync_recovery_index = 0;
  319. for (i = 0; i < view_list_entries; i++) {
  320. barrier_data_process[i].nodeid = view_list[i];
  321. barrier_data_process[i].completed = 0;
  322. }
  323. sync_start_init (sync_ring_id);
  324. LEAVE("");
  325. }
  326. static void sync_deliver_fn (
  327. unsigned int nodeid,
  328. struct iovec *iovec,
  329. int iov_len,
  330. int endian_conversion_required)
  331. {
  332. int i;
  333. int barrier_completed;
  334. sync_msg_t *msg = (sync_msg_t *)iovec[0].iov_base;
  335. ENTER("type %d, len %d", msg->header.id, (int)iovec[0].iov_len);
  336. if (endian_conversion_required) {
  337. swab_mar_req_header_t (&msg->header);
  338. swab_memb_ring_id_t (&msg->ring_id);
  339. }
  340. /*
  341. * If this message is not from this configuration, ignore it
  342. */
  343. if (memcmp (&msg->ring_id, sync_ring_id,
  344. sizeof (struct memb_ring_id)) != 0) {
  345. goto end;
  346. }
  347. if (msg->header.id == MESSAGE_REQ_SYNC_REQUEST) {
  348. if (endian_conversion_required) {
  349. swab_mar_uint32_t (&msg->sync_request.name_len);
  350. }
  351. /*
  352. * If there is an ongoing sync, abort it. A requested sync is
  353. * only allowed to abort other requested synchronizations,
  354. * not full synchronizations.
  355. */
  356. if (sync_processing && sync_callbacks.sync_abort) {
  357. sync_callbacks.sync_abort();
  358. sync_callbacks.sync_activate = NULL;
  359. sync_processing = 0;
  360. assert (service_name != NULL);
  361. free (service_name);
  362. service_name = NULL;
  363. }
  364. service_name = malloc (msg->sync_request.name_len);
  365. strcpy (service_name, msg->sync_request.name);
  366. /*
  367. * Start requested synchronization
  368. */
  369. sync_primary_callback_fn (current_members, current_members_cnt, 1,
  370. sync_ring_id);
  371. goto end;
  372. }
  373. barrier_completed = 1;
  374. memcpy (&deliver_ring_id, &msg->ring_id, sizeof (struct memb_ring_id));
  375. /*
  376. * Set completion for source_addr's address
  377. */
  378. for (i = 0; i < current_members_cnt; i++) {
  379. if (nodeid == barrier_data_process[i].nodeid) {
  380. barrier_data_process[i].completed = 1;
  381. log_printf (LOG_LEVEL_DEBUG,
  382. "Barrier Start Received From %d\n",
  383. barrier_data_process[i].nodeid);
  384. break;
  385. }
  386. }
  387. /*
  388. * Test if barrier is complete
  389. */
  390. for (i = 0; i < current_members_cnt; i++) {
  391. log_printf (LOG_LEVEL_DEBUG,
  392. "Barrier completion status for nodeid %d = %d. \n",
  393. barrier_data_process[i].nodeid,
  394. barrier_data_process[i].completed);
  395. if (barrier_data_process[i].completed == 0) {
  396. barrier_completed = 0;
  397. }
  398. }
  399. if (barrier_completed) {
  400. log_printf (LOG_LEVEL_DEBUG, "Synchronization barrier completed\n");
  401. /*
  402. * This sync is complete so activate and start next service sync
  403. */
  404. if (sync_callbacks.sync_activate) {
  405. log_printf (LOG_LEVEL_DEBUG,
  406. "Committing synchronization for (%s)\n",
  407. sync_callbacks.name);
  408. sync_callbacks.sync_activate ();
  409. }
  410. /*
  411. * Start synchronization if the barrier has completed
  412. */
  413. for (i = 0; i < current_members_cnt; i++) {
  414. barrier_data_process[i].nodeid = current_members[i];
  415. barrier_data_process[i].completed = 0;
  416. }
  417. sync_callbacks_load();
  418. /*
  419. * if sync service found, execute it
  420. */
  421. if (sync_processing && sync_callbacks.sync_init) {
  422. log_printf (LOG_LEVEL_DEBUG,
  423. "Synchronization actions starting for (%s)\n",
  424. sync_callbacks.name);
  425. sync_service_init (&deliver_ring_id);
  426. } else {
  427. if (service_name != NULL) {
  428. free (service_name);
  429. service_name = NULL;
  430. }
  431. }
  432. }
  433. end:
  434. LEAVE("");
  435. }
  436. static void sync_confchg_fn (
  437. enum totem_configuration_type configuration_type,
  438. unsigned int *member_list, int member_list_entries,
  439. unsigned int *left_list, int left_list_entries,
  440. unsigned int *joined_list, int joined_list_entries,
  441. struct memb_ring_id *ring_id)
  442. {
  443. int i;
  444. ENTER("");
  445. if (configuration_type != TOTEM_CONFIGURATION_REGULAR) {
  446. LEAVE("");
  447. return;
  448. }
  449. /*
  450. * Save current members and ring ID for later use
  451. */
  452. for (i = 0; i < member_list_entries; i++) {
  453. current_members[i] = member_list[i];
  454. }
  455. current_members_cnt = member_list_entries;
  456. sync_ring_id = ring_id;
  457. /*
  458. * If no virtual synchrony filter configured.
  459. */
  460. if (vsf_none == 1) {
  461. /*
  462. * If there is an ongoing synchronization, abort it.
  463. */
  464. if (sync_processing && sync_callbacks.sync_abort) {
  465. sync_callbacks.sync_abort();
  466. sync_callbacks.sync_activate = NULL;
  467. sync_processing = 0;
  468. if (service_name != NULL) {
  469. free (service_name);
  470. service_name = NULL;
  471. }
  472. }
  473. /*
  474. * Start new synchronization process
  475. */
  476. sync_primary_callback_fn (
  477. member_list, member_list_entries, 1, ring_id);
  478. }
  479. LEAVE("");
  480. }
  481. /**
  482. * TOTEM callback function used to multicast a sync_request
  483. * message
  484. * @param type
  485. * @param _name
  486. *
  487. * @return int
  488. */
  489. static int sync_request_send (
  490. enum totem_callback_token_type type, void *_name)
  491. {
  492. int res;
  493. char *name = _name;
  494. sync_msg_t msg;
  495. struct iovec iovec[2];
  496. int name_len;
  497. ENTER("'%s'", name);
  498. name_len = strlen (name) + 1;
  499. msg.header.size = sizeof (msg) + name_len;
  500. msg.header.id = MESSAGE_REQ_SYNC_REQUEST;
  501. memcpy (&msg.ring_id, sync_ring_id, sizeof (struct memb_ring_id));
  502. msg.sync_request.name_len = name_len;
  503. iovec[0].iov_base = (char *)&msg;
  504. iovec[0].iov_len = sizeof (msg);
  505. iovec[1].iov_base = _name;
  506. iovec[1].iov_len = name_len;
  507. res = totempg_groups_mcast_joined (
  508. sync_group_handle, iovec, 2, TOTEMPG_AGREED);
  509. if (res == 0) {
  510. /*
  511. * We managed to multicast the message so delete the token callback
  512. * for the sync request.
  513. */
  514. totempg_callback_token_destroy (&sync_request_token_handle);
  515. }
  516. /*
  517. * if we failed to multicast the message, this function will be called
  518. * again.
  519. */
  520. LEAVE("");
  521. return (0);
  522. }
  523. int sync_in_process (void)
  524. {
  525. return (sync_processing);
  526. }
  527. int sync_primary_designated (void)
  528. {
  529. if (vsf_none == 1) {
  530. return (1);
  531. } else {
  532. return (vsf_iface->primary());
  533. }
  534. }
  535. /**
  536. * Execute synchronization upon request for the named service
  537. * @param name
  538. *
  539. * @return int
  540. */
  541. int sync_request (char *name)
  542. {
  543. assert (name != NULL);
  544. ENTER("'%s'", name);
  545. if (sync_processing) {
  546. return -1;
  547. }
  548. totempg_callback_token_create (&sync_request_token_handle,
  549. TOTEM_CALLBACK_TOKEN_SENT, 0, /* don't delete after callback */
  550. sync_request_send, name);
  551. LEAVE("");
  552. return 0;
  553. }