Ver Fonte

enhancmement 821
Global reorganization of totem code and also many feature enhancments
These include:
need to specify whether authentication is enabled or dislabed in config file
need to specify frame size in config file
need redundant ring placeholder
need to specify version field of totem
need to support large frame sizes
need to break out threading code from totemsrp
need to break out network code from totemsrp
need to break out parser code from parse.c
and some others


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@794 fd59a12c-fef9-0310-b244-a6a79926bd2f

Steven Dake há 20 anos atrás
pai
commit
5613db03a1
28 ficheiros alterados com 3615 adições e 882 exclusões
  1. 8 4
      conf/openais.conf
  2. 14 5
      exec/Makefile
  3. 28 0
      exec/aispoll.c
  4. 5 0
      exec/aispoll.h
  5. 0 3
      exec/evs.c
  6. 15 29
      exec/main.c
  7. 31 2
      exec/totem.h
  8. 417 0
      exec/totemconfig.c
  9. 58 0
      exec/totemconfig.h
  10. 1 1
      exec/totemmrp.c
  11. 1 1
      exec/totemmrp.h
  12. 1377 0
      exec/totemnet.c
  13. 102 0
      exec/totemnet.h
  14. 25 8
      exec/totempg.c
  15. 1 1
      exec/totempg.h
  16. 870 0
      exec/totemrrp.c
  17. 105 0
      exec/totemrrp.h
  18. 159 763
      exec/totemsrp.c
  19. 7 2
      exec/totemsrp.h
  20. 169 0
      exec/wthread.c
  21. 64 0
      exec/wthread.h
  22. 134 37
      man/openais.conf.5
  23. 9 10
      test/Makefile
  24. 4 4
      test/ckptbench.c
  25. 1 1
      test/ckptbenchth.c
  26. 6 8
      test/evsbench.c
  27. 2 2
      test/evtbench.c
  28. 2 1
      test/testevs.c

+ 8 - 4
conf/openais.conf

@@ -1,13 +1,17 @@
-totem {
-	bindnetaddr: 192.168.1.0
+# Please read the openais.conf.5 manual page
+
+network {
+	version: 1
+	secauth: off
+	bindnetaddr: 192.168.2.0
 	mcastaddr: 226.94.1.1
 	mcastport: 5405
+	netmtu: 1500
+	threads: 0
 }
 
 logging {
-	logoutput: file
 	logoutput: stderr
-	logoutput: syslog
 	logfile: /tmp/ais
 	debug: off
 	timestamp: on

+ 14 - 5
exec/Makefile

@@ -39,14 +39,14 @@ LDFLAGS = -lpthread
 
 # Profile mode flags
 #CFLAGS = -O3 -pg
-#LDFLAGS = -pg
+#LDFLAGS = -pg -lpthread
 
 # Code Coverage with lcov flgs
 #CFLAGS = -ftest-coverage -fprofile-arcs
 #LDFLAGS = -g
 
-TOTEM_SRC = aispoll.c totemsrp.c totemmrp.c totempg.c totemparse.c tlist.c hdb.c crypto.c
-TOTEM_OBJS = aispoll.o totemsrp.o totemmrp.o totempg.o totemparse.o tlist.o hdb.o crypto.o
+TOTEM_SRC = aispoll.c totemnet.c totemrrp.c totemsrp.c totemmrp.c totempg.c totemconfig.c tlist.c hdb.c crypto.c wthread.c
+TOTEM_OBJS = aispoll.o totemnet.o totemrrp.o totemsrp.o totemmrp.o totempg.o totemconfig.o tlist.o hdb.o crypto.o wthread.o
 
 EXEC_SRC = main.c clm.c amf.c ckpt.c evt.c evs.c mainparse.c print.c mempool.c \
 		util.c sync.c
@@ -83,10 +83,19 @@ depend:
 aispoll.o: aispoll.c
 	$(CC) $(CFLAGS) -fPIC -c -o $@ $(*F).c
 
+totempg.o: totempg.c
+	$(CC) $(CFLAGS) -fPIC -c -o $@ $(*F).c
+
 totemsrp.o: totemsrp.c
 	$(CC) $(CFLAGS) -fPIC -c -o $@ $(*F).c
 
-totempg.o: totempg.c
+totemrrp.o: totemrrp.c
+	$(CC) $(CFLAGS) -fPIC -c -o $@ $(*F).c
+
+totemnet.o: totemnet.c
+	$(CC) $(CFLAGS) -fPIC -c -o $@ $(*F).c
+
+wthread.o: wthread.c
 	$(CC) $(CFLAGS) -fPIC -c -o $@ $(*F).c
 
 tlist.o: tlist.c
@@ -101,7 +110,7 @@ crypto.o: crypto.c
 totemmrp.o: totemmrp.c
 	$(CC) $(CFLAGS) -fPIC -c -o $@ $(*F).c
 
-totemparse.o: totemparse.c
+totemconfig.o: totemconfig.c
 	$(CC) $(CFLAGS) -fPIC -c -o $@ $(*F).c
 
 # DO NOT DELETE

+ 28 - 0
exec/aispoll.c

@@ -454,3 +454,31 @@ error_exit:
 
 int poll_stop (
 	poll_handle handle);
+
+#ifdef COMPILE_OUT
+void poll_print_state (
+	poll_handle handle,
+	int fd)
+{
+	struct poll_instance *poll_instance;
+	int i;
+	SaErrorT error;
+	int res = 0;
+	error = saHandleInstanceGet (&poll_instance_database, handle,
+		(void *)&poll_instance);
+	if (error != SA_OK) {
+		res = -ENOENT;
+		exit (1);
+	}
+
+	for (i = 0; i < poll_instance->poll_entry_count; i++) {
+		if (poll_instance->poll_entries[i].ufd.fd == fd) {
+		printf ("fd %d\n", poll_instance->poll_entries[i].ufd.fd);
+		printf ("events %d\n", poll_instance->poll_entries[i].ufd.events);
+		printf ("dispatch_fn %x\n", poll_instance->poll_entries[i].dispatch_fn);
+		printf ("prio %d\n", poll_instance->poll_entries[i].prio);
+		}
+	}
+}
+	
+#endif

+ 5 - 0
exec/aispoll.h

@@ -81,4 +81,9 @@ int poll_run (
 int poll_stop (
 	poll_handle handle);
 
+#ifdef COMPILE_OUT
+void poll_print_state (
+	poll_handle handle, int fd);
+#endif
+
 #endif	/* POLL_H_DEFINED */

+ 0 - 3
exec/evs.c

@@ -342,10 +342,7 @@ static int message_handler_req_evs_mcast_joined (struct conn_info *conn_info, vo
 		// TODO
 	if (res == 0) {
 		error = EVS_OK;
-	} else {
-		printf ("res is wrong\n");
 	}
-error = EVS_OK;
 
 	res_lib_evs_mcast_joined.header.size = sizeof (struct res_lib_evs_mcast_joined);
 	res_lib_evs_mcast_joined.header.id = MESSAGE_RES_EVS_MCAST_JOINED;

+ 15 - 29
exec/main.c

@@ -64,7 +64,7 @@
 #include "totemsrp.h"
 #include "mempool.h"
 #include "mainparse.h"
-#include "totemparse.h"
+#include "totemconfig.h"
 #include "main.h"
 #include "handlers.h"
 #include "sync.h"
@@ -708,7 +708,7 @@ retry_recv:
 			send_ok =
 				(ais_service_handlers[service]->libais_handlers[header->id].flow_control == FLOW_CONTROL_NOT_REQUIRED) ||
 				((ais_service_handlers[service]->libais_handlers[header->id].flow_control == FLOW_CONTROL_REQUIRED) &&
-				(totempg_send_ok (1000 + header->size)) &&
+				(totempg_send_ok (header->size)) &&
 				(sync_in_process() == 0));
 
 			if (send_ok) {
@@ -773,7 +773,7 @@ void sigintr_handler (int signum)
 #endif
 
 	totempg_finalize ();
-	print_stats (totemsrp_handle_in);
+//	print_stats (totemsrp_handle_in);
 	ais_done (AIS_DONE_EXIT);
 }
 
@@ -1042,29 +1042,6 @@ static void aisexec_mlockall (void)
 	};
 }
 
-void aisexec_keyread (unsigned char *key)
-{
-	int fd;
-	int res;
-
-	fd = open ("/etc/ais/authkey", O_RDONLY);
-	if (fd == -1) {
-		log_printf (LOG_LEVEL_ERROR, "Could not open /etc/ais/authkey: %s\n", strerror (errno));
-		ais_done (AIS_DONE_READKEY);
-	}
-	res = read (fd, key, 128);
-	if (res == -1) {
-		log_printf (LOG_LEVEL_ERROR, "Could not read /etc/ais/authkey: %s\n", strerror (errno));
-		ais_done (AIS_DONE_READKEY);
-	}
-	if (res != 128) {
-		log_printf (LOG_LEVEL_ERROR, "Could only read %d bits of 1024 bits from /etc/ais/authkey.\n", res * 8);
-		ais_done (AIS_DONE_READKEY);
-	}
-
-	close (fd);
-}
-
 int message_source_is_local(struct message_source *source)
 {
 	int ret = 0;
@@ -1125,8 +1102,17 @@ int main (int argc, char **argv)
 		ais_done (AIS_DONE_MAINCONFIGREAD);
 	}
 
-	aisexec_keyread (openais_config.totem_config.private_key);
-	openais_config.totem_config.private_key_len = sizeof (openais_config.totem_config.private_key);
+	res = totem_config_keyread ("/etc/ais/authkey", &openais_config.totem_config, &error_string);
+	if (res == -1) {
+		log_printf (LOG_LEVEL_ERROR, error_string);
+		ais_done (AIS_DONE_MAINCONFIGREAD);
+	}
+
+	res = totem_config_validate (&openais_config.totem_config, &error_string);
+	if (res == -1) {
+		log_printf (LOG_LEVEL_ERROR, error_string);
+		ais_done (AIS_DONE_MAINCONFIGREAD);
+	}
 
 	res = log_setup (&error_string, openais_config.logmode, openais_config.logfile);
 	if (res == -1) {
@@ -1155,7 +1141,7 @@ int main (int argc, char **argv)
 	openais_config.totem_config.totem_logging_configuration.log_printf = internal_log_printf;
 
 	totempg_initialize (
-		&aisexec_poll_handle,
+		aisexec_poll_handle,
 		&totemsrp_handle_in,
 		&openais_config.totem_config,
 		deliver_fn,

+ 31 - 2
exec/totem.h

@@ -36,6 +36,8 @@
 
 #define MESSAGE_SIZE_MAX			256000
 #define PROCESSOR_COUNT_MAX			16
+#define FRAME_SIZE_MAX				9000
+#define SEND_THREADS_MAX			16
 
 /*
  * Array location of various timeouts as
@@ -72,6 +74,8 @@ struct totem_logging_configuration {
 };
 
 struct totem_config {
+	int version;
+
 	/*
 	 * network
 	 */
@@ -83,14 +87,39 @@ struct totem_config {
 	 * key information
 	 */
 	unsigned char private_key[128];
+
 	int private_key_len;
 
 	/*
-	 * Timeouts
+	 * Totem configuration parameters
 	 */
-	unsigned int timeouts[MAX_TOTEM_TIMEOUTS];
+	unsigned int token_timeout;
+
+	unsigned int token_retransmit_timeout;
+
+	unsigned int token_hold_timeout;
+
+	unsigned int token_retransmits_before_loss_const;
+
+	unsigned int join_timeout;
+
+	unsigned int consensus_timeout;
+
+	unsigned int merge_timeout;
+
+	unsigned int downcheck_timeout;
+
+	unsigned int fail_to_recv_const;
+
+	unsigned int seqno_unchanged_const;
 
 	struct totem_logging_configuration totem_logging_configuration;
+
+	unsigned int secauth;
+
+	unsigned int net_mtu;
+
+	unsigned int threads;
 };
 
 enum totem_configuration_type {

+ 417 - 0
exec/totemconfig.c

@@ -0,0 +1,417 @@
+/*
+ * Copyright (c) 2002-2004 MontaVista Software, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake@mvista.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <assert.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/param.h>
+
+#include "../include/list.h"
+#include "util.h"
+#include "totem.h"
+#include "totemconfig.h"
+#include "print.h"
+
+#define LOG_SERVICE LOG_SERVICE_GMI
+
+#define TOKEN_RETRANSMITS_BEFORE_LOSS_CONST	4
+#define TOKEN_TIMEOUT				1000
+#define TOKEN_RETRANSMIT_TIMEOUT		(int)(TOKEN_TIMEOUT / (TOKEN_RETRANSMITS_BEFORE_LOSS_CONST + 0.2))
+#define TOKEN_HOLD_TIMEOUT			(int)(TOKEN_RETRANSMIT_TIMEOUT * 0.8 - (1000/HZ))
+#define JOIN_TIMEOUT				100
+#define CONSENSUS_TIMEOUT			200
+#define MERGE_TIMEOUT				200
+#define DOWNCHECK_TIMEOUT			1000
+#define FAIL_TO_RECV_CONST			10
+#define	SEQNO_UNCHANGED_CONST			3000
+
+static char error_string_response[512];
+
+typedef enum {
+	MAIN_HEAD,
+	MAIN_TOTEM
+} main_parse_t;
+
+static inline char *
+strstr_rs (const char *haystack, const char *needle)
+{
+	char *end_address;
+	char *new_needle;
+
+	new_needle = (char *)strdup (needle);
+	new_needle[strlen(new_needle) - 1] = '\0';
+
+	end_address = strstr (haystack, new_needle);
+	if (end_address) {
+		end_address += strlen (new_needle);
+		end_address = strstr (end_address, needle + strlen (new_needle));
+	}
+	if (end_address) {
+		end_address += 1; /* skip past { or = */
+		do {
+			if (*end_address == '\t' || *end_address == ' ') {
+				end_address++;
+			} else {
+				break;
+			}
+		} while (*end_address != '\0');
+	}
+
+	free (new_needle);
+	return (end_address);
+}
+
+extern int totem_config_read (
+	struct totem_config *totem_config,
+	char **error_string,
+	int interface_max)
+{
+	FILE *fp;
+	int res = 0;
+	int line_number = 0;
+	main_parse_t parse = MAIN_HEAD;
+	int totem_parsed = 0;
+	char *loc;
+	int i;
+	int parse_done = 0;
+	char line[512];
+	char *error_reason = error_string_response;
+
+	memset (totem_config, 0, sizeof (struct totem_config));
+	totem_config->interfaces = malloc (sizeof (struct totem_interface) * interface_max);
+	if (totem_config->interfaces == 0) {
+		parse_done = 1;
+		*error_string = "Out of memory trying to allocate ethernet interface storage area";
+		return -1;
+	}
+
+	memset (totem_config->interfaces, 0,
+		sizeof (struct totem_interface) * interface_max);
+
+	totem_config->mcast_addr.sin_family = AF_INET;
+	totem_config->secauth = 1;
+
+	fp = fopen ("/etc/ais/openais.conf", "r");
+
+	if (fp == 0) {
+		parse_done = 1;
+		sprintf (error_reason, "Can't read file reason = (%s)\n", strerror (errno));
+		*error_string = error_reason;
+		return -1;
+	}
+
+	while (fgets (line, 255, fp)) {
+		line_number += 1;
+		line[strlen(line) - 1] = '\0';
+		/*
+		 * Clear out white space and tabs
+		 */
+		for (i = strlen (line) - 1; i > -1; i--) {
+			if (line[i] == '\t' || line[i] == ' ') {
+				line[i] = '\0';
+			} else {
+				break;
+			}
+		}
+		/*
+		 * Clear out comments and empty lines
+		 */
+		if (line[0] == '#' || line[0] == '\0') {
+			continue;
+		}
+			
+		line_number += 1;
+
+		switch (parse) {
+		case MAIN_HEAD:
+			if (totem_parsed == 0 && strstr_rs (line, "network{")) {
+				totem_parsed = 1;
+				parse = MAIN_TOTEM;
+			} else
+			if (totem_parsed == 0 && strstr_rs (line, "totem{")) {
+				totem_parsed = 1;
+				parse = MAIN_TOTEM;
+			} else {
+				continue;
+			}
+			break;
+
+		case MAIN_TOTEM:
+			if ((loc = strstr_rs (line, "version:"))) {
+				if (strcmp (loc, "1") == 0) {
+					totem_config->version = 1;
+				}
+			} else
+			if ((loc = strstr_rs (line, "secauth:"))) {
+				if (strcmp (loc, "on") == 0) {
+					totem_config->secauth = 1;
+				} else
+				if (strcmp (loc, "off") == 0) {
+					totem_config->secauth = 0;
+				}
+			} else
+			if ((loc = strstr_rs (line, "threads:"))) {
+				totem_config->threads = atoi (loc);
+			} else
+			if ((loc = strstr_rs (line, "netmtu:"))) {
+				totem_config->net_mtu = atoi (loc);
+			} else
+			if ((loc = strstr_rs (line, "mcastaddr:"))) {
+				res = inet_aton (loc, &totem_config->mcast_addr.sin_addr);
+			} else
+			if ((loc = strstr_rs (line, "mcastport:"))) {
+				res = totem_config->mcast_addr.sin_port = htons (atoi (loc));
+			} else
+			if ((loc = strstr_rs (line, "bindnetaddr:"))) {
+				if (interface_max == totem_config->interface_count) {
+					sprintf (error_reason,
+						"%d is too many interfaces in /etc/ais/network.conf",
+					totem_config->interface_count);
+					goto parse_error;
+				}
+				res = inet_aton (loc,
+					&totem_config->interfaces[totem_config->interface_count].bindnet.sin_addr);
+				totem_config->interface_count += 1;
+			} else
+			if ((loc = strstr_rs (line, "token:"))) {
+				totem_config->token_timeout = atoi(loc);
+			} else if ((loc = strstr_rs (line, "token_retransmit:"))) {
+				totem_config->token_retransmit_timeout = atoi(loc);
+			} else if ((loc = strstr_rs (line, "hold:"))) {
+				totem_config->token_hold_timeout = atoi(loc);
+			} else if ((loc = strstr_rs (line, "token_retransmits_before_loss_const:"))) {
+				totem_config->token_retransmits_before_loss_const = atoi(loc);
+			} else if ((loc = strstr_rs (line, "join:"))) {
+				totem_config->join_timeout = atoi(loc);
+			} else if ((loc = strstr_rs (line, "consensus:"))) {
+				totem_config->consensus_timeout = atoi(loc);
+			} else if ((loc = strstr_rs (line, "merge:"))) {
+				totem_config->merge_timeout = atoi(loc);
+			} else if ((loc = strstr_rs (line, "downcheck:"))) {
+				totem_config->downcheck_timeout = atoi(loc);
+			} else if ((loc = strstr_rs (line, "fail_recv_const:"))) {
+				totem_config->fail_to_recv_const = atoi(loc);
+			} else if ((loc = strstr_rs (line, "}"))) {
+				parse = MAIN_HEAD;
+			} else {
+				goto parse_error;
+			}
+			break;
+
+		default:
+			assert (0 == 1); /* SHOULDN'T HAPPEN */
+			break;	
+		}
+	}
+
+
+	if (parse == MAIN_HEAD) {
+		fclose (fp);
+		return (0);
+	} else {
+		error_reason = "Missing closing brace";
+		goto parse_error;
+	}
+
+parse_error:
+	if (parse_done) {
+		sprintf (error_string_response,
+			"parse error in /etc/ais/openais.conf: %s.\n", error_reason);
+	} else {
+		sprintf (error_string_response,
+			"parse error in /etc/ais/openais.conf: %s (line %d).\n",
+			error_reason, line_number);
+	}
+	*error_string = error_string_response;
+	fclose (fp);
+	return (-1);
+}
+
+int totem_config_validate (
+	struct totem_config *totem_config,
+	char **error_string)
+{
+	char *error_reason = error_string_response;
+
+	/*
+	 * Some error checking of parsed data to make sure its valid
+	 */
+	if (totem_config->mcast_addr.sin_addr.s_addr == 0) {
+		error_reason = "No multicast address specified";
+		goto parse_error;
+	}
+
+	if (totem_config->mcast_addr.sin_port == 0) {
+		error_reason = "No multicast port specified";
+		goto parse_error;
+	}
+
+	if (totem_config->interface_count == 0) {
+		error_reason = "No bindnet specified";
+		goto parse_error;
+	}
+
+	if (totem_config->version != 1) {
+		error_reason = "This totem parser can only parse version 1 configuration files.";
+		goto parse_error;
+	}
+
+
+	if (totem_config->token_retransmits_before_loss_const == 0) {
+		totem_config->token_retransmits_before_loss_const =
+			TOKEN_RETRANSMITS_BEFORE_LOSS_CONST;
+	}
+
+	/*
+	 * Setup timeout values that are not setup by user
+	 */
+	if (totem_config->token_timeout == 0) {
+		totem_config->token_timeout = TOKEN_TIMEOUT;
+		if (totem_config->token_retransmits_before_loss_const == 0) {
+			totem_config->token_retransmits_before_loss_const = TOKEN_RETRANSMITS_BEFORE_LOSS_CONST;
+		}
+
+		if (totem_config->token_retransmit_timeout == 0) {
+			totem_config->token_retransmit_timeout =
+				(int)(totem_config->token_timeout /
+				(totem_config->token_retransmits_before_loss_const + 0.2));
+		}
+		if (totem_config->token_hold_timeout == 0) {
+			totem_config->token_hold_timeout =
+				(int)(totem_config->token_retransmit_timeout * 0.8 -
+				(1000/HZ));
+		}
+	}
+	if (totem_config->token_retransmit_timeout == 0) {
+		totem_config->token_retransmit_timeout =
+			(int)(totem_config->token_timeout /
+			(totem_config->token_retransmits_before_loss_const + 0.2));
+	}
+	if (totem_config->token_hold_timeout == 0) {
+		totem_config->token_hold_timeout =
+			(int)(totem_config->token_retransmit_timeout * 0.8 -
+			(1000/HZ));
+	}
+
+	if (totem_config->token_hold_timeout == 0) {
+		totem_config->token_hold_timeout = TOKEN_HOLD_TIMEOUT;
+	}
+	if (totem_config->join_timeout == 0) {
+		totem_config->join_timeout = JOIN_TIMEOUT;
+	}
+	if (totem_config->consensus_timeout == 0) {
+		totem_config->consensus_timeout = CONSENSUS_TIMEOUT;
+	}
+	if (totem_config->merge_timeout == 0) {
+		totem_config->merge_timeout = MERGE_TIMEOUT;
+	}
+	if (totem_config->downcheck_timeout == 0) {
+		totem_config->downcheck_timeout = DOWNCHECK_TIMEOUT;
+	}
+	if (totem_config->fail_to_recv_const == 0) {
+		totem_config->fail_to_recv_const = FAIL_TO_RECV_CONST;
+	}
+	if (totem_config->seqno_unchanged_const == 0) {
+		totem_config->seqno_unchanged_const = SEQNO_UNCHANGED_CONST;
+	}
+	if (totem_config->net_mtu == 0) {
+		totem_config->net_mtu = 1500;
+	}
+	if (totem_config->threads > SEND_THREADS_MAX) {
+		totem_config->threads = SEND_THREADS_MAX;
+	}
+	if (totem_config->secauth == 0) {
+		totem_config->threads = 0;
+	}
+	if (totem_config->net_mtu > FRAME_SIZE_MAX) {
+		error_reason = "This net_mtu parameter is greater then the maximum frame size";
+		goto parse_error;
+	}
+	return (0);
+
+parse_error:
+	sprintf (error_string_response,
+		"parse error in /etc/ais/openais.conf: %s.\n", error_reason);
+	*error_string = error_string_response;
+	return (-1);
+}
+
+int totem_config_keyread (
+	unsigned char *key_location,
+	struct totem_config *totem_config,
+	char **error_string)
+{
+	int fd;
+	int res;
+
+	if (totem_config->secauth == 0) {
+		return (0);
+	}
+	fd = open (key_location, O_RDONLY);
+	if (fd == -1) {
+		sprintf (error_string_response, "Could not open %s: %s\n",
+			key_location, strerror (errno));
+		goto parse_error;
+	}
+
+	res = read (fd, totem_config->private_key, 128);
+	if (res == -1) {
+		close (fd);
+		sprintf (error_string_response, "Could not read %s: %s\n",
+			key_location, strerror (errno));
+		goto parse_error;
+	}
+
+	totem_config->private_key_len = 128;
+
+	if (res != 128) {
+		close (fd);
+		sprintf (error_string_response, "Could only read %d bits of 1024 bits from %s.\n",
+			res * 8, key_location);
+		goto parse_error;
+	}
+	return (0);
+
+parse_error:
+	*error_string = error_string_response;
+	return (-1);
+}

+ 58 - 0
exec/totemconfig.h

@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2005 MontaVista Software, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake@mvista.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+#include <netinet/in.h>
+#include "../include/ais_types.h"
+#include "../include/list.h"
+#include "aispoll.h"
+#include "totemsrp.h"
+#include "totempg.h"
+
+#ifndef TOTEMCONFIG_H_DEFINED
+#define TOTEMCONFIG_H_DEFINED
+
+extern int totem_config_read (
+	struct totem_config *totem_config,
+	char **error_string,
+	int interface_max);
+	
+extern int totem_config_validate (
+	struct totem_config *totem_config,
+	char **error_string);
+
+int totem_config_keyread (
+	unsigned char *key_location,
+	struct totem_config *totem_config,
+	char **error_string);
+
+#endif /* TOTEMCONFIG_H_DEFINED */

+ 1 - 1
exec/totemmrp.c

@@ -102,7 +102,7 @@ void totemmrp_confchg_fn (
  * Initialize the totem multiple ring protocol
  */
 int totemmrp_initialize (
-	poll_handle *poll_handle,
+	poll_handle poll_handle,
 	totemsrp_handle *totemsrp_handle,
 	struct totem_config *totem_config,
 

+ 1 - 1
exec/totemmrp.h

@@ -58,7 +58,7 @@ void totemmrp_log_printf_init (
  * Initialize the group messaging interface
  */
 int totemmrp_initialize (
-	poll_handle *poll_handle,
+	poll_handle poll_handle,
 	totemsrp_handle *totemsrp_handle,
 	struct totem_config *totem_config,
 

+ 1377 - 0
exec/totemnet.c

@@ -0,0 +1,1377 @@
+#define WORKER_THREAD_COUNT 2
+/*
+ * Copyright (c) 2005 MontaVista Software, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake@mvista.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#include <assert.h>
+#include <pthread.h>
+#include <sys/mman.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <sys/un.h>
+#include <sys/sysinfo.h>
+#include <sys/ioctl.h>
+#include <sys/param.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <linux/if.h>
+#include <linux/sockios.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <errno.h>
+#include <signal.h>
+#include <sched.h>
+#include <time.h>
+#include <sys/time.h>
+#include <sys/poll.h>
+
+#include "aispoll.h"
+#include "totemnet.h"
+#include "wthread.h"
+#include "../include/queue.h"
+#include "../include/sq.h"
+#include "../include/list.h"
+#include "hdb.h"
+#include "swab.h"
+
+#include "crypto.h"
+
+#define NETIF_STATE_REPORT_UP		1	
+#define NETIF_STATE_REPORT_DOWN		2
+
+#define BIND_STATE_UNBOUND	0
+#define BIND_STATE_REGULAR	1
+#define BIND_STATE_LOOPBACK	2
+
+#define LOCALHOST_IP inet_addr("127.0.0.1")
+
+#define HMAC_HASH_SIZE 20
+struct security_header {
+	unsigned char hash_digest[HMAC_HASH_SIZE]; /* The hash *MUST* be first in the data structure */
+	unsigned char salt[16]; /* random number */
+	char msg[0];
+} __attribute__((packed));
+
+struct totemnet_mcast_thread_state {
+	char iobuf[9000];
+	prng_state prng_state;
+};
+
+struct totemnet_socket {
+	int mcast;
+	int token;
+};
+
+struct totemnet_instance {
+	hmac_state totemnet_hmac_state;
+
+	prng_state totemnet_prng_state;
+
+	unsigned char totemnet_private_key[1024];
+
+	unsigned int totemnet_private_key_len;
+
+	poll_handle totemnet_poll_handle;
+
+	struct totem_interface *totemnet_interface;
+
+	int netif_state_report;
+
+	int netif_bind_state;
+
+	struct worker_thread_group worker_thread_group;
+
+	void *context;
+
+	void (*totemnet_deliver_fn) (
+		void *context,
+		struct in_addr *system_from,
+		void *msg,
+		int msg_len);
+
+	void (*totemnet_iface_change_fn) (
+		void *context,
+		struct sockaddr_in *iface_sockaddr_in);
+
+	/*
+	 * Function and data used to log messages
+	 */
+	int totemnet_log_level_security;
+
+	int totemnet_log_level_error;
+
+	int totemnet_log_level_warning;
+
+	int totemnet_log_level_notice;
+
+	int totemnet_log_level_debug;
+
+	void (*totemnet_log_printf) (int level, char *format, ...);
+
+	totemnet_handle handle;
+
+	char iov_buffer[FRAME_SIZE_MAX];
+
+	char iov_buffer_flush[FRAME_SIZE_MAX];
+
+	struct iovec totemnet_iov_recv;
+
+	struct iovec totemnet_iov_recv_flush;
+
+	struct totemnet_socket totemnet_sockets;
+
+	struct sockaddr_in sockaddr_in_mcast;
+
+	struct in_addr in_addr_mcast;
+
+	int stats_sent;
+
+	int stats_recv;
+
+	int stats_delv;
+
+	int stats_remcasts;
+
+	int stats_orf_token;
+
+	struct timeval stats_tv_start;
+
+	struct sockaddr_in my_id;
+
+	int firstrun;
+
+	poll_timer_handle timer_netif_check_timeout;
+
+	unsigned int my_memb_entries;
+
+	int flushing;
+
+	struct totem_config *totem_config;
+};
+
+struct work_item {
+	struct iovec iovec[20];
+	int iov_len;
+	struct totemnet_instance *instance;
+};
+
+/*
+ * All instances in one database
+ */
+static struct saHandleDatabase totemnet_instance_database = {
+	.handleCount			= 0,
+	.handles			= 0,
+	.handleInstanceDestructor	= 0
+};
+
+static int loopback_determine (struct sockaddr_in *bound_to);
+static void netif_down_check (struct totemnet_instance *instance);
+
+static int totemnet_build_sockets (
+	struct totemnet_instance *instance,
+	struct sockaddr_in *sockaddr_mcast,
+	struct sockaddr_in *sockaddr_bindnet,
+	struct totemnet_socket *sockets,
+	struct sockaddr_in *bound_to,
+	int *interface_up);
+
+static int totemnet_build_sockets_loopback (
+	struct totemnet_instance *instance,
+	struct sockaddr_in *sockaddr_mcast,
+	struct sockaddr_in *sockaddr_bindnet,
+	struct totemnet_socket *sockets,
+	struct sockaddr_in *bound_to);
+
+static void totemnet_instance_initialize (struct totemnet_instance *instance)
+{
+	memset (instance, 0, sizeof (struct totemnet_instance));
+
+	instance->netif_state_report = NETIF_STATE_REPORT_UP | NETIF_STATE_REPORT_DOWN;
+
+	instance->totemnet_iov_recv.iov_base = instance->iov_buffer;
+
+	instance->totemnet_iov_recv.iov_len = FRAME_SIZE_MAX; //sizeof (instance->iov_buffer);
+	instance->totemnet_iov_recv_flush.iov_base = instance->iov_buffer_flush;
+
+	instance->totemnet_iov_recv_flush.iov_len = FRAME_SIZE_MAX; //sizeof (instance->iov_buffer);
+
+}
+
+static int authenticate_and_decrypt (
+	struct totemnet_instance *instance,
+	struct iovec *iov)
+{
+	char keys[48];
+	struct security_header *header = iov[0].iov_base;
+	prng_state keygen_prng_state;
+	prng_state stream_prng_state;
+	char *hmac_key = &keys[32];
+	char *cipher_key = &keys[16];
+	char *initial_vector = &keys[0];
+	char digest_comparison[HMAC_HASH_SIZE];
+	unsigned long len;
+	int res = 0;
+
+	/*
+	 * Generate MAC, CIPHER, IV keys from private key
+	 */
+	memset (keys, 0, sizeof (keys));
+	sober128_start (&keygen_prng_state);
+	sober128_add_entropy (instance->totemnet_private_key,
+		instance->totemnet_private_key_len, &keygen_prng_state);	
+	sober128_add_entropy (header->salt, sizeof (header->salt), &keygen_prng_state);
+
+	sober128_read (keys, sizeof (keys), &keygen_prng_state);
+
+	/*
+	 * Setup stream cipher
+	 */
+	sober128_start (&stream_prng_state);
+	sober128_add_entropy (cipher_key, 16, &stream_prng_state);	
+	sober128_add_entropy (initial_vector, 16, &stream_prng_state);	
+
+	/*
+	 * Authenticate contents of message
+	 */
+	hmac_init (&instance->totemnet_hmac_state, DIGEST_SHA1, hmac_key, 16);
+
+	hmac_process (&instance->totemnet_hmac_state, 
+		iov->iov_base + HMAC_HASH_SIZE,
+		iov->iov_len - HMAC_HASH_SIZE);
+
+	len = hash_descriptor[DIGEST_SHA1]->hashsize;
+	assert (HMAC_HASH_SIZE >= len);
+	hmac_done (&instance->totemnet_hmac_state, digest_comparison, &len);
+
+	if (memcmp (digest_comparison, header->hash_digest, len) != 0) {
+		instance->totemnet_log_printf (instance->totemnet_log_level_security, "Received message has invalid digest... ignoring.\n");
+		res = -1;
+		return (-1);
+	}
+	
+	/*
+	 * Decrypt the contents of the message with the cipher key
+	 */
+	sober128_read (iov->iov_base + sizeof (struct security_header),
+		iov->iov_len - sizeof (struct security_header),
+		&stream_prng_state);
+
+	return (res);
+	return (0);
+}
+static void encrypt_and_sign_worker (
+	struct totemnet_instance *instance,
+	char *buf,
+	int *buf_len,
+	struct iovec *iovec,
+	int iov_len,
+	prng_state *prng_state_in)
+{
+	int i;
+	char *addr;
+	char keys[48];
+	struct security_header *header;
+	char *hmac_key = &keys[32];
+	char *cipher_key = &keys[16];
+	char *initial_vector = &keys[0];
+	unsigned long len;
+	int outlen = 0;
+	hmac_state hmac_state;
+	prng_state keygen_prng_state;
+	prng_state stream_prng_state;
+
+	header = (struct security_header *)buf;
+	addr = buf + sizeof (struct security_header);
+
+	memset (keys, 0, sizeof (keys));
+	memset (header->salt, 0, sizeof (header->salt));
+
+	/*
+	 * Generate MAC, CIPHER, IV keys from private key
+	 */
+	sober128_read (header->salt, sizeof (header->salt), prng_state_in);
+	sober128_start (&keygen_prng_state);
+	sober128_add_entropy (instance->totemnet_private_key,
+		instance->totemnet_private_key_len,
+		&keygen_prng_state);	
+	sober128_add_entropy (header->salt, sizeof (header->salt),
+		&keygen_prng_state);
+
+	sober128_read (keys, sizeof (keys), &keygen_prng_state);
+
+	/*
+	 * Setup stream cipher
+	 */
+	sober128_start (&stream_prng_state);
+	sober128_add_entropy (cipher_key, 16, &stream_prng_state);	
+	sober128_add_entropy (initial_vector, 16, &stream_prng_state);	
+
+	outlen = sizeof (struct security_header);
+	/*
+	 * Copy remainder of message, then encrypt it
+	 */
+	for (i = 1; i < iov_len; i++) {
+		memcpy (addr, iovec[i].iov_base, iovec[i].iov_len);
+		addr += iovec[i].iov_len;
+		outlen += iovec[i].iov_len;
+	}
+
+	/*
+ 	 * Encrypt message by XORing stream cipher data
+	 */
+	sober128_read (buf + sizeof (struct security_header),
+		outlen - sizeof (struct security_header),
+		&stream_prng_state);
+
+	memset (&hmac_state, 0, sizeof (hmac_state));
+
+	/*
+	 * Sign the contents of the message with the hmac key and store signature in message
+	 */
+	hmac_init (&hmac_state, DIGEST_SHA1, hmac_key, 16);
+
+	hmac_process (&hmac_state, 
+		buf + HMAC_HASH_SIZE,
+		outlen - HMAC_HASH_SIZE);
+
+	len = hash_descriptor[DIGEST_SHA1]->hashsize;
+
+	hmac_done (&hmac_state, header->hash_digest, &len);
+
+	*buf_len = outlen;
+}
+
+void totemnet_iovec_send (
+	struct totemnet_instance *instance,
+	struct iovec *iovec_in,
+	int iov_len_in)
+{
+	struct msghdr msg_mcast;
+	int res = 0;
+	int buf_len;
+	unsigned char sheader[sizeof (struct security_header)];
+	unsigned char encrypt_data[FRAME_SIZE_MAX];
+	struct iovec iovec_encrypt[20];
+	struct iovec *iovec_sendmsg;
+	int iov_len;
+
+	if (instance->totem_config->secauth == 1) {
+
+		iovec_encrypt[0].iov_base = sheader;
+		iovec_encrypt[0].iov_len = sizeof (struct security_header);
+		memcpy (&iovec_encrypt[1], &iovec_in[0],
+			sizeof (struct iovec) * iov_len_in);
+
+		/*
+		 * Encrypt and digest the message
+		 */
+		encrypt_and_sign_worker (
+			instance,
+			encrypt_data,
+			&buf_len,
+			iovec_encrypt,
+			iov_len_in + 1,
+			&instance->totemnet_prng_state);
+
+		iovec_encrypt[0].iov_base = encrypt_data;
+		iovec_encrypt[0].iov_len = buf_len;
+		iovec_sendmsg = &iovec_encrypt[0];
+		iov_len = 1;
+	} else {
+		iovec_sendmsg = iovec_in;
+		iov_len = iov_len_in;
+	}
+
+	/*
+	 * Build multicast message
+	 */
+	msg_mcast.msg_name = &instance->sockaddr_in_mcast;
+	msg_mcast.msg_namelen = sizeof (struct sockaddr_in);
+	msg_mcast.msg_iov = iovec_sendmsg;
+	msg_mcast.msg_iovlen = iov_len;
+	msg_mcast.msg_control = 0;
+	msg_mcast.msg_controllen = 0;
+	msg_mcast.msg_flags = 0;
+
+	/*
+	 * Transmit token or multicast message
+	 * An error here is recovered by totemnet
+	 */
+	res = sendmsg (instance->totemnet_sockets.mcast, &msg_mcast, MSG_NOSIGNAL | MSG_DONTWAIT);
+}
+
+void totemnet_msg_send (
+	struct totemnet_instance *instance,
+	struct in_addr *system_to,
+	void *msg,
+	int msg_len)
+{
+	struct sockaddr_in next_addr;
+	struct msghdr msg_mcast;
+	int res = 0;
+	int buf_len;
+	unsigned char sheader[sizeof (struct security_header)];
+	unsigned char encrypt_data[FRAME_SIZE_MAX];
+	struct iovec iovec[2];
+	struct iovec iovec_sendmsg;
+	int fd;
+
+	if (instance->totem_config->secauth == 1) {
+
+		iovec[0].iov_base = sheader;
+		iovec[0].iov_len = sizeof (struct security_header);
+		iovec[1].iov_base = msg;
+		iovec[1].iov_len = msg_len;
+
+		/*
+		 * Encrypt and digest the message
+		 */
+		encrypt_and_sign_worker (
+			instance,
+			encrypt_data,
+			&buf_len,
+			iovec,
+			2,
+			&instance->totemnet_prng_state);
+
+		iovec_sendmsg.iov_base = encrypt_data;
+		iovec_sendmsg.iov_len = buf_len;
+	} else {
+		iovec_sendmsg.iov_base = msg;
+		iovec_sendmsg.iov_len = msg_len;
+	}
+
+	/*
+	 * Build multicast message
+	 */
+	if (system_to) {
+		/*
+		 * system_to is non-zero, so its a token send operation
+		 */
+		next_addr.sin_addr.s_addr = system_to->s_addr;
+		next_addr.sin_port = instance->sockaddr_in_mcast.sin_port;
+		next_addr.sin_family = AF_INET;
+
+		fd = instance->totemnet_sockets.token;
+		msg_mcast.msg_name = &next_addr;
+	} else {
+		/*
+		 * system_to is zero, so its a mcast send operation
+		 */
+		fd = instance->totemnet_sockets.mcast;
+		msg_mcast.msg_name = &instance->sockaddr_in_mcast;
+	}
+
+	msg_mcast.msg_namelen = sizeof (struct sockaddr_in);
+	msg_mcast.msg_iov = &iovec_sendmsg;
+	msg_mcast.msg_iovlen = 1;
+	msg_mcast.msg_control = 0;
+	msg_mcast.msg_controllen = 0;
+	msg_mcast.msg_flags = 0;
+
+	/*
+	 * Transmit token or multicast message
+	 * An error here is recovered by totemnet
+	 */
+	res = sendmsg (fd, &msg_mcast, MSG_NOSIGNAL | MSG_DONTWAIT);
+//printf ("sent %d bytes\n", res);
+}
+
+static void totemnet_mcast_thread_state_constructor (
+	void *totemnet_mcast_thread_state_in)
+{
+	struct totemnet_mcast_thread_state *totemnet_mcast_thread_state =
+		(struct totemnet_mcast_thread_state *)totemnet_mcast_thread_state_in;
+	memset (totemnet_mcast_thread_state, 0,
+		sizeof (totemnet_mcast_thread_state));
+
+	rng_make_prng (128, PRNG_SOBER,
+		&totemnet_mcast_thread_state->prng_state, NULL);
+}
+
+
+static void totemnet_mcast_worker_fn (void *thread_state, void *work_item_in)
+{
+	struct work_item *work_item = (struct work_item *)work_item_in;
+	struct totemnet_mcast_thread_state *totemnet_mcast_thread_state =
+		(struct totemnet_mcast_thread_state *)thread_state;
+	struct totemnet_instance *instance = work_item->instance;
+	struct msghdr msg_mcast;
+	unsigned char sheader[sizeof (struct security_header)];
+	int res = 0;
+	int buf_len;
+	struct iovec iovec_encrypted;
+	struct iovec *iovec_sendmsg;
+	unsigned int iovs;
+
+	if (instance->totem_config->secauth == 1) {
+		memmove (&work_item->iovec[1], &work_item->iovec[0],
+			work_item->iov_len * sizeof (struct iovec));
+		work_item->iovec[0].iov_base = sheader;
+		work_item->iovec[0].iov_len = sizeof (struct security_header);
+
+		/*
+		 * Encrypt and digest the message
+		 */
+		encrypt_and_sign_worker (
+			instance,
+			totemnet_mcast_thread_state->iobuf, &buf_len,
+			work_item->iovec, work_item->iov_len + 1,
+			&totemnet_mcast_thread_state->prng_state);
+
+			iovec_sendmsg = &iovec_encrypted;
+			iovec_sendmsg->iov_base = totemnet_mcast_thread_state->iobuf;
+			iovec_sendmsg->iov_len = buf_len;
+			iovs = 1;
+	} else {
+		iovec_sendmsg = work_item->iovec;
+		iovs = work_item->iov_len;
+	}
+
+	msg_mcast.msg_name = &instance->sockaddr_in_mcast;
+	msg_mcast.msg_namelen = sizeof (struct sockaddr_in);
+	msg_mcast.msg_iov = iovec_sendmsg;
+	msg_mcast.msg_iovlen = iovs;
+	msg_mcast.msg_control = 0;
+	msg_mcast.msg_controllen = 0;
+	msg_mcast.msg_flags = 0;
+
+	/*
+	 * Transmit token or multicast message
+	 * An error here is recovered by totemnet
+	 */
+	res = sendmsg (instance->totemnet_sockets.mcast, &msg_mcast, MSG_NOSIGNAL | MSG_DONTWAIT);
+
+	if (res > 0) {
+		instance->stats_sent += res;
+	}
+}
+
+
+int totemnet_finalize (
+	totemnet_handle handle)
+{
+	struct totemnet_instance *instance;
+	SaErrorT error;
+	int res = 0;
+
+	error = saHandleInstanceGet (&totemnet_instance_database, handle,
+		(void *)&instance);
+	if (error != SA_OK) {
+		res = ENOENT;
+		goto error_exit;
+	}
+
+	worker_thread_group_exit (&instance->worker_thread_group);
+
+	saHandleInstancePut (&totemnet_instance_database, handle);
+
+error_exit:
+	return (res);
+}
+
+/*
+ * Only designed to work with a message with one iov
+ */
+
+static int net_deliver_fn (
+	poll_handle handle,
+	int fd,
+	int revents,
+	void *data,
+	unsigned int *prio)
+{
+	struct totemnet_instance *instance = (struct totemnet_instance *)data;
+	struct msghdr msg_recv;
+	struct iovec *iovec;
+	struct security_header *security_header;
+	struct sockaddr_in system_from;
+	int bytes_received;
+	int res = 0;
+	unsigned char *msg_offset;
+	unsigned int size_delv;
+
+	*prio = UINT_MAX;
+
+	if (instance->flushing == 1) {
+		iovec = &instance->totemnet_iov_recv_flush;
+	} else {
+		iovec = &instance->totemnet_iov_recv;
+	}
+
+	/*
+	 * Receive datagram
+	 */
+	msg_recv.msg_name = &system_from;
+	msg_recv.msg_namelen = sizeof (struct sockaddr_in);
+	msg_recv.msg_iov = iovec;
+	msg_recv.msg_iovlen = 1;
+	msg_recv.msg_control = 0;
+	msg_recv.msg_controllen = 0;
+	msg_recv.msg_flags = 0;
+
+	bytes_received = recvmsg (fd, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
+	if (bytes_received == -1) {
+		return (0);
+	} else {
+		instance->stats_recv += bytes_received;
+	}
+
+	if ((instance->totem_config->secauth == 1) &&
+		(bytes_received < sizeof (struct security_header))) {
+
+		instance->totemnet_log_printf (instance->totemnet_log_level_security, "Received message is too short...  ignoring %d.\n", bytes_received);
+		return (0);
+	}
+
+	security_header = (struct security_header *)iovec->iov_base;
+
+	iovec->iov_len = bytes_received;
+	if (instance->totem_config->secauth == 1) {
+		/*
+		 * Authenticate and if authenticated, decrypt datagram
+		 */
+
+		res = authenticate_and_decrypt (instance, iovec);
+		if (res == -1) {
+			printf ("Invalid packet data\n");
+			iovec->iov_len = FRAME_SIZE_MAX;
+			return 0;
+		}
+		msg_offset = iovec->iov_base +
+			sizeof (struct security_header);
+		size_delv = bytes_received - sizeof (struct security_header);
+	} else {
+		msg_offset = iovec->iov_base;
+		size_delv = bytes_received;
+	}
+
+	/*
+	 * Handle incoming message
+	 */
+	instance->totemnet_deliver_fn (
+		instance->context,
+		&system_from.sin_addr,
+		msg_offset,
+		size_delv);
+		
+	iovec->iov_len = FRAME_SIZE_MAX;
+	return (0);
+}
+
+static int netif_determine (
+	struct totemnet_instance *instance,
+	struct sockaddr_in *bindnet,
+	struct sockaddr_in *bound_to,
+	int *interface_up)
+{
+	struct sockaddr_in *sockaddr_in;
+	int id_fd;
+	struct ifconf ifc;
+	int numreqs = 0;
+	int res;
+	int i;
+	in_addr_t mask_addr;
+
+	*interface_up = 0;
+
+	/*
+	 * Generate list of local interfaces in ifc.ifc_req structure
+	 */
+	id_fd = socket (AF_INET, SOCK_STREAM, 0);
+	ifc.ifc_buf = 0;
+	do {
+		numreqs += 32;
+		ifc.ifc_len = sizeof (struct ifreq) * numreqs;
+		ifc.ifc_buf = (void *)realloc(ifc.ifc_buf, ifc.ifc_len);
+		res = ioctl (id_fd, SIOCGIFCONF, &ifc);
+		if (res < 0) {
+			close (id_fd);
+			return -1;
+		}
+	} while (ifc.ifc_len == sizeof (struct ifreq) * numreqs);
+	res = -1;
+
+	/*
+	 * Find interface address to bind to
+	 */
+	for (i = 0; i < ifc.ifc_len / sizeof (struct ifreq); i++) {
+		sockaddr_in = (struct sockaddr_in *)&ifc.ifc_ifcu.ifcu_req[i].ifr_ifru.ifru_addr;
+		mask_addr = inet_addr ("255.255.255.0");
+
+		if ((sockaddr_in->sin_family == AF_INET) &&
+			(sockaddr_in->sin_addr.s_addr & mask_addr) ==
+			(bindnet->sin_addr.s_addr & mask_addr)) {
+
+			bound_to->sin_addr.s_addr = sockaddr_in->sin_addr.s_addr;
+			res = i;
+
+			if (ioctl(id_fd, SIOCGIFFLAGS, &ifc.ifc_ifcu.ifcu_req[i]) < 0) {
+				printf ("couldn't do ioctl\n");
+			}
+
+			*interface_up = ifc.ifc_ifcu.ifcu_req[i].ifr_ifru.ifru_flags & IFF_UP;
+			break; /* for */
+		}
+	}
+	free (ifc.ifc_buf);
+	close (id_fd);
+	
+	return (res);
+}
+
+static int loopback_determine (struct sockaddr_in *bound_to)
+{
+
+	bound_to->sin_addr.s_addr = LOCALHOST_IP;
+	if (&bound_to->sin_addr.s_addr == 0) {
+		return -1;
+	}
+	return 1;
+}
+
+
+/*
+ * If the interface is up, the sockets for totem are built.  If the interface is down
+ * this function is requeued in the timer list to retry building the sockets later.
+ */
+static void timer_function_netif_check_timeout (
+	void *data)
+{
+	struct totemnet_instance *instance = (struct totemnet_instance *)data;
+	int res;
+	int interface_up;
+
+	/*
+	* Build sockets for every interface
+	*/
+	netif_determine (instance,
+		&instance->totemnet_interface->bindnet,
+		&instance->totemnet_interface->boundto,
+		&interface_up);
+
+	if (instance->totemnet_sockets.mcast > 0) {
+		close (instance->totemnet_sockets.mcast);
+	 	poll_dispatch_delete (instance->totemnet_poll_handle,
+		instance->totemnet_sockets.mcast);
+	}
+	if (instance->totemnet_sockets.token > 0) {
+		close (instance->totemnet_sockets.token);
+		poll_dispatch_delete (instance->totemnet_poll_handle,
+		instance->totemnet_sockets.token);
+	}
+
+	if (!interface_up) {
+		instance->netif_bind_state = BIND_STATE_LOOPBACK;
+		res = totemnet_build_sockets_loopback(instance,
+			&instance->sockaddr_in_mcast,
+			&instance->totemnet_interface->bindnet,
+			&instance->totemnet_sockets,
+			&instance->totemnet_interface->boundto);
+
+		poll_dispatch_add (
+			instance->totemnet_poll_handle,
+			instance->totemnet_sockets.token,
+			POLLIN, instance, net_deliver_fn, UINT_MAX);
+
+		instance->netif_bind_state = BIND_STATE_REGULAR;
+	} else {
+		/*
+		* Create and bind the multicast and unicast sockets
+		*/
+		memcpy (&instance->sockaddr_in_mcast.sin_addr,
+			&instance->in_addr_mcast, sizeof (struct in_addr));
+		res = totemnet_build_sockets (instance,
+			&instance->sockaddr_in_mcast,
+			&instance->totemnet_interface->bindnet,
+			&instance->totemnet_sockets,
+			&instance->totemnet_interface->boundto,
+			&interface_up);
+
+		poll_dispatch_add (
+			instance->totemnet_poll_handle,
+			instance->totemnet_sockets.mcast,
+			POLLIN, instance, net_deliver_fn, UINT_MAX);
+
+		poll_dispatch_add (
+			instance->totemnet_poll_handle,
+			instance->totemnet_sockets.token,
+			POLLIN, instance, net_deliver_fn, UINT_MAX);
+	}
+
+	memcpy (&instance->my_id, &instance->totemnet_interface->boundto,
+		sizeof (struct sockaddr_in));	
+
+	/*
+	* This stuff depends on totemnet_build_sockets
+	*/
+	if (interface_up) {
+		if (instance->netif_state_report & NETIF_STATE_REPORT_UP) {
+			instance->totemnet_log_printf (instance->totemnet_log_level_notice,
+				" The network interface [%s] is now up.\n",
+				inet_ntoa (instance->totemnet_interface->boundto.sin_addr));
+			instance->netif_state_report = NETIF_STATE_REPORT_DOWN;
+			instance->totemnet_iface_change_fn (instance->context, &instance->my_id);
+		}
+
+		/*
+		 * If this is a single processor, detect downs which may not 
+		 * be detected by token loss when the interface is downed
+		 */
+		/*
+		if (instance->my_memb_entries <= 1) {
+			poll_timer_add (instance->totemnet_poll_handle,
+				instance->timeout_downcheck,
+				(void *)instance,
+				timer_function_netif_check_timeout,
+				&instance->timer_netif_check_timeout);
+		}
+		*/
+	} else {		
+		if (instance->netif_state_report & NETIF_STATE_REPORT_DOWN) {
+			instance->totemnet_log_printf (instance->totemnet_log_level_notice,
+				"The network interface is down.\n");
+			instance->totemnet_iface_change_fn (instance->context, &instance->my_id);
+		}
+		instance->netif_state_report = NETIF_STATE_REPORT_UP;
+
+		/*
+		 * Add a timer to retry building interfaces and request memb_gather_enter
+		 */
+		poll_timer_add (instance->totemnet_poll_handle,
+			instance->totem_config->downcheck_timeout,
+			(void *)instance,
+			timer_function_netif_check_timeout,
+			&instance->timer_netif_check_timeout);
+	}
+}
+
+
+/*
+ * Check if an interface is down and reconfigure
+ * totemnet waiting for it to come back up
+ */
+static void netif_down_check (struct totemnet_instance *instance)
+{
+	timer_function_netif_check_timeout (instance);
+}
+
+	struct sockaddr_in sockaddr_in_test;
+static int totemnet_build_sockets_loopback (
+	struct totemnet_instance *instance,
+	struct sockaddr_in *sockaddr_mcast,
+	struct sockaddr_in *sockaddr_bindnet,
+	struct totemnet_socket *sockets,
+	struct sockaddr_in *bound_to)
+{
+	struct ip_mreq mreq;
+	int res;
+
+	memset (&mreq, 0, sizeof (struct ip_mreq));
+
+	/*
+	 * Determine the ip address bound to and the interface name
+	 */
+	res = loopback_determine (bound_to);
+
+	if (res == -1) {
+		return (-1);
+	}
+
+	/* TODO this should be somewhere else */
+	instance->my_id.sin_addr.s_addr = bound_to->sin_addr.s_addr;
+	instance->my_id.sin_family = AF_INET;
+	instance->my_id.sin_port = sockaddr_mcast->sin_port;
+
+	 /*
+	 * Setup unicast socket
+	 */
+	sockets->token = socket (AF_INET, SOCK_DGRAM, 0);
+	if (sockets->token == -1) {
+		perror ("socket2");
+		return (-1);
+	}
+
+	/*
+	 * Bind to unicast socket used for token send/receives	
+	 * This has the side effect of binding to the correct interface
+	 */
+	sockaddr_in_test.sin_addr.s_addr = bound_to->sin_addr.s_addr;
+	sockaddr_in_test.sin_family = AF_INET;
+	sockaddr_in_test.sin_port = sockaddr_mcast->sin_port;
+
+	res = bind (sockets->token, (struct sockaddr *)&sockaddr_in_test,
+			sizeof (struct sockaddr_in));
+	if (res == -1) {
+		perror ("bind2 failed");
+		return (-1);
+	}
+
+	memcpy(&instance->sockaddr_in_mcast, &sockaddr_in_test, sizeof(struct sockaddr_in));
+	sockets->mcast = sockets->token;
+
+	return (0);
+}
+
+
+static int totemnet_build_sockets (
+	struct totemnet_instance *instance,
+	struct sockaddr_in *sockaddr_mcast,
+	struct sockaddr_in *sockaddr_bindnet,
+	struct totemnet_socket *sockets,
+	struct sockaddr_in *bound_to,
+	int *interface_up)
+{
+	struct ip_mreq mreq;
+	struct sockaddr_in sockaddr_in_test;
+	char flag;
+	int res;
+	
+	memset (&mreq, 0, sizeof (struct ip_mreq));
+
+	/*
+	 * Determine the ip address bound to and the interface name
+	 */
+	res = netif_determine (instance,
+		sockaddr_bindnet,
+		bound_to,
+		interface_up);
+
+	if (res == -1) {
+		return (-1);
+	}
+
+	/* TODO this should be somewhere else */
+	instance->my_id.sin_addr.s_addr = bound_to->sin_addr.s_addr;
+	instance->my_id.sin_family = AF_INET;
+	instance->my_id.sin_port = sockaddr_mcast->sin_port;
+
+	/*
+	 * Create multicast socket
+	 */
+	sockets->mcast = socket (AF_INET, SOCK_DGRAM, 0);
+	if (sockets->mcast == -1) {
+		perror ("socket");
+		return (-1);
+	}
+
+	if (setsockopt (sockets->mcast, SOL_IP, IP_MULTICAST_IF,
+		&bound_to->sin_addr, sizeof (struct in_addr)) < 0) {
+
+		instance->totemnet_log_printf (instance->totemnet_log_level_warning, "Could not bind to device for multicast, group messaging may not work properly. (%s)\n", strerror (errno));
+	}
+
+	/*
+	 * Bind to multicast socket used for multicast send/receives
+	 */
+	sockaddr_in_test.sin_family = AF_INET;
+	sockaddr_in_test.sin_addr.s_addr = sockaddr_mcast->sin_addr.s_addr;
+	sockaddr_in_test.sin_port = sockaddr_mcast->sin_port;
+	printf ("binding to %s\n", inet_ntoa (sockaddr_in_test.sin_addr));
+		printf ("%d\n", sockaddr_in_test.sin_port);
+	res = bind (sockets->mcast, (struct sockaddr *)&sockaddr_in_test,
+		sizeof (struct sockaddr_in));
+	if (res == -1) {
+		perror ("bind failed");
+		return (-1);
+	}
+
+	/*
+	 * Setup unicast socket
+	 */
+	sockets->token = socket (AF_INET, SOCK_DGRAM, 0);
+	if (sockets->token == -1) {
+		perror ("socket2");
+		return (-1);
+	}
+
+	/*
+	 * Bind to unicast socket used for token send/receives
+	 * This has the side effect of binding to the correct interface
+	 */
+	sockaddr_in_test.sin_family = AF_INET;
+	sockaddr_in_test.sin_addr.s_addr = bound_to->sin_addr.s_addr;
+	sockaddr_in_test.sin_port = sockaddr_mcast->sin_port;
+	printf ("binding to %s\n", inet_ntoa (sockaddr_in_test.sin_addr));
+		printf ("%d\n", sockaddr_in_test.sin_port);
+	res = bind (sockets->token, (struct sockaddr *)&sockaddr_in_test,
+		sizeof (struct sockaddr_in));
+	if (res == -1) {
+		perror ("bind2 failed");
+		return (-1);
+	}
+
+#ifdef CONFIG_USE_BROADCAST
+/* This config option doesn't work */
+{
+	int on = 1;
+	setsockopt (sockets->mcast, SOL_SOCKET, SO_BROADCAST, (char *)&on, sizeof (on));
+}
+#else
+	/*
+	 * Join group membership on socket
+	 */
+	mreq.imr_multiaddr.s_addr = sockaddr_mcast->sin_addr.s_addr;
+	mreq.imr_interface.s_addr = bound_to->sin_addr.s_addr;
+
+	res = setsockopt (sockets->mcast, IPPROTO_IP, IP_ADD_MEMBERSHIP,
+		&mreq, sizeof (mreq));
+	if (res == -1) {
+		perror ("join multicast group failed");
+		return (-1);
+	}
+
+#endif
+	/*
+	 * Turn on multicast loopback
+	 */
+	flag = 1;
+	res = setsockopt (sockets->mcast, IPPROTO_IP, IP_MULTICAST_LOOP,
+		&flag, sizeof (flag));
+	if (res == -1) {
+		perror ("turn off loopback");
+		return (-1);
+	}
+
+	return (0);
+}
+	
+/*
+ * Totem Network interface - also does encryption/decryption
+ * depends on poll abstraction, POSIX, IPV4
+ */
+
+/*
+ * Create an instance
+ */
+int totemnet_initialize (
+	poll_handle poll_handle,
+	totemnet_handle *handle,
+	struct totem_config *totem_config,
+	int interface_no,
+	void *context,
+
+	void (*deliver_fn) (
+		void *context,
+		struct in_addr *system_from,
+		void *msg,
+		int msg_len),
+
+	void (*iface_change_fn) (
+		void *context,
+		struct sockaddr_in *iface_sockaddr_in))
+{
+	SaAisErrorT error;
+	struct totemnet_instance *instance;
+
+	memset (&sockaddr_in_test, 0, sizeof (struct sockaddr_in));
+	error = saHandleCreate (&totemnet_instance_database,
+	sizeof (struct totemnet_instance), handle);
+	if (error != SA_OK) {
+		goto error_exit;
+	}
+	error = saHandleInstanceGet (&totemnet_instance_database, *handle,
+		(void *)&instance);
+	if (error != SA_OK) {
+		goto error_destroy;
+	}
+
+	totemnet_instance_initialize (instance);
+
+	instance->totem_config = totem_config;
+
+	/*
+	* Configure logging
+	*/
+	instance->totemnet_log_level_security = 1; //totem_config->totem_logging_configuration.log_level_security;
+	instance->totemnet_log_level_error = totem_config->totem_logging_configuration.log_level_error;
+	instance->totemnet_log_level_warning = totem_config->totem_logging_configuration.log_level_warning;
+	instance->totemnet_log_level_notice = totem_config->totem_logging_configuration.log_level_notice;
+	instance->totemnet_log_level_debug = totem_config->totem_logging_configuration.log_level_debug;
+	instance->totemnet_log_printf = totem_config->totem_logging_configuration.log_printf;
+
+	memcpy (&instance->sockaddr_in_mcast, &totem_config->mcast_addr,
+		sizeof (struct sockaddr_in));
+
+	memcpy (&instance->in_addr_mcast, &totem_config->mcast_addr.sin_addr,
+		sizeof (struct in_addr));
+
+	/*
+	* Initialize random number generator for later use to generate salt
+	*/
+	memcpy (instance->totemnet_private_key, totem_config->private_key,
+		totem_config->private_key_len);
+
+	instance->totemnet_private_key_len = totem_config->private_key_len;
+
+        rng_make_prng (128, PRNG_SOBER, &instance->totemnet_prng_state, NULL);
+
+	/*
+	* Initialize local variables for totemnet
+	*/
+	memcpy (&instance->sockaddr_in_mcast, &totem_config->mcast_addr, 
+	sizeof (struct sockaddr_in));
+	memset (instance->iov_buffer, 0, FRAME_SIZE_MAX);
+
+	/*
+	* If threaded send requested, initialize thread group data structure
+	*/
+	if (totem_config->threads) {
+		worker_thread_group_init (
+			&instance->worker_thread_group,
+			totem_config->threads, 128,
+			sizeof (struct work_item),
+			sizeof (struct totemnet_mcast_thread_state),
+			totemnet_mcast_thread_state_constructor,
+			totemnet_mcast_worker_fn);
+	}
+
+	instance->totemnet_interface = &totem_config->interfaces[interface_no];
+	instance->totemnet_poll_handle = poll_handle;
+
+	instance->context = context;
+	instance->totemnet_deliver_fn = deliver_fn;
+
+	instance->totemnet_iface_change_fn = iface_change_fn;
+
+	instance->handle = *handle;
+
+	rng_make_prng (128, PRNG_SOBER, &instance->totemnet_prng_state, NULL);
+
+	netif_down_check (instance);
+
+error_exit:
+	saHandleInstancePut (&totemnet_instance_database, *handle);
+	return (0);
+
+error_destroy:
+	saHandleDestroy (&totemnet_instance_database, *handle);
+	return (-1);
+}
+
+int totemnet_processor_count_set (
+	totemnet_handle handle,
+	int processor_count)
+{
+	SaAisErrorT error;
+	struct totemnet_instance *instance;
+	int res = 0;
+
+	error = saHandleInstanceGet (&totemnet_instance_database, handle,
+		(void *)&instance);
+	if (error != SA_OK) {
+		res = ENOENT;
+		goto error_exit;
+	}
+
+	instance->my_memb_entries = processor_count;
+
+	saHandleInstancePut (&totemnet_instance_database, handle);
+
+error_exit:
+	return (res);
+}
+
+int totemnet_recv_flush (totemnet_handle handle)
+{
+	SaAisErrorT error;
+	struct totemnet_instance *instance;
+	struct pollfd ufd;
+	int nfds;
+	int res = 0;
+	int prio;
+
+	error = saHandleInstanceGet (&totemnet_instance_database, handle,
+		(void *)&instance);
+	if (error != SA_OK) {
+		res = ENOENT;
+		goto error_exit;
+	}
+
+	instance->flushing = 1;
+
+	do {
+		ufd.fd = instance->totemnet_sockets.mcast;
+		ufd.events = POLLIN;
+		nfds = poll (&ufd, 1, 0);
+		if (nfds == 1 && ufd.revents & POLLIN) {
+		net_deliver_fn (0, instance->totemnet_sockets.mcast,
+			ufd.revents, instance, &prio);
+		}
+	} while (nfds == 1);
+
+	instance->flushing = 0;
+
+	saHandleInstancePut (&totemnet_instance_database, handle);
+
+error_exit:
+	return (res);
+}
+
+int totemnet_send_flush (totemnet_handle handle)
+{
+	SaAisErrorT error;
+	struct totemnet_instance *instance;
+	int res = 0;
+
+	error = saHandleInstanceGet (&totemnet_instance_database, handle,
+		(void *)&instance);
+	if (error != SA_OK) {
+		res = ENOENT;
+		goto error_exit;
+	}
+	
+	worker_thread_group_wait (&instance->worker_thread_group);
+
+	saHandleInstancePut (&totemnet_instance_database, handle);
+
+error_exit:
+	return (res);
+}
+
+int totemnet_token_send (
+	totemnet_handle handle,
+	struct in_addr *system_to,
+	void *msg,
+	int msg_len)
+{
+	SaAisErrorT error;
+	struct totemnet_instance *instance;
+	int res = 0;
+
+	error = saHandleInstanceGet (&totemnet_instance_database, handle,
+		(void *)&instance);
+	if (error != SA_OK) {
+		res = ENOENT;
+		goto error_exit;
+	}
+
+	totemnet_msg_send (instance, system_to, msg, msg_len);
+
+	saHandleInstancePut (&totemnet_instance_database, handle);
+
+error_exit:
+	return (res);
+}
+int totemnet_mcast_flush_send (
+	totemnet_handle handle,
+	void *msg,
+	int msg_len)
+{
+	SaAisErrorT error;
+	struct totemnet_instance *instance;
+	int res = 0;
+
+	error = saHandleInstanceGet (&totemnet_instance_database, handle,
+		(void *)&instance);
+	if (error != SA_OK) {
+		res = ENOENT;
+		goto error_exit;
+	}
+	
+	totemnet_msg_send (instance, 0, msg, msg_len);
+
+	saHandleInstancePut (&totemnet_instance_database, handle);
+
+error_exit:
+	return (res);
+}
+
+int totemnet_mcast_noflush_send (
+	totemnet_handle handle,
+	struct iovec *iovec,
+	int iov_len)
+{
+	SaAisErrorT error;
+	struct totemnet_instance *instance;
+	struct work_item work_item;
+	int res = 0;
+
+	error = saHandleInstanceGet (&totemnet_instance_database, handle,
+		(void *)&instance);
+	if (error != SA_OK) {
+		res = ENOENT;
+		goto error_exit;
+	}
+	
+	if (instance->totem_config->threads) {
+		memcpy (&work_item.iovec[0], iovec, iov_len * sizeof (struct iovec));
+		work_item.iov_len = iov_len;
+		work_item.instance = instance;
+
+		worker_thread_group_work_add (&instance->worker_thread_group,
+			&work_item);         
+	} else {
+		totemnet_iovec_send (instance, iovec, iov_len);
+	}
+	
+	saHandleInstancePut (&totemnet_instance_database, handle);
+error_exit:
+	return (res);
+}
+extern int totemnet_iface_check (totemnet_handle handle)
+{
+	SaAisErrorT error;
+	struct totemnet_instance *instance;
+	int res = 0;
+
+	error = saHandleInstanceGet (&totemnet_instance_database, handle,
+		(void *)&instance);
+	if (error != SA_OK) {
+		res = ENOENT;
+		goto error_exit;
+	}
+	
+	timer_function_netif_check_timeout (instance);
+
+	saHandleInstancePut (&totemnet_instance_database, handle);
+error_exit:
+	return (res);
+}
+
+extern void totemnet_net_mtu_adjust (struct totem_config *totem_config)
+{
+#define UDPIP_HEADER_SIZE (20 + 8) /* 20 bytes for ip 8 bytes for udp */
+	if (totem_config->secauth == 1) {
+		totem_config->net_mtu -= sizeof (struct security_header) +
+			UDPIP_HEADER_SIZE;
+	} else {
+		totem_config->net_mtu -= UDPIP_HEADER_SIZE;
+	}
+printf ("adjusted frame size is %d\n", totem_config->net_mtu);
+}

+ 102 - 0
exec/totemnet.h

@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2005 MontaVista Software, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake@mvista.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+#ifndef TOTEMNET_H_DEFINED
+#define TOTEMNET_H_DEFINED
+
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#include "totem.h"
+#include "aispoll.h"
+
+typedef unsigned int totemnet_handle;
+
+#define TOTEMNET_NOFLUSH	0
+#define TOTEMNET_FLUSH		1
+/*
+ * Totem Network interface - also does encryption/decryption
+ * depends on poll abstraction, POSIX, IPV4
+ */
+
+/*
+ * Create an instance
+ */
+extern int totemnet_initialize (
+	poll_handle poll_handle,
+	totemnet_handle *handle,
+	struct totem_config *totem_config,
+	int interface_no,
+	void *context,
+
+	void (*deliver_fn) (
+		void *context,
+		struct in_addr *system_from,
+		void *msg,
+		int msg_len),
+
+	void (*iface_change_fn) (
+		void *context,
+		struct sockaddr_in *iface_sockaddr_in));
+
+extern int totemnet_processor_count_set (
+	totemnet_handle handle,
+	int processor_count);
+
+extern int totemnet_token_send (
+	totemnet_handle handle,
+	struct in_addr *system_to,
+	void *msg,
+	int msg_len);
+
+extern int totemnet_mcast_flush_send (
+	totemnet_handle handle,
+	void *msg,
+	int msg_len);
+
+extern int totemnet_mcast_noflush_send (
+	totemnet_handle handle,
+	struct iovec *iovec,
+	int iov_len);
+
+extern int totemnet_recv_flush (totemnet_handle handle);
+
+extern int totemnet_send_flush (totemnet_handle handle);
+
+extern int totemnet_iface_check (totemnet_handle handle);
+
+extern int totemnet_finalize (totemnet_handle handle);
+
+extern void totemnet_net_mtu_adjust (struct totem_config *totem_config);
+
+#endif /* TOTEMNET_H_DEFINED */

+ 25 - 8
exec/totempg.c

@@ -127,16 +127,18 @@ struct totempg_mcast {
 /*
  * Maximum packet size for totem pg messages
  */
-#define TOTEMPG_PACKET_SIZE (TOTEMSRP_PACKET_SIZE_MAX - \
+#define TOTEMPG_PACKET_SIZE (totempg_totem_config->net_mtu - \
 	sizeof (struct totempg_mcast))
 
 /*
  * Local variables used for packing small messages
  */
-static unsigned short mcast_packed_msg_lens[TOTEMSRP_PACKET_SIZE_MAX];
+static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX];
 
 static int mcast_packed_msg_count = 0;
 
+struct totem_config *totempg_totem_config;
+
 static void (*app_deliver_fn) (
 		struct in_addr source_addr,
 		struct iovec *iovec,
@@ -157,7 +159,7 @@ struct assembly {
 	unsigned char last_frag_num;
 };
 
-struct assembly *assembly_list[16]; // MAX PROCESSORS TODO
+struct assembly *assembly_list[PROCESSOR_COUNT_MAX];
 int assembly_list_entries = 0;
 
 /*
@@ -170,7 +172,7 @@ int assembly_list_entries = 0;
  * fragment_contuation indicates whether the first packed message in 
  * the buffer is a continuation of a previously packed fragment.
  */
-static unsigned char fragmentation_data[TOTEMPG_PACKET_SIZE];
+static unsigned char fragmentation_data[MESSAGE_SIZE_MAX];
 int fragment_size = 0;
 int fragment_continuation = 0;
 
@@ -251,7 +253,7 @@ static void totempg_deliver_fn (
 	unsigned short *msg_lens;
 	int i;
 	struct assembly *assembly;
-	char header[1500];
+	char header[FRAME_SIZE_MAX];
 	int h_index;
 	int a_i = 0;
 	int msg_count;
@@ -308,6 +310,7 @@ static void totempg_deliver_fn (
 
 		for (i = 2; i < iov_len; i++) {
 			a_i = assembly->index;
+			assert (iovec[i].iov_len + a_i <= MESSAGE_SIZE_MAX);
 			memcpy (&assembly->data[a_i], iovec[i].iov_base, iovec[i].iov_len);
 			a_i += msg_lens[i - 2];
 		}
@@ -338,6 +341,7 @@ printf ("Message fragmented %d count %d\n", mcast->fragmented, mcast->msg_count)
 	continuation = mcast->continuation;
 	iov_delv.iov_base = &assembly->data[0];
 	iov_delv.iov_len = assembly->index + msg_lens[0];
+//	printf ("%d %d %d\n", msg_count, continuation, assembly->last_frag_num);
 
 	/*
 	 * Make sure that if this message is a continuation, that it
@@ -399,7 +403,7 @@ printf ("Message fragmented %d count %d\n", mcast->fragmented, mcast->msg_count)
 		}
 		assembly->index += msg_lens[msg_count];
 	} else {
-			assembly->last_frag_num = 0;
+		assembly->last_frag_num = 0;
 		assembly->index = 0;
 	}
 }
@@ -450,7 +454,7 @@ int callback_token_received_fn (enum totem_callback_token_type type,
  * Initialize the totem process group abstraction
  */
 int totempg_initialize (
-	poll_handle *poll_handle,
+	poll_handle poll_handle,
 	totemsrp_handle *totemsrp_handle,
 	struct totem_config *totem_config,
 
@@ -472,6 +476,8 @@ int totempg_initialize (
 	app_deliver_fn = deliver_fn;
 	app_confchg_fn = confchg_fn;
 
+	totempg_totem_config = totem_config;
+
 	res = totemmrp_initialize (
 		poll_handle,
 		totemsrp_handle,
@@ -485,6 +491,8 @@ int totempg_initialize (
 		callback_token_received_fn,
 		0);
 
+	totemsrp_net_mtu_adjust (totem_config);
+
 	return (res);
 }
 
@@ -510,6 +518,7 @@ int totempg_mcast (
 	int max_packet_size = 0;
 	int copy_len = 0; 
 	int copy_base = 0;
+	int total_size = 0;
 
 	totemmrp_new_msg_signal ();
 
@@ -518,6 +527,10 @@ int totempg_mcast (
 
 	mcast_packed_msg_lens[mcast_packed_msg_count] = 0;
 
+	for (i = 0; i < iov_len; i++) {
+		total_size += iovec[i].iov_len;
+	}
+
 	for (i = 0; i < iov_len; ) {
 		mcast.fragmented = 0;
 		mcast.continuation = fragment_continuation;
@@ -625,9 +638,13 @@ int totempg_send_ok (
 	int msg_size)
 {
 	int avail = 0;
+	int total;
+
 	avail = totemmrp_avail ();
 
-	return (avail > 200);
+	total = (msg_size / (totempg_totem_config->net_mtu - 25) /* for totempg_mcat header */);
+
+	return (avail >= total);
 }
 /*
  *	vi: set autoindent tabstop=4 shiftwidth=4 :

+ 1 - 1
exec/totempg.h

@@ -52,7 +52,7 @@
  * Initialize the totem process group abstraction
  */
 int totempg_initialize (
-	poll_handle *poll_handle,
+	poll_handle poll_handle,
 	totemsrp_handle *totemsrp_handle,
 	struct totem_config *totem_config,
 

+ 870 - 0
exec/totemrrp.c

@@ -0,0 +1,870 @@
+/*
+ * Copyright (c) 2005 MontaVista Software, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake@mvista.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#include <assert.h>
+#include <pthread.h>
+#include <sys/mman.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <sys/un.h>
+#include <sys/sysinfo.h>
+#include <sys/ioctl.h>
+#include <sys/param.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <linux/if.h>
+#include <linux/sockios.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <errno.h>
+#include <signal.h>
+#include <sched.h>
+#include <time.h>
+#include <sys/time.h>
+#include <sys/poll.h>
+
+#include "../include/queue.h"
+#include "../include/sq.h"
+#include "../include/list.h"
+#include "hdb.h"
+#include "swab.h"
+#include "aispoll.h"
+#include "totemnet.h"
+#include "totemrrp.h"
+
+struct totemrrp_instance;
+struct passive_instance {
+	unsigned int *faulty;
+	unsigned int *last_seq;
+	unsigned int *counter_problems;
+	unsigned char token[15000];
+	unsigned int token_len;
+};
+
+struct active_instance {
+	struct totemrrp_instance *rrp_instance;
+	unsigned int *faulty;
+	unsigned int *last_token_recv;
+	unsigned int *counter_problems;
+	unsigned char token[15000];
+	unsigned int token_len;
+        poll_timer_handle timer_active_token;
+};
+
+struct rrp_algo {
+	void (*mcast_recv) (
+		struct totemrrp_instance *instance,
+		void *context,
+		struct in_addr *system_from,
+		void *msg,
+		unsigned int msg_len);
+
+	void (*mcast_noflush_send) (
+		struct totemrrp_instance *instance,
+		struct iovec *iovec,
+		unsigned int iov_len);
+
+	void (*mcast_flush_send) (
+		struct totemrrp_instance *instance,
+		void *msg,
+		unsigned int msg_len);
+
+	void (*token_recv) (
+		struct totemrrp_instance *instance,
+		unsigned int interface_no,
+		void *context,
+		struct in_addr *system_from,
+		void *msg,
+		unsigned int msg_len,
+		unsigned int token_seqid);
+
+	void (*token_send) (
+		struct totemrrp_instance *instance,
+		struct in_addr *system_to,
+		void *msg,
+		unsigned int msg_len);
+};
+
+struct totemrrp_instance {
+	poll_handle totemrrp_poll_handle;
+
+	struct totem_interface *totemrrp_interfaces;
+
+	struct rrp_algo *rrp_algo;
+
+	void *context;
+	
+	void (*totemrrp_deliver_fn) (
+		void *context,
+		struct in_addr *system_from,
+		void *msg,
+		int msg_len);
+
+	void (*totemrrp_iface_change_fn) (
+		void *context,
+		struct sockaddr_in *iface_sockaddr_in);
+
+	void (*totemrrp_token_seqid_get) (
+		void *msg,
+		unsigned int *seqid,
+		unsigned int *token_is);
+
+	/*
+	 * Function and data used to log messages
+	 */
+	int totemrrp_log_level_security;
+
+	int totemrrp_log_level_error;
+
+	int totemrrp_log_level_warning;
+
+	int totemrrp_log_level_notice;
+
+	int totemrrp_log_level_debug;
+
+	void (*totemrrp_log_printf) (int level, char *format, ...);
+
+	totemrrp_handle handle;
+
+	totemnet_handle *net_handles;
+
+	totemnet_handle net_handle;
+
+	void *rrp_algo_instance;
+
+	int interface_count;
+
+	int poll_handle;
+
+	int processor_count;
+};
+
+void passive_mcast_recv (
+	struct totemrrp_instance *instance,
+	void *context,
+	struct in_addr *system_from,
+	void *msg,
+	unsigned int msg_len);
+
+void passive_mcast_noflush_send (
+	struct totemrrp_instance *instance,
+	struct iovec *iovec,
+	unsigned int iov_len);
+
+void passive_mcast_flush_send (
+	struct totemrrp_instance *instance,
+	void *msg,
+	unsigned int msg_len);
+
+void passive_token_recv (
+	struct totemrrp_instance *instance,
+	unsigned int interface_no,
+	struct in_addr *system_from,
+	void *msg,
+	unsigned int msg_len,
+	unsigned int token_seqid);
+
+void passive_token_send (
+	struct totemrrp_instance *instance,
+	struct in_addr *system_to,
+	void *msg,
+	unsigned int msg_len);
+
+void active_mcast_recv (
+	struct totemrrp_instance *instance,
+	void *context,
+	struct in_addr *system_from,
+	void *msg,
+	unsigned int msg_len);
+
+void active_mcast_noflush_send (
+	struct totemrrp_instance *instance,
+	struct iovec *iovec,
+	unsigned int iov_len);
+
+void active_mcast_flush_send (
+	struct totemrrp_instance *instance,
+	void *msg,
+	unsigned int msg_len);
+
+void active_token_recv (
+	struct totemrrp_instance *instance,
+	unsigned int interface_no,
+	void *context,
+	struct in_addr *system_from,
+	void *msg,
+	unsigned int msg_len,
+	unsigned int token_seqid);
+
+void active_token_send (
+	struct totemrrp_instance *instance,
+	struct in_addr *system_to,
+	void *msg,
+	unsigned int msg_len);
+
+/*
+struct rrp_algo passive_algo = {
+	.mcast_recv		= passive_mcast_recv,
+	.mcast_noflush_send	= passive_mcast_noflush_send,
+	.mcast_flush_send	= passive_mcast_flush_send,
+	.token_recv		= passive_token_recv,
+	.token_send		= passive_token_send
+};
+*/
+
+struct rrp_algo active_algo = {
+	.mcast_recv		= active_mcast_recv,
+	.mcast_noflush_send	= active_mcast_noflush_send,
+	.mcast_flush_send	= active_mcast_flush_send,
+	.token_recv		= active_token_recv,
+	.token_send		= active_token_send
+};
+
+/*
+ * All instances in one database
+ */
+static struct saHandleDatabase totemrrp_instance_database = {
+	.handleCount			= 0,
+	.handles			= 0,
+	.handleInstanceDestructor	= 0
+};
+
+struct passive_instance *passive_instance_initialize (
+	int interface_count)
+{
+	struct passive_instance *instance;
+	int i;
+
+	instance = malloc (sizeof (struct passive_instance));
+	if (instance == 0) {
+		goto error_exit;
+	}
+
+	instance->faulty = malloc (sizeof (int) * interface_count);
+	if (instance->faulty == 0) {
+		free (instance);
+		instance = 0;
+		goto error_exit;
+	}
+
+	instance->last_seq = malloc (sizeof (int) * interface_count);
+	if (instance->last_seq == 0) {
+		free (instance->faulty);
+		free (instance);
+		instance = 0;
+		goto error_exit;
+	}
+
+	instance->counter_problems = malloc (sizeof (int) * interface_count);
+	if (instance->counter_problems == 0) {
+		free (instance->last_seq);
+		free (instance->faulty);
+		free (instance);
+		instance = 0;
+		goto error_exit;
+	}
+
+	for (i = 0; i < interface_count; i++) {
+		instance->faulty[i] = 0;
+		instance->last_seq[i] = 0;
+		instance->counter_problems[i] = 0;
+	}
+error_exit:
+	return (instance);
+}
+
+
+struct active_instance *active_instance_initialize (
+	struct totemrrp_instance *rrp_instance,
+	int interface_count)
+{
+	struct active_instance *instance;
+	int i;
+
+	instance = malloc (sizeof (struct active_instance));
+	if (instance == 0) {
+		goto error_exit;
+	}
+
+	instance->faulty = malloc (sizeof (int) * interface_count);
+	if (instance->faulty == 0) {
+		free (instance);
+		instance = 0;
+		goto error_exit;
+	}
+
+	instance->last_token_recv = malloc (sizeof (int) * interface_count);
+	if (instance->last_token_recv == 0) {
+		free (instance->faulty);
+		free (instance);
+		instance = 0;
+		goto error_exit;
+	}
+
+	instance->counter_problems = malloc (sizeof (int) * interface_count);
+	if (instance->counter_problems == 0) {
+		free (instance->last_token_recv);
+		free (instance->faulty);
+		free (instance);
+		instance = 0;
+		goto error_exit;
+	}
+
+	instance->faulty = malloc (sizeof (int) * interface_count);
+	instance->last_token_recv = malloc (sizeof (int) * interface_count);
+	instance->counter_problems = malloc (sizeof (int) * interface_count);
+	for (i = 0; i < interface_count; i++) {
+		instance->faulty[i] = 0;
+		instance->last_token_recv[i] = 0;
+		instance->counter_problems[i] = 0;
+	}
+
+	instance->timer_active_token = 0;
+
+	instance->rrp_instance = rrp_instance;
+
+error_exit:
+	return (instance);
+}
+
+static void timer_function_active_token (void *context)
+{
+	struct active_instance *instance = (struct active_instance *)context;
+
+	printf ("active instance %p\n", instance);
+}
+
+
+void active_token_timer_start (struct active_instance *active_instance)
+{
+        poll_timer_add (
+		active_instance->rrp_instance->poll_handle,
+		10, /* 10 msec */
+		(void *)active_instance,
+		timer_function_active_token,
+		&active_instance->timer_active_token);
+}
+
+void active_token_timer_cancel (struct active_instance *active_instance)
+{
+        poll_timer_delete (
+		active_instance->rrp_instance->poll_handle,
+		active_instance->timer_active_token);
+}
+
+void active_mcast_recv (
+	struct totemrrp_instance *instance,
+	void *context,
+	struct in_addr *system_from,
+	void *msg,
+	unsigned int msg_len)
+{
+	instance->totemrrp_deliver_fn (
+		context,
+		system_from,
+		msg,
+		msg_len);
+}
+
+void active_mcast_flush_send (
+	struct totemrrp_instance *instance,
+	void *msg,
+	unsigned int msg_len)
+{
+	int i;
+	struct active_instance *rrp_algo_instance = (struct active_instance *)instance->rrp_algo_instance;
+
+	for (i = 0; i < instance->interface_count; i++) {
+		if (rrp_algo_instance->faulty[i] == 0) {
+			totemnet_mcast_flush_send (instance->net_handle, msg, msg_len);
+		}
+	}
+}
+void active_mcast_noflush_send (
+	struct totemrrp_instance *instance,
+	struct iovec *iovec,
+	unsigned int iov_len)
+{
+	int i;
+	struct active_instance *rrp_algo_instance = (struct active_instance *)instance->rrp_algo_instance;
+
+	for (i = 0; i < instance->interface_count; i++) {
+		if (rrp_algo_instance->faulty[i] == 0) {
+			totemnet_mcast_noflush_send (instance->net_handle, iovec, iov_len);
+		}
+	}
+}
+
+void active_token_recv (
+	struct totemrrp_instance *instance,
+	unsigned int interface_no,
+	void *context,
+	struct in_addr *system_from,
+	void *msg,
+	unsigned int msg_len,
+	unsigned int token_seqid)
+{
+	unsigned int cur_token_seq;
+	unsigned int last_token_seq;
+	int token_is;
+	int i;
+	struct active_instance *active_instance = (struct active_instance *)instance->rrp_algo_instance;
+
+	instance->totemrrp_token_seqid_get (
+		msg,
+		&cur_token_seq,
+		&token_is);
+	assert (token_is);
+
+	if (token_seqid > cur_token_seq) {
+		memcpy (active_instance->token, msg, msg_len);
+		active_instance->token_len = msg_len;
+		for (i = 0; i < instance->interface_count; i++) {
+			active_instance->last_token_recv[i] = 0;
+		}
+		active_instance->last_token_recv[interface_no] = 1;
+		active_token_timer_start (active_instance);
+	
+	}
+
+	instance->totemrrp_token_seqid_get (
+		msg,
+		&last_token_seq,
+		&token_is);
+	assert (token_is);
+
+	if (cur_token_seq == last_token_seq) {
+		active_instance->last_token_recv[interface_no] = 1;
+		for (i = 0; i < instance->interface_count; i++) {
+			if ((active_instance->last_token_recv[i] == 0) &&
+				active_instance->faulty[i] == 0) {
+				return; /* don't deliver token */
+			}
+		}
+		active_token_timer_cancel (active_instance);
+
+		instance->totemrrp_deliver_fn (
+			context,
+			system_from,
+			msg,
+			msg_len);
+	}
+}
+
+void active_token_send (
+	struct totemrrp_instance *instance,
+	struct in_addr *system_to,
+	void *msg,
+	unsigned int msg_len)
+{
+	struct active_instance *rrp_algo_instance = (struct active_instance *)instance->rrp_algo_instance;
+	int i;
+
+	for (i = 0; i < instance->interface_count; i++) {
+		if (rrp_algo_instance->faulty[i] == 0) {
+			totemnet_token_send (
+				instance->net_handle,
+				system_to,
+				msg,
+				msg_len);
+		}
+	}
+}
+
+struct deliver_fn_context {
+	struct totemrrp_instance *instance;
+	void *context;
+	int interface_no;
+};
+
+static void totemrrp_instance_initialize (struct totemrrp_instance *instance)
+{
+	memset (instance, 0, sizeof (struct totemrrp_instance));
+	instance->rrp_algo = &active_algo;
+}
+
+void rrp_deliver_fn (
+	void *context,
+	struct in_addr *system_from,
+	void *msg,
+	int msg_len)
+{
+	int token_seqid;
+	int token_is;
+
+	struct deliver_fn_context *deliver_fn_context = (struct deliver_fn_context *)context;
+
+	deliver_fn_context->instance->totemrrp_token_seqid_get (
+		msg,
+		&token_seqid,
+		&token_is);
+
+	if (token_is) {
+		/*
+		 * Deliver to the token receiver for this rrp algorithm 
+		 */
+		deliver_fn_context->instance->rrp_algo->token_recv (
+			deliver_fn_context->instance,
+			deliver_fn_context->interface_no,
+			deliver_fn_context->context,
+			system_from,
+			msg,
+			msg_len,
+			token_seqid);
+	} else {
+		/*
+		 * Deliver to the mcast receiver for this rrp algorithm 
+		 */
+		deliver_fn_context->instance->rrp_algo->mcast_recv (
+			deliver_fn_context->instance,
+			deliver_fn_context->context,
+			system_from,
+			msg,
+			msg_len);
+	}
+}
+
+void rrp_iface_change_fn (
+	void *context,
+	struct sockaddr_in *iface_sockaddr_in)
+{
+	struct deliver_fn_context *deliver_fn_context = (struct deliver_fn_context *)context;
+
+	deliver_fn_context->instance->totemrrp_iface_change_fn (
+		deliver_fn_context->context,
+		iface_sockaddr_in);
+}
+
+int totemrrp_finalize (
+	totemrrp_handle handle)
+{
+	struct totemrrp_instance *instance;
+	SaErrorT error;
+	int res = 0;
+
+	error = saHandleInstanceGet (&totemrrp_instance_database, handle,
+		(void *)&instance);
+	if (error != SA_OK) {
+		res = ENOENT;
+		goto error_exit;
+	}
+
+	totemnet_finalize (instance->net_handle);
+
+	saHandleInstancePut (&totemrrp_instance_database, handle);
+
+error_exit:
+	return (res);
+}
+
+/*
+ * Totem Redundant Ring interface
+ * depends on poll abstraction, POSIX, IPV4
+ */
+
+/*
+ * Create an instance
+ */
+int totemrrp_initialize (
+	poll_handle poll_handle,
+	totemrrp_handle *handle,
+	struct totem_config *totem_config,
+	void *context,
+
+	void (*deliver_fn) (
+		void *context,
+		struct in_addr *system_from,
+		void *msg,
+		int msg_len),
+
+	void (*iface_change_fn) (
+		void *context,
+		struct sockaddr_in *iface_sockaddr_in),
+
+	void (*token_seqid_get) (
+		void *msg,
+		unsigned int *seqid,
+		unsigned int *token_is))
+	
+{
+	SaAisErrorT error;
+	struct totemrrp_instance *instance;
+	int i;
+
+	error = saHandleCreate (&totemrrp_instance_database,
+	sizeof (struct totemrrp_instance), handle);
+	if (error != SA_OK) {
+		goto error_exit;
+	}
+	error = saHandleInstanceGet (&totemrrp_instance_database, *handle,
+		(void *)&instance);
+	if (error != SA_OK) {
+		goto error_destroy;
+	}
+
+	totemrrp_instance_initialize (instance);
+
+	/*
+	* Configure logging
+	*/
+	instance->totemrrp_log_level_security = totem_config->totem_logging_configuration.log_level_security;
+	instance->totemrrp_log_level_error = totem_config->totem_logging_configuration.log_level_error;
+	instance->totemrrp_log_level_warning = totem_config->totem_logging_configuration.log_level_warning;
+	instance->totemrrp_log_level_notice = totem_config->totem_logging_configuration.log_level_notice;
+	instance->totemrrp_log_level_debug = totem_config->totem_logging_configuration.log_level_debug;
+	instance->totemrrp_log_printf = totem_config->totem_logging_configuration.log_printf;
+
+	instance->totemrrp_interfaces = totem_config->interfaces;
+
+	instance->totemrrp_poll_handle = poll_handle;
+
+	instance->totemrrp_deliver_fn = deliver_fn;
+
+	instance->totemrrp_iface_change_fn = iface_change_fn;
+
+	instance->totemrrp_token_seqid_get = token_seqid_get;
+
+	instance->interface_count = totem_config->interface_count;
+
+	instance->net_handles = malloc (sizeof (totemnet_handle) * totem_config->interface_count);
+
+	instance->context = context;
+
+	instance->poll_handle = poll_handle;
+
+	instance->rrp_algo_instance = malloc (sizeof (struct active_instance));
+	assert (instance->rrp_algo_instance);
+
+	instance->rrp_algo_instance = active_instance_initialize (
+		instance,
+		totem_config->interface_count);
+
+	for (i = 0; i < totem_config->interface_count; i++) {
+		struct deliver_fn_context *deliver_fn_context;
+
+		deliver_fn_context = malloc (sizeof (struct deliver_fn_context));
+		assert (deliver_fn_context);
+		deliver_fn_context->instance = instance;
+		deliver_fn_context->context = context;
+		deliver_fn_context->interface_no = i;
+
+		totemnet_initialize (
+			poll_handle,
+			&instance->net_handles[i],
+			totem_config,
+			i,
+			(void *)deliver_fn_context,
+			rrp_deliver_fn,
+			rrp_iface_change_fn);
+	}
+	instance->net_handle = instance->net_handles[0];
+
+	totemnet_net_mtu_adjust (totem_config);
+
+error_exit:
+	saHandleInstancePut (&totemrrp_instance_database, *handle);
+	return (0);
+
+error_destroy:
+	saHandleDestroy (&totemrrp_instance_database, *handle);
+	return (-1);
+}
+
+int totemrrp_processor_count_set (
+	totemrrp_handle handle,
+	int processor_count)
+{
+	SaAisErrorT error;
+	struct totemrrp_instance *instance;
+	int res = 0;
+
+	error = saHandleInstanceGet (&totemrrp_instance_database, handle,
+		(void *)&instance);
+	if (error != SA_OK) {
+		res = ENOENT;
+		goto error_exit;
+	}
+
+	totemnet_processor_count_set (instance->net_handle, processor_count);
+	instance->processor_count = processor_count;
+
+	saHandleInstancePut (&totemrrp_instance_database, handle);
+
+error_exit:
+	return (res);
+}
+
+int totemrrp_recv_flush (totemrrp_handle handle)
+{
+	SaAisErrorT error;
+	struct totemrrp_instance *instance;
+	int res = 0;
+
+	error = saHandleInstanceGet (&totemrrp_instance_database, handle,
+		(void *)&instance);
+	if (error != SA_OK) {
+		res = ENOENT;
+		goto error_exit;
+	}
+
+	totemnet_recv_flush (instance->net_handle);
+
+	saHandleInstancePut (&totemrrp_instance_database, handle);
+
+error_exit:
+	return (res);
+}
+
+int totemrrp_send_flush (totemrrp_handle handle)
+{
+	SaAisErrorT error;
+	struct totemrrp_instance *instance;
+	int res = 0;
+
+	error = saHandleInstanceGet (&totemrrp_instance_database, handle,
+		(void *)&instance);
+	if (error != SA_OK) {
+		res = ENOENT;
+		goto error_exit;
+	}
+	
+	totemnet_send_flush (instance->net_handle);
+
+	saHandleInstancePut (&totemrrp_instance_database, handle);
+
+error_exit:
+	return (res);
+}
+
+int totemrrp_token_send (
+	totemrrp_handle handle,
+	struct in_addr *system_to,
+	void *msg,
+	int msg_len)
+{
+	SaAisErrorT error;
+	struct totemrrp_instance *instance;
+	int res = 0;
+
+	error = saHandleInstanceGet (&totemrrp_instance_database, handle,
+		(void *)&instance);
+	if (error != SA_OK) {
+		res = ENOENT;
+		goto error_exit;
+	}
+
+	instance->rrp_algo->token_send (instance, system_to, msg, msg_len);
+
+	saHandleInstancePut (&totemrrp_instance_database, handle);
+
+error_exit:
+	return (res);
+}
+
+int totemrrp_mcast_flush_send (
+	totemrrp_handle handle,
+	void *msg,
+	int msg_len)
+{
+	SaAisErrorT error;
+	struct totemrrp_instance *instance;
+	int res = 0;
+
+	error = saHandleInstanceGet (&totemrrp_instance_database, handle,
+		(void *)&instance);
+	if (error != SA_OK) {
+		res = ENOENT;
+		goto error_exit;
+	}
+	
+	instance->rrp_algo->mcast_flush_send (instance, msg, msg_len);
+
+	saHandleInstancePut (&totemrrp_instance_database, handle);
+error_exit:
+	return (res);
+}
+
+int totemrrp_mcast_noflush_send (
+	totemrrp_handle handle,
+	struct iovec *iovec,
+	int iov_len)
+{
+	SaAisErrorT error;
+	struct totemrrp_instance *instance;
+	int res = 0;
+
+	error = saHandleInstanceGet (&totemrrp_instance_database, handle,
+		(void *)&instance);
+	if (error != SA_OK) {
+		res = ENOENT;
+		goto error_exit;
+	}
+	
+	/*
+	 * merge detects go out through mcast_flush_send so it is safe to
+	 * flush these messages if we are only one processor.  This avoids
+	 * an encryption/hmac and decryption/hmac
+	 */
+	if (instance->processor_count > 1) {
+		instance->rrp_algo->mcast_noflush_send (instance, iovec, iov_len);
+	}
+
+	saHandleInstancePut (&totemrrp_instance_database, handle);
+error_exit:
+	return (res);
+}
+int totemrrp_iface_check (totemrrp_handle handle)
+{
+	SaAisErrorT error;
+	struct totemrrp_instance *instance;
+	int res = 0;
+
+	error = saHandleInstanceGet (&totemrrp_instance_database, handle,
+		(void *)&instance);
+	if (error != SA_OK) {
+		res = ENOENT;
+		goto error_exit;
+	}
+	
+	totemnet_iface_check (instance->net_handle);
+
+	saHandleInstancePut (&totemrrp_instance_database, handle);
+error_exit:
+	return (res);
+}

+ 105 - 0
exec/totemrrp.h

@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2005 MontaVista Software, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake@mvista.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+#ifndef TOTEMRRP_H_DEFINED
+#define TOTEMRRP_H_DEFINED
+
+#include <sys/types.h>
+#include <sys/socket.h>
+
+#include "totem.h"
+#include "aispoll.h"
+
+typedef unsigned int totemrrp_handle;
+
+#define TOTEMRRP_NOFLUSH	0
+#define TOTEMRRP_FLUSH		1
+
+/*
+ * Totem Network interface - also does encryption/decryption
+ * depends on poll abstraction, POSIX, IPV4
+ */
+
+/*
+ * Create an instance
+ */
+extern int totemrrp_initialize (
+	poll_handle poll_handle,
+	totemrrp_handle *handle,
+	struct totem_config *totem_config,
+	void *context,
+
+	void (*deliver_fn) (
+		void *context,
+		struct in_addr *system_from,
+		void *msg,
+		int msg_len),
+
+	void (*iface_change_fn) (
+		void *context,
+		struct sockaddr_in *iface_sockaddr_in),
+
+	void (*token_seqid_get) (
+		void *msg,
+		unsigned int *seqid,
+		unsigned int *token_is));
+
+extern int totemrrp_processor_count_set (
+	totemrrp_handle handle,
+	int processor_count);
+
+extern int totemrrp_token_send (
+	totemrrp_handle handle,
+	struct in_addr *system_to,
+	void *msg,
+	int msg_len);
+
+extern int totemrrp_mcast_noflush_send (
+	totemrrp_handle handle,
+	struct iovec *iovec,
+	int iov_len);
+
+extern int totemrrp_mcast_flush_send (
+	totemrrp_handle handle,
+	void *msg,
+	int msg_len);
+
+extern int totemrrp_recv_flush (totemrrp_handle handle);
+
+extern int totemrrp_send_flush (totemrrp_handle handle);
+
+extern int totemrrp_iface_check (totemrrp_handle handle);
+
+extern int totemrrp_finalize (totemrrp_handle handle);
+
+#endif /* TOTEMRRP_H_DEFINED */

Diff do ficheiro suprimidas por serem muito extensas
+ 159 - 763
exec/totemsrp.c


+ 7 - 2
exec/totemsrp.h

@@ -37,7 +37,10 @@
 #include "totem.h"
 #include "aispoll.h"
 
-#define TOTEMSRP_PACKET_SIZE_MAX	1404
+#define HEADERSIZE 78
+
+//#define TOTEMSRP_PACKET_SIZE_MAX	1404
+#define TOTEMSRP_PACKET_SIZE_MAX	9000 - HEADERSIZE
 
 typedef unsigned int totemsrp_handle;
 
@@ -50,7 +53,7 @@ typedef unsigned int totemsrp_handle;
  * Create a protocol instance
  */
 int totemsrp_initialize (
-	poll_handle *poll_handle,
+	poll_handle poll_handle,
 	totemsrp_handle *handle,
 	struct totem_config *totem_config,
 
@@ -98,4 +101,6 @@ int totemsrp_new_msg_signal (totemsrp_handle handle);
 
 extern struct sockaddr_in config_mcast_addr;
 
+extern void totemsrp_net_mtu_adjust (struct totem_config *totem_config);
+
 #endif /* TOTEMSRP_H_DEFINED */

+ 169 - 0
exec/wthread.c

@@ -0,0 +1,169 @@
+/*
+ * Copyright (c) 2005 MontaVista Software, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake@mvista.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * Add work to a work group and have threads process the work
+ * Provide blocking for all work to complete 
+ */
+#include <stdlib.h>
+#include <pthread.h>
+#include <errno.h>
+#include "wthread.h"
+#include "../include/queue.h"
+
+struct thread_data {
+	void *thread_state;
+	void *data;
+};
+
+struct worker_thread {
+	struct worker_thread_group *worker_thread_group;
+	pthread_mutex_t new_work_mutex;
+	pthread_cond_t new_work_cond;
+	pthread_cond_t cond;
+	pthread_mutex_t done_work_mutex;
+	pthread_cond_t done_work_cond;
+	pthread_t thread_id;
+	struct queue queue;
+	void *thread_state;
+	struct thread_data thread_data;
+};
+
+void *worker_thread (void *thread_data_in) {
+	struct thread_data *thread_data = (struct thread_data *)thread_data_in;
+	struct orf_token_mcast_thread_state *orf_token_mcast_thread_state =
+		(struct orf_token_mcast_thread_state *)thread_data->thread_state;
+	struct worker_thread *worker_thread =
+		(struct worker_thread *)thread_data->data;
+	void *data_for_worker_fn;
+
+	for (;;) {
+		pthread_mutex_lock (&worker_thread->new_work_mutex);
+		if (queue_is_empty (&worker_thread->queue) == 1) {
+		pthread_cond_wait (&worker_thread->new_work_cond,
+			&worker_thread->new_work_mutex);
+		}
+
+		data_for_worker_fn = queue_item_get (&worker_thread->queue);
+		worker_thread->worker_thread_group->worker_fn (orf_token_mcast_thread_state, data_for_worker_fn);
+		queue_item_remove (&worker_thread->queue);
+		pthread_mutex_unlock (&worker_thread->new_work_mutex);
+		pthread_mutex_lock (&worker_thread->done_work_mutex);
+		if (queue_is_empty (&worker_thread->queue) == 1) {
+			pthread_cond_signal (&worker_thread->done_work_cond);
+		}
+		pthread_mutex_unlock (&worker_thread->done_work_mutex);
+	}
+	return (0);
+}
+
+int worker_thread_group_init (
+	struct worker_thread_group *worker_thread_group,
+	int threads,
+	int items_max,
+	int item_size,
+	int thread_state_size,
+	void (*thread_state_constructor)(void *),
+	void (*worker_fn)(void *thread_state, void *work_item))
+{
+	int i;
+
+	worker_thread_group->threadcount = threads;
+	worker_thread_group->last_scheduled = 0;
+	worker_thread_group->worker_fn = worker_fn;
+	worker_thread_group->threads = malloc (sizeof (struct worker_thread) *
+		threads);
+	if (worker_thread_group->threads == 0) {
+		return (-1);
+	}
+
+	for (i = 0; i < threads; i++) {
+		worker_thread_group->threads[i].thread_state = malloc (thread_state_size);
+		thread_state_constructor (worker_thread_group->threads[i].thread_state);
+		worker_thread_group->threads[i].worker_thread_group = worker_thread_group;
+		pthread_mutex_init (&worker_thread_group->threads[i].new_work_mutex, NULL);
+		pthread_cond_init (&worker_thread_group->threads[i].new_work_cond, NULL);
+		pthread_mutex_init (&worker_thread_group->threads[i].done_work_mutex, NULL);
+		pthread_cond_init (&worker_thread_group->threads[i].done_work_cond, NULL);
+		queue_init (&worker_thread_group->threads[i].queue, items_max,
+			item_size);
+
+		worker_thread_group->threads[i].thread_data.thread_state =
+			worker_thread_group->threads[i].thread_state;
+		worker_thread_group->threads[i].thread_data.data = &worker_thread_group->threads[i];
+		pthread_create (&worker_thread_group->threads[i].thread_id,
+			NULL, worker_thread, &worker_thread_group->threads[i].thread_data);
+	}
+	return (0);
+}
+
+void worker_thread_group_work_add (
+	struct worker_thread_group *worker_thread_group,
+	void *item)
+{
+	int schedule;
+
+	schedule = (worker_thread_group->last_scheduled + 1) % (worker_thread_group->threadcount);
+	worker_thread_group->last_scheduled = schedule;
+
+	pthread_mutex_lock (&worker_thread_group->threads[schedule].new_work_mutex);
+	queue_item_add (&worker_thread_group->threads[schedule].queue, item);
+	pthread_cond_signal (&worker_thread_group->threads[schedule].new_work_cond);
+	pthread_mutex_unlock (&worker_thread_group->threads[schedule].new_work_mutex);
+}
+
+void worker_thread_group_wait (
+	struct worker_thread_group *worker_thread_group)
+{
+	int i;
+
+	for (i = 0; i < worker_thread_group->threadcount; i++) {
+		pthread_mutex_lock (&worker_thread_group->threads[i].done_work_mutex);
+		if (queue_is_empty (&worker_thread_group->threads[i].queue) == 0) {
+			pthread_cond_wait (&worker_thread_group->threads[i].done_work_cond,
+				&worker_thread_group->threads[i].done_work_mutex);
+		}
+		pthread_mutex_unlock (&worker_thread_group->threads[i].done_work_mutex);
+	}
+}
+
+void worker_thread_group_exit (
+	struct worker_thread_group *worker_thread_group)
+{
+	int i;
+
+	for (i = 0; i < worker_thread_group->threadcount; i++) {
+		pthread_cancel (worker_thread_group->threads[i].thread_id);
+	}
+}

+ 64 - 0
exec/wthread.h

@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2005 MontaVista Software, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake@mvista.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef CONFIG_WTHREAD_H_DEFINED
+#define CONFIG_WTHREAD_H_DEFINED
+
+struct worker_thread_group {
+	int threadcount;
+	int last_scheduled;
+	struct worker_thread *threads;
+	void (*worker_fn) (void *thread_state, void *work_item);
+};
+
+extern int worker_thread_group_init (
+	struct worker_thread_group *worker_thread_group,
+	int threads,
+	int items_max,
+	int item_size,
+	int thread_state_size,
+	void (*thread_state_constructor)(void *),
+	void (*worker_fn)(void *thread_state, void *work_item));
+
+extern void worker_thread_group_work_add (
+	struct worker_thread_group *worker_thread_group,
+	void *item);
+
+extern void worker_thread_group_wait (
+	struct worker_thread_group *worker_thread_group);
+
+extern void worker_thread_group_exit (
+	struct worker_thread_group *worker_thread_group);
+
+#endif /* WTHREAD_H_DEFINED */

+ 134 - 37
man/openais.conf.5

@@ -42,30 +42,35 @@ openais.conf - openais executive configuration file
 The openais.conf instructs the openais executive about various parameters
 needed to control the openais executive.  The configuration file consists of
 bracketed top level directives.  The possible directive choices are
-.IR network { } ", " logging { } ", " timeout { } ".  These directives are
-described below.
+.IR totem { } " and " logging { } ".  These directives are described below.
 
 .TP
-network { }
-This top level directive contains configuration options for the network.
+totem { }
+This top level directive contains configuration options for the totem protocol.
 .TP
 logging { }
 This top level directive contains configuration options for logging.
-.TP
-timeout { }
-This top level directive contains configuration options for protocol timeouts.
 
 .PP
 .PP
 Within the 
-.B network
-directive, there are three configuration options which are all required:
+.B totem
+directive, there are four configuration options which are all required:
+.TP
+version
+This specifies the version of the configuration file.  Currently the only
+valid verion for this directive is 1.
+
 .TP
 bindnetaddr
 This specifies the address which the openais executive should bind.
-This address should always end in zero.  If the local interface traffic should
+This address should always end in zero.  If the totem traffic should
 be routed over 192.168.5.92, set bindnetaddr to 192.168.5.0.
 
+Multiple bindnetaddr directives may be specified.  When multiple bindnetaddr
+directives are specified, the totem redundant ring protocol will use multiple
+interfaces to replicate the network traffic.
+
 .TP
 mcastaddr
 This is the multicast address used by openais executive.  The default
@@ -79,40 +84,100 @@ This specifies the UDP port number.  It is possible to use the same multicast
 address on a network with the openais services configured for different
 UDP ports.
 
-.PP
 .PP
 Within the 
-.B logging
-directive, there are four configuration options which are all optional:
+.B totem 
+directive, there are four configuration options which are all optional.
+These control secrecy & authentication, the redundant ring mode of operation,
+and network MTU, and number of sending threads.
+
 .TP
-logoutput
-This specifies the logging output.  The choices are file, which logs to a file,
-stderr, which logs to stderr, and syslog which logs to the system log.  It is
-possible to have multiple targets by including this directive with different
-options multiple times in the top level directive.
+secauth
+This specifies that HMAC/SHA1 authentication should be used to authenticate
+all messages.  It further specifies that all data should be encrypted with the
+sober128 encryption algorithm to protect data from eavesdropping.
+
+Enabling this option adds a 36 byte header to every message sent by totem which
+reduces total throughput.  Encryption and authentication consume 75% of CPU
+cycles in aisexec as measured with gprof when enabled.
+
+For 100mbit networks with 1500 MTU frame transmissions:
+A throughput of 9mb/sec is possible with 100% cpu utilization when this
+option is enabled on 3ghz cpus.
+A throughput of 10mb/sec is possible wth 20% cpu utilization when this
+optin is disabled on 3ghz cpus.
+
+For gig-e networks with large frame transmissions:
+A throughput of 20mb/sec is possible when this option is enabled on
+3ghz cpus.
+A throughput of 60mb/sec is possible when this option is disabled on
+3ghz cpus.
+
+The default is on.
 
 .TP
-logfile
-If the logoutput: file directive is set, this option specifies where the
-log file is written to.  
+redundantring
+This specifies the mode of redundant ring, which may be none, active, or
+passive.  Active replication offers slightly lower latency from transmit
+to delivery in faulty network environments but with poorer performance.
+Passive replication may nearly double the speed of the totem protocol
+if the protocol doesn't become cpu bound.  The final option is none, in
+which case only one network interface will be used to operate the totem
+protocol.
+
+At this time redundant ring is only partially implemented and not yet available.
+
+The default is none.
 
 .TP
-debug
-This specifies whether debug output is logged.  This is generally a bad idea, 
-unless there is some specific bug or problem that must be found in the
-executive.  Set the value to on to debug, off to turn of debugging.
+netmtu
+This specifies the network maximum transmit unit.  To set this value beyond
+1500, the regular frame MTU, requires ethernet devices that support large, or
+also called jumbo, frames.  If any device in the network doesn't support large
+frames, the protocol will not operate properly.  The hosts must also have their
+mtu size set from 1500 to whatever frame size is specified here.
+
+Please note while some NICs or switches claim large frame support, they support
+9000 MTU as the maximum frame size including the IP header.  Setting the netmtu
+and host MTUs to 9000 will cause totem to use the full 9000 bytes of frame room.
+Then Linux will add a 18 byte header moving the full frame size to 9018.  As a
+result some hardware will not operate properly with this size of data.  A netmtu 
+of 8982 seems to work for the few large frame devices that have been tested.
+Some manufacturers claim large frame support when in fact they support frame
+sizes of 4500 bytes.
+
+Increasing the MTU from 1500 to 8982 doubles throughput performance from 30MB/sec
+to 60MB/sec as measured with evsbench with 175000 byte messages with the secauth 
+directive set to off.
+
+When sending multicast traffic, if the network frequently reconfigures, chances are
+that some device in the network doesn't support large frames.
+
+Choose hardware carefully if intending to use large frame support.
+
+The default is 1500.
 
 .TP
-timestamp
-This specifies that a timestamp is placed on all log messages.
-.PP
-.PP
+threads
+This directive controls how many threads are used to encrypt and send multicast
+messages.  If secauth is off, the protocol will never use threaded sending.
+If secauth is on, this directive allows systems to be configured to use
+multiple threads to encrypt and send multicast messages.
+
+A thread directive of 0 indicates that no threaded send should be used.  This
+mode offers best performance for non-SMP systems. 
+
+The default is 0.
+
 Within the 
-.B timeout
-directive, there are several configuration options which are all optional.
-It is generally not recommended to change any of these values.  Some networks
-may require larger values for token or some applications may require faster
-failure detection times (and lower values for token).
+.B totem 
+directive, there are several configuration options which are used to control
+the operation of the protocol.  It is generally not recommended to change any
+of these values without proper guidance and sufficient testing.  Some networks
+may require larger values if suffering from frequent reconfigurations.  Some
+applications may require faster failure detection times which can be achieved
+by reducing the token timeout.
+
 .TP
 token
 This timeout specifies in milliseconds until a token loss is declared after not
@@ -162,9 +227,41 @@ This timeout specifies in milliseconds how long to wait before checking
 that a network interface is back up after it has been downed.
 
 .TP
-fail_recv_const
-This value identifies how many rotations of the token without receiving any
-of the messages may occur before a new configuration is formed.
+fail_to_recv_const
+This constant specifies how many rotations of the token without receiving any
+of the messages when messages should be received may occur before a new
+configuration is formed.
+
+.TP
+seqno_unchanged_const
+This constant specifies how many rotations of the token without any multicast
+traffic should occur before the merge detection timeout is started.
+
+.PP
+Within the 
+.B logging
+directive, there are four configuration options which are all optional:
+.TP
+logoutput
+This specifies the logging output.  The choices are file, which logs to a file,
+stderr, which logs to stderr, and syslog which logs to the system log.  It is
+possible to have multiple targets by including this directive with different
+options multiple times in the top level directive.
+
+.TP
+logfile
+If the logoutput: file directive is set, this option specifies where the
+log file is written to.  
+
+.TP
+debug
+This specifies whether debug output is logged.  This is generally a bad idea, 
+unless there is some specific bug or problem that must be found in the
+executive.  Set the value to on to debug, off to turn of debugging.
+
+.TP
+timestamp
+This specifies that a timestamp is placed on all log messages.
 
 .SH "FILES"
 .TP

+ 9 - 10
test/Makefile

@@ -28,25 +28,24 @@
 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
 # THE POSSIBILITY OF SUCH DAMAGE.
 
+LIBRARIES= ../lib/libSaClm.a ../lib/libSaAmf.a ../lib/libSaCkpt.a ../lib/libSaEvt.a ../lib/libevs.a
+LIBS = $(LIBRARIES) -lpthread
+
 # Production mode flags
-#CFLAGS = -c -O3 -Wall -I../include
-#CFLAGS = -c -O3 -Wall -I../include
-#LDFLAGS = -L../lib
+CFLAGS = -c -O3 -Wall -I../include
+CFLAGS = -c -O3 -Wall -I../include
+LDFLAGS = -L../lib
 #LIBRARIES= ../lib/libais.a ../lib/libevs.a
 #LIBS = $(LIBRARIES) -lpthread
 
 # Debug mode flags
-CFLAGS = -c -g -Wall -DDEBUG -I../include
-CPPFLAGS = -c -g -Wall -DDEBUG -I../include
-LDFLAGS = -g -L../lib
-LIBRARIES= ../lib/libSaClm.a ../lib/libSaAmf.a ../lib/libSaCkpt.a ../lib/libSaEvt.a ../lib/libevs.a
-LIBS = $(LIBRARIES) -lpthread
+#CFLAGS = -c -g -Wall -DDEBUG -I../include
+#CPPFLAGS = -c -g -Wall -DDEBUG -I../include
+#LDFLAGS = -g -L../lib
 
 # Profile mode flags
 #CFLAGS = -c -O3 -pg -DDEBUG -I../include
 #LDFLAGS = -pg -L../lib
-#LIBRARIES= ../lib/libais.a ../lib/libevs.a
-#LIBS = $(LIBRARIES)
 
 EXTRA_CFLAGS = -I../include
 TEST_SRC =  testclm.c testamf.c testamf1.c testamf2.c testamf3.c \

+ 4 - 4
test/ckptbench.c

@@ -74,10 +74,10 @@ SaNameT checkpointName = { 5, "abra\0" };
 
 SaCkptCheckpointCreationAttributesT checkpointCreationAttributes = {
         .creationFlags =        SA_CKPT_WR_ALL_REPLICAS,
-        .checkpointSize =       150000,
+        .checkpointSize =       250000,
         .retentionDuration =    0,
         .maxSections =          5,
-        .maxSectionSize =       150000,
+        .maxSectionSize =       250000,
         .maxSectionIdSize =     10
 };
 
@@ -127,7 +127,7 @@ SaCkptIOVectorElementT ReadVectorElements[] = {
 	}
 };
 
-#define DATASIZE 1000
+#define DATASIZE 200000
 #define LOOPS 5000
 
 char data[500000];
@@ -231,7 +231,7 @@ int main (void) {
 		"Initial Data #0",
 		strlen ("Initial Data #0") + 1);
 
-	size = 1;
+	size = 200000;
 
 	for (i = 0; i < 50; i++) { /* number of repetitions - up to 50k */
 		ckpt_benchmark (checkpointHandle, size);

+ 1 - 1
test/ckptbenchth.c

@@ -276,7 +276,7 @@ int main (void) {
 		assert (error == SA_AIS_OK);
 	}
 
-	for (i = 1; i < CHECKPOINT_THREADS; i++) {	/* i threads */
+	for (i = 25; i < CHECKPOINT_THREADS; i++) {	/* i threads */
 		count = 3000; /* initial write count */
 		size = 10000; /* initial size */
 		printf ("THREADS %d\n", i);

+ 6 - 8
test/evsbench.c

@@ -52,7 +52,7 @@
 #include "ais_types.h"
 #include "evs.h"
 
-static int alarm_notice = 0;
+volatile static int alarm_notice = 0;
 
 int outstanding = 0;
 
@@ -121,13 +121,11 @@ void evs_benchmark (evs_handle_t handle,
 	iov.iov_len = write_size;
 	do {
 		sprintf (buffer, "This is message %d\n", write_count);
-try_again:
+retry_mcast:
 		if (outstanding < 10) {
 			result = evs_mcast_joined (handle, EVS_TYPE_AGREED, &iov, 1);
-			if (result == EVS_ERR_TRY_AGAIN) {
-printf ("try again\n");
-				goto try_again;
-			} else {
+
+			if (result != EVS_ERR_TRY_AGAIN) {
 				write_count += 1;
 				outstanding++;
 			}
@@ -169,9 +167,9 @@ int main (void) {
 	result = evs_leave (handle, &groups[0], 1);
 	printf ("Leave result %d\n", result);
 
-	size = 1;
+	size = 170000;
 
-	for (i = 0; i < 50; i++) { /* number of repetitions - up to 50k */
+	for (i = 0; i < 225; i++) { /* number of repetitions - up to 50k */
 		evs_benchmark (handle, size);
 		/*
 		 * Adjust count to 95% of previous count

+ 2 - 2
test/evtbench.c

@@ -92,8 +92,8 @@ SaEvtEventPatternArrayT evt_pat_set_array = {
 };
 
 char user_data_file[256];
-char  user_data[65536];
-int user_data_size = 1024;
+char  user_data[100000];
+int user_data_size = 50000;
 
 uint64_t clust_time_now(void)
 {

+ 2 - 1
test/testevs.c

@@ -86,7 +86,7 @@ struct evs_group groups[3] = {
 	{ "key3" }
 };
 
-char buffer[100];
+char buffer[253000];
 struct iovec iov = {
 	.iov_base = buffer,
 	.iov_len = sizeof (buffer)
@@ -137,6 +137,7 @@ try_again_one:
 		result = evs_mcast_joined (handle, EVS_TYPE_AGREED,
 			&iov, 1);
 		if (result == EVS_ERR_TRY_AGAIN) {
+printf ("try again\n");
 			goto try_again_one;
 		}
 		result = evs_dispatch (handle, EVS_DISPATCH_ALL);

Alguns ficheiros não foram mostrados porque muitos ficheiros mudaram neste diff