Просмотр исходного кода

defect 932
This patch adds token sequence number and global sequence number rollover
support. A window is used to ensure comparisons are done properly.


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@852 fd59a12c-fef9-0310-b244-a6a79926bd2f

Steven Dake 20 лет назад
Родитель
Сommit
ecb97392f8
6 измененных файлов с 359 добавлено и 200 удалено
  1. 4 4
      exec/Makefile
  2. 2 2
      exec/print.c
  3. 0 5
      exec/totemnet.c
  4. 0 2
      exec/totemrrp.c
  5. 237 142
      exec/totemsrp.c
  6. 116 45
      include/sq.h

+ 4 - 4
exec/Makefile

@@ -29,13 +29,13 @@
 # THE POSSIBILITY OF SUCH DAMAGE.
 
 # Production mode flags
-#CFLAGS = -O3 -Wall -fomit-frame-pointer
-#LDFLAGS = -lpthread
+CFLAGS = -O3 -Wall -fomit-frame-pointer
+LDFLAGS = -lpthread
 
 # Debug mode flags
-CFLAGS = -g -Wall
+#CFLAGS = -g -Wall
 ##-DDEBUG
-LDFLAGS = -g -lpthread
+#LDFLAGS = -g -lpthread
 
 # Profile mode flags
 #CFLAGS = -O3 -pg

+ 2 - 2
exec/print.c

@@ -142,8 +142,8 @@ void log_syslog (char *log_string) {
 void internal_log_printf (int logclass, char *string, ...)
 {
 	va_list ap;
-	char newstring[1024];
-	char log_string[1024];
+	char newstring[4096];
+	char log_string[4096];
 	char char_time[512];
 	time_t curr_time;
 	int level;

+ 0 - 5
exec/totemnet.c

@@ -1026,8 +1026,6 @@ static int totemnet_build_sockets (
 	sockaddr_in_test.sin_family = AF_INET;
 	sockaddr_in_test.sin_addr.s_addr = sockaddr_mcast->sin_addr.s_addr;
 	sockaddr_in_test.sin_port = sockaddr_mcast->sin_port;
-	printf ("binding to %s\n", inet_ntoa (sockaddr_in_test.sin_addr));
-		printf ("%d\n", sockaddr_in_test.sin_port);
 	res = bind (sockets->mcast, (struct sockaddr *)&sockaddr_in_test,
 		sizeof (struct sockaddr_in));
 	if (res == -1) {
@@ -1051,8 +1049,6 @@ static int totemnet_build_sockets (
 	sockaddr_in_test.sin_family = AF_INET;
 	sockaddr_in_test.sin_addr.s_addr = bound_to->sin_addr.s_addr;
 	sockaddr_in_test.sin_port = sockaddr_mcast->sin_port;
-	printf ("binding to %s\n", inet_ntoa (sockaddr_in_test.sin_addr));
-		printf ("%d\n", sockaddr_in_test.sin_port);
 	res = bind (sockets->token, (struct sockaddr *)&sockaddr_in_test,
 		sizeof (struct sockaddr_in));
 	if (res == -1) {
@@ -1397,5 +1393,4 @@ extern void totemnet_net_mtu_adjust (struct totem_config *totem_config)
 	} else {
 		totem_config->net_mtu -= UDPIP_HEADER_SIZE;
 	}
-printf ("adjusted frame size is %d\n", totem_config->net_mtu);
 }

+ 0 - 2
exec/totemrrp.c

@@ -366,8 +366,6 @@ error_exit:
 static void timer_function_active_token (void *context)
 {
 	struct active_instance *instance = (struct active_instance *)context;
-
-	printf ("active instance %p\n", instance);
 }
 
 

+ 237 - 142
exec/totemsrp.c

@@ -93,6 +93,25 @@
 #define RETRANSMIT_ENTRIES_MAX			30
 #define MISSING_MCAST_WINDOW			128
 
+/*
+ * Rollover handling:
+ * SEQNO_START_MSG is the starting sequence number after a new configuration
+ *	This should remain zero, unless testing overflow in which case
+ *	0x7ffff000 and 0xfffff000 are good starting values.
+ *
+ * SEQNO_START_TOKEN is the starting sequence number after a new configuration
+ *	for a token.  This should remain zero, unless testing overflow in which
+ *	case 07fffff00 or 0xffffff00 are good starting values.
+ *
+ * SEQNO_START_MSG is the starting sequence number after a new configuration
+ *	This should remain zero, unless testing overflow in which case
+ *	0x7ffff000 and 0xfffff000 are good values to start with
+ */
+#define SEQNO_START_MSG 0x0
+#define SEQNO_START_TOKEN 0x0
+//#define SEQNO_START_MSG 0xfffffe00
+//#define SEQNO_START_TOKEN 0xfffffe00
+
 /*
  * we compare incoming messages to determine if their endian is
  * different - if so convert them
@@ -136,13 +155,12 @@ struct totemsrp_socket {
 struct message_header {
 	char type;
 	char encapsulated;
-//	unsigned short filler;
 	unsigned short endian_detector;
 } __attribute__((packed));
 
 struct mcast {
 	struct message_header header;
-	int seq;
+	unsigned int seq;
 	int this_seqno;
 	struct memb_ring_id ring_id;
 	struct in_addr source;
@@ -161,14 +179,14 @@ struct mcast {
 
 struct rtr_item  {
 	struct memb_ring_id ring_id;
-	int seq;
+	unsigned int seq;
 }__attribute__((packed));
 
 struct orf_token {
 	struct message_header header;
-	int seq;
-	int token_seq;
-	int aru;
+	unsigned int seq;
+	unsigned int token_seq;
+	unsigned int aru;
 	struct in_addr aru_addr;
 	struct memb_ring_id ring_id; 
 	short int fcc;
@@ -198,14 +216,14 @@ struct token_hold_cancel {
 
 struct memb_commit_token_memb_entry {
 	struct memb_ring_id ring_id;
-	int aru;
-	int high_delivered;
+	unsigned int aru;
+	unsigned int high_delivered;
 	int received_flg;
 }__attribute__((packed));
 
 struct memb_commit_token {
 	struct message_header header;
-	int token_seq;
+	unsigned int token_seq;
 	struct memb_ring_id ring_id;
 	unsigned int retrans_flg;
 	int memb_index;
@@ -287,15 +305,15 @@ struct totemsrp_instance {
 
 	int my_merge_detect_timeout_outstanding;
 
-	int my_last_aru;
+	unsigned int my_last_aru;
 
 	int my_seq_unchanged;
 
 	int my_received_flg;
 
-	int my_high_seq_received;
+	unsigned int my_high_seq_received;
 
-	int my_install_seq;
+	unsigned int my_install_seq;
 
 	int my_rotation_counter;
 
@@ -321,9 +339,9 @@ struct totemsrp_instance {
 	/*
 	 * Received up to and including
 	 */
-	int my_aru;
+	unsigned int my_aru;
 
-	int my_high_delivered;
+	unsigned int my_high_delivered;
 
 	struct list_head token_callback_received_listhead;
 
@@ -333,7 +351,7 @@ struct totemsrp_instance {
 
 	int orf_token_retransmit_size;
 
-	int my_token_seq;
+	unsigned int my_token_seq;
 
 	/*
 	 * Timers
@@ -407,21 +425,19 @@ struct totemsrp_instance {
 
 	unsigned long long token_ring_id_seq;
 
-	int last_released;
+	unsigned int last_released;
 
-	int set_aru;
-
-	int totemsrp_brake;	
+	unsigned int set_aru;
 
 	int old_ring_state_saved;
 
 	int old_ring_state_aru;
 
-	int old_ring_state_high_seq_received;
+	unsigned int old_ring_state_high_seq_received;
 
 	int ring_saved;
 
-	int my_last_seq;
+	unsigned int my_last_seq;
 
 	struct timeval tv_old;
 
@@ -491,10 +507,10 @@ static void memb_ring_id_create_or_load (struct totemsrp_instance *, struct memb
 
 static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
 static void memb_state_gather_enter (struct totemsrp_instance *instance);
-static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, int end_point);
+static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
 static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
 	int fcc_mcasts_allowed, struct in_addr *system_from);
-static int messages_free (struct totemsrp_instance *instance, int token_aru);
+static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
 
 static void memb_ring_id_store (struct totemsrp_instance *instance, struct memb_commit_token *commit_token);
 static void memb_state_commit_token_update (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
@@ -552,13 +568,19 @@ void totemsrp_instance_initialize (struct totemsrp_instance *instance)
 
 	instance->my_received_flg = 1;
 
-	instance->my_token_seq = -1;
+	instance->my_token_seq = SEQNO_START_TOKEN - 1;
 
 	instance->orf_token_retransmit = malloc (15000);
 
 	instance->memb_state = MEMB_STATE_OPERATIONAL;
 
 	instance->set_aru = -1;
+
+	instance->my_aru = SEQNO_START_MSG;
+
+	instance->my_high_seq_received = SEQNO_START_MSG;
+
+	instance->my_high_delivered = SEQNO_START_MSG;
 }
 
 void main_token_seqid_get (
@@ -748,7 +770,6 @@ void totemsrp_finalize (
 	saHandleInstancePut (&totemsrp_instance_database, handle);
 }
 
-
 /*
  * Set operations for use by the membership algorithm
  */
@@ -1037,7 +1058,7 @@ static void old_ring_state_save (struct totemsrp_instance *instance)
 		instance->old_ring_state_aru = instance->my_aru;
 		instance->old_ring_state_high_seq_received = instance->my_high_seq_received;
 		instance->totemsrp_log_printf (instance->totemsrp_log_level_notice,
-			"Saving state aru %d high seq recieved %d\n",
+			"Saving state aru %x high seq received %x\n",
 			instance->my_aru, instance->my_high_seq_received);
 	}
 }
@@ -1063,7 +1084,7 @@ static void ring_state_restore (struct totemsrp_instance *instance)
 		instance->my_aru = instance->old_ring_state_aru;
 		instance->my_high_seq_received = instance->old_ring_state_high_seq_received;
 		instance->totemsrp_log_printf (instance->totemsrp_log_level_debug,
-			"Restoring instance->my_aru %d my high seq received %d\n",
+			"Restoring instance->my_aru %x my high seq received %x\n",
 			instance->my_aru, instance->my_high_seq_received);
 	}
 }
@@ -1222,26 +1243,28 @@ static void memb_timer_function_gather_consensus_timeout (void *data)
 
 static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
 {
-	int i;
+	unsigned int i;
 	struct sort_queue_item *recovery_message_item;
 	struct sort_queue_item regular_message_item;
+	unsigned int range = 0;
 	int res;
 	void *ptr;
 	struct mcast *mcast;
 
 	instance->totemsrp_log_printf (instance->totemsrp_log_level_debug,
-		"recovery to regular %d-%d\n", 1, instance->my_aru);
+		"recovery to regular %x-%x\n", SEQNO_START_MSG + 1, instance->my_aru);
 
+	range = instance->my_aru - SEQNO_START_MSG;
 	/*
 	 * Move messages from recovery to regular sort queue
 	 */
 // todo should i be initialized to 0 or 1 ?
-	for (i = 1; i <= instance->my_aru; i++) {
-		res = sq_item_get (&instance->recovery_sort_queue, i, &ptr);
+	for (i = 1; i <= range; i++) {
+		res = sq_item_get (&instance->recovery_sort_queue,
+			i + SEQNO_START_MSG, &ptr);
 		if (res != 0) {
 			continue;
 		}
-printf ("Transferring message with seq id %d\n", i);
 		recovery_message_item = (struct sort_queue_item *)ptr;
 
 		/*
@@ -1254,9 +1277,6 @@ printf ("Transferring message with seq id %d\n", i);
 				sizeof (struct iovec) * recovery_message_item->iov_len);
 		} else {
 			mcast = recovery_message_item->iovec[0].iov_base;
-			instance->totemsrp_log_printf (instance->totemsrp_log_level_notice,
-				"encapsulated is %d\n",
-				mcast->header.encapsulated);
 			if (mcast->header.encapsulated == 1) {
 				/*
 				 * Message is a recovery message encapsulated
@@ -1269,7 +1289,6 @@ printf ("Transferring message with seq id %d\n", i);
 				regular_message_item.iov_len = 1;
 				mcast = regular_message_item.iovec[0].iov_base;
 			} else {
-printf ("not encapsulated\n");
 				continue; /* TODO this case shouldn't happen */
 				/*
 				 * Message is originated on new ring and not
@@ -1294,21 +1313,18 @@ printf ("not encapsulated\n");
 		if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
 			sizeof (struct memb_ring_id)) == 0) {
 
-		instance->totemsrp_log_printf (instance->totemsrp_log_level_notice,
-			"adding msg with seq no %d\n", mcast->seq, mcast->this_seqno);
-
 			regular_message_item.iov_len = recovery_message_item->iov_len;
 			res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
 			if (res == 0) {
 				sq_item_add (&instance->regular_sort_queue,
 					&regular_message_item, mcast->seq);
-				if (mcast->seq > instance->old_ring_state_high_seq_received) {
+				if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
 					instance->old_ring_state_high_seq_received = mcast->seq;
 				}
 			}
 		} else {
 			instance->totemsrp_log_printf (instance->totemsrp_log_level_notice,
-				"-not adding msg with seq no %d\n", mcast->seq);
+				"-not adding msg with seq no %x\n", mcast->seq);
 		}
 	}
 }
@@ -1322,14 +1338,14 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance)
 	int joined_list_entries = 0;
 	struct in_addr left_list[PROCESSOR_COUNT_MAX];
 	int left_list_entries = 0;
-	int aru_save;
+	unsigned int aru_save;
 
 	old_ring_state_reset (instance);
 	ring_reset (instance);
 	deliver_messages_from_recovery_to_regular (instance);
 
 	instance->totemsrp_log_printf (instance->totemsrp_log_level_debug,
-		"Delivering to app %d to %d\n",
+		"Delivering to app %x to %x\n",
 		instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
 
 	aru_save = instance->my_aru;
@@ -1384,7 +1400,8 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance)
 	 * into the regular sort queue.
 	 */
 	sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
-	instance->my_last_aru = 0;
+	instance->my_last_aru = SEQNO_START_MSG;
+	sq_items_release (&instance->regular_sort_queue, SEQNO_START_MSG - 1);
 
 	instance->my_proc_list_entries = instance->my_new_memb_entries;
 	memcpy (instance->my_proc_list, instance->my_new_memb_list,
@@ -1494,18 +1511,23 @@ static void memb_state_recovery_enter (
 	int local_received_flg = 1;
 #endif
 	unsigned int low_ring_aru;
+	unsigned int range = 0;
 	unsigned int messages_originated = 0;
+	char is_originated[4096];
+	char not_originated[4096];
+	char seqno_string_hex[10];
 
 	instance->my_high_ring_delivered = 0;
 
-	sq_reinit (&instance->recovery_sort_queue, 0);
+	sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
 	queue_reinit (&instance->retrans_message_queue);
 
 	low_ring_aru = instance->old_ring_state_high_seq_received;
 
 	memb_state_commit_token_send (instance, commit_token);
 
-instance->my_token_seq = -1;
+	instance->my_token_seq = SEQNO_START_TOKEN - 1;
+
 	/*
 	 * Build regular configuration
 	 */
@@ -1534,7 +1556,7 @@ instance->my_token_seq = -1;
 			inet_ntoa (commit_token->memb_list[i].ring_id.rep));
 
 		instance->totemsrp_log_printf (instance->totemsrp_log_level_notice,
-			"aru %d high delivered %d received flag %d\n",
+			"aru %x high delivered %x received flag %d\n",
 			commit_token->memb_list[i].aru,
 			commit_token->memb_list[i].high_delivered,
 			commit_token->memb_list[i].received_flg);
@@ -1565,12 +1587,6 @@ instance->my_token_seq = -1;
 		 * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
 		 */
 		for (i = 0; i < commit_token->addr_entries; i++) {
-printf ("comparing %d old ring %s.%lld with commit ring %s.%lld.\n", i,
-	inet_ntoa (instance->my_old_ring_id.rep), instance->my_old_ring_id.seq,
-	inet_ntoa (commit_token->memb_list[i].ring_id.rep),
-	commit_token->memb_list[i].ring_id.seq);
-printf ("memb set subset %d\n", 
-	memb_set_subset (&instance->my_new_memb_list[i], 1, instance->my_deliver_memb_list, instance->my_deliver_memb_entries));
 			if (memb_set_subset (&instance->my_new_memb_list[i], 1,
 				instance->my_deliver_memb_list,
 				 instance->my_deliver_memb_entries) &&
@@ -1580,36 +1596,41 @@ printf ("memb set subset %d\n",
 				sizeof (struct memb_ring_id)) == 0) {
 	
 				if (low_ring_aru == 0 ||
-					low_ring_aru > commit_token->memb_list[i].aru) {
+					sq_lt_compare (commit_token->memb_list[i].aru, low_ring_aru)) {
 
 					low_ring_aru = commit_token->memb_list[i].aru;
 				}
-				if (instance->my_high_ring_delivered < commit_token->memb_list[i].high_delivered) {
+				if (sq_lt_compare (instance->my_high_ring_delivered, commit_token->memb_list[i].high_delivered)) {
 					instance->my_high_ring_delivered = commit_token->memb_list[i].high_delivered;
 				}
 			}
 		}
-		assert (low_ring_aru != 0xffffffff);
 		/*
-		 * Cpy all old ring messages to instance->retrans_message_queue
+		 * Copy all old ring messages to instance->retrans_message_queue
 		 */
 		instance->totemsrp_log_printf (instance->totemsrp_log_level_notice,
-			"copying all old ring messages from %d-%d.\n",
+			"copying all old ring messages from %x-%x.\n",
 			low_ring_aru + 1, instance->old_ring_state_high_seq_received);
+		strcpy (not_originated, "Not Originated for recovery: ");
+		strcpy (is_originated, "Originated for recovery: ");
 			
-		for (i = low_ring_aru + 1; i <= instance->old_ring_state_high_seq_received; i++) {
+		range = instance->old_ring_state_high_seq_received - low_ring_aru;
+		assert (range < 1024);
+		for (i = 1; i <= range; i++) {
 
 			struct sort_queue_item *sort_queue_item;
 			struct message_item message_item;
 			void *ptr;
 			int res;
 
-			res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
+			sprintf (seqno_string_hex, "%x ", low_ring_aru + i);
+			res = sq_item_get (&instance->regular_sort_queue,
+				low_ring_aru + i, &ptr);
 			if (res != 0) {
-printf ("-not copying %d-\n", i);
+				strcat (not_originated, seqno_string_hex);
 				continue;
 			}
-printf ("copying %d\n", i);
+			strcat (is_originated, seqno_string_hex);
 			sort_queue_item = ptr;
 			assert (sort_queue_item->iov_len > 0);
 			assert (sort_queue_item->iov_len <= MAXIOVS);
@@ -1630,15 +1651,19 @@ printf ("copying %d\n", i);
 		}
 		instance->totemsrp_log_printf (instance->totemsrp_log_level_notice,
 			"Originated %d messages in RECOVERY.\n", messages_originated);
+		strcat (not_originated, "\n");
+		strcat (is_originated, "\n");
+		instance->totemsrp_log_printf (instance->totemsrp_log_level_notice, is_originated);
+		instance->totemsrp_log_printf (instance->totemsrp_log_level_notice, not_originated);
 //	}
 
-	instance->my_aru = 0;
+	instance->my_aru = SEQNO_START_MSG;
 	instance->my_aru_count = 0;
 	instance->my_seq_unchanged = 0;
-	instance->my_high_seq_received = 0;
-	instance->my_install_seq = 0;
+	instance->my_high_seq_received = SEQNO_START_MSG;
+	instance->my_install_seq = SEQNO_START_MSG;
+	instance->last_released = SEQNO_START_MSG;
 
-	instance->totemsrp_log_printf (instance->totemsrp_log_level_notice, "entering RECOVERY state.\n");
 	reset_token_timeout (instance); // REVIEWED
 	reset_token_retransmit_timeout (instance); // REVIEWED
 
@@ -1797,13 +1822,18 @@ static int orf_token_remcast (
 
 	struct sq *sort_queue;
 
-//TODO	printf ("remcasting %d\n", seq);
 	if (instance->memb_state == MEMB_STATE_RECOVERY) {
 		sort_queue = &instance->recovery_sort_queue;
 	} else {
 		sort_queue = &instance->regular_sort_queue;
 	}
 
+	res = sq_in_range (sort_queue, seq);
+	if (res == 0) {
+printf ("sq not in range\n");
+		return (-1);
+	}
+	
 	/*
 	 * Get RTR item at seq, if not available, return
 	 */
@@ -1825,54 +1855,73 @@ static int orf_token_remcast (
 /*
  * Free all freeable messages from ring
  */
-static int messages_free (
+static void messages_free (
 	struct totemsrp_instance *instance,
-	int token_aru)
+	unsigned int token_aru)
 {
 	struct sort_queue_item *regular_message;
-	int i, j;
+	unsigned int i, j;
 	int res;
 	int log_release = 0;
-	int release_to;
+	unsigned int release_to;
+	unsigned int range = 0;
+
+//printf ("aru %x last aru %x my high delivered %x last releaed %x\n",
+//		token_aru, instance->my_last_aru, instance->my_high_delivered, instance->last_released);
 
 	release_to = token_aru;
-	if (release_to > instance->my_last_aru) {
+	if (sq_lt_compare (instance->my_last_aru, release_to)) {
 		release_to = instance->my_last_aru;
 	}
-	if (release_to > instance->my_high_delivered) {
+	if (sq_lt_compare (instance->my_high_delivered, release_to)) {
 		release_to = instance->my_high_delivered;
 	}
 
+	/*
+	 * Ensure we dont try release before an already released point
+	 */
+	if (sq_lt_compare (release_to, instance->last_released)) {
+		return;
+	}
+
+	range = release_to - instance->last_released;
+	assert (range < 1024);
+
 	/*
 	 * Release retransmit list items if group aru indicates they are transmitted
 	 */
-	for (i = instance->last_released; i <= release_to; i++) {
+	for (i = 1; i <= range; i++) {
 		void *ptr;
-		res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
+
+		res = sq_item_get (&instance->regular_sort_queue,
+			instance->last_released + i, &ptr);
 		if (res == 0) {
 			regular_message = ptr;
 			for (j = 0; j < regular_message->iov_len; j++) {
 				free (regular_message->iovec[j].iov_base);
 			}
 		}
-		sq_items_release (&instance->regular_sort_queue, i);
-		instance->last_released = i + 1;
+		sq_items_release (&instance->regular_sort_queue,
+			instance->last_released + i);
+
 		log_release = 1;
 	}
+	instance->last_released += range;
 
  	if (log_release) {
 		instance->totemsrp_log_printf (instance->totemsrp_log_level_debug,
-			"releasing messages up to and including %d\n", release_to);
+			"releasing messages up to and including %x\n", release_to);
 	}
-	return (0);
 }
 
 static void update_aru (
 	struct totemsrp_instance *instance)
 {
-	int i;
+	unsigned int i;
 	int res;
 	struct sq *sort_queue;
+	unsigned int range;
+	unsigned int my_aru_saved = 0;
 
 	if (instance->memb_state == MEMB_STATE_RECOVERY) {
 		sort_queue = &instance->recovery_sort_queue;
@@ -1880,18 +1929,27 @@ static void update_aru (
 		sort_queue = &instance->regular_sort_queue;
 	}
 
-	for (i = instance->my_aru + 1; i <= instance->my_high_seq_received; i++) {
+	range = instance->my_high_seq_received - instance->my_aru;
+	if (range > 1024) {
+		return;
+	}
+
+	my_aru_saved = instance->my_aru;
+	for (i = 1; i <= range; i++) {
+
 		void *ptr;
 
-		res = sq_item_get (sort_queue, i, &ptr);
+		res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
 		/*
-		 * If hole, stop assembly
+		 * If hole, stop updating aru
 		 */
 		if (res != 0) {
 			break;
 		}
-		instance->my_aru = i;
 	}
+	instance->my_aru += i - 1;
+
+
 //	instance->totemsrp_log_printf (instance->totemsrp_log_level_debug,
 //		"setting received flag to FALSE %d %d\n",
 //		instance->my_aru, instance->my_high_seq_received);
@@ -1944,6 +2002,7 @@ static int orf_token_mcast (
 			token->seq);
 			return (0);
 		}
+
 		message_item->mcast->seq = ++token->seq;
 		message_item->mcast->this_seqno = instance->global_seqno++;
 
@@ -1988,7 +2047,7 @@ static int orf_token_mcast (
 	assert (instance->fcc_mcast_current < 100);
 
 	/*
-	 * If messages mcasted, deliver any new messages to totemg
+	 * If messages mcasted, deliver any new messages to totempg
 	 */
 	instance->my_high_seq_received = token->seq;
 		
@@ -2008,12 +2067,15 @@ static int orf_token_rtr (
 	struct orf_token *orf_token,
 	int *fcc_allowed)
 {
-	int res;
-	int i, j;
-	int found;
-	int total_entries;
+	unsigned int res;
+	unsigned int i, j;
+	unsigned int found;
+	unsigned int total_entries;
 	struct sq *sort_queue;
 	struct rtr_item *rtr_list;
+	unsigned int range = 0;
+	char retransmit_msg[1024];
+	char value[64];
 
 	if (instance->memb_state == MEMB_STATE_RECOVERY) {
 		sort_queue = &instance->recovery_sort_queue;
@@ -2022,14 +2084,17 @@ static int orf_token_rtr (
 	}
 
 	rtr_list = &orf_token->rtr_list[0];
+	strcpy (retransmit_msg, "Retransmit List: ");
 	if (orf_token->rtr_list_entries) {
 		instance->totemsrp_log_printf (instance->totemsrp_log_level_debug,
 			"Retransmit List %d\n", orf_token->rtr_list_entries);
 		for (i = 0; i < orf_token->rtr_list_entries; i++) {
-			instance->totemsrp_log_printf (instance->totemsrp_log_level_debug,
-				"%d ", rtr_list[i].seq);
+			sprintf (value, "%x ", rtr_list[i].seq);
+			strcat (retransmit_msg, value);
 		}
-		instance->totemsrp_log_printf (instance->totemsrp_log_level_debug, "\n");
+		strcat (retransmit_msg, "\n");
+		instance->totemsrp_log_printf (instance->totemsrp_log_level_notice,
+			"%s", retransmit_msg);
 	}
 
 	total_entries = orf_token->rtr_list_entries;
@@ -2051,7 +2116,6 @@ static int orf_token_rtr (
 			continue;
 		}
 
-		assert (rtr_list[i].seq > 0);
 		res = orf_token_remcast (instance, rtr_list[i].seq);
 		if (res == 0) {
 			/*
@@ -2069,32 +2133,36 @@ static int orf_token_rtr (
 	}
 	*fcc_allowed = *fcc_allowed - instance->fcc_remcast_current - 1;
 
-#ifdef COMPILE_OUT
-for (i = 0; i < orf_token->rtr_list_entries; i++) {
-	assert (rtr_list_old[index_old].seq != -1);
-}
-#endif
-
 	/*
 	 * Add messages to retransmit to RTR list
 	 * but only retry if there is room in the retransmit list
 	 */
-	for (i = instance->my_aru + 1;
-		orf_token->rtr_list_entries < RETRANSMIT_ENTRIES_MAX &&
-		i <= instance->my_high_seq_received;
-		i++) {
+//printf ("high seq %x aru %x\n", instance->my_high_seq_received, instance->my_aru);
+	range = instance->my_high_seq_received - instance->my_aru;
+	assert (range < 100000);
+
+	for (i = 1; (orf_token->rtr_list_entries < RETRANSMIT_ENTRIES_MAX) &&
+		(i <= range); i++) {
+
+		/*
+		 * Ensure message is within the sort queue range
+		 */
+		res = sq_in_range (sort_queue, instance->my_aru + i);
+		if (res == 0) {
+			break;
+		}
 
 		/*
 		 * Find if a message is missing from this processor
 		 */
-		res = sq_item_inuse (sort_queue, i);
+		res = sq_item_inuse (sort_queue, instance->my_aru + i);
 		if (res == 0) {
 			/*
 			 * Determine if missing message is already in retransmit list
 			 */
 			found = 0;
 			for (j = 0; j < orf_token->rtr_list_entries; j++) {
-				if (i == rtr_list[j].seq) {
+				if (instance->my_aru + i == rtr_list[j].seq) {
 					found = 1;
 				}
 			}
@@ -2104,7 +2172,7 @@ for (i = 0; i < orf_token->rtr_list_entries; i++) {
 				 */
 				memcpy (&rtr_list[orf_token->rtr_list_entries].ring_id,
 					&instance->my_ring_id, sizeof (struct memb_ring_id));
-				rtr_list[orf_token->rtr_list_entries].seq = i;
+				rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i;
 				orf_token->rtr_list_entries++;
 			}
 		}
@@ -2241,7 +2309,8 @@ static int orf_token_send_initial (struct totemsrp_instance *instance)
 	orf_token.header.endian_detector = ENDIAN_LOCAL;
 	orf_token.header.encapsulated = 0;
 	orf_token.seq = 0;
-	orf_token.token_seq = 0;
+	orf_token.seq = SEQNO_START_MSG;
+	orf_token.token_seq = SEQNO_START_TOKEN;
 	orf_token.retrans_flg = 1;
 	instance->my_set_retrans_flg = 1;
 /*
@@ -2254,7 +2323,7 @@ static int orf_token_send_initial (struct totemsrp_instance *instance)
 */
 		
 	orf_token.aru = 0;
-//	orf_token.aru_addr.s_addr = 0;//instance->my_id.sin_addr.s_addr;
+	orf_token.aru = SEQNO_START_MSG - 1;
 	orf_token.aru_addr.s_addr = instance->my_id.sin_addr.s_addr;
 	memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
 	orf_token.fcc = 0;
@@ -2611,8 +2680,8 @@ static int message_handler_orf_token (
 	int transmits_allowed;
 	int forward_token;
 	int mcasted;
-	int last_aru;
-	int low_water;
+	unsigned int last_aru;
+	unsigned int low_water;
 
 #ifdef GIVEINFO
 	struct timeval tv_current;
@@ -2628,7 +2697,7 @@ printf ("OTHERS %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0);
 #endif
 
 #ifdef RANDOM_DROP
-if (random () % 100 < 10) {
+if (random()%100 < 10) {
 	return (0);
 }
 #endif
@@ -2647,9 +2716,6 @@ if (random () % 100 < 10) {
 
 	instance->my_last_seq = token_ref->seq;
 
-//	assert (msg_len >= sizeof (struct orf_token));
-//	assert (msg_len == sizeof (struct orf_token) +
-//		(sizeof (struct rtr_item) * token_ref->rtr_list_entries));
 	/*
 	 * Make copy of token and retransmit list in case we have
 	 * to flush incoming messages from the kernel queue
@@ -2728,7 +2794,7 @@ if (random () % 100 < 10) {
 		/*
 		 * Discard retransmitted tokens
 		 */
-		if (instance->my_token_seq >= token->token_seq) {
+		if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
 			/*
 			 * If this processor receives a retransmitted token, it is sure
 		 	 * the previous processor is still alive.  As a result, it can
@@ -2751,11 +2817,12 @@ if (random () % 100 < 10) {
 		transmits_allowed = TRANSMITS_ALLOWED;
 		mcasted = orf_token_rtr (instance, token, &transmits_allowed);
 
-	        if ((last_aru + MISSING_MCAST_WINDOW) < token->seq) {
+		if (sq_lt_compare (instance->last_released + MISSING_MCAST_WINDOW, token->seq + TRANSMITS_ALLOWED)) {
 			transmits_allowed = 0;
+printf ("zero \n");
 		}
 		mcasted = orf_token_mcast (instance, token, transmits_allowed, system_from);
-		if (instance->my_aru < token->aru ||
+		if (sq_lt_compare (instance->my_aru, token->aru) ||
 			instance->my_id.sin_addr.s_addr == token->aru_addr.s_addr || 
 			token->aru_addr.s_addr == 0) {
 			
@@ -2796,7 +2863,7 @@ printf ("FAILED TO RECEIVE\n");
 				 * (ie: its retrans queue is empty)
 				 */
 				low_water = instance->my_aru;
-				if (low_water > last_aru) {
+				if (sq_lt_compare (last_aru, low_water)) {
 					low_water = last_aru;
 				}
 // TODO is this code right
@@ -2812,7 +2879,7 @@ printf ("FAILED TO RECEIVE\n");
 					token->retrans_flg = 0;
 				}
 				instance->totemsrp_log_printf (instance->totemsrp_log_level_debug,
-					"token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, low_water %d aru %d\n", 
+					"token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, low_water %x aru %x\n", 
 					token->retrans_flg, instance->my_set_retrans_flg,
 					queue_is_empty (&instance->retrans_message_queue),
 					instance->my_retrans_flg_count, low_water, token->aru);
@@ -2825,7 +2892,7 @@ printf ("FAILED TO RECEIVE\n");
 					instance->my_install_seq = token->seq;
 				}
 				instance->totemsrp_log_printf (instance->totemsrp_log_level_debug,
-					"install seq %d aru %d high seq received %d\n",
+					"install seq %x aru %x high seq received %x\n",
 					instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
 				if (instance->my_retrans_flg_count >= 2 && instance->my_aru >= instance->my_install_seq && instance->my_received_flg == 0) {
 					instance->my_received_flg = 1;
@@ -2840,7 +2907,7 @@ printf ("FAILED TO RECEIVE\n");
 				}
 				if (instance->my_rotation_counter == 2) {
 				instance->totemsrp_log_printf (instance->totemsrp_log_level_debug,
-					"retrans flag count %d token aru %d install seq %d aru %d %d\n",
+					"retrans flag count %x token aru %x install seq %x aru %x %x\n",
 					instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
 					instance->my_aru, token->seq);
 
@@ -2862,7 +2929,8 @@ printf ("I held %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0);
 }
 #endif
 			if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
-				messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
+				messages_deliver_to_app (instance, 0,
+					instance->my_high_seq_received);
 			}
 
 			/*
@@ -2896,37 +2964,56 @@ printf ("I held %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0);
 static void messages_deliver_to_app (
 	struct totemsrp_instance *instance,
 	int skip,
-	int end_point)
+	unsigned int end_point)
 {
-    struct sort_queue_item *sort_queue_item_p;
-    int i;
-    int res;
-    struct mcast *mcast;
+	struct sort_queue_item *sort_queue_item_p;
+	unsigned int i;
+	int res;
+	struct mcast *mcast;
+	unsigned int range = 0;
+	unsigned int my_high_delivered_stored = 0;
 
 	instance->totemsrp_log_printf (instance->totemsrp_log_level_debug,
-		"Delivering %d to %d\n", instance->my_high_delivered + 1,
+		"Delivering %x to %x\n", instance->my_high_delivered,
 		end_point);
 
+	range = end_point - instance->my_high_delivered;
+
+	assert (range < 10240);
+	my_high_delivered_stored = instance->my_high_delivered;
+
 	/*
 	 * Deliver messages in order from rtr queue to pending delivery queue
 	 */
-	for (i = instance->my_high_delivered + 1; i <= end_point; i++) {
+	for (i = 1; i <= range; i++) {
+
 		void *ptr = 0;
 
-		res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
-		if (res != 0 && skip) {
-printf ("-skipping %d-\n", i);
-			instance->my_high_delivered = i;
-			continue;
+		/*
+		 * If out of range of sort queue, stop assembly
+		 */
+		res = sq_in_range (&instance->regular_sort_queue,
+			my_high_delivered_stored + i);
+		if (res == 0) {
+			break;
 		}
 
+		res = sq_item_get (&instance->regular_sort_queue,
+			my_high_delivered_stored + i, &ptr);
 		/*
 		 * If hole, stop assembly
 		 */
-		if (res != 0) {
+		if (res != 0 && skip == 0) {
 			break;
 		}
 
+		instance->my_high_delivered = my_high_delivered_stored + i;
+
+		if (res != 0) {
+			continue;
+
+		}
+
 		sort_queue_item_p = ptr;
 
 		mcast = sort_queue_item_p->iovec[0].iov_base;
@@ -2943,18 +3030,16 @@ printf ("-skipping %d-\n", i);
 				1,
 				instance->my_deliver_memb_list,
 				instance->my_deliver_memb_entries) == 0) {
+		instance->my_high_delivered = my_high_delivered_stored + i;
 
-printf ("-skipping %d - wrong ip", i);
-			instance->my_high_delivered = i;
 			continue;
 		}
-		instance->my_high_delivered = i;
 
 		/*
 		 * Message found
 		 */
 		instance->totemsrp_log_printf (instance->totemsrp_log_level_debug,
-			"Delivering MCAST message with seq %d to pending delivery queue\n",
+			"Delivering MCAST message with seq %x to pending delivery queue\n",
 			mcast->seq);
 
 		/*
@@ -3006,17 +3091,25 @@ static int message_handler_mcast (
 	struct sq *sort_queue;
 	struct mcast mcast_header;
 	
+
 	if (endian_conversion_needed) {
 		mcast_endian_convert (msg, &mcast_header);
 	} else {
 		memcpy (&mcast_header, msg, sizeof (struct mcast));
 	}
 
+/*
 	if (mcast_header.header.encapsulated == 1) {
 		sort_queue = &instance->recovery_sort_queue;
 	} else {
 		sort_queue = &instance->regular_sort_queue;
 	}
+*/
+	if (instance->memb_state == MEMB_STATE_RECOVERY) {
+		sort_queue = &instance->recovery_sort_queue;
+	} else {
+		sort_queue = &instance->regular_sort_queue;
+	}
 	assert (msg_len < FRAME_SIZE_MAX);
 #ifdef RANDOM_DROP
 if (random()%100 < 50) {
@@ -3065,16 +3158,17 @@ if (random()%100 < 50) {
 	}
 
 	instance->totemsrp_log_printf (instance->totemsrp_log_level_debug,
-		"Received ringid(%s:%lld) seq %d\n",
+		"Received ringid(%s:%lld) seq %x\n",
 		inet_ntoa (mcast_header.ring_id.rep),
 		mcast_header.ring_id.seq,
 		mcast_header.seq);
+
 	/*
 	 * Add mcast message to rtr queue if not already in rtr queue
 	 * otherwise free io vectors
 	 */
 	if (msg_len > 0 && msg_len < FRAME_SIZE_MAX &&
-		instance->my_aru < mcast_header.seq &&
+		sq_in_range (sort_queue, mcast_header.seq) && 
 		sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
 
 		/*
@@ -3091,7 +3185,8 @@ if (random()%100 < 50) {
 		assert (sort_queue_item.iovec[0].iov_len < FRAME_SIZE_MAX);
 		sort_queue_item.iov_len = 1;
 		
-		if (mcast_header.seq > instance->my_high_seq_received) {
+		if (sq_lt_compare (instance->my_high_seq_received,
+			mcast_header.seq)) {
 			instance->my_high_seq_received = mcast_header.seq;
 		}
 

+ 116 - 45
include/sq.h

@@ -37,16 +37,57 @@
 #include "errno.h"
 
 struct sq {
-	int head;
-	int size;
+	unsigned int head;
+	unsigned int size;
 	void *items;
-	unsigned char *items_inuse;
-	int size_per_item;
-	int head_seqid;
-	int item_count;
-	int pos_max;
+	unsigned int *items_inuse;
+	unsigned int size_per_item;
+	unsigned int head_seqid;
+	unsigned int item_count;
+	unsigned int pos_max;
 };
 
+/*
+ * Compare a unsigned rollover-safe value to an unsigned rollover-safe value
+ */
+
+/*
+ * ADJUST_ROLLOVER_POINT is the value used to determine when a window should be
+ *	used to calculate a less-then or less-then-equal comparison.
+ *
+ * ADJUST_ROLLOVER_VALUE is the value by which both values in a comparison are
+ *	adjusted if either value in a comparison is greater then
+ *	ADJUST_ROLLOVER_POINT.
+ */
+#define ADJUST_ROLLOVER_POINT 0x80000000
+#define ADJUST_ROLLOVER_VALUE 0x10000
+
+static inline int sq_lt_compare (unsigned int a, unsigned int b) {
+	if ((a > ADJUST_ROLLOVER_POINT) || (b > ADJUST_ROLLOVER_POINT)) {
+		if ((a - ADJUST_ROLLOVER_VALUE) < (b - ADJUST_ROLLOVER_VALUE)) {
+			return (1);
+		}
+	} else {
+		if (a < b) {
+			return (1);
+		}
+	}
+	return (0);
+}
+
+static inline int sq_lte_compare (unsigned int a, unsigned int b) {
+	if ((a > ADJUST_ROLLOVER_POINT) || (b > ADJUST_ROLLOVER_POINT)) {
+		if ((a - ADJUST_ROLLOVER_VALUE) <= (b - ADJUST_ROLLOVER_VALUE)) {
+			return (1);
+		}
+	} else {
+		if (a <= b) {
+			return (1);
+		}
+	}
+	return (0);
+}
+
 static inline int sq_init (
 	struct sq *sq,
 	int item_count,
@@ -66,24 +107,24 @@ static inline int sq_init (
 	}
 	memset (sq->items, 0, item_count * size_per_item);
 
-	sq->items_inuse = (void *)malloc (item_count * sizeof (char));
-	memset (sq->items_inuse, 0, item_count * sizeof (char));
+	sq->items_inuse = (void *)malloc (item_count * sizeof (unsigned int));
+	memset (sq->items_inuse, 0, item_count * sizeof (unsigned int));
 	return (0);
 }
 
-static inline void sq_reinit (struct sq *sq, int head_seqid)
+static inline void sq_reinit (struct sq *sq, unsigned int head_seqid)
 {
 	sq->head = 0;
 	sq->head_seqid = head_seqid;
 	sq->pos_max = 0;
 
 	memset (sq->items, 0, sq->item_count * sq->size_per_item);
-	memset (sq->items_inuse, 0, sq->item_count * sizeof (char));
+	memset (sq->items_inuse, 0, sq->item_count * sizeof (unsigned int));
 }
 
-static inline void sq_assert (struct sq *sq, int pos)
+static inline void sq_assert (struct sq *sq, unsigned int pos)
 {
-	int i;
+	unsigned int i;
 
 //	printf ("Instrument[%d] Asserting from %d to %d\n",
 //		pos, sq->pos_max, sq->size);
@@ -103,7 +144,7 @@ static inline void sq_copy (struct sq *sq_dest, struct sq *sq_src)
 	memcpy (sq_dest->items, sq_src->items,
 		sq_src->item_count * sq_src->size_per_item);
 	memcpy (sq_dest->items_inuse, sq_src->items_inuse,
-		sq_src->item_count * sizeof (char));
+		sq_src->item_count * sizeof (unsigned int));
 }
 
 static inline void sq_free (struct sq *sq) {
@@ -114,35 +155,35 @@ static inline void sq_free (struct sq *sq) {
 static inline void *sq_item_add (
 	struct sq *sq,
 	void *item,
-	int seqid)
+	unsigned int seqid)
 {
 	char *sq_item;
-	int sq_position;
+	unsigned int sq_position;
 
-	if (seqid - sq->head_seqid >= sq->size) {
-		return(0);
-	}
 	sq_position = (sq->head + seqid - sq->head_seqid) % sq->size;
 	if (sq_position > sq->pos_max) {
 		sq->pos_max = sq_position;
 	}
 	assert (sq_position >= 0);
 
-//printf ("item add position %d seqid %d head seqid %d\n", sq_position, seqid, sq->head_seqid);
 	sq_item = sq->items;
 	sq_item += sq_position * sq->size_per_item;
 	assert(sq->items_inuse[sq_position] == 0);
 	memcpy (sq_item, item, sq->size_per_item);
-	sq->items_inuse[sq_position] = 1;
+	if (seqid == 0) {
+		sq->items_inuse[sq_position] = 1;
+	} else {
+		sq->items_inuse[sq_position] = seqid;
+	}
 
 	return (sq_item);
 }
 
-static inline int sq_item_inuse (
+static inline unsigned int sq_item_inuse (
 	struct sq *sq,
-	int seq_id) {
+	unsigned int seq_id) {
 
-	int sq_position;
+	unsigned int sq_position;
 
 	/*
 	 * We need to say that the seqid is in use if it shouldn't 
@@ -157,34 +198,68 @@ static inline int sq_item_inuse (
 	}
 #endif
 	sq_position = (sq->head - sq->head_seqid + seq_id) % sq->size;
-//printf ("in use %d\n", sq_position);
-	return (sq->items_inuse[sq_position]);
+	return (sq->items_inuse[sq_position] != 0);
 }
 
-static inline int sq_size_get (
+static inline unsigned int sq_size_get (
 	struct sq *sq)
 {
 	return sq->size;
 }
 
-static inline int sq_item_get (
+static inline unsigned int sq_in_range (
+	struct sq *sq,
+	unsigned int seq_id)
+{
+	int res = 1;
+
+	if (sq->head_seqid > ADJUST_ROLLOVER_POINT) {
+		if (seq_id - ADJUST_ROLLOVER_VALUE <
+			sq->head_seqid - ADJUST_ROLLOVER_VALUE) {
+
+			res = 0;
+		}
+		if ((seq_id - ADJUST_ROLLOVER_VALUE) >=
+			((sq->head_seqid - ADJUST_ROLLOVER_VALUE) + sq->size)) {
+
+			res = 0;
+		}
+	} else {
+		if (seq_id < sq->head_seqid) {
+			res = 0;
+		}
+		if ((seq_id) >= ((sq->head_seqid) + sq->size)) {
+			res = 0;
+		}
+	}
+	return (res);
+
+}
+
+static inline unsigned int sq_item_get (
 	struct sq *sq,
-	int seq_id,
+	unsigned int seq_id,
 	void **sq_item_out)
 {
 	char *sq_item;
-	int sq_position;
+	unsigned int sq_position;
 
-if (seq_id == -1) {
-	return (ENOENT);
-}
-	assert (seq_id < (sq->head_seqid + sq->size));
-	sq_position = (sq->head - sq->head_seqid + seq_id) % sq->size;
+	if (seq_id > ADJUST_ROLLOVER_POINT) {
+		assert ((seq_id - ADJUST_ROLLOVER_POINT) <
+			((sq->head_seqid - ADJUST_ROLLOVER_POINT) + sq->size));
+
+		sq_position = ((sq->head - ADJUST_ROLLOVER_VALUE) -
+			(sq->head_seqid - ADJUST_ROLLOVER_VALUE) + seq_id) % sq->size;
+	} else {
+		assert (seq_id < (sq->head_seqid + sq->size));
+		sq_position = (sq->head - sq->head_seqid + seq_id) % sq->size;
+	}
+//printf ("seqid %x head %x head %x pos %x\n", seq_id, sq->head, sq->head_seqid, sq_position);
+//	sq_position = (sq->head - sq->head_seqid + seq_id) % sq->size;
+//printf ("sq_position = %x\n", sq_position);
 //printf ("ITEMGET %d %d %d %d\n", sq_position, sq->head, sq->head_seqid, seq_id);
 assert (sq_position >= 0);
-//printf ("itme get in use %d\n", sq_position);
 	if (sq->items_inuse[sq_position] == 0) {
-//printf ("not in use %d\n", sq_position);
 		return (ENOENT);
 	}
 	sq_item = sq->items;
@@ -193,13 +268,9 @@ assert (sq_position >= 0);
 	return (0);
 }
 
-static inline void sq_items_release (struct sq *sq, int seqid)
+static inline void sq_items_release (struct sq *sq, unsigned int seqid)
 {
-	int oldhead;
-
-	if (seqid < sq->head_seqid) {
-		return;
-	}
+	unsigned int oldhead;
 
 	oldhead = sq->head;
 
@@ -208,11 +279,11 @@ static inline void sq_items_release (struct sq *sq, int seqid)
 //		printf ("releasing %d for %d\n", oldhead, sq->size - oldhead);
 //		printf ("releasing %d for %d\n", 0, sq->head);
 		memset (&sq->items_inuse[oldhead], 0, sq->size - oldhead);
-		memset (sq->items_inuse, 0, sq->head * sizeof (char));
+		memset (sq->items_inuse, 0, sq->head * sizeof (unsigned int));
 	} else {
 //		printf ("releasing %d for %d\n", oldhead, seqid - sq->head_seqid + 1);
 		memset (&sq->items_inuse[oldhead], 0,
-			(seqid - sq->head_seqid + 1) * sizeof (char));
+			(seqid - sq->head_seqid + 1) * sizeof (unsigned int));
 	}
 	sq->head_seqid = seqid + 1;
 }