|
@@ -90,6 +90,7 @@
|
|
|
#include <pthread.h>
|
|
#include <pthread.h>
|
|
|
|
|
|
|
|
#include "../include/hdb.h"
|
|
#include "../include/hdb.h"
|
|
|
|
|
+#include "../include/list.h"
|
|
|
#include "totempg.h"
|
|
#include "totempg.h"
|
|
|
#include "totemmrp.h"
|
|
#include "totemmrp.h"
|
|
|
#include "totemsrp.h"
|
|
#include "totemsrp.h"
|
|
@@ -157,10 +158,12 @@ struct assembly {
|
|
|
unsigned char data[MESSAGE_SIZE_MAX];
|
|
unsigned char data[MESSAGE_SIZE_MAX];
|
|
|
int index;
|
|
int index;
|
|
|
unsigned char last_frag_num;
|
|
unsigned char last_frag_num;
|
|
|
|
|
+ struct list_head list;
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
-struct assembly *assembly_list[PROCESSOR_COUNT_MAX];
|
|
|
|
|
-int assembly_list_entries = 0;
|
|
|
|
|
|
|
+DECLARE_LIST_INIT(assembly_list_inuse);
|
|
|
|
|
+
|
|
|
|
|
+DECLARE_LIST_INIT(assembly_list_free);
|
|
|
|
|
|
|
|
/*
|
|
/*
|
|
|
* Staging buffer for packed messages. Messages are staged in this buffer
|
|
* Staging buffer for packed messages. Messages are staged in this buffer
|
|
@@ -181,6 +184,7 @@ static int fragment_continuation = 0;
|
|
|
static struct iovec iov_delv;
|
|
static struct iovec iov_delv;
|
|
|
|
|
|
|
|
static unsigned int totempg_max_handle = 0;
|
|
static unsigned int totempg_max_handle = 0;
|
|
|
|
|
+
|
|
|
struct totempg_group_instance {
|
|
struct totempg_group_instance {
|
|
|
void (*deliver_fn) (
|
|
void (*deliver_fn) (
|
|
|
unsigned int nodeid,
|
|
unsigned int nodeid,
|
|
@@ -220,16 +224,56 @@ static pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER;
|
|
|
#define log_printf(level, format, args...) \
|
|
#define log_printf(level, format, args...) \
|
|
|
totempg_log_printf (__FILE__, __LINE__, level, format, ##args)
|
|
totempg_log_printf (__FILE__, __LINE__, level, format, ##args)
|
|
|
|
|
|
|
|
-static struct assembly *find_assembly (unsigned int nodeid)
|
|
|
|
|
|
|
+static struct assembly *assembly_ref (unsigned int nodeid)
|
|
|
{
|
|
{
|
|
|
- int i;
|
|
|
|
|
|
|
+ struct assembly *assembly;
|
|
|
|
|
+ struct list_head *list;
|
|
|
|
|
+
|
|
|
|
|
+ /*
|
|
|
|
|
+ * Search inuse list for node id and return assembly buffer if found
|
|
|
|
|
+ */
|
|
|
|
|
+ for (list = assembly_list_inuse.next;
|
|
|
|
|
+ list != &assembly_list_inuse;
|
|
|
|
|
+ list = list->next) {
|
|
|
|
|
|
|
|
- for (i = 0; i < assembly_list_entries; i++) {
|
|
|
|
|
- if (nodeid == assembly_list[i]->nodeid) {
|
|
|
|
|
- return (assembly_list[i]);
|
|
|
|
|
|
|
+ assembly = list_entry (list, struct assembly, list);
|
|
|
|
|
+
|
|
|
|
|
+ if (nodeid == assembly->nodeid) {
|
|
|
|
|
+ return (assembly);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- return (0);
|
|
|
|
|
|
|
+
|
|
|
|
|
+ /*
|
|
|
|
|
+ * Nothing found in inuse list get one from free list if available
|
|
|
|
|
+ */
|
|
|
|
|
+ if (list_empty (&assembly_list_free) == 0) {
|
|
|
|
|
+ assembly = list_entry (assembly_list_free.next, struct assembly, list);
|
|
|
|
|
+ list_del (&assembly->list);
|
|
|
|
|
+ list_add (&assembly->list, &assembly_list_inuse);
|
|
|
|
|
+ assembly->nodeid = nodeid;
|
|
|
|
|
+ return (assembly);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /*
|
|
|
|
|
+ * Nothing available in inuse or free list, so allocate a new one
|
|
|
|
|
+ */
|
|
|
|
|
+ assembly = malloc (sizeof (struct assembly));
|
|
|
|
|
+ memset (assembly, 0, sizeof (struct assembly));
|
|
|
|
|
+ /*
|
|
|
|
|
+ * TODO handle memory allocation failure here
|
|
|
|
|
+ */
|
|
|
|
|
+ assert (assembly);
|
|
|
|
|
+ assembly->nodeid = nodeid;
|
|
|
|
|
+ list_init (&assembly->list);
|
|
|
|
|
+ list_add (&assembly->list, &assembly_list_inuse);
|
|
|
|
|
+
|
|
|
|
|
+ return (assembly);
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+void assembly_deref (struct assembly *assembly)
|
|
|
|
|
+{
|
|
|
|
|
+ list_del (&assembly->list);
|
|
|
|
|
+ list_add (&assembly->list, &assembly_list_free);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
static inline void app_confchg_fn (
|
|
static inline void app_confchg_fn (
|
|
@@ -264,6 +308,7 @@ static inline void app_confchg_fn (
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
static inline void group_endian_convert (
|
|
static inline void group_endian_convert (
|
|
|
struct iovec *iovec)
|
|
struct iovec *iovec)
|
|
|
{
|
|
{
|
|
@@ -363,45 +408,7 @@ static void totempg_confchg_fn (
|
|
|
unsigned int *joined_list, int joined_list_entries,
|
|
unsigned int *joined_list, int joined_list_entries,
|
|
|
struct memb_ring_id *ring_id)
|
|
struct memb_ring_id *ring_id)
|
|
|
{
|
|
{
|
|
|
- int i;
|
|
|
|
|
- int j;
|
|
|
|
|
- int found;
|
|
|
|
|
-
|
|
|
|
|
- /*
|
|
|
|
|
- * Clean out the assembly area for nodes that have left the
|
|
|
|
|
- * membership. If they return, we don't want any stale message
|
|
|
|
|
- * data that may be there.
|
|
|
|
|
- */
|
|
|
|
|
- for (i = 0; i < left_list_entries; i++) {
|
|
|
|
|
- for (j = 0; j < assembly_list_entries; j++) {
|
|
|
|
|
- if (left_list[i] == assembly_list[j]->nodeid) {
|
|
|
|
|
- assembly_list[j]->index = 0;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /*
|
|
|
|
|
- * Create a message assembly area for any new members.
|
|
|
|
|
- */
|
|
|
|
|
- for (i = 0; i < member_list_entries; i++) {
|
|
|
|
|
- found = 0;
|
|
|
|
|
- for (j = 0; j < assembly_list_entries; j++) {
|
|
|
|
|
- if (member_list[i] == assembly_list[j]->nodeid) {
|
|
|
|
|
- found = 1;
|
|
|
|
|
- break;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- if (found == 0) {
|
|
|
|
|
- assembly_list[assembly_list_entries] =
|
|
|
|
|
- malloc (sizeof (struct assembly));
|
|
|
|
|
- assert (assembly_list[assembly_list_entries]); // TODO
|
|
|
|
|
- assembly_list[assembly_list_entries]->nodeid =
|
|
|
|
|
- member_list[i];
|
|
|
|
|
- assembly_list[assembly_list_entries]->index = 0;
|
|
|
|
|
- assembly_list_entries += 1;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
|
|
+// TODO optimize this
|
|
|
app_confchg_fn (configuration_type,
|
|
app_confchg_fn (configuration_type,
|
|
|
member_list, member_list_entries,
|
|
member_list, member_list_entries,
|
|
|
left_list, left_list_entries,
|
|
left_list, left_list_entries,
|
|
@@ -426,7 +433,7 @@ static void totempg_deliver_fn (
|
|
|
int continuation;
|
|
int continuation;
|
|
|
int start;
|
|
int start;
|
|
|
|
|
|
|
|
- assembly = find_assembly (nodeid);
|
|
|
|
|
|
|
+ assembly = assembly_ref (nodeid);
|
|
|
assert (assembly);
|
|
assert (assembly);
|
|
|
|
|
|
|
|
/*
|
|
/*
|
|
@@ -550,7 +557,17 @@ static void totempg_deliver_fn (
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if (mcast->fragmented) {
|
|
|
|
|
|
|
+ if (mcast->fragmented == 0) {
|
|
|
|
|
+ /*
|
|
|
|
|
+ * End of messages, dereference assembly struct
|
|
|
|
|
+ */
|
|
|
|
|
+ assembly->last_frag_num = 0;
|
|
|
|
|
+ assembly->index = 0;
|
|
|
|
|
+ assembly_deref (assembly);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ /*
|
|
|
|
|
+ * Message is fragmented, keep around assembly list
|
|
|
|
|
+ */
|
|
|
assembly->last_frag_num = mcast->fragmented;
|
|
assembly->last_frag_num = mcast->fragmented;
|
|
|
if (mcast->msg_count > 1) {
|
|
if (mcast->msg_count > 1) {
|
|
|
memmove (&assembly->data[0],
|
|
memmove (&assembly->data[0],
|
|
@@ -560,9 +577,6 @@ static void totempg_deliver_fn (
|
|
|
assembly->index = 0;
|
|
assembly->index = 0;
|
|
|
}
|
|
}
|
|
|
assembly->index += msg_lens[msg_count];
|
|
assembly->index += msg_lens[msg_count];
|
|
|
- } else {
|
|
|
|
|
- assembly->last_frag_num = 0;
|
|
|
|
|
- assembly->index = 0;
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|