Просмотр исходного кода

BUG 37. Fix several leaks during configuration changes. Also fixes
a possible assert with many large messages being sent from multiple
processors at the same time.

(Logical change 1.68)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@247 fd59a12c-fef9-0310-b244-a6a79926bd2f

Steven Dake 21 лет назад
Родитель
Сommit
75268b5e32
1 измененных файлов с 128 добавлено и 38 удалено
  1. 128 38
      exec/gmi.c

+ 128 - 38
exec/gmi.c

@@ -92,7 +92,7 @@
 #define QUEUE_ASSEMBLY_SIZE_MAX		((MESSAGE_SIZE_MAX / 1472) + 1)
 #define QUEUE_RTR_ITEMS_SIZE_MAX	256
 #define QUEUE_PEND_TRANS_SIZE_MAX	((MESSAGE_SIZE_MAX / 1472) + 1)
-#define MAXIOVS						8
+#define MAXIOVS						4
 #define RTR_TOKEN_SIZE_MAX			32
 #define MISSING_MCAST_WINDOW		64
 #define TIMEOUT_STATE_GATHER		100
@@ -142,12 +142,17 @@ enum message_type {
  */
 struct queue queues_pend_trans[PRIORITY_MAX];
 
+struct reftwo {
+	int refcount;
+};
+
 /*
  * In-order pending delivery queue
  */
 struct assembly_queue_item {
 	struct iovec iovec[MAXIOVS];
 	int iov_len;
+	struct reftwo *reftwo;
 };
 
 struct assembly_queue {
@@ -156,10 +161,18 @@ struct assembly_queue {
 	struct queue queue;
 };
 
+struct pend_queue_msg_item {
+	struct reftwo *reftwo;
+	char *iov_base[MAXIOVS];
+	int iovec_entries;
+};
+
 struct pend_queue_item {
 	int seqid;
-	struct iovec iovec[256];
+	struct iovec iovec[QUEUE_PEND_SIZE_MAX * MAXIOVS];
 	int iov_len;
+	struct pend_queue_msg_item pend_queue_msg_items[QUEUE_PEND_SIZE_MAX];
+	int pend_queue_msg_item_count;
 };
 
 struct queue_frag {
@@ -199,10 +212,6 @@ int gmi_arut = 0;
 /*
  * Delivered up to and including
  */
-int gmi_adut = 0;
-
-int gmi_adut_old = 0;
-
 int gmi_original_arut = 0;
 
 int gmi_highest_seq = 0;
@@ -279,6 +288,15 @@ struct mcast {
 	struct gmi_groupname groupname;
 };
 
+/*
+ * MTU - multicast message header - IP header - UDP header
+ *
+ * On lossy switches, making use of the DF UDP flag can lead to loss of
+ * forward progress.  So the packets must be fragmented by the algorithm
+ * and reassembled at the receiver.
+ */
+#define FRAGMENT_SIZE (PACKET_SIZE_MAX - sizeof (struct mcast) - 20 - 8)
+
 struct rtr_item  {
 	struct memb_conf_id conf_id;
 	int seqid;
@@ -329,14 +347,15 @@ struct memb_join {
 
 struct gmi_pend_trans_item {
 	struct mcast *mcast;
-
 	struct iovec iovec[MAXIOVS];
 	int iov_len;
+	struct reftwo *reftwo;
 };
 
 struct gmi_rtr_item {
 	struct iovec iovec[MAXIOVS+2]; /* +2 is for mcast msg + group name  TODO is this right */
 	int iov_len;
+	struct reftwo *reftwo;
 };
 
 enum memb_state {
@@ -611,6 +630,12 @@ static int gmi_pend_trans_item_store (
 		goto error_mcast;
 	}
 
+	gmi_pend_trans_item.reftwo = malloc (sizeof (struct reftwo));
+	if (gmi_pend_trans_item.reftwo == 0) {
+		goto error_reftwo;
+	}
+	gmi_pend_trans_item.reftwo->refcount = 2;
+
 	/*
 	 * Set mcast header
 	 */
@@ -650,10 +675,46 @@ error_iovec:
 		free (gmi_pend_trans_item.iovec[j].iov_base);
 	}
 	return (-1);
+error_reftwo:
+	free (gmi_pend_trans_item.mcast);
 error_mcast:
 	return (0);
 }
 
+static void release_reftwo_char (struct reftwo *reftwo,
+	char **iovec,
+	int iovec_entries)
+{
+	int i;
+
+	assert (reftwo > 0);
+	if ((--reftwo->refcount) == 0) {
+		for (i = 0; i < iovec_entries; i++) {
+			free (iovec[i]);
+			iovec[i] = (char *)0xdeadbeef;
+		}
+		reftwo->refcount = -1;
+		free (reftwo);
+	}
+}
+
+static void release_reftwo_iovec (struct reftwo *reftwo,
+	struct iovec *iovec,
+	int iovec_entries)
+{
+	int i;
+
+	assert (reftwo > 0);
+	if ((--reftwo->refcount) == 0) {
+		for (i = 0; i < iovec_entries; i++) {
+			free (iovec[i].iov_base);
+			iovec[i].iov_base = (char *)0xdeadbeef;
+		}
+		reftwo->refcount = -1;
+		free (reftwo);
+	}
+}
+
 static void encrypt_and_sign (struct iovec *iovec, int iov_len)
 {
 	char *addr = iov_encrypted.iov_base + sizeof (struct security_header);
@@ -822,15 +883,6 @@ print_digest ("calculated digest", digest_comparison);
 	return (0);
 }
 
-/*
- * MTU - multicast message header - IP header - UDP header
- *
- * On lossy switches, making use of the DF UDP flag can lead to loss of
- * forward progress.  So the packets must be fragmented by the algorithm
- * and reassembled at the receiver.
- */
-#define FRAGMENT_SIZE (PACKET_SIZE_MAX - sizeof (struct mcast) - 20 - 8)
-
 static void timer_function_single_member (void *data);
 
 /*
@@ -872,9 +924,9 @@ static void timer_function_single_member (void *data)
 }
 
 int gmi_mcast (
-    struct gmi_groupname *groupname,
-    struct iovec *iovec,
-    int iov_len,
+	struct gmi_groupname *groupname,
+	struct iovec *iovec,
+	int iov_len,
 	int priority)
 {
 	int res;
@@ -1214,7 +1266,7 @@ int gmi_brake;
 static int messages_free (int group_arut)
 {
 	struct gmi_rtr_item *gmi_rtr_item_p;
-	int i, j;
+	int i;
 	int res;
 	int lesser;
 
@@ -1227,13 +1279,7 @@ static int messages_free (int group_arut)
 		gmi_brake = last_group_arut;
 	}
 	
-	/*
-	 * Determine low water mark for messages to be freed
-	 */
 	lesser = gmi_brake;
-	if (lesser > gmi_adut) {
-		lesser = gmi_adut;
-	}
 
 //printf ("Freeing lesser %d %d %d\n", lesser, group_arut, last_group_arut);
 //printf ("lesser %d gropu arut %d last group arut %d\n", lesser, group_arut, last_group_arut);
@@ -1253,16 +1299,14 @@ static int messages_free (int group_arut)
 	for (i = last_released; i <= lesser; i++) {
 		res = sq_item_get (&queue_rtr_items, i, (void **)&gmi_rtr_item_p);
 		if (res == 0) {
-			for (j = 0; j < gmi_rtr_item_p->iov_len; j++) {
-				free (gmi_rtr_item_p->iovec[j].iov_base);
-				gmi_rtr_item_p->iovec[j].iov_base = (void *)0xdeadbeef;
-				gmi_rtr_item_p->iovec[j].iov_len = i;
-			}
+			release_reftwo_iovec (gmi_rtr_item_p->reftwo,
+				gmi_rtr_item_p->iovec,
+				gmi_rtr_item_p->iov_len);
 		}
+		sq_items_release (&queue_rtr_items, i);
 		last_released = i + 1;
 	}
 
-	sq_items_release (&queue_rtr_items, lesser);
 	gmi_log_printf (gmi_log_level_debug, "releasing messages up to and including %d\n", lesser);
 	return (0);
 }
@@ -1340,6 +1384,7 @@ static int orf_token_mcast (
 		 * Build IO vector
 		 */
 		memset (&gmi_rtr_item, 0, sizeof (struct gmi_rtr_item));
+		gmi_rtr_item.reftwo = gmi_pend_trans_item->reftwo;
 		gmi_rtr_item.iovec[0].iov_base = gmi_pend_trans_item->mcast;
 		gmi_rtr_item.iovec[0].iov_len = sizeof (struct mcast);
 
@@ -1558,6 +1603,23 @@ static void queues_queue_frag_memb_new (void)
 
 	memset (queues_frag_new, 0, sizeof (struct queue_frag) * MAX_MEMBERS);
 
+	/*
+	 * Free queues that are no longer part of the configuration
+	 */
+	for (i = 0; i < MAX_MEMBERS; i++) {
+		found = 0;
+		for (j = 0; j < memb_list_entries_confchg; j++) {
+			if (memb_list[j].sin_addr.s_addr == queues_frag[i].source_addr.s_addr) {
+				found = 1;
+				break;
+			}
+		}
+		if (found == 0) {
+			queue_free (&queues_frag[i].assembly.queue);
+			queue_free (&queues_frag[i].pend_queue);
+		}
+	}
+
 	/*
 	 * Build new pending list
 	 */
@@ -1662,19 +1724,18 @@ printf ("EVS STATE group arut %d gmi arut %d highest %d barrier %d starting grou
 // TODO
 	if (memb_state == MEMB_STATE_EVS && gmi_arut == gmi_barrier_seq && orf_token->group_arut == gmi_barrier_seq) {
 		gmi_log_printf (gmi_log_level_notice, "EVS recovery of messages complete, transitioning to operational.\n");
+		messages_free (gmi_barrier_seq - 1);
 		/*
 		 * EVS recovery complete, reset local variables
 		 */
 		gmi_arut = 0;
 
-		gmi_adut_old = gmi_adut;
-		gmi_adut = 0;
-
 //		gmi_token_seqid = 0;
 
 		gmi_highest_seq_old = gmi_highest_seq;
 		gmi_highest_seq = 0;
 		last_group_arut = 0;
+		last_released = 0;
 		sq_reinit (&queue_rtr_items, 0);
 
 		memb_failed_list_entries = 0;
@@ -2820,6 +2881,7 @@ printf ("Queue empty[%d] %d queues seqid %d lowest so far %d\n", i,
 static void app_deliver (void) {
 	struct queue_frag *queue_frag;
 	struct pend_queue_item *pend_queue_item;
+	int i;
 
 	do {
 		queue_frag = queue_frag_delivery_find ();
@@ -2849,9 +2911,14 @@ assert (queue_frag);
 			pend_queue_item->iov_len);
 
 		/*
-		 * Release messages that can be freed
-		 */
-		gmi_adut = queue_frag->seqid;
+		 * Reduce ref count on these delivered messages and free them if their
+		 * reference count is zero
+	 	 */
+		for (i = 0; i < pend_queue_item->pend_queue_msg_item_count; i++) {
+			release_reftwo_char (pend_queue_item->pend_queue_msg_items[i].reftwo,
+				pend_queue_item->pend_queue_msg_items[i].iov_base,
+				pend_queue_item->pend_queue_msg_items[i].iovec_entries);
+		}
 
 		/*
 		 * Reset lowest seqid for this pending queue from next assembled message
@@ -2876,6 +2943,8 @@ static void assembly_deliver (struct queue_frag *queue_frag)
 	struct iovec iovec_delv[256];
 	int iov_len_delv = 0;
 	struct mcast *mcast = 0;
+	int pend_queue_msg_item_count;
+	int i;
 
 	memset (iovec_delv, 0, sizeof (iovec_delv));
 
@@ -2887,9 +2956,21 @@ static void assembly_deliver (struct queue_frag *queue_frag)
 	/*
 	 * Assemble all of the message iovectors into one iovector for delivery
 	 */
+	pend_queue_msg_item_count = 0;
 	do {
 		assembly_queue_item = queue_item_iterator_get (&queue_frag->assembly.queue);
 
+		/*
+		 * Assemble the refcounting structure to free the messages if appropriate
+		 */
+		for (i = 0; i < assembly_queue_item->iov_len; i++) {
+			pend_queue_item.pend_queue_msg_items[pend_queue_msg_item_count].iov_base[i] = 
+				assembly_queue_item->iovec[i].iov_base;
+		}
+		pend_queue_item.pend_queue_msg_items[pend_queue_msg_item_count].iovec_entries = i;
+		pend_queue_item.pend_queue_msg_items[pend_queue_msg_item_count].reftwo = assembly_queue_item->reftwo;
+		pend_queue_msg_item_count++;
+		
 		/*
 		 * Assemble io vector
 		 */
@@ -2924,6 +3005,7 @@ static void assembly_deliver (struct queue_frag *queue_frag)
 
 		res = queue_item_iterator_next (&queue_frag->assembly.queue);
 	} while (res == 0);
+	pend_queue_item.pend_queue_msg_item_count = pend_queue_msg_item_count;
 
 	/*
 	 * assert that this really is the end of the packet
@@ -3002,6 +3084,7 @@ static void pending_queues_deliver (void)
 		/*
 		 * Create pending delivery item
 		 */
+		assembly_queue_item.reftwo = gmi_rtr_item_p->reftwo;
 		assembly_queue_item.iov_len = gmi_rtr_item_p->iov_len;
 		memcpy (&assembly_queue_item.iovec, gmi_rtr_item_p->iovec,
 			sizeof (struct iovec) * gmi_rtr_item_p->iov_len);
@@ -3080,6 +3163,13 @@ static int message_handler_mcast (
 		if (gmi_rtr_item.iovec[0].iov_base == 0) {
 			return (-1); /* error here is corrected by the algorithm */
 		}
+		gmi_rtr_item.reftwo = malloc (sizeof (struct reftwo));
+		if (gmi_rtr_item.reftwo == 0) {
+			free (gmi_rtr_item.iovec[0].iov_base);
+			return (-1);
+		}
+		gmi_rtr_item.reftwo->refcount = 2;
+
 		memcpy (gmi_rtr_item.iovec[0].iov_base, mcast, bytes_received);
 		gmi_rtr_item.iovec[0].iov_len = bytes_received;
 		assert (gmi_rtr_item.iovec[0].iov_len > 0);