|
@@ -143,8 +143,9 @@ int fcc_remcast_current = 0;
|
|
|
enum message_type {
|
|
enum message_type {
|
|
|
MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
|
|
MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
|
|
|
MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
|
|
MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
|
|
|
- MESSAGE_TYPE_MEMB_JOIN = 2, /* membership join message */
|
|
|
|
|
- MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 3, /* membership commit token */
|
|
|
|
|
|
|
+ MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
|
|
|
|
|
+ MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
|
|
|
|
|
+ MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
/*
|
|
/*
|
|
@@ -345,6 +346,10 @@ struct memb_join {
|
|
|
unsigned long long ring_seq;
|
|
unsigned long long ring_seq;
|
|
|
} __attribute__((packed));
|
|
} __attribute__((packed));
|
|
|
|
|
|
|
|
|
|
+struct memb_merge_detect {
|
|
|
|
|
+ struct message_header header;
|
|
|
|
|
+} __attribute__((packed));
|
|
|
|
|
+
|
|
|
struct memb_commit_token_memb_entry {
|
|
struct memb_commit_token_memb_entry {
|
|
|
struct memb_ring_id ring_id;
|
|
struct memb_ring_id ring_id;
|
|
|
int aru;
|
|
int aru;
|
|
@@ -405,7 +410,7 @@ static struct iovec iov_encrypted = {
|
|
|
|
|
|
|
|
struct message_handlers {
|
|
struct message_handlers {
|
|
|
int count;
|
|
int count;
|
|
|
- int (*handler_functions[4]) (struct sockaddr_in *, struct iovec *, int, int, int);
|
|
|
|
|
|
|
+ int (*handler_functions[5]) (struct sockaddr_in *, struct iovec *, int, int, int);
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
poll_handle *totemsrp_poll_handle;
|
|
poll_handle *totemsrp_poll_handle;
|
|
@@ -430,8 +435,11 @@ void (*totemsrp_confchg_fn) (
|
|
|
* forward decls
|
|
* forward decls
|
|
|
*/
|
|
*/
|
|
|
static int message_handler_orf_token (struct sockaddr_in *, struct iovec *, int, int, int);
|
|
static int message_handler_orf_token (struct sockaddr_in *, struct iovec *, int, int, int);
|
|
|
|
|
+
|
|
|
static int message_handler_mcast (struct sockaddr_in *, struct iovec *, int, int, int);
|
|
static int message_handler_mcast (struct sockaddr_in *, struct iovec *, int, int, int);
|
|
|
|
|
|
|
|
|
|
+static int message_handler_memb_merge_detect (struct sockaddr_in *, struct iovec *, int, int, int);
|
|
|
|
|
+
|
|
|
static int message_handler_memb_join (struct sockaddr_in *, struct iovec *, int, int, int);
|
|
static int message_handler_memb_join (struct sockaddr_in *, struct iovec *, int, int, int);
|
|
|
|
|
|
|
|
static int message_handler_memb_commit_token (struct sockaddr_in *, struct iovec *, int, int, int);
|
|
static int message_handler_memb_commit_token (struct sockaddr_in *, struct iovec *, int, int, int);
|
|
@@ -466,6 +474,7 @@ struct message_handlers totemsrp_message_handlers = {
|
|
|
{
|
|
{
|
|
|
message_handler_orf_token,
|
|
message_handler_orf_token,
|
|
|
message_handler_mcast,
|
|
message_handler_mcast,
|
|
|
|
|
+ message_handler_memb_merge_detect,
|
|
|
message_handler_memb_join,
|
|
message_handler_memb_join,
|
|
|
message_handler_memb_commit_token
|
|
message_handler_memb_commit_token
|
|
|
}
|
|
}
|
|
@@ -899,6 +908,8 @@ static void memb_state_consensus_timeout_expired (void)
|
|
|
|
|
|
|
|
static int memb_join_message_send (void);
|
|
static int memb_join_message_send (void);
|
|
|
|
|
|
|
|
|
|
+static int memb_merge_detect_send (void);
|
|
|
|
|
+
|
|
|
/*
|
|
/*
|
|
|
* Timers used for various states of the membership algorithm
|
|
* Timers used for various states of the membership algorithm
|
|
|
*/
|
|
*/
|
|
@@ -2388,6 +2399,35 @@ int memb_join_message_send (void)
|
|
|
return (res);
|
|
return (res);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+static int memb_merge_detect_send (void)
|
|
|
|
|
+{
|
|
|
|
|
+ struct msghdr msghdr;
|
|
|
|
|
+ struct iovec iovec;
|
|
|
|
|
+ struct memb_merge_detect memb_merge_detect;
|
|
|
|
|
+ int res;
|
|
|
|
|
+
|
|
|
|
|
+ memb_merge_detect.header.type = MESSAGE_TYPE_MEMB_MERGE_DETECT;
|
|
|
|
|
+ memb_merge_detect.header.endian_detector = ENDIAN_LOCAL;
|
|
|
|
|
+ memb_merge_detect.header.encapsulated = 0;
|
|
|
|
|
+
|
|
|
|
|
+ iovec.iov_base = &memb_merge_detect;
|
|
|
|
|
+ iovec.iov_len = sizeof (struct memb_merge_detect);
|
|
|
|
|
+
|
|
|
|
|
+ encrypt_and_sign (&iovec, 1);
|
|
|
|
|
+
|
|
|
|
|
+ msghdr.msg_name = &sockaddr_in_mcast;
|
|
|
|
|
+ msghdr.msg_namelen = sizeof (struct sockaddr_in);
|
|
|
|
|
+ msghdr.msg_iov = &iov_encrypted;
|
|
|
|
|
+ msghdr.msg_iovlen = 1;
|
|
|
|
|
+ msghdr.msg_control = 0;
|
|
|
|
|
+ msghdr.msg_controllen = 0;
|
|
|
|
|
+ msghdr.msg_flags = 0;
|
|
|
|
|
+
|
|
|
|
|
+ res = sendmsg (totemsrp_sockets[0].mcast, &msghdr, MSG_NOSIGNAL | MSG_DONTWAIT);
|
|
|
|
|
+
|
|
|
|
|
+ return (res);
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
static void memb_ring_id_create_or_load (
|
|
static void memb_ring_id_create_or_load (
|
|
|
struct memb_ring_id *memb_ring_id)
|
|
struct memb_ring_id *memb_ring_id)
|
|
|
{
|
|
{
|
|
@@ -2818,6 +2858,9 @@ printf ("I held %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0);
|
|
|
reset_token_timeout (); // REVIEWED
|
|
reset_token_timeout (); // REVIEWED
|
|
|
if (forward_token == 0) {
|
|
if (forward_token == 0) {
|
|
|
reset_token_retransmit_timeout (); // REVIEWED
|
|
reset_token_retransmit_timeout (); // REVIEWED
|
|
|
|
|
+ if (memb_state == MEMB_STATE_OPERATIONAL) {
|
|
|
|
|
+ memb_merge_detect_send ();
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
token_callbacks_execute (TOTEM_CALLBACK_TOKEN_SENT);
|
|
token_callbacks_execute (TOTEM_CALLBACK_TOKEN_SENT);
|
|
@@ -3013,6 +3056,61 @@ printf ("got foreign message\n");
|
|
|
return (0);
|
|
return (0);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+static int message_handler_memb_merge_detect (
|
|
|
|
|
+ struct sockaddr_in *system_from,
|
|
|
|
|
+ struct iovec *iovec,
|
|
|
|
|
+ int iov_len,
|
|
|
|
|
+ int bytes_received,
|
|
|
|
|
+ int endian_conversion_needed)
|
|
|
|
|
+{
|
|
|
|
|
+
|
|
|
|
|
+printf ("merge detect\n");
|
|
|
|
|
+ /*
|
|
|
|
|
+ * Return if we are already aware of this configuration
|
|
|
|
|
+ */
|
|
|
|
|
+ if (memb_set_subset (&system_from->sin_addr,
|
|
|
|
|
+ 1,
|
|
|
|
|
+ my_new_memb_list,
|
|
|
|
|
+ my_new_memb_entries)) {
|
|
|
|
|
+
|
|
|
|
|
+ return (0);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ printf ("Merging configuration with rep %s\n", inet_ntoa (system_from->sin_addr));
|
|
|
|
|
+ /*
|
|
|
|
|
+ * Execute merge operation
|
|
|
|
|
+ */
|
|
|
|
|
+ switch (memb_state) {
|
|
|
|
|
+ case MEMB_STATE_OPERATIONAL:
|
|
|
|
|
+ memb_set_merge (&system_from->sin_addr, 1,
|
|
|
|
|
+ my_proc_list, &my_proc_list_entries);
|
|
|
|
|
+ memb_state_gather_enter ();
|
|
|
|
|
+ break;
|
|
|
|
|
+
|
|
|
|
|
+ case MEMB_STATE_GATHER:
|
|
|
|
|
+ if (!memb_set_subset (&system_from->sin_addr,
|
|
|
|
|
+ 1,
|
|
|
|
|
+ my_proc_list,
|
|
|
|
|
+ my_proc_list_entries)) {
|
|
|
|
|
+
|
|
|
|
|
+ memb_set_merge (&system_from->sin_addr, 1,
|
|
|
|
|
+ my_proc_list, &my_proc_list_entries);
|
|
|
|
|
+ memb_state_gather_enter ();
|
|
|
|
|
+ return (0);
|
|
|
|
|
+ }
|
|
|
|
|
+ break;
|
|
|
|
|
+
|
|
|
|
|
+ case MEMB_STATE_COMMIT:
|
|
|
|
|
+ /* discard message */
|
|
|
|
|
+ break;
|
|
|
|
|
+
|
|
|
|
|
+ case MEMB_STATE_RECOVERY:
|
|
|
|
|
+ /* discard message */
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+ return (0);
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
int memb_join_process (struct memb_join *memb_join, struct sockaddr_in *system_from)
|
|
int memb_join_process (struct memb_join *memb_join, struct sockaddr_in *system_from)
|
|
|
{
|
|
{
|
|
|
struct memb_commit_token my_commit_token;
|
|
struct memb_commit_token my_commit_token;
|