|
@@ -526,6 +526,9 @@ static void orf_token_endian_convert (struct orf_token *in, struct orf_token *ou
|
|
|
static void memb_commit_token_endian_convert (struct memb_commit_token *in, struct memb_commit_token *out);
|
|
static void memb_commit_token_endian_convert (struct memb_commit_token *in, struct memb_commit_token *out);
|
|
|
static void memb_join_endian_convert (struct memb_join *in, struct memb_join *out);
|
|
static void memb_join_endian_convert (struct memb_join *in, struct memb_join *out);
|
|
|
static void mcast_endian_convert (struct mcast *in, struct mcast *out);
|
|
static void mcast_endian_convert (struct mcast *in, struct mcast *out);
|
|
|
|
|
+static void memb_merge_detect_endian_convert (
|
|
|
|
|
+ struct memb_merge_detect *in,
|
|
|
|
|
+ struct memb_merge_detect *out);
|
|
|
static void timer_function_orf_token_timeout (void *data);
|
|
static void timer_function_orf_token_timeout (void *data);
|
|
|
static void timer_function_heartbeat_timeout (void *data);
|
|
static void timer_function_heartbeat_timeout (void *data);
|
|
|
static void timer_function_token_retransmit_timeout (void *data);
|
|
static void timer_function_token_retransmit_timeout (void *data);
|
|
@@ -2686,8 +2689,7 @@ static int message_handler_orf_token (
|
|
|
{
|
|
{
|
|
|
char token_storage[1500];
|
|
char token_storage[1500];
|
|
|
char token_convert[1500];
|
|
char token_convert[1500];
|
|
|
- struct orf_token *token;
|
|
|
|
|
- struct orf_token *token_ref = (struct orf_token *)msg;
|
|
|
|
|
|
|
+ struct orf_token *token = NULL;
|
|
|
int transmits_allowed;
|
|
int transmits_allowed;
|
|
|
int forward_token;
|
|
int forward_token;
|
|
|
int mcasted;
|
|
int mcasted;
|
|
@@ -2713,20 +2715,12 @@ if (random()%100 < 10) {
|
|
|
}
|
|
}
|
|
|
#endif
|
|
#endif
|
|
|
|
|
|
|
|
- /*
|
|
|
|
|
- * Handle merge detection timeout
|
|
|
|
|
- */
|
|
|
|
|
- if (token_ref->seq == instance->my_last_seq) {
|
|
|
|
|
- start_merge_detect_timeout (instance);
|
|
|
|
|
- instance->my_seq_unchanged += 1;
|
|
|
|
|
- } else {
|
|
|
|
|
- cancel_merge_detect_timeout (instance);
|
|
|
|
|
- cancel_token_hold_retransmit_timeout (instance);
|
|
|
|
|
- instance->my_seq_unchanged = 0;
|
|
|
|
|
|
|
+ if (endian_conversion_needed) {
|
|
|
|
|
+ orf_token_endian_convert ((struct orf_token *)msg,
|
|
|
|
|
+ (struct orf_token *)token_convert);
|
|
|
|
|
+ msg = (struct orf_token *)token_convert;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- instance->my_last_seq = token_ref->seq;
|
|
|
|
|
-
|
|
|
|
|
/*
|
|
/*
|
|
|
* Make copy of token and retransmit list in case we have
|
|
* Make copy of token and retransmit list in case we have
|
|
|
* to flush incoming messages from the kernel queue
|
|
* to flush incoming messages from the kernel queue
|
|
@@ -2736,11 +2730,21 @@ if (random()%100 < 10) {
|
|
|
memcpy (&token->rtr_list[0], msg + sizeof (struct orf_token),
|
|
memcpy (&token->rtr_list[0], msg + sizeof (struct orf_token),
|
|
|
sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
|
|
sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
|
|
|
|
|
|
|
|
- if (endian_conversion_needed) {
|
|
|
|
|
- orf_token_endian_convert (token, (struct orf_token *)token_convert);
|
|
|
|
|
- token = (struct orf_token *)token_convert;
|
|
|
|
|
|
|
+
|
|
|
|
|
+ /*
|
|
|
|
|
+ * Handle merge detection timeout
|
|
|
|
|
+ */
|
|
|
|
|
+ if (token->seq == instance->my_last_seq) {
|
|
|
|
|
+ start_merge_detect_timeout (instance);
|
|
|
|
|
+ instance->my_seq_unchanged += 1;
|
|
|
|
|
+ } else {
|
|
|
|
|
+ cancel_merge_detect_timeout (instance);
|
|
|
|
|
+ cancel_token_hold_retransmit_timeout (instance);
|
|
|
|
|
+ instance->my_seq_unchanged = 0;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ instance->my_last_seq = token->seq;
|
|
|
|
|
+
|
|
|
totemrrp_recv_flush (instance->totemrrp_handle);
|
|
totemrrp_recv_flush (instance->totemrrp_handle);
|
|
|
|
|
|
|
|
/*
|
|
/*
|
|
@@ -2830,7 +2834,6 @@ if (random()%100 < 10) {
|
|
|
|
|
|
|
|
if (sq_lt_compare (instance->last_released + MISSING_MCAST_WINDOW, token->seq + TRANSMITS_ALLOWED)) {
|
|
if (sq_lt_compare (instance->last_released + MISSING_MCAST_WINDOW, token->seq + TRANSMITS_ALLOWED)) {
|
|
|
transmits_allowed = 0;
|
|
transmits_allowed = 0;
|
|
|
-printf ("zero \n");
|
|
|
|
|
}
|
|
}
|
|
|
mcasted = orf_token_mcast (instance, token, transmits_allowed, system_from);
|
|
mcasted = orf_token_mcast (instance, token, transmits_allowed, system_from);
|
|
|
if (sq_lt_compare (instance->my_aru, token->aru) ||
|
|
if (sq_lt_compare (instance->my_aru, token->aru) ||
|
|
@@ -2983,6 +2986,7 @@ static void messages_deliver_to_app (
|
|
|
struct mcast *mcast;
|
|
struct mcast *mcast;
|
|
|
unsigned int range = 0;
|
|
unsigned int range = 0;
|
|
|
unsigned int my_high_delivered_stored = 0;
|
|
unsigned int my_high_delivered_stored = 0;
|
|
|
|
|
+ struct totem_ip_address msg_source;
|
|
|
|
|
|
|
|
instance->totemsrp_log_printf (instance->totemsrp_log_level_debug,
|
|
instance->totemsrp_log_printf (instance->totemsrp_log_level_debug,
|
|
|
"Delivering %x to %x\n", instance->my_high_delivered,
|
|
"Delivering %x to %x\n", instance->my_high_delivered,
|
|
@@ -3050,13 +3054,19 @@ static void messages_deliver_to_app (
|
|
|
"Delivering MCAST message with seq %x to pending delivery queue\n",
|
|
"Delivering MCAST message with seq %x to pending delivery queue\n",
|
|
|
mcast->seq);
|
|
mcast->seq);
|
|
|
|
|
|
|
|
|
|
+ if (mcast->header.endian_detector == ENDIAN_LOCAL) {
|
|
|
|
|
+ totemip_copy (&msg_source, &mcast->source);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ totemip_copy_endian_convert (&msg_source, &mcast->source);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/*
|
|
/*
|
|
|
* Message is locally originated multicast
|
|
* Message is locally originated multicast
|
|
|
*/
|
|
*/
|
|
|
if (sort_queue_item_p->iov_len > 1 &&
|
|
if (sort_queue_item_p->iov_len > 1 &&
|
|
|
sort_queue_item_p->iovec[0].iov_len == sizeof (struct mcast)) {
|
|
sort_queue_item_p->iovec[0].iov_len == sizeof (struct mcast)) {
|
|
|
instance->totemsrp_deliver_fn (
|
|
instance->totemsrp_deliver_fn (
|
|
|
- &mcast->source,
|
|
|
|
|
|
|
+ &msg_source,
|
|
|
&sort_queue_item_p->iovec[1],
|
|
&sort_queue_item_p->iovec[1],
|
|
|
sort_queue_item_p->iov_len - 1,
|
|
sort_queue_item_p->iov_len - 1,
|
|
|
mcast->header.endian_detector != ENDIAN_LOCAL);
|
|
mcast->header.endian_detector != ENDIAN_LOCAL);
|
|
@@ -3065,7 +3075,7 @@ static void messages_deliver_to_app (
|
|
|
sort_queue_item_p->iovec[0].iov_base += sizeof (struct mcast);
|
|
sort_queue_item_p->iovec[0].iov_base += sizeof (struct mcast);
|
|
|
|
|
|
|
|
instance->totemsrp_deliver_fn (
|
|
instance->totemsrp_deliver_fn (
|
|
|
- &mcast->source,
|
|
|
|
|
|
|
+ &msg_source,
|
|
|
sort_queue_item_p->iovec,
|
|
sort_queue_item_p->iovec,
|
|
|
sort_queue_item_p->iov_len,
|
|
sort_queue_item_p->iov_len,
|
|
|
mcast->header.endian_detector != ENDIAN_LOCAL);
|
|
mcast->header.endian_detector != ENDIAN_LOCAL);
|
|
@@ -3220,6 +3230,10 @@ static int message_handler_memb_merge_detect (
|
|
|
{
|
|
{
|
|
|
struct memb_merge_detect *memb_merge_detect = (struct memb_merge_detect *)msg;
|
|
struct memb_merge_detect *memb_merge_detect = (struct memb_merge_detect *)msg;
|
|
|
|
|
|
|
|
|
|
+ if (endian_conversion_needed) {
|
|
|
|
|
+ memb_merge_detect_endian_convert (msg, msg);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/*
|
|
/*
|
|
|
* do nothing if this is a merge detect from this configuration
|
|
* do nothing if this is a merge detect from this configuration
|
|
|
*/
|
|
*/
|
|
@@ -3341,12 +3355,10 @@ static void memb_join_endian_convert (struct memb_join *in, struct memb_join *ou
|
|
|
out->failed_list_entries = swab32 (in->failed_list_entries);
|
|
out->failed_list_entries = swab32 (in->failed_list_entries);
|
|
|
out->ring_seq = swab64 (in->ring_seq);
|
|
out->ring_seq = swab64 (in->ring_seq);
|
|
|
for (i = 0; i < out->proc_list_entries; i++) {
|
|
for (i = 0; i < out->proc_list_entries; i++) {
|
|
|
- totemip_copy(&out->proc_list[i], &in->proc_list[i]);
|
|
|
|
|
- out->proc_list[i].family = swab16(out->proc_list[i].family);
|
|
|
|
|
|
|
+ totemip_copy_endian_convert(&out->proc_list[i], &in->proc_list[i]);
|
|
|
}
|
|
}
|
|
|
for (i = 0; i < out->failed_list_entries; i++) {
|
|
for (i = 0; i < out->failed_list_entries; i++) {
|
|
|
- totemip_copy(&out->failed_list[i], &in->failed_list[i]);
|
|
|
|
|
- out->failed_list[i].family = swab16(out->failed_list[i].family);
|
|
|
|
|
|
|
+ totemip_copy_endian_convert(&out->failed_list[i], &in->failed_list[i]);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -3358,24 +3370,27 @@ static void memb_commit_token_endian_convert (struct memb_commit_token *in, stru
|
|
|
out->header.endian_detector = ENDIAN_LOCAL;
|
|
out->header.endian_detector = ENDIAN_LOCAL;
|
|
|
out->header.nodeid = swab32 (in->header.nodeid);
|
|
out->header.nodeid = swab32 (in->header.nodeid);
|
|
|
out->token_seq = swab32 (in->token_seq);
|
|
out->token_seq = swab32 (in->token_seq);
|
|
|
- totemip_copy(&out->ring_id.rep, &in->ring_id.rep);
|
|
|
|
|
|
|
+ totemip_copy_endian_convert(&out->ring_id.rep, &in->ring_id.rep);
|
|
|
out->ring_id.seq = swab64 (in->ring_id.seq);
|
|
out->ring_id.seq = swab64 (in->ring_id.seq);
|
|
|
out->retrans_flg = swab32 (in->retrans_flg);
|
|
out->retrans_flg = swab32 (in->retrans_flg);
|
|
|
out->memb_index = swab32 (in->memb_index);
|
|
out->memb_index = swab32 (in->memb_index);
|
|
|
out->addr_entries = swab32 (in->addr_entries);
|
|
out->addr_entries = swab32 (in->addr_entries);
|
|
|
for (i = 0; i < out->addr_entries; i++) {
|
|
for (i = 0; i < out->addr_entries; i++) {
|
|
|
- totemip_copy(&out->addr[i], &in->addr[i]);
|
|
|
|
|
- out->addr[i].family = swab16(in->addr[i].family);
|
|
|
|
|
-
|
|
|
|
|
- totemip_copy(&out->memb_list[i].ring_id.rep,
|
|
|
|
|
- &in->memb_list[i].ring_id.rep);
|
|
|
|
|
- out->memb_list[i].ring_id.rep.family = swab16(in->memb_list[i].ring_id.rep.family);
|
|
|
|
|
|
|
+ totemip_copy_endian_convert(&out->addr[i], &in->addr[i]);
|
|
|
|
|
|
|
|
- out->memb_list[i].ring_id.seq =
|
|
|
|
|
- swab64 (in->memb_list[i].ring_id.seq);
|
|
|
|
|
- out->memb_list[i].aru = swab32 (in->memb_list[i].aru);
|
|
|
|
|
- out->memb_list[i].high_delivered = swab32 (in->memb_list[i].high_delivered);
|
|
|
|
|
- out->memb_list[i].received_flg = swab32 (in->memb_list[i].received_flg);
|
|
|
|
|
|
|
+ /*
|
|
|
|
|
+ * Only convert the memb entry if it has been set
|
|
|
|
|
+ */
|
|
|
|
|
+ if (in->memb_list[i].ring_id.rep.family != 0) {
|
|
|
|
|
+ totemip_copy_endian_convert(&out->memb_list[i].ring_id.rep,
|
|
|
|
|
+ &in->memb_list[i].ring_id.rep);
|
|
|
|
|
+
|
|
|
|
|
+ out->memb_list[i].ring_id.seq =
|
|
|
|
|
+ swab64 (in->memb_list[i].ring_id.seq);
|
|
|
|
|
+ out->memb_list[i].aru = swab32 (in->memb_list[i].aru);
|
|
|
|
|
+ out->memb_list[i].high_delivered = swab32 (in->memb_list[i].high_delivered);
|
|
|
|
|
+ out->memb_list[i].received_flg = swab32 (in->memb_list[i].received_flg);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -3389,16 +3404,14 @@ static void orf_token_endian_convert (struct orf_token *in, struct orf_token *ou
|
|
|
out->seq = swab32 (in->seq);
|
|
out->seq = swab32 (in->seq);
|
|
|
out->token_seq = swab32 (in->token_seq);
|
|
out->token_seq = swab32 (in->token_seq);
|
|
|
out->aru = swab32 (in->aru);
|
|
out->aru = swab32 (in->aru);
|
|
|
- totemip_copy(&out->ring_id.rep, &in->ring_id.rep);
|
|
|
|
|
- out->ring_id.rep.family = swab16(in->ring_id.rep.family);
|
|
|
|
|
-
|
|
|
|
|
|
|
+ totemip_copy_endian_convert(&out->ring_id.rep, &in->ring_id.rep);
|
|
|
|
|
+ totemip_copy_endian_convert(&out->aru_addr, &in->aru_addr);
|
|
|
out->ring_id.seq = swab64 (in->ring_id.seq);
|
|
out->ring_id.seq = swab64 (in->ring_id.seq);
|
|
|
out->fcc = swab32 (in->fcc);
|
|
out->fcc = swab32 (in->fcc);
|
|
|
out->retrans_flg = swab32 (in->retrans_flg);
|
|
out->retrans_flg = swab32 (in->retrans_flg);
|
|
|
out->rtr_list_entries = swab32 (in->rtr_list_entries);
|
|
out->rtr_list_entries = swab32 (in->rtr_list_entries);
|
|
|
for (i = 0; i < out->rtr_list_entries; i++) {
|
|
for (i = 0; i < out->rtr_list_entries; i++) {
|
|
|
- totemip_copy(&out->rtr_list[i].ring_id.rep, &in->rtr_list[i].ring_id.rep);
|
|
|
|
|
- out->rtr_list[i].ring_id.rep.family = swab16(in->rtr_list[i].ring_id.rep.family);
|
|
|
|
|
|
|
+ totemip_copy_endian_convert(&out->rtr_list[i].ring_id.rep, &in->rtr_list[i].ring_id.rep);
|
|
|
out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
|
|
out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
|
|
|
out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
|
|
out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
|
|
|
}
|
|
}
|
|
@@ -3410,13 +3423,23 @@ static void mcast_endian_convert (struct mcast *in, struct mcast *out)
|
|
|
out->header.endian_detector = ENDIAN_LOCAL;
|
|
out->header.endian_detector = ENDIAN_LOCAL;
|
|
|
out->header.nodeid = swab32 (in->header.nodeid);
|
|
out->header.nodeid = swab32 (in->header.nodeid);
|
|
|
out->seq = swab32 (in->seq);
|
|
out->seq = swab32 (in->seq);
|
|
|
- totemip_copy(&out->ring_id.rep, &in->ring_id.rep);
|
|
|
|
|
- out->ring_id.rep.family = swab16(in->ring_id.rep.family);
|
|
|
|
|
|
|
+ totemip_copy_endian_convert(&out->ring_id.rep, &in->ring_id.rep);
|
|
|
|
|
+ totemip_copy_endian_convert(&out->source, &in->source);
|
|
|
out->ring_id.seq = swab64 (in->ring_id.seq);
|
|
out->ring_id.seq = swab64 (in->ring_id.seq);
|
|
|
- out->source = in->source;
|
|
|
|
|
out->guarantee = in->guarantee;
|
|
out->guarantee = in->guarantee;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+static void memb_merge_detect_endian_convert (
|
|
|
|
|
+ struct memb_merge_detect *in,
|
|
|
|
|
+ struct memb_merge_detect *out)
|
|
|
|
|
+{
|
|
|
|
|
+ out->header.type = in->header.type;
|
|
|
|
|
+ out->header.endian_detector = ENDIAN_LOCAL;
|
|
|
|
|
+ out->header.nodeid = swab32 (in->header.nodeid);
|
|
|
|
|
+ totemip_copy_endian_convert(&out->ring_id.rep, &in->ring_id.rep);
|
|
|
|
|
+ out->ring_id.seq = swab64 (in->ring_id.seq);
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
static int message_handler_memb_join (
|
|
static int message_handler_memb_join (
|
|
|
struct totemsrp_instance *instance,
|
|
struct totemsrp_instance *instance,
|
|
|
struct totem_ip_address *system_from,
|
|
struct totem_ip_address *system_from,
|