Browse Source

defect 577
Implement token holding mode

(Logical change 1.207)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@670 fd59a12c-fef9-0310-b244-a6a79926bd2f

Steven Dake 20 years ago
parent
commit
f047d8f44f
9 changed files with 260 additions and 81 deletions
  1. 3 1
      QUICKSTART
  2. 5 5
      exec/Makefile
  3. 13 8
      exec/parse.c
  4. 2 0
      exec/parse.h
  5. 2 0
      exec/totempg.c
  6. 190 46
      exec/totemsrp.c
  7. 2 0
      exec/totemsrp.h
  8. 14 6
      test/evsbench.c
  9. 29 15
      test/test.cpp

+ 3 - 1
QUICKSTART

@@ -46,7 +46,9 @@ logging {
 
 
 timeout {
 timeout {
 	token: 200
 	token: 200
-	retransmit: 45
+	token_retransmit: 50
+	hold: 30
+	retransmits_before_loss: 4
 	join: 100
 	join: 100
 	consensus: 200
 	consensus: 200
 	merge: 200
 	merge: 200

+ 5 - 5
exec/Makefile

@@ -29,16 +29,16 @@
 # THE POSSIBILITY OF SUCH DAMAGE.
 # THE POSSIBILITY OF SUCH DAMAGE.
 
 
 # Production mode flags
 # Production mode flags
-#CFLAGS = -O3 -Wall -fomit-frame-pointer
-#LDFLAGS = 
+CFLAGS = -O3 -Wall -fomit-frame-pointer
+LDFLAGS = 
 
 
 # Debug mode flags
 # Debug mode flags
-CFLAGS = -g -Wall
+#CFLAGS = -g -Wall
 ##-DDEBUG
 ##-DDEBUG
-LDFLAGS = -g
+#LDFLAGS = -g
 
 
 # Profile mode flags
 # Profile mode flags
-#CFLAGS = -O2 -pg
+#CFLAGS = -O3 -pg
 #LDFLAGS = -pg
 #LDFLAGS = -pg
 
 
 # Code Coverage with lcov flgs
 # Code Coverage with lcov flgs

+ 13 - 8
exec/parse.c

@@ -564,19 +564,24 @@ extern int openais_main_config_read (char **error_string,
 			break;
 			break;
 		case MAIN_TIMEOUT:
 		case MAIN_TIMEOUT:
 			if ((loc = strstr_rs (line, "token:"))) {
 			if ((loc = strstr_rs (line, "token:"))) {
-					openais_config->timeouts[TOTEM_TOKEN]= atoi(loc);
-			} else if ((loc = strstr_rs (line, "retransmit:"))) {
-					openais_config->timeouts[TOTEM_RETRANSMIT_TOKEN] = atoi(loc);
+				openais_config->timeouts[TOTEM_TOKEN]= atoi(loc);
+			} else if ((loc = strstr_rs (line, "token_retransmit:"))) {
+				openais_config->timeouts[TOTEM_RETRANSMIT_TOKEN] = atoi(loc);
+			} else if ((loc = strstr_rs (line, "hold:"))) {
+				openais_config->timeouts[TOTEM_HOLD_TOKEN] = atoi(loc);
+			} else if ((loc = strstr_rs (line, "retransmits_before_loss:"))) {
+				openais_config->timeouts[TOTEM_RETRANSMITS_BEFORE_LOSS] = atoi(loc);
+		
 			} else if ((loc = strstr_rs (line, "join:"))) {
 			} else if ((loc = strstr_rs (line, "join:"))) {
-					openais_config->timeouts[TOTEM_JOIN] = atoi(loc);
+				openais_config->timeouts[TOTEM_JOIN] = atoi(loc);
 			} else if ((loc = strstr_rs (line, "consensus:"))) {
 			} else if ((loc = strstr_rs (line, "consensus:"))) {
-					openais_config->timeouts[TOTEM_CONSENSUS] = atoi(loc);
+				openais_config->timeouts[TOTEM_CONSENSUS] = atoi(loc);
 			} else if ((loc = strstr_rs (line, "merge:"))) {
 			} else if ((loc = strstr_rs (line, "merge:"))) {
-					openais_config->timeouts[TOTEM_MERGE] = atoi(loc);
+				openais_config->timeouts[TOTEM_MERGE] = atoi(loc);
 			} else if ((loc = strstr_rs (line, "downcheck:"))) {
 			} else if ((loc = strstr_rs (line, "downcheck:"))) {
-					openais_config->timeouts[TOTEM_DOWNCHECK] = atoi(loc);
+				openais_config->timeouts[TOTEM_DOWNCHECK] = atoi(loc);
 			} else if ((loc = strstr_rs (line, "fail_recv_const:"))) {
 			} else if ((loc = strstr_rs (line, "fail_recv_const:"))) {
-					openais_config->timeouts[TOTEM_FAIL_RECV_CONST] = atoi(loc);
+				openais_config->timeouts[TOTEM_FAIL_RECV_CONST] = atoi(loc);
 			} else if ((loc = strstr_rs (line, "}"))) {
 			} else if ((loc = strstr_rs (line, "}"))) {
 				parse = MAIN_HEAD;
 				parse = MAIN_HEAD;
 			} else {
 			} else {

+ 2 - 0
exec/parse.h

@@ -62,8 +62,10 @@ enum amfOperationalAdministrativeState {
  * needs to remain the last item in the list.
  * needs to remain the last item in the list.
  */
  */
 enum {
 enum {
+	TOTEM_RETRANSMITS_BEFORE_LOSS,
 	TOTEM_TOKEN,
 	TOTEM_TOKEN,
 	TOTEM_RETRANSMIT_TOKEN,
 	TOTEM_RETRANSMIT_TOKEN,
+	TOTEM_HOLD_TOKEN,
 	TOTEM_JOIN,
 	TOTEM_JOIN,
 	TOTEM_CONSENSUS,
 	TOTEM_CONSENSUS,
 	TOTEM_MERGE,
 	TOTEM_MERGE,

+ 2 - 0
exec/totempg.c

@@ -524,6 +524,8 @@ int totempg_mcast (
 	int copy_len = 0; 
 	int copy_len = 0; 
 	int copy_base = 0;
 	int copy_base = 0;
 
 
+	totemsrp_new_msg_signal ();
+
 	max_packet_size = TOTEMPG_PACKET_SIZE -
 	max_packet_size = TOTEMPG_PACKET_SIZE -
 		(sizeof (unsigned short) * (mcast_packed_msg_count + 1));
 		(sizeof (unsigned short) * (mcast_packed_msg_count + 1));
 
 

+ 190 - 46
exec/totemsrp.c

@@ -64,6 +64,7 @@ int totemsrp_brake;
 #include <sys/un.h>
 #include <sys/un.h>
 #include <sys/sysinfo.h>
 #include <sys/sysinfo.h>
 #include <sys/ioctl.h>
 #include <sys/ioctl.h>
+#include <sys/param.h>
 #include <netinet/in.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <arpa/inet.h>
 #include <linux/if.h>
 #include <linux/if.h>
@@ -101,8 +102,10 @@ int totemsrp_brake;
 #define MISSING_MCAST_WINDOW			128
 #define MISSING_MCAST_WINDOW			128
 #define TIMEOUT_STATE_GATHER_JOIN		100
 #define TIMEOUT_STATE_GATHER_JOIN		100
 #define TIMEOUT_STATE_GATHER_CONSENSUS	200
 #define TIMEOUT_STATE_GATHER_CONSENSUS	200
+#define TOKEN_RETRANSMITS_BEFORE_LOSS	4
 #define TIMEOUT_TOKEN					200
 #define TIMEOUT_TOKEN					200
-#define TIMEOUT_TOKEN_RETRANSMIT		45
+#define TIMEOUT_TOKEN_RETRANSMIT		(int)(TIMEOUT_TOKEN / (TOKEN_RETRANSMITS_BEFORE_LOSS + 0.2))
+#define TIMEOUT_TOKEN_HOLD				(int)(TIMEOUT_TOKEN_RETRANSMIT * 0.8 - (1000/HZ))
 #define TIMEOUT_MERGE_DETECT			200
 #define TIMEOUT_MERGE_DETECT			200
 #define PACKET_SIZE_MAX					2000
 #define PACKET_SIZE_MAX					2000
 #define FAIL_TO_RECV_CONST				250
 #define FAIL_TO_RECV_CONST				250
@@ -150,6 +153,7 @@ enum message_type {
 	MESSAGE_TYPE_MEMB_MERGE_DETECT = 2,	/* merge rings if there are available rings */
 	MESSAGE_TYPE_MEMB_MERGE_DETECT = 2,	/* merge rings if there are available rings */
 	MESSAGE_TYPE_MEMB_JOIN = 3, 		/* membership join message */
 	MESSAGE_TYPE_MEMB_JOIN = 3, 		/* membership join message */
 	MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4,	/* membership commit token */
 	MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4,	/* membership commit token */
+	MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5,	/* cancel the holding of the token */
 };
 };
 
 
 /* 
 /* 
@@ -218,6 +222,10 @@ static unsigned int timeout_token = TIMEOUT_TOKEN;
 
 
 static unsigned int timeout_token_retransmit = TIMEOUT_TOKEN_RETRANSMIT;
 static unsigned int timeout_token_retransmit = TIMEOUT_TOKEN_RETRANSMIT;
 
 
+static unsigned int timeout_token_hold = 0;
+
+static unsigned int token_retransmits_before_loss = TOKEN_RETRANSMITS_BEFORE_LOSS;
+
 static unsigned int timeout_state_gather_join = TIMEOUT_STATE_GATHER_JOIN;
 static unsigned int timeout_state_gather_join = TIMEOUT_STATE_GATHER_JOIN;
 
 
 static unsigned int timeout_state_gather_consensus = TIMEOUT_STATE_GATHER_CONSENSUS;
 static unsigned int timeout_state_gather_consensus = TIMEOUT_STATE_GATHER_CONSENSUS;
@@ -283,6 +291,8 @@ poll_timer_handle timer_orf_token_timeout = 0;
 
 
 poll_timer_handle timer_orf_token_retransmit_timeout = 0;
 poll_timer_handle timer_orf_token_retransmit_timeout = 0;
 
 
+poll_timer_handle timer_orf_token_hold_retransmit_timeout = 0;
+
 poll_timer_handle timer_merge_detect_timeout = 0;
 poll_timer_handle timer_merge_detect_timeout = 0;
 
 
 poll_timer_handle memb_timer_state_gather_join_timeout = 0;
 poll_timer_handle memb_timer_state_gather_join_timeout = 0;
@@ -373,6 +383,11 @@ struct memb_merge_detect {
 	struct memb_ring_id ring_id;
 	struct memb_ring_id ring_id;
 } __attribute__((packed));
 } __attribute__((packed));
 
 
+struct token_hold_cancel {
+	struct message_header header;
+	struct memb_ring_id ring_id;
+} __attribute__((packed));
+
 struct memb_commit_token_memb_entry {
 struct memb_commit_token_memb_entry {
 	struct memb_ring_id ring_id;
 	struct memb_ring_id ring_id;
 	int aru;
 	int aru;
@@ -433,7 +448,7 @@ static struct iovec iov_encrypted = {
 
 
 struct message_handlers {
 struct message_handlers {
 	int count;
 	int count;
-	int (*handler_functions[5]) (struct sockaddr_in *, struct iovec *, int, int, int);
+	int (*handler_functions[6]) (struct sockaddr_in *, struct iovec *, int, int, int);
 };
 };
 
 
 poll_handle *totemsrp_poll_handle;
 poll_handle *totemsrp_poll_handle;
@@ -467,12 +482,17 @@ static int message_handler_memb_join (struct sockaddr_in *, struct iovec *, int,
 
 
 static int message_handler_memb_commit_token (struct sockaddr_in *, struct iovec *, int, int, int);
 static int message_handler_memb_commit_token (struct sockaddr_in *, struct iovec *, int, int, int);
 
 
+static int message_handler_token_hold_cancel (struct sockaddr_in *, struct iovec *, int, int, int);
+
 static void memb_ring_id_create_or_load (struct memb_ring_id *);
 static void memb_ring_id_create_or_load (struct memb_ring_id *);
 static int recv_handler (poll_handle handle, int fd, int revents, void *data, unsigned int *prio);
 static int recv_handler (poll_handle handle, int fd, int revents, void *data, unsigned int *prio);
 static int netif_determine (struct sockaddr_in *bindnet, struct sockaddr_in *bound_to,int *interface_up);
 static int netif_determine (struct sockaddr_in *bindnet, struct sockaddr_in *bound_to,int *interface_up);
 static int loopback_determine (struct sockaddr_in *bound_to);
 static int loopback_determine (struct sockaddr_in *bound_to);
 static void netif_down_check (void);
 static void netif_down_check (void);
 
 
+static void token_callbacks_execute (enum totem_callback_token_type type);
+
+
 #define NETIF_STATE_REPORT_UP		1	
 #define NETIF_STATE_REPORT_UP		1	
 #define NETIF_STATE_REPORT_DOWN		2
 #define NETIF_STATE_REPORT_DOWN		2
 
 
@@ -508,19 +528,21 @@ static void memb_ring_id_store (struct memb_commit_token *commit_token);
 static void memb_state_commit_token_update (struct memb_commit_token *memb_commit_token);
 static void memb_state_commit_token_update (struct memb_commit_token *memb_commit_token);
 static int memb_state_commit_token_send (struct memb_commit_token *memb_commit_token);
 static int memb_state_commit_token_send (struct memb_commit_token *memb_commit_token);
 static void memb_state_commit_token_create (struct memb_commit_token *commit_token);
 static void memb_state_commit_token_create (struct memb_commit_token *commit_token);
+static int token_hold_cancel_send (void);
 static void orf_token_endian_convert (struct orf_token *in, struct orf_token *out);
 static void orf_token_endian_convert (struct orf_token *in, struct orf_token *out);
 static void memb_commit_token_endian_convert (struct memb_commit_token *in, struct memb_commit_token *out);
 static void memb_commit_token_endian_convert (struct memb_commit_token *in, struct memb_commit_token *out);
 static void memb_join_endian_convert (struct memb_join *in, struct memb_join *out);
 static void memb_join_endian_convert (struct memb_join *in, struct memb_join *out);
 static void mcast_endian_convert (struct mcast *in, struct mcast *out);
 static void mcast_endian_convert (struct mcast *in, struct mcast *out);
 
 
 struct message_handlers totemsrp_message_handlers = {
 struct message_handlers totemsrp_message_handlers = {
-	4,
+	6,
 	{
 	{
 		message_handler_orf_token,
 		message_handler_orf_token,
 		message_handler_mcast,
 		message_handler_mcast,
 		message_handler_memb_merge_detect,
 		message_handler_memb_merge_detect,
 		message_handler_memb_join,
 		message_handler_memb_join,
-		message_handler_memb_commit_token
+		message_handler_memb_commit_token,
+		message_handler_token_hold_cancel
 	}
 	}
 };
 };
 
 
@@ -592,6 +614,8 @@ int totemsrp_initialize (
 
 
 	int i;
 	int i;
 
 
+	timeout_token_hold = (int)(timeout_token_retransmit * 0.8 - (1000/HZ));
+
 	/*
 	/*
 	 * Initialize random number generator for later use to generate salt
 	 * Initialize random number generator for later use to generate salt
 	 */
 	 */
@@ -613,7 +637,7 @@ int totemsrp_initialize (
 	 * Update our timeout values if they were specified in the openais.conf
 	 * Update our timeout values if they were specified in the openais.conf
 	 * file.
 	 * file.
 	 */
 	 */
-	for (i = TOTEM_TOKEN; i < MAX_TOTEM_TIMEOUTS; i++) {
+	for (i = 0; i < MAX_TOTEM_TIMEOUTS; i++) {
 		if (!timeouts[i]) {
 		if (!timeouts[i]) {
 			continue;
 			continue;
 		}
 		}
@@ -621,12 +645,24 @@ int totemsrp_initialize (
 		case TOTEM_TOKEN:
 		case TOTEM_TOKEN:
 			timeout_token = timeouts[i];
 			timeout_token = timeouts[i];
 			totemsrp_log_printf (totemsrp_log_level_notice,
 			totemsrp_log_printf (totemsrp_log_level_notice,
-					"Token Timeout set to %u ms\n", timeouts[i]);
+					"Overriding token timeout to (%u ms)\n", timeouts[i]);
+			timeout_token_retransmit = (int)(timeout_token / (token_retransmits_before_loss + 0.2));
+			timeout_token_hold = (int)(timeout_token_retransmit * 0.8 - (1000/HZ));
 			break;
 			break;
 		case TOTEM_RETRANSMIT_TOKEN:
 		case TOTEM_RETRANSMIT_TOKEN:
 			timeout_token_retransmit = timeouts[i];
 			timeout_token_retransmit = timeouts[i];
 			totemsrp_log_printf (totemsrp_log_level_notice,
 			totemsrp_log_printf (totemsrp_log_level_notice,
-					"Token Retransmit Timeout set to %u ms\n", timeouts[i]);
+					"Overriding token retransmit timeout to (%u ms)\n", timeouts[i]);
+			break;
+		case TOTEM_RETRANSMITS_BEFORE_LOSS:
+			token_retransmits_before_loss = timeouts[i];
+			totemsrp_log_printf (totemsrp_log_level_notice,
+					"Overriding retransmits before loss (%u retrans)\n", timeouts[i]);
+			break;
+		case TOTEM_HOLD_TOKEN:
+			timeout_token_hold = timeouts[i];
+			totemsrp_log_printf (totemsrp_log_level_notice,
+					"Overriding token hold timeout to (%u ms)\n", timeouts[i]);
 			break;
 			break;
 		case TOTEM_JOIN:
 		case TOTEM_JOIN:
 			timeout_state_gather_join = timeouts[i];
 			timeout_state_gather_join = timeouts[i];
@@ -660,6 +696,13 @@ int totemsrp_initialize (
 		}
 		}
 	}
 	}
 
 
+	totemsrp_log_printf (totemsrp_log_level_notice,
+		"Token Timeout (%d ms) retransmit timeout (%d ms)\n",
+		timeout_token, timeout_token_retransmit);
+	totemsrp_log_printf (totemsrp_log_level_notice,
+		"token hold (%d ms) retransmits before loss (%d retrans)\n",
+		timeout_token_hold, token_retransmits_before_loss);
+
 	queue_init (&new_message_queue, NEW_MESSAGE_QUEUE_SIZE_MAX,
 	queue_init (&new_message_queue, NEW_MESSAGE_QUEUE_SIZE_MAX,
 		sizeof (struct message_item));
 		sizeof (struct message_item));
 
 
@@ -922,15 +965,16 @@ void memb_set_print (char *string,
 
 
 static void timer_function_orf_token_timeout (void *data);
 static void timer_function_orf_token_timeout (void *data);
 static void timer_function_token_retransmit_timeout (void *data);
 static void timer_function_token_retransmit_timeout (void *data);
+static void timer_function_token_hold_retransmit_timeout (void *data);
 static void timer_function_merge_detect_timeout (void *data);
 static void timer_function_merge_detect_timeout (void *data);
 
 
 void reset_token_retransmit_timeout (void)
 void reset_token_retransmit_timeout (void)
 {
 {
-			poll_timer_delete (*totemsrp_poll_handle,
-				timer_orf_token_retransmit_timeout);
-			poll_timer_add (*totemsrp_poll_handle, timeout_token_retransmit, 0,
-				timer_function_token_retransmit_timeout,
-				&timer_orf_token_retransmit_timeout);
+	poll_timer_delete (*totemsrp_poll_handle,
+		timer_orf_token_retransmit_timeout);
+	poll_timer_add (*totemsrp_poll_handle, timeout_token_retransmit, 0,
+		timer_function_token_retransmit_timeout,
+		&timer_orf_token_retransmit_timeout);
 
 
 }
 }
 
 
@@ -1011,11 +1055,23 @@ void reset_token_timeout (void) {
 }
 }
 
 
 void cancel_token_timeout (void) {
 void cancel_token_timeout (void) {
-		poll_timer_delete (*totemsrp_poll_handle, timer_orf_token_timeout);
+	poll_timer_delete (*totemsrp_poll_handle, timer_orf_token_timeout);
 }
 }
 
 
 void cancel_token_retransmit_timeout (void) {
 void cancel_token_retransmit_timeout (void) {
-		poll_timer_delete (*totemsrp_poll_handle, timer_orf_token_retransmit_timeout);
+	poll_timer_delete (*totemsrp_poll_handle, timer_orf_token_retransmit_timeout);
+}
+
+void start_token_hold_retransmit_timeout (void)
+{
+	poll_timer_add (*totemsrp_poll_handle, timeout_token_hold, (void *)9999,
+		timer_function_token_hold_retransmit_timeout,
+		&timer_orf_token_hold_retransmit_timeout);
+}
+void cancel_token_hold_retransmit_timeout (void)
+{
+	poll_timer_delete (*totemsrp_poll_handle,
+		timer_orf_token_hold_retransmit_timeout);
 }
 }
 
 
 static void memb_state_consensus_timeout_expired (void)
 static void memb_state_consensus_timeout_expired (void)
@@ -1712,6 +1768,11 @@ printf ("received message size %d\n", iov->iov_len);
 	return (0);
 	return (0);
 }
 }
 
 
+void totemsrp_new_msg_signal (void)
+{
+	token_hold_cancel_send ();
+}
+
 int totemsrp_mcast (
 int totemsrp_mcast (
 	struct iovec *iovec,
 	struct iovec *iovec,
 	int iov_len,
 	int iov_len,
@@ -1769,6 +1830,7 @@ int totemsrp_mcast (
 	queue_item_add (&new_message_queue, &message_item);
 	queue_item_add (&new_message_queue, &message_item);
 
 
 	return (0);
 	return (0);
+
 error_iovec:
 error_iovec:
 	for (j = 0; j < i; j++) {
 	for (j = 0; j < i; j++) {
 		free (message_item.iovec[j].iov_base);
 		free (message_item.iovec[j].iov_base);
@@ -2567,9 +2629,6 @@ void token_retransmit (void) {
  */
  */
 void timer_function_token_retransmit_timeout (void *data)
 void timer_function_token_retransmit_timeout (void *data)
 {
 {
-struct timeval timeval;
-
-	gettimeofday (&timeval, 0);
 	switch (memb_state) {
 	switch (memb_state) {
 	case MEMB_STATE_GATHER:
 	case MEMB_STATE_GATHER:
 		break;
 		break;
@@ -2583,6 +2642,19 @@ struct timeval timeval;
 	}
 	}
 }
 }
 
 
+void timer_function_token_hold_retransmit_timeout (void *data)
+{
+	switch (memb_state) {
+	case MEMB_STATE_GATHER:
+		break;
+	case MEMB_STATE_COMMIT:
+		break;
+	case MEMB_STATE_OPERATIONAL:
+	case MEMB_STATE_RECOVERY:
+		token_retransmit ();
+		break;
+	}
+}
 void timer_function_merge_detect_timeout(void *data)
 void timer_function_merge_detect_timeout(void *data)
 {
 {
 	my_merge_detect_timeout_outstanding = 0;
 	my_merge_detect_timeout_outstanding = 0;
@@ -2660,6 +2732,51 @@ static int token_send (
 	return (res);
 	return (res);
 }
 }
 
 
+static int token_hold_cancel_send (void)
+{
+	struct token_hold_cancel token_hold_cancel;
+	struct iovec iov;
+	struct msghdr msghdr;
+
+	/*
+	 * Only cancel if the token is currently held
+	 */
+	if (my_token_held == 0) {
+		return (0);
+	}
+	my_token_held = 0;
+
+	/*
+	 * Build message
+	 */
+	token_hold_cancel.header.type = MESSAGE_TYPE_TOKEN_HOLD_CANCEL;
+	token_hold_cancel.header.endian_detector = ENDIAN_LOCAL;
+	memcpy (&token_hold_cancel.ring_id, &my_ring_id,
+		sizeof (struct memb_ring_id));
+
+	iov.iov_base = &token_hold_cancel;
+	iov.iov_len = sizeof (struct token_hold_cancel);
+
+	encrypt_and_sign (&iov, 1);
+
+	/*
+	 * Build multicast message
+	 */
+	msghdr.msg_name = (caddr_t)&sockaddr_in_mcast;
+	msghdr.msg_namelen = sizeof (struct sockaddr_in);
+	msghdr.msg_iov = &iov_encrypted;
+	msghdr.msg_iovlen = 1;
+	msghdr.msg_control = 0;
+	msghdr.msg_controllen = 0;
+	msghdr.msg_flags = 0;
+
+	/*
+	 * Multicast message
+	 */
+	sendmsg (totemsrp_sockets[0].mcast, &msghdr, MSG_NOSIGNAL | MSG_DONTWAIT);
+
+	return (0);
+}
 int orf_token_send_initial (void)
 int orf_token_send_initial (void)
 {
 {
 	struct orf_token orf_token;
 	struct orf_token orf_token;
@@ -2997,7 +3114,7 @@ void totem_callback_token_type (void *handle)
 	free (token_callback_instance);
 	free (token_callback_instance);
 }
 }
 
 
-void token_callbacks_execute (enum totem_callback_token_type type)
+static void token_callbacks_execute (enum totem_callback_token_type type)
 {
 {
 	struct list_head *list;
 	struct list_head *list;
 	struct list_head *list_next;
 	struct list_head *list_next;
@@ -3085,7 +3202,6 @@ printf ("OTHERS %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0);
 }
 }
 #endif
 #endif
 
 
-	my_token_held = 1;
 	my_do_delivery = 0;
 	my_do_delivery = 0;
 
 
 #ifdef RANDOM_DROP
 #ifdef RANDOM_DROP
@@ -3093,24 +3209,17 @@ if (random () % 100 < 10) {
 	return (0);
 	return (0);
 }
 }
 #endif
 #endif
-	/*
-	 * Hold onto token when there is no activity on ring and
-	 * this processor is the ring rep
-	 */
-	forward_token = 1;
-	if (my_ring_id.rep.s_addr == my_id.sin_addr.s_addr) {
-		if (my_seq_unchanged > SEQNO_UNCHANGED_CONST) {
-			forward_token = 0;
-		}
-	}
 
 
 	/*
 	/*
 	 * Handle merge detection timeout
 	 * Handle merge detection timeout
 	 */
 	 */
 	if (token_ref->seq == my_last_seq) {
 	if (token_ref->seq == my_last_seq) {
 		start_merge_detect_timeout ();
 		start_merge_detect_timeout ();
+		my_seq_unchanged += 1;
 	} else {
 	} else {
 		cancel_merge_detect_timeout ();
 		cancel_merge_detect_timeout ();
+		cancel_token_hold_retransmit_timeout ();
+		my_seq_unchanged = 0;
 	}
 	}
 
 
 	my_last_seq = token_ref->seq;
 	my_last_seq = token_ref->seq;
@@ -3145,6 +3254,29 @@ if (random () % 100 < 10) {
 		}
 		}
 	} while (nfds == 1);
 	} while (nfds == 1);
 
 
+	/*
+	 * Determine if we should hold (in reality drop) the token
+	 */
+	my_token_held = 0;
+	if (my_ring_id.rep.s_addr == my_id.sin_addr.s_addr &&
+		my_seq_unchanged > SEQNO_UNCHANGED_CONST) {
+		my_token_held = 1;
+	} else
+	if (my_ring_id.rep.s_addr != my_id.sin_addr.s_addr &&
+		my_seq_unchanged >= SEQNO_UNCHANGED_CONST) {
+		my_token_held = 1;
+	}
+
+	/*
+	 * Hold onto token when there is no activity on ring and
+	 * this processor is the ring rep
+	 */
+	forward_token = 1;
+	if (my_ring_id.rep.s_addr == my_id.sin_addr.s_addr) {
+		if (my_token_held) {
+			forward_token = 0;
+		}
+	}
 
 
 	token_callbacks_execute (TOTEM_CALLBACK_TOKEN_RECEIVED);
 	token_callbacks_execute (TOTEM_CALLBACK_TOKEN_RECEIVED);
 
 
@@ -3170,7 +3302,6 @@ if (random () % 100 < 10) {
 		if (memcmp (&token->ring_id, &my_ring_id,
 		if (memcmp (&token->ring_id, &my_ring_id,
 			sizeof (struct memb_ring_id)) != 0) {
 			sizeof (struct memb_ring_id)) != 0) {
 
 
-			my_token_held = 0;
 			return (0); /* discard token */
 			return (0); /* discard token */
 		}
 		}
 
 
@@ -3178,26 +3309,17 @@ if (random () % 100 < 10) {
 		 * Discard retransmitted tokens
 		 * Discard retransmitted tokens
 		 */
 		 */
 		if (my_token_seq >= token->token_seq) {
 		if (my_token_seq >= token->token_seq) {
-			my_token_held = 0;
 			reset_token_retransmit_timeout ();
 			reset_token_retransmit_timeout ();
 			reset_token_timeout ();
 			reset_token_timeout ();
 			return (0); /* discard token */
 			return (0); /* discard token */
 		}		
 		}		
 		transmits_allowed = 30;
 		transmits_allowed = 30;
 		mcasted = orf_token_rtr (token, &transmits_allowed);
 		mcasted = orf_token_rtr (token, &transmits_allowed);
-		if (mcasted) {
-			forward_token = 1;
-			my_seq_unchanged = 0;
-		}
 
 
-        if ((last_aru + MISSING_MCAST_WINDOW) < token->seq) {
-                transmits_allowed = 0;
-        }
-		mcasted = orf_token_mcast (token, transmits_allowed, system_from);
-		if (mcasted) {
-			forward_token = 1;
-			my_seq_unchanged = 0;
+	        if ((last_aru + MISSING_MCAST_WINDOW) < token->seq) {
+			transmits_allowed = 0;
 		}
 		}
+		mcasted = orf_token_mcast (token, transmits_allowed, system_from);
 		if (my_aru < token->aru ||
 		if (my_aru < token->aru ||
 			my_id.sin_addr.s_addr == token->aru_addr.s_addr || 
 			my_id.sin_addr.s_addr == token->aru_addr.s_addr || 
 			token->aru_addr.s_addr == 0) {
 			token->aru_addr.s_addr == 0) {
@@ -3292,7 +3414,7 @@ printf ("FAILED TO RECEIVE\n");
 				}
 				}
 			}
 			}
 	
 	
-			token_send (token, 1 /* forward_token */);
+			token_send (token, forward_token); 
 
 
 #ifdef GIVEINFO
 #ifdef GIVEINFO
 gettimeofday (&tv_current, NULL);
 gettimeofday (&tv_current, NULL);
@@ -3313,15 +3435,17 @@ printf ("I held %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0);
 			 * to improve performance
 			 * to improve performance
 			 */
 			 */
 			reset_token_timeout (); // REVIEWED
 			reset_token_timeout (); // REVIEWED
-			if (forward_token == 0) {
-				reset_token_retransmit_timeout (); // REVIEWED
+			reset_token_retransmit_timeout (); // REVIEWED
+			if (my_id.sin_addr.s_addr == my_ring_id.rep.s_addr &&
+				my_token_held == 1) {
+
+				start_token_hold_retransmit_timeout ();
 			}
 			}
 
 
 			token_callbacks_execute (TOTEM_CALLBACK_TOKEN_SENT);
 			token_callbacks_execute (TOTEM_CALLBACK_TOKEN_SENT);
 		}
 		}
 		break;
 		break;
 	}
 	}
-			my_token_held = 0;
 	return (0);
 	return (0);
 }
 }
 
 
@@ -3871,6 +3995,26 @@ if (random()%100 < 10) {
 	return (0);
 	return (0);
 }
 }
 
 
+static int message_handler_token_hold_cancel (
+	struct sockaddr_in *system_from,
+	struct iovec *iovec,
+	int iov_len,
+	int bytes_received,
+	int endian_conversion_needed)
+{
+	struct token_hold_cancel *token_hold_cancel = (struct token_hold_cancel *)iovec->iov_base;
+
+	if (memcmp (&token_hold_cancel->ring_id, &my_ring_id,
+		sizeof (struct memb_ring_id)) == 0) {
+
+		my_seq_unchanged = 0;
+		if (my_ring_id.rep.s_addr == my_id.sin_addr.s_addr) {
+			timer_function_token_retransmit_timeout (0);
+		}
+	}
+	return (0);
+}
+
 static int recv_handler (poll_handle handle, int fd, int revents,
 static int recv_handler (poll_handle handle, int fd, int revents,
 	void *data, unsigned int *prio)
 	void *data, unsigned int *prio)
 {
 {

+ 2 - 0
exec/totemsrp.h

@@ -101,6 +101,8 @@ int totemsrp_callback_token_create (
 void totemsrp_callback_token_destroy (
 void totemsrp_callback_token_destroy (
 	void **handle_out);
 	void **handle_out);
 
 
+void totemsrp_new_msg_signal (void);
+
 extern struct sockaddr_in config_mcast_addr;
 extern struct sockaddr_in config_mcast_addr;
 
 
 #endif /* TOTEMSRP_H_DEFINED */
 #endif /* TOTEMSRP_H_DEFINED */

+ 14 - 6
test/evsbench.c

@@ -54,10 +54,14 @@
 
 
 static int alarm_notice = 0;
 static int alarm_notice = 0;
 
 
+int outstanding = 0;
+
+
 
 
 void evs_deliver_fn (struct in_addr source_addr, void *msg, int msg_len)
 void evs_deliver_fn (struct in_addr source_addr, void *msg, int msg_len)
 {
 {
-//	printf ("Delivering message %s\n", buf);
+	outstanding--;
+// printf ("Delivering message %s\n", msg);
 }
 }
 
 
 void evs_confchg_fn (
 void evs_confchg_fn (
@@ -118,11 +122,15 @@ void evs_benchmark (evs_handle_t handle,
 	do {
 	do {
 		sprintf (buffer, "This is message %d\n", write_count);
 		sprintf (buffer, "This is message %d\n", write_count);
 try_again:
 try_again:
-		result = evs_mcast_joined (handle, EVS_TYPE_AGREED, &iov, 1);
-		if (result == EVS_ERR_TRY_AGAIN) {
-			goto try_again;
-		} else {
-			write_count += 1;
+		if (outstanding < 10) {
+			result = evs_mcast_joined (handle, EVS_TYPE_AGREED, &iov, 1);
+			if (result == EVS_ERR_TRY_AGAIN) {
+printf ("try again\n");
+				goto try_again;
+			} else {
+				write_count += 1;
+				outstanding++;
+			}
 		}
 		}
 		result = evs_dispatch (handle, EVS_DISPATCH_ALL);
 		result = evs_dispatch (handle, EVS_DISPATCH_ALL);
 	} while (alarm_notice == 0);
 	} while (alarm_notice == 0);

+ 29 - 15
test/test.cpp

@@ -15,10 +15,15 @@
 
 
 
 
 #include "ais_types.h"
 #include "ais_types.h"
-#include "ais_ckpt.h"
+#include "saCkpt.h"
 
 
 
 
-//SaVersionT version = { 'A', 1, 1 };
+SaVersionT version = { 'B', 1, 1 };
+
+SaCkptCallbacksT callbacks = {
+    0,
+    0
+};
 
 
 SaCkptCheckpointCreationAttributesT checkpointCreationAttributes = {
 SaCkptCheckpointCreationAttributesT checkpointCreationAttributes = {
         SA_CKPT_WR_ALL_REPLICAS,
         SA_CKPT_WR_ALL_REPLICAS,
@@ -30,8 +35,8 @@ SaCkptCheckpointCreationAttributesT checkpointCreationAttributes = {
 };
 };
 
 
 SaCkptSectionIdT sectionId = {
 SaCkptSectionIdT sectionId = {
-        (SaUint8T*)"section ID #1",
-        14
+        14,
+        (SaUint8T*)"section ID #1"
 };
 };
 
 
 SaCkptSectionCreationAttributesT sectionCreationAttributes = {
 SaCkptSectionCreationAttributesT sectionCreationAttributes = {
@@ -60,7 +65,7 @@ char* getPayload(int psize) {
         return retVal;
         return retVal;
 }
 }
 
 
-SaCkptCheckpointHandleT* WriteCheckpointHandle;
+SaCkptCheckpointHandleT WriteCheckpointHandle;
 
 
 static long sendCount = 0;
 static long sendCount = 0;
 void process_message()
 void process_message()
@@ -68,10 +73,11 @@ void process_message()
         struct timeval tv;
         struct timeval tv;
         long t1;
         long t1;
         long t2;
         long t2;
+	long told;
         SaCkptIOVectorElementT writeElement; // KJS
         SaCkptIOVectorElementT writeElement; // KJS
 
 
         SaUint32T erroroneousVectorIndex = 0;
         SaUint32T erroroneousVectorIndex = 0;
-        SaErrorT error;
+        SaAisErrorT error;
         
         
         writeElement.sectionId = sectionId;
         writeElement.sectionId = sectionId;
         writeElement.dataBuffer = getPayload(200); 
         writeElement.dataBuffer = getPayload(200); 
@@ -81,6 +87,7 @@ void process_message()
 
 
         gettimeofday(&tv, NULL);
         gettimeofday(&tv, NULL);
         t1 = tv.tv_usec;
         t1 = tv.tv_usec;
+	told = tv.tv_sec;
         
         
         do {
         do {
                 error = saCkptCheckpointWrite (WriteCheckpointHandle,
                 error = saCkptCheckpointWrite (WriteCheckpointHandle,
@@ -88,33 +95,40 @@ void process_message()
                                                1,
                                                1,
                                                &erroroneousVectorIndex);
                                                &erroroneousVectorIndex);
 
 
-                if (error != SA_OK) {
+                if (error != SA_AIS_OK) {
                         fprintf(stderr,"saCkptCheckpointWrite result %d (should be 1)\n", error);
                         fprintf(stderr,"saCkptCheckpointWrite result %d (should be 1)\n", error);
                 }
                 }
                 sendCount++;
                 sendCount++;
                 fprintf(stderr,"sendCount = %d",(int)sendCount);
                 fprintf(stderr,"sendCount = %d",(int)sendCount);
-        } while (error == SA_ERR_TRY_AGAIN);
+        } while (error == SA_AIS_ERR_TRY_AGAIN);
 
 
         gettimeofday(&tv, NULL);
         gettimeofday(&tv, NULL);
         t2 = tv.tv_usec;        
         t2 = tv.tv_usec;        
-        fprintf(stderr," ,RTT::%d\n",(long)t2-t1);
+        fprintf(stderr," ,RTT::%d.%d\n",(long)tv.tv_sec - told, t2-t1);
 }
 }
 
 
 int main () {
 int main () {
-	SaErrorT error;
+	SaAisErrorT error;
 	SaNameT* WriteCheckpointName = (SaNameT*) malloc(sizeof(SaNameT));
 	SaNameT* WriteCheckpointName = (SaNameT*) malloc(sizeof(SaNameT));
-        WriteCheckpointHandle = (SaCkptCheckpointHandleT*) malloc(sizeof(SaCkptCheckpointHandleT));
         char name[10];
         char name[10];
+        SaCkptHandleT ckptHandle;
+
         sprintf(name,"ckpt%d",1);
         sprintf(name,"ckpt%d",1);
         int namelen = strlen(name) + 1;
         int namelen = strlen(name) + 1;
         memcpy(WriteCheckpointName->value, name, namelen);
         memcpy(WriteCheckpointName->value, name, namelen);
         WriteCheckpointName->length = namelen;
         WriteCheckpointName->length = namelen;
-        error = saCkptCheckpointOpen (WriteCheckpointName,
+
+	error = saCkptInitialize (&ckptHandle, &callbacks, &version);
+
+        error = saCkptCheckpointOpen (
+			ckptHandle,
+			WriteCheckpointName,
                         &checkpointCreationAttributes,
                         &checkpointCreationAttributes,
                         SA_CKPT_CHECKPOINT_WRITE,
                         SA_CKPT_CHECKPOINT_WRITE,
                         1000000000, /* 1 Second */
                         1000000000, /* 1 Second */
-                        WriteCheckpointHandle);
-        if (error != SA_OK) {
+                        &WriteCheckpointHandle);
+
+        if (error != SA_AIS_OK) {
                 fprintf(stderr,"saCkptCheckpointOpen result %d (should be 1)\n", error);
                 fprintf(stderr,"saCkptCheckpointOpen result %d (should be 1)\n", error);
                 return error;
                 return error;
         }
         }
@@ -123,7 +137,7 @@ int main () {
                                         &sectionCreationAttributes,
                                         &sectionCreationAttributes,
                                         "Initial Data #0",
                                         "Initial Data #0",
                                          strlen ("Initial Data #0") + 1);
                                          strlen ("Initial Data #0") + 1);
-        if (error != SA_OK) {
+        if (error != SA_AIS_OK) {
                 fprintf(stderr,"saCkptSectionCreate result = %d\n", error);
                 fprintf(stderr,"saCkptSectionCreate result = %d\n", error);
                 return error;
                 return error;
         }
         }