|
|
@@ -114,7 +114,7 @@ struct totempg_mcast {
|
|
|
struct totempg_mcast_header header;
|
|
|
unsigned char fragmented;
|
|
|
unsigned char continuation;
|
|
|
- short msg_count;
|
|
|
+ unsigned short msg_count;
|
|
|
/*
|
|
|
* short msg_len[msg_count];
|
|
|
*/
|
|
|
@@ -153,6 +153,7 @@ struct assembly {
|
|
|
struct in_addr addr;
|
|
|
unsigned char data[MESSAGE_SIZE_MAX];
|
|
|
int index;
|
|
|
+ unsigned char last_frag_num;
|
|
|
};
|
|
|
|
|
|
struct assembly *assembly_list[16]; // MAX PROCESSORS TODO
|
|
|
@@ -254,6 +255,7 @@ static void totempg_deliver_fn (
|
|
|
int a_i = 0;
|
|
|
int msg_count;
|
|
|
int continuation;
|
|
|
+ int start;
|
|
|
|
|
|
assembly = find_assembly (source_addr);
|
|
|
assert (assembly);
|
|
|
@@ -336,19 +338,48 @@ printf ("Message fragmented %d count %d\n", mcast->fragmented, mcast->msg_count)
|
|
|
iov_delv.iov_base = &assembly->data[0];
|
|
|
iov_delv.iov_len = assembly->index + msg_lens[0];
|
|
|
|
|
|
- for (i = 0; i < msg_count; i++) {
|
|
|
- /*
|
|
|
- * If the first packed message is a continuation
|
|
|
- * of a previous message, but the assembly buffer
|
|
|
- * is empty, then we need to discard it since we can't
|
|
|
- * assemble a complete message.
|
|
|
- */
|
|
|
- if (continuation && (assembly->index == 0)) {
|
|
|
+ /*
|
|
|
+ * Make sure that if this message is a continuation, that it
|
|
|
+ * matches the sequence number of the previous fragment.
|
|
|
+ * Also, if the first packed message is a continuation
|
|
|
+ * of a previous message, but the assembly buffer
|
|
|
+ * is empty, then we need to discard it since we can't
|
|
|
+ * assemble a complete message. Likewise, if this message isn't a
|
|
|
+ * continuation and the assembly buffer is empty, we have to discard
|
|
|
+ * the continued message.
|
|
|
+ */
|
|
|
+ start = 0;
|
|
|
+ if (continuation) {
|
|
|
+
|
|
|
+ if (continuation != assembly->last_frag_num) {
|
|
|
+ printf("Message continuation doesn't match previous frag e: %u - a: %u\n",
|
|
|
+ assembly->last_frag_num, continuation);
|
|
|
continuation = 0;
|
|
|
- } else {
|
|
|
- app_deliver_fn(source_addr, &iov_delv, 1,
|
|
|
- endian_conversion_required);
|
|
|
}
|
|
|
+
|
|
|
+ if ((assembly->index == 0) ||
|
|
|
+ (!continuation && assembly->index)) {
|
|
|
+ printf("Throwing away broken message: continuation %u, index %u\n",
|
|
|
+ continuation, assembly->index);
|
|
|
+ continuation = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * we decided to throw away the first continued message
|
|
|
+ * in this buffer, if continuation was set to zero.
|
|
|
+ */
|
|
|
+ if (!continuation) {
|
|
|
+ assembly->index += msg_lens[0];
|
|
|
+ iov_delv.iov_base = &assembly->data[assembly->index];
|
|
|
+ iov_delv.iov_len = msg_lens[1];
|
|
|
+ start = 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ for (i = start; i < msg_count; i++) {
|
|
|
+ app_deliver_fn(source_addr, &iov_delv, 1,
|
|
|
+ endian_conversion_required);
|
|
|
assembly->index += msg_lens[i];
|
|
|
iov_delv.iov_base = &assembly->data[assembly->index];
|
|
|
if (i < (msg_count - 1)) {
|
|
|
@@ -357,6 +388,7 @@ printf ("Message fragmented %d count %d\n", mcast->fragmented, mcast->msg_count)
|
|
|
}
|
|
|
|
|
|
if (mcast->fragmented) {
|
|
|
+ assembly->last_frag_num = mcast->fragmented;
|
|
|
if (mcast->msg_count > 1) {
|
|
|
memmove (&assembly->data[0],
|
|
|
&assembly->data[assembly->index],
|
|
|
@@ -366,6 +398,7 @@ printf ("Message fragmented %d count %d\n", mcast->fragmented, mcast->msg_count)
|
|
|
}
|
|
|
assembly->index += msg_lens[msg_count];
|
|
|
} else {
|
|
|
+ assembly->last_frag_num = 0;
|
|
|
assembly->index = 0;
|
|
|
}
|
|
|
}
|
|
|
@@ -473,6 +506,7 @@ int totempg_initialize (
|
|
|
return (res);
|
|
|
}
|
|
|
|
|
|
+static unsigned char next_fragment = 1;
|
|
|
|
|
|
/*
|
|
|
* Multicast a message
|
|
|
@@ -533,8 +567,13 @@ int totempg_mcast (
|
|
|
*/
|
|
|
if ((i < (iov_len - 1)) ||
|
|
|
((copy_base + copy_len) < iovec[i].iov_len)) {
|
|
|
- mcast.fragmented = 1;
|
|
|
- fragment_continuation = 1;
|
|
|
+ if (!next_fragment) {
|
|
|
+ next_fragment++;
|
|
|
+ }
|
|
|
+ fragment_continuation = next_fragment;
|
|
|
+ mcast.fragmented = next_fragment++;
|
|
|
+ assert(fragment_continuation != 0);
|
|
|
+ assert(mcast.fragmented != 0);
|
|
|
} else {
|
|
|
fragment_continuation = 0;
|
|
|
}
|