|
|
@@ -152,16 +152,6 @@ struct totempg_mcast {
|
|
|
#define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \
|
|
|
sizeof (struct totempg_mcast))
|
|
|
|
|
|
-/*
|
|
|
- * Local variables used for packing small messages
|
|
|
- */
|
|
|
-static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX];
|
|
|
-
|
|
|
-static int mcast_packed_msg_count = 0;
|
|
|
-
|
|
|
-static int totempg_reserved = 1;
|
|
|
-
|
|
|
-static unsigned int totempg_size_limit;
|
|
|
|
|
|
/*
|
|
|
* Function and data used to log messages
|
|
|
@@ -211,24 +201,41 @@ DECLARE_LIST_INIT(assembly_list_free_trans);
|
|
|
DECLARE_LIST_INIT(assembly_list_free);
|
|
|
|
|
|
/*
|
|
|
- * Staging buffer for packed messages. Messages are staged in this buffer
|
|
|
- * before sending. Multiple messages may fit which cuts down on the
|
|
|
- * number of mcasts sent. If a message doesn't completely fit, then
|
|
|
- * the mcast header has a fragment bit set that says that there are more
|
|
|
- * data to follow. fragment_size is an index into the buffer. It indicates
|
|
|
- * the size of message data and where to place new message data.
|
|
|
- * fragment_contuation indicates whether the first packed message in
|
|
|
- * the buffer is a continuation of a previously packed fragment.
|
|
|
+ * Structure for storing totem_pg contexts so they can be switched
|
|
|
*/
|
|
|
-static unsigned char *fragmentation_data;
|
|
|
+struct totempg_context {
|
|
|
+ /*
|
|
|
+ * Staging buffer for packed messages. Messages are staged in this buffer
|
|
|
+ * before sending. Multiple messages may fit which cuts down on the
|
|
|
+ * number of mcasts sent. If a message doesn't completely fit, then
|
|
|
+ * the mcast header has a fragment bit set that says that there are more
|
|
|
+ * data to follow. fragment_size is an index into the buffer. It indicates
|
|
|
+ * the size of message data and where to place new message data.
|
|
|
+ * fragment_contuation indicates whether the first packed message in
|
|
|
+ * the buffer is a continuation of a previously packed fragment.
|
|
|
+ */
|
|
|
+ unsigned char *fragmentation_data;
|
|
|
|
|
|
-static int fragment_size = 0;
|
|
|
+ int fragment_size;
|
|
|
|
|
|
-static int fragment_continuation = 0;
|
|
|
+ int fragment_continuation;
|
|
|
|
|
|
-static int totempg_waiting_transack = 0;
|
|
|
+ unsigned char next_fragment;
|
|
|
|
|
|
-static unsigned int totempg_max_handle = 0;
|
|
|
+ /*
|
|
|
+ * Local variables used for packing small messages
|
|
|
+ */
|
|
|
+ unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX];
|
|
|
+
|
|
|
+ int mcast_packed_msg_count;
|
|
|
+
|
|
|
+ int totempg_reserved;
|
|
|
+};
|
|
|
+
|
|
|
+#define TOTEMPG_NO_CONTEXTS 2
|
|
|
+
|
|
|
+static struct totempg_context totempg_contexts_array[TOTEMPG_NO_CONTEXTS];
|
|
|
+static struct totempg_context *totempg_active_context;
|
|
|
|
|
|
struct totempg_group_instance {
|
|
|
void (*deliver_fn) (
|
|
|
@@ -251,7 +258,9 @@ struct totempg_group_instance {
|
|
|
|
|
|
DECLARE_HDB_DATABASE (totempg_groups_instance_database,NULL);
|
|
|
|
|
|
-static unsigned char next_fragment = 1;
|
|
|
+static unsigned int totempg_max_handle = 0;
|
|
|
+static int totempg_waiting_transack = 0;
|
|
|
+static unsigned int totempg_size_limit = 0;
|
|
|
|
|
|
static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
|
|
|
|
|
|
@@ -273,12 +282,23 @@ static int msg_count_send_ok (int msg_count);
|
|
|
|
|
|
static int byte_count_send_ok (int byte_count);
|
|
|
|
|
|
+static void totempg_set_context(int context_no)
|
|
|
+{
|
|
|
+ totempg_active_context = &totempg_contexts_array[context_no];
|
|
|
+}
|
|
|
+
|
|
|
static void totempg_waiting_trans_ack_cb (int waiting_trans_ack)
|
|
|
{
|
|
|
log_printf(LOG_DEBUG, "waiting_trans_ack changed to %u", waiting_trans_ack);
|
|
|
totempg_waiting_transack = waiting_trans_ack;
|
|
|
+ if (!waiting_trans_ack) {
|
|
|
+ totempg_set_context(0);
|
|
|
+ } else {
|
|
|
+ totempg_set_context(1);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+
|
|
|
static struct assembly *assembly_ref (unsigned int nodeid)
|
|
|
{
|
|
|
struct assembly *assembly;
|
|
|
@@ -746,9 +766,10 @@ int callback_token_received_fn (enum totem_callback_token_type type,
|
|
|
struct totempg_mcast mcast;
|
|
|
struct iovec iovecs[3];
|
|
|
int res;
|
|
|
+ struct totempg_context *con = totempg_active_context;
|
|
|
|
|
|
pthread_mutex_lock (&mcast_msg_mutex);
|
|
|
- if (mcast_packed_msg_count == 0) {
|
|
|
+ if (con->mcast_packed_msg_count == 0) {
|
|
|
pthread_mutex_unlock (&mcast_msg_mutex);
|
|
|
return (0);
|
|
|
}
|
|
|
@@ -763,21 +784,21 @@ int callback_token_received_fn (enum totem_callback_token_type type,
|
|
|
* Was the first message in this buffer a continuation of a
|
|
|
* fragmented message?
|
|
|
*/
|
|
|
- mcast.continuation = fragment_continuation;
|
|
|
- fragment_continuation = 0;
|
|
|
+ mcast.continuation = con->fragment_continuation;
|
|
|
+ con->fragment_continuation = 0;
|
|
|
|
|
|
- mcast.msg_count = mcast_packed_msg_count;
|
|
|
+ mcast.msg_count = con->mcast_packed_msg_count;
|
|
|
|
|
|
iovecs[0].iov_base = (void *)&mcast;
|
|
|
iovecs[0].iov_len = sizeof (struct totempg_mcast);
|
|
|
- iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
|
|
|
- iovecs[1].iov_len = mcast_packed_msg_count * sizeof (unsigned short);
|
|
|
- iovecs[2].iov_base = (void *)&fragmentation_data[0];
|
|
|
- iovecs[2].iov_len = fragment_size;
|
|
|
+ iovecs[1].iov_base = (void *)con->mcast_packed_msg_lens;
|
|
|
+ iovecs[1].iov_len = con->mcast_packed_msg_count * sizeof (unsigned short);
|
|
|
+ iovecs[2].iov_base = (void *)&con->fragmentation_data[0];
|
|
|
+ iovecs[2].iov_len = con->fragment_size;
|
|
|
res = totemmrp_mcast (iovecs, 3, 0);
|
|
|
|
|
|
- mcast_packed_msg_count = 0;
|
|
|
- fragment_size = 0;
|
|
|
+ con->mcast_packed_msg_count = 0;
|
|
|
+ con->fragment_size = 0;
|
|
|
|
|
|
pthread_mutex_unlock (&mcast_msg_mutex);
|
|
|
return (0);
|
|
|
@@ -791,6 +812,8 @@ int totempg_initialize (
|
|
|
struct totem_config *totem_config)
|
|
|
{
|
|
|
int res;
|
|
|
+ int i;
|
|
|
+ struct totempg_context *con;
|
|
|
|
|
|
totempg_totem_config = totem_config;
|
|
|
totempg_log_level_security = totem_config->totem_logging_configuration.log_level_security;
|
|
|
@@ -801,9 +824,16 @@ int totempg_initialize (
|
|
|
totempg_log_printf = totem_config->totem_logging_configuration.log_printf;
|
|
|
totempg_subsys_id = totem_config->totem_logging_configuration.log_subsys_id;
|
|
|
|
|
|
- fragmentation_data = malloc (TOTEMPG_PACKET_SIZE);
|
|
|
- if (fragmentation_data == 0) {
|
|
|
- return (-1);
|
|
|
+ for (i = 0; i < TOTEMPG_NO_CONTEXTS; i++) {
|
|
|
+ con = &totempg_contexts_array[i];
|
|
|
+ memset(con, 0, sizeof(*con));
|
|
|
+
|
|
|
+ con->fragmentation_data = malloc (TOTEMPG_PACKET_SIZE);
|
|
|
+ if (con->fragmentation_data == 0) {
|
|
|
+ return (-1);
|
|
|
+ }
|
|
|
+ con->totempg_reserved = 1;
|
|
|
+ con->next_fragment = 1;
|
|
|
}
|
|
|
|
|
|
totemsrp_net_mtu_adjust (totem_config);
|
|
|
@@ -855,6 +885,7 @@ static int mcast_msg (
|
|
|
int copy_len = 0;
|
|
|
int copy_base = 0;
|
|
|
int total_size = 0;
|
|
|
+ struct totempg_context *con = totempg_active_context;
|
|
|
|
|
|
pthread_mutex_lock (&mcast_msg_mutex);
|
|
|
totemmrp_event_signal (TOTEM_EVENT_NEW_MSG, 1);
|
|
|
@@ -872,9 +903,9 @@ static int mcast_msg (
|
|
|
iov_len = dest;
|
|
|
|
|
|
max_packet_size = TOTEMPG_PACKET_SIZE -
|
|
|
- (sizeof (unsigned short) * (mcast_packed_msg_count + 1));
|
|
|
+ (sizeof (unsigned short) * (con->mcast_packed_msg_count + 1));
|
|
|
|
|
|
- mcast_packed_msg_lens[mcast_packed_msg_count] = 0;
|
|
|
+ con->mcast_packed_msg_lens[con->mcast_packed_msg_count] = 0;
|
|
|
|
|
|
/*
|
|
|
* Check if we would overwrite new message queue
|
|
|
@@ -884,7 +915,7 @@ static int mcast_msg (
|
|
|
}
|
|
|
|
|
|
if (byte_count_send_ok (total_size + sizeof(unsigned short) *
|
|
|
- (mcast_packed_msg_count)) == 0) {
|
|
|
+ (con->mcast_packed_msg_count)) == 0) {
|
|
|
|
|
|
pthread_mutex_unlock (&mcast_msg_mutex);
|
|
|
return(-1);
|
|
|
@@ -893,7 +924,7 @@ static int mcast_msg (
|
|
|
mcast.header.version = 0;
|
|
|
for (i = 0; i < iov_len; ) {
|
|
|
mcast.fragmented = 0;
|
|
|
- mcast.continuation = fragment_continuation;
|
|
|
+ mcast.continuation = con->fragment_continuation;
|
|
|
copy_len = iovec[i].iov_len - copy_base;
|
|
|
|
|
|
/*
|
|
|
@@ -902,14 +933,14 @@ static int mcast_msg (
|
|
|
* fragment_buffer on exit so that max_packet_size + fragment_size
|
|
|
* doesn't exceed the size of the fragment_buffer on the next call.
|
|
|
*/
|
|
|
- if ((copy_len + fragment_size) <
|
|
|
+ if ((copy_len + con->fragment_size) <
|
|
|
(max_packet_size - sizeof (unsigned short))) {
|
|
|
|
|
|
- memcpy (&fragmentation_data[fragment_size],
|
|
|
+ memcpy (&con->fragmentation_data[con->fragment_size],
|
|
|
(char *)iovec[i].iov_base + copy_base, copy_len);
|
|
|
- fragment_size += copy_len;
|
|
|
- mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
|
|
|
- next_fragment = 1;
|
|
|
+ con->fragment_size += copy_len;
|
|
|
+ con->mcast_packed_msg_lens[con->mcast_packed_msg_count] += copy_len;
|
|
|
+ con->next_fragment = 1;
|
|
|
copy_len = 0;
|
|
|
copy_base = 0;
|
|
|
i++;
|
|
|
@@ -921,18 +952,18 @@ static int mcast_msg (
|
|
|
} else {
|
|
|
unsigned char *data_ptr;
|
|
|
|
|
|
- copy_len = min(copy_len, max_packet_size - fragment_size);
|
|
|
+ copy_len = min(copy_len, max_packet_size - con->fragment_size);
|
|
|
if( copy_len == max_packet_size )
|
|
|
data_ptr = (unsigned char *)iovec[i].iov_base + copy_base;
|
|
|
else {
|
|
|
- data_ptr = fragmentation_data;
|
|
|
- memcpy (&fragmentation_data[fragment_size],
|
|
|
+ data_ptr = con->fragmentation_data;
|
|
|
+ memcpy (&con->fragmentation_data[con->fragment_size],
|
|
|
(unsigned char *)iovec[i].iov_base + copy_base, copy_len);
|
|
|
}
|
|
|
|
|
|
- memcpy (&fragmentation_data[fragment_size],
|
|
|
+ memcpy (&con->fragmentation_data[con->fragment_size],
|
|
|
(unsigned char *)iovec[i].iov_base + copy_base, copy_len);
|
|
|
- mcast_packed_msg_lens[mcast_packed_msg_count] += copy_len;
|
|
|
+ con->mcast_packed_msg_lens[con->mcast_packed_msg_count] += copy_len;
|
|
|
|
|
|
/*
|
|
|
* if we're not on the last iovec or the iovec is too large to
|
|
|
@@ -941,25 +972,25 @@ static int mcast_msg (
|
|
|
*/
|
|
|
if ((i < (iov_len - 1)) ||
|
|
|
((copy_base + copy_len) < iovec[i].iov_len)) {
|
|
|
- if (!next_fragment) {
|
|
|
- next_fragment++;
|
|
|
+ if (!con->next_fragment) {
|
|
|
+ con->next_fragment++;
|
|
|
}
|
|
|
- fragment_continuation = next_fragment;
|
|
|
- mcast.fragmented = next_fragment++;
|
|
|
- assert(fragment_continuation != 0);
|
|
|
+ con->fragment_continuation = con->next_fragment;
|
|
|
+ mcast.fragmented = con->next_fragment++;
|
|
|
+ assert(con->fragment_continuation != 0);
|
|
|
assert(mcast.fragmented != 0);
|
|
|
} else {
|
|
|
- fragment_continuation = 0;
|
|
|
+ con->fragment_continuation = 0;
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
* assemble the message and send it
|
|
|
*/
|
|
|
- mcast.msg_count = ++mcast_packed_msg_count;
|
|
|
+ mcast.msg_count = ++con->mcast_packed_msg_count;
|
|
|
iovecs[0].iov_base = (void *)&mcast;
|
|
|
iovecs[0].iov_len = sizeof(struct totempg_mcast);
|
|
|
- iovecs[1].iov_base = (void *)mcast_packed_msg_lens;
|
|
|
- iovecs[1].iov_len = mcast_packed_msg_count *
|
|
|
+ iovecs[1].iov_base = (void *)con->mcast_packed_msg_lens;
|
|
|
+ iovecs[1].iov_len = con->mcast_packed_msg_count *
|
|
|
sizeof(unsigned short);
|
|
|
iovecs[2].iov_base = (void *)data_ptr;
|
|
|
iovecs[2].iov_len = max_packet_size;
|
|
|
@@ -972,9 +1003,9 @@ static int mcast_msg (
|
|
|
/*
|
|
|
* Recalculate counts and indexes for the next.
|
|
|
*/
|
|
|
- mcast_packed_msg_lens[0] = 0;
|
|
|
- mcast_packed_msg_count = 0;
|
|
|
- fragment_size = 0;
|
|
|
+ con->mcast_packed_msg_lens[0] = 0;
|
|
|
+ con->mcast_packed_msg_count = 0;
|
|
|
+ con->fragment_size = 0;
|
|
|
max_packet_size = TOTEMPG_PACKET_SIZE - (sizeof(unsigned short));
|
|
|
|
|
|
/*
|
|
|
@@ -999,8 +1030,8 @@ static int mcast_msg (
|
|
|
* the last buffer just fit into the fragmentation_data buffer
|
|
|
* and we were at the last iovec.
|
|
|
*/
|
|
|
- if (mcast_packed_msg_lens[mcast_packed_msg_count]) {
|
|
|
- mcast_packed_msg_count++;
|
|
|
+ if (con->mcast_packed_msg_lens[con->mcast_packed_msg_count]) {
|
|
|
+ con->mcast_packed_msg_count++;
|
|
|
}
|
|
|
|
|
|
error_exit:
|
|
|
@@ -1015,11 +1046,12 @@ static int msg_count_send_ok (
|
|
|
int msg_count)
|
|
|
{
|
|
|
int avail = 0;
|
|
|
+ struct totempg_context *con = totempg_active_context;
|
|
|
|
|
|
avail = totemmrp_avail ();
|
|
|
totempg_stats.msg_queue_avail = avail;
|
|
|
|
|
|
- return ((avail - totempg_reserved) > msg_count);
|
|
|
+ return ((avail - con->totempg_reserved) > msg_count);
|
|
|
}
|
|
|
|
|
|
static int byte_count_send_ok (
|
|
|
@@ -1039,10 +1071,11 @@ static int send_reserve (
|
|
|
int msg_size)
|
|
|
{
|
|
|
unsigned int msg_count = 0;
|
|
|
+ struct totempg_context *con = totempg_active_context;
|
|
|
|
|
|
msg_count = (msg_size / (totempg_totem_config->net_mtu - sizeof (struct totempg_mcast) - 16)) + 1;
|
|
|
- totempg_reserved += msg_count;
|
|
|
- totempg_stats.msg_reserved = totempg_reserved;
|
|
|
+ con->totempg_reserved += msg_count;
|
|
|
+ totempg_stats.msg_reserved = con->totempg_reserved;
|
|
|
|
|
|
return (msg_count);
|
|
|
}
|
|
|
@@ -1050,8 +1083,10 @@ static int send_reserve (
|
|
|
static void send_release (
|
|
|
int msg_count)
|
|
|
{
|
|
|
- totempg_reserved -= msg_count;
|
|
|
- totempg_stats.msg_reserved = totempg_reserved;
|
|
|
+ struct totempg_context *con = totempg_active_context;
|
|
|
+
|
|
|
+ con->totempg_reserved -= msg_count;
|
|
|
+ totempg_stats.msg_reserved = con->totempg_reserved;
|
|
|
}
|
|
|
|
|
|
int totempg_callback_token_create (
|