Просмотр исходного кода

Committed a bunch of changes for testing scalability - reverting patch

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1177 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake 19 лет назад
Родитель
Сommit
0add7e0611
8 измененных файлов с 48 добавлено и 86 удалено
  1. 2 2
      Makefile.inc
  2. 6 12
      exec/ipc.c
  3. 1 1
      exec/main.c
  4. 4 11
      exec/totemconfig.c
  5. 1 2
      exec/totemip.c
  6. 0 1
      exec/totemnet.c
  7. 32 53
      exec/totemsrp.c
  8. 2 4
      test/testckpt.c

+ 2 - 2
Makefile.inc

@@ -29,7 +29,7 @@ endif
 # OPENAIS_BUILD can be defined as RELEASE or DEBUG
 #
 ifndef OPENAIS_BUILD
-	OPENAIS_BUILD=DEBUG
+	OPENAIS_BUILD=RELEASE
 endif
 
 # OPENAIS_PROFILE
@@ -46,7 +46,7 @@ DYFLAGS =
 # build CFLAGS, LDFLAGS
 #
 ifeq (${OPENAIS_BUILD}, RELEASE) 
-	CFLAGS += -O3 -Wall -DDEBUG
+	CFLAGS += -O3 -Wall
 # -Wstrict-aliasing=2 TODO sameday fix all of these
 ifndef OPENAIS_PROFILE
 	CFLAGS += -fomit-frame-pointer

+ 6 - 12
exec/ipc.c

@@ -507,19 +507,20 @@ retry_poll:
 	return (0);
 }
 
-#if defined(OPENAIS_LINUX)
+#if defined(OPENAIS_LINUX) || defined(OPENAIS_SOLARIS)
 /* SUN_LEN is broken for abstract namespace
  */
 #define AIS_SUN_LEN(a) sizeof(*(a))
-
-char socketname[20];
 #else
 #define AIS_SUN_LEN(a) SUN_LEN(a)
-
+#endif
+ 
+#if defined(OPENAIS_LINUX)
+char *socketname = "libais.socket";
+#else
 char *socketname = "/var/run/libais.socket";
 #endif
 
-
 static int conn_info_outq_flush (struct conn_info *conn_info) {
 	struct queue *outq;
 	int res = 0;
@@ -872,16 +873,9 @@ void openais_ipc_init (
 	int libais_server_fd;
 	struct sockaddr_un un_addr;
 	int res;
-	char *socket_number;
 
 	log_init ("IPC");
 
-	socket_number = getenv ("INTERFACE_NUMBER");
-	if (socket_number) {
-		sprintf (socketname, "libais.socket%s", socket_number);
-	} else {
-		strcpy (socketname, "libais.socket");
-	}
 	ipc_serialize_lock_fn = serialize_lock_fn;
 
 	ipc_serialize_unlock_fn = serialize_unlock_fn;

+ 1 - 1
exec/main.c

@@ -495,7 +495,7 @@ int main (int argc, char **argv)
 	 */
 	aisexec_setscheduler ();
 
-//	aisexec_mlockall ();
+	aisexec_mlockall ();
 
 	totem_config.totem_logging_configuration = totem_logging_configuration;
 	totem_log_service = _log_init ("TOTEM");

+ 4 - 11
exec/totemconfig.c

@@ -58,12 +58,12 @@
 #endif
 
 #define TOKEN_RETRANSMITS_BEFORE_LOSS_CONST	4
-#define TOKEN_TIMEOUT				10000
+#define TOKEN_TIMEOUT				1000
 #define TOKEN_RETRANSMIT_TIMEOUT		(int)(TOKEN_TIMEOUT / (TOKEN_RETRANSMITS_BEFORE_LOSS_CONST + 0.2))
 #define TOKEN_HOLD_TIMEOUT			(int)(TOKEN_RETRANSMIT_TIMEOUT * 0.8 - (1000/(int)HZ))
-#define JOIN_TIMEOUT				300
-#define CONSENSUS_TIMEOUT			30000
-#define MERGE_TIMEOUT				2000
+#define JOIN_TIMEOUT				100
+#define CONSENSUS_TIMEOUT			200
+#define MERGE_TIMEOUT				200
 #define DOWNCHECK_TIMEOUT			1000000
 #define FAIL_TO_RECV_CONST			50
 #define	SEQNO_UNCHANGED_CONST			30
@@ -125,7 +125,6 @@ extern int totem_config_read (
 	unsigned int object_interface_handle;
 	char *str;
 	unsigned int ringnumber = 0;
-	char *ring0;
 
 	memset (totem_config, 0, sizeof (struct totem_config));
 	totem_config->interfaces = malloc (sizeof (struct totem_interface) * INTERFACE_MAX);
@@ -239,11 +238,6 @@ extern int totem_config_read (
 			totem_config->interfaces[ringnumber].ip_port = atoi (str);
 		}
 
-		ring0 = getenv ("INTERFACE_RING0");
-		if (ring0) {
-			res = totemip_parse (&totem_config->interfaces[ringnumber].bindnet, ring0,
-					     totem_config->interfaces[ringnumber].mcast_addr.family);
-		} else {
 		/*
 		 * Get the bind net address
 		 */
@@ -252,7 +246,6 @@ extern int totem_config_read (
 			res = totemip_parse (&totem_config->interfaces[ringnumber].bindnet, str,
 					     totem_config->interfaces[ringnumber].mcast_addr.family);
 		}
-		}
 		totem_config->interface_count++;
 	}
 

+ 1 - 2
exec/totemip.c

@@ -532,9 +532,8 @@ int totemip_iface_check(struct totem_ip_address *bindnet,
 				parse_rtattr(tb, IFA_MAX, IFA_RTA(ifa), len);
 
 				memcpy(ipaddr.addr, RTA_DATA(tb[IFA_ADDRESS]), TOTEMIP_ADDRLEN);
-				if (totemip_equal(&ipaddr, bindnet)) {
+				if (totemip_equal(&ipaddr, bindnet))
 					found_if = 1;
-				}
 
 				/* If the address we have is an IPv4 network address, then
 				   substitute the actual IP address of this interface */

+ 0 - 1
exec/totemnet.c

@@ -663,7 +663,6 @@ static int net_deliver_fn (
 #endif
 
 	bytes_received = recvmsg (fd, &msg_recv, MSG_NOSIGNAL | MSG_DONTWAIT);
-//log_printf (instance->totemnet_log_level_notice, "bytes received %d\n", bytes_received);
 	if (bytes_received == -1) {
 		return (0);
 	} else {

+ 32 - 53
exec/totemsrp.c

@@ -89,7 +89,6 @@
 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX		500 /* allow 500 messages to be queued */
 #define MAXIOVS					5	
 #define RETRANSMIT_ENTRIES_MAX			30
-#define TOKEN_SIZE_MAX				64000 /* bytes */
 
 /*
  * Rollover handling:
@@ -544,7 +543,7 @@ static int message_handler_token_hold_cancel (
 static void memb_ring_id_create_or_load (struct totemsrp_instance *, struct memb_ring_id *);
 
 static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
-static void memb_state_gather_enter (struct totemsrp_instance *instance, unsigned int gather_from);
+static void memb_state_gather_enter (struct totemsrp_instance *instance);
 static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
 static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
 	int fcc_mcasts_allowed);
@@ -617,7 +616,7 @@ void totemsrp_instance_initialize (struct totemsrp_instance *instance)
 
 	instance->my_commit_token_seq = SEQNO_START_TOKEN - 1;
 
-	instance->orf_token_retransmit = malloc (TOKEN_SIZE_MAX);
+	instance->orf_token_retransmit = malloc (15000);
 
 	instance->memb_state = MEMB_STATE_OPERATIONAL;
 
@@ -1337,7 +1336,7 @@ static void memb_state_consensus_timeout_expired (
 
 		memb_set_merge (no_consensus_list, no_consensus_list_entries,
 			instance->my_failed_list, &instance->my_failed_list_entries);
-		memb_state_gather_enter (instance, 0);
+		memb_state_gather_enter (instance);
 	}
 }
 
@@ -1352,32 +1351,26 @@ static void timer_function_orf_token_timeout (void *data)
 {
 	struct totemsrp_instance *instance = (struct totemsrp_instance *)data;
 
+	log_printf (instance->totemsrp_log_level_notice,
+		"The token was lost in state %d from timer %p\n", instance->memb_state, data);
 	switch (instance->memb_state) {
 		case MEMB_STATE_OPERATIONAL:
-			log_printf (instance->totemsrp_log_level_notice,
-				"The token was lost in the OPERATIONAL state.\n");
 			totemrrp_iface_check (instance->totemrrp_handle);
-			memb_state_gather_enter (instance, 1);
+			memb_state_gather_enter (instance);
 			break;
 
 		case MEMB_STATE_GATHER:
-			log_printf (instance->totemsrp_log_level_notice,
-				"The consensus timeout expired.\n");
 			memb_state_consensus_timeout_expired (instance);
-			memb_state_gather_enter (instance, 2);
+			memb_state_gather_enter (instance);
 			break;
 
 		case MEMB_STATE_COMMIT:
-			log_printf (instance->totemsrp_log_level_notice,
-				"The token was lost in the COMMIT state.\n");
-			memb_state_gather_enter (instance, 3);
+			memb_state_gather_enter (instance);
 			break;
 		
 		case MEMB_STATE_RECOVERY:
-			log_printf (instance->totemsrp_log_level_notice,
-				"The token was lost in the RECOVERY state.\n");
 			ring_state_restore (instance);
-			memb_state_gather_enter (instance, 4);
+			memb_state_gather_enter (instance);
 			break;
 	}
 }
@@ -1614,9 +1607,7 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance)
 	return;
 }
 
-static void memb_state_gather_enter (
-	struct totemsrp_instance *instance,
-	unsigned int gather_from)
+static void memb_state_gather_enter (struct totemsrp_instance *instance)
 {
 	instance->my_commit_token_seq = SEQNO_START_TOKEN - 1;
 
@@ -1663,7 +1654,7 @@ static void memb_state_gather_enter (
 	memb_consensus_set (instance, &instance->my_id);
 
 	log_printf (instance->totemsrp_log_level_notice,
-		"entering GATHER state %d.\n", gather_from);
+		"entering GATHER state.\n");
 
 	instance->memb_state = MEMB_STATE_GATHER;
 
@@ -2404,6 +2395,7 @@ static void timer_function_token_retransmit_timeout (void *data)
 	case MEMB_STATE_GATHER:
 		break;
 	case MEMB_STATE_COMMIT:
+		break;
 	case MEMB_STATE_OPERATIONAL:
 	case MEMB_STATE_RECOVERY:
 		token_retransmit (instance);
@@ -2602,11 +2594,6 @@ static int memb_state_commit_token_send (struct totemsrp_instance *instance,
 	iovec.iov_len = sizeof (struct memb_commit_token) +
 		((sizeof (struct srp_addr) +
 			sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
-	/*
-	 * Make a copy for retransmission if necessary
-	 */
-	memcpy (instance->orf_token_retransmit, commit_token, iovec.iov_len);
-	instance->orf_token_retransmit_size = iovec.iov_len;
 
 	for (i = 0; i < instance->totem_config->interface_count; i++) {
 		totemrrp_token_target_set (
@@ -2619,10 +2606,6 @@ static int memb_state_commit_token_send (struct totemsrp_instance *instance,
 		&iovec,
 		1);
 
-	/*
-	 * Request retransmission of the commit token in case it is lost
-	 */
-	reset_token_retransmit_timeout (instance);
 	return (0);
 }
 
@@ -3019,7 +3002,6 @@ static void fcc_token_update (
  * Message Handlers
  */
 
-struct timeval tv_old;
 /*
  * message handler called when TOKEN message type received
  */
@@ -3047,10 +3029,9 @@ static int message_handler_orf_token (
 	timersub (&tv_current, &tv_old, &tv_diff);
 	memcpy (&tv_old, &tv_current, sizeof (struct timeval));
 
-	log_printf (instance->totemsrp_log_level_notice,
-	"Time since last token %0.4f ms\n",
-		(((float)tv_diff.tv_sec) * 1000) + ((float)tv_diff.tv_usec)
-			/ 1000.0);
+	if ((((float)tv_diff.tv_usec) / 100.0) > 5.0) {
+		printf ("OTHERS %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0);
+	}
 #endif
 
 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
@@ -3221,7 +3202,7 @@ static int message_handler_orf_token (
 			ring_state_restore (instance);
 
 printf ("gather 1");
-			memb_state_gather_enter (instance, 5);
+			memb_state_gather_enter (instance);
 		} else {
 			instance->my_token_seq = token->token_seq;
 			token->token_seq += 1;
@@ -3294,9 +3275,9 @@ printf ("gather 1");
 			gettimeofday (&tv_current, NULL);
 			timersub (&tv_current, &tv_old, &tv_diff);
 			memcpy (&tv_old, &tv_current, sizeof (struct timeval));
-			log_printf (instance->totemsrp_log_level_notice,
-				"I held %0.4f ms\n",
-				((float)tv_diff.tv_usec) / 1000.0);
+			if ((((float)tv_diff.tv_usec) / 100.0) > 5.0) {
+				printf ("I held %0.4f ms\n", ((float)tv_diff.tv_usec) / 100.0);
+			}
 #endif
 			if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
 				messages_deliver_to_app (instance, 0,
@@ -3398,8 +3379,7 @@ static void messages_deliver_to_app (
 				1,
 				instance->my_deliver_memb_list,
 				instance->my_deliver_memb_entries) == 0) {
-
-				instance->my_high_delivered = my_high_delivered_stored + i;
+		instance->my_high_delivered = my_high_delivered_stored + i;
 
 			continue;
 		}
@@ -3502,7 +3482,7 @@ static int message_handler_mcast (
 				&mcast_header.system_from, 1,
 				instance->my_proc_list, &instance->my_proc_list_entries);
 printf ("gather 2");
-			memb_state_gather_enter (instance, 6);
+			memb_state_gather_enter (instance);
 			break;
 
 		case MEMB_STATE_GATHER:
@@ -3514,7 +3494,7 @@ printf ("gather 2");
 
 				memb_set_merge (&mcast_header.system_from, 1,
 					instance->my_proc_list, &instance->my_proc_list_entries);
-				memb_state_gather_enter (instance, 7);
+				memb_state_gather_enter (instance);
 				return (0);
 			}
 			break;
@@ -3603,7 +3583,8 @@ static int message_handler_memb_merge_detect (
 	case MEMB_STATE_OPERATIONAL:
 		memb_set_merge (&memb_merge_detect->system_from, 1,
 			instance->my_proc_list, &instance->my_proc_list_entries);
-		memb_state_gather_enter (instance, 8);
+printf ("gather 3");
+		memb_state_gather_enter (instance);
 		break;
 
 	case MEMB_STATE_GATHER:
@@ -3615,7 +3596,8 @@ static int message_handler_memb_merge_detect (
 
 			memb_set_merge (&memb_merge_detect->system_from, 1,
 				instance->my_proc_list, &instance->my_proc_list_entries);
-			memb_state_gather_enter (instance, 9);
+printf ("gather 4");
+			memb_state_gather_enter (instance);
 			return (0);
 		}
 		break;
@@ -3635,7 +3617,7 @@ static int memb_join_process (
 	struct totemsrp_instance *instance,
 	struct memb_join *memb_join)
 {
-	unsigned char *commit_token_storage[TOKEN_SIZE_MAX];
+	unsigned char *commit_token_storage[32000];
 	struct memb_commit_token *my_commit_token =
 		(struct memb_commit_token *)commit_token_storage;
 	struct srp_addr *proc_list;
@@ -3699,7 +3681,7 @@ static int memb_join_process (
 				memb_join->failed_list_entries,
 				instance->my_failed_list, &instance->my_failed_list_entries);
 		}
-		memb_state_gather_enter (instance, 10);
+		memb_state_gather_enter (instance);
 		return (1); /* gather entered */
 	}
 	return (0); /* gather not entered */
@@ -3843,7 +3825,7 @@ static int message_handler_memb_join (
 			gather_entered = memb_join_process (instance,
 				memb_join);
 			if (gather_entered == 0) {
-				memb_state_gather_enter (instance, 11);
+				memb_state_gather_enter (instance);
 			}
 			break;
 
@@ -3860,7 +3842,7 @@ static int message_handler_memb_join (
 				memb_join->ring_seq >= instance->my_ring_id.seq) {
 
 				memb_join_process (instance, memb_join);
-				memb_state_gather_enter (instance, 12);
+				memb_state_gather_enter (instance);
 			}
 			break;
 
@@ -3875,7 +3857,7 @@ static int message_handler_memb_join (
 				ring_state_restore (instance);
 
 				memb_join_process (instance, memb_join);
-				memb_state_gather_enter (instance, 13);
+				memb_state_gather_enter (instance);
 			}
 			break;
 	}
@@ -3896,9 +3878,6 @@ static int message_handler_memb_commit_token (
 	struct srp_addr *addr;
 	struct memb_commit_token_memb_entry *memb_list;
 
-	log_printf (instance->totemsrp_log_level_debug,
-		"got commit token\n");
-
 	if (endian_conversion_needed) {
 		memb_commit_token = memb_commit_token_convert;
 		memb_commit_token_endian_convert (msg, memb_commit_token);
@@ -4029,7 +4008,7 @@ void main_iface_change_fn (
 
 	}
 	if (instance->iface_changes >= instance->totem_config->interface_count) {
-		memb_state_gather_enter (instance, 14);
+		memb_state_gather_enter (instance);
 	}
 }
 

+ 2 - 4
test/testckpt.c

@@ -220,16 +220,14 @@ int main (void) {
 	int i;
 	
 	error = saCkptInitialize (&ckptHandle, &callbacks, &version);
-	printf ("%s: checkpoint initialize\n",
-		get_test_output (error, SA_AIS_OK));
 
 	error = saCkptCheckpointOpenAsync (ckptHandle,
 		open_invocation,
 		&checkpointName,
 		&checkpointCreationAttributes,
 		SA_CKPT_CHECKPOINT_CREATE|SA_CKPT_CHECKPOINT_READ|SA_CKPT_CHECKPOINT_WRITE);
-	printf ("%s: initial asynchronous open of checkpoint\n",
-		get_test_output (error, SA_AIS_OK));
+    printf ("%s: initial asynchronous open of checkpoint\n",
+        get_test_output (error, SA_AIS_OK));
 
 	error = saCkptSelectionObjectGet (ckptHandle, &sel_fd);