Kaynağa Gözat

Add waiting_trans_ack also to fragmentation layer

Patch for support waiting_trans_ack may fail if there is synchronization
happening between delivery of fragmented message. In such situation,
fragmentation layer is waiting for message with correct number, but it
will never arrive.

Solution is to handle (callback) change of waiting_trans_ack and use
different queue.

Signed-off-by: Jan Friesse <jfriesse@redhat.com>
Reviewed-by: Fabio M. Di Nitto <fdinitto@redhat.com>
(cherry picked from commit 92e0f9c7bb9b4b6a0da8d64bdf3b2e47ae55b1cc)
Jan Friesse 13 yıl önce
ebeveyn
işleme
da5ada8342
5 değiştirilmiş dosya ile 97 ekleme ve 17 silme
  1. 5 2
      exec/totemmrp.c
  2. 3 1
      exec/totemmrp.h
  3. 75 12
      exec/totempg.c
  4. 11 1
      exec/totemsrp.c
  5. 3 1
      exec/totemsrp.h

+ 5 - 2
exec/totemmrp.c

@@ -130,7 +130,9 @@ int totemmrp_initialize (
 		const unsigned int *member_list, size_t member_list_entries,
 		const unsigned int *left_list, size_t left_list_entries,
 		const unsigned int *joined_list, size_t joined_list_entries,
-		const struct memb_ring_id *ring_id))
+		const struct memb_ring_id *ring_id),
+	void (*waiting_trans_ack_cb_fn) (
+		int waiting_trans_ack))
 {
 	int result;
 	pg_deliver_fn = deliver_fn;
@@ -143,7 +145,8 @@ int totemmrp_initialize (
 		totem_config,
 		stats->mrp,
 		totemmrp_deliver_fn,
-		totemmrp_confchg_fn);
+		totemmrp_confchg_fn,
+		waiting_trans_ack_cb_fn);
 
 	return (result);
 }

+ 3 - 1
exec/totemmrp.h

@@ -74,7 +74,9 @@ extern int totemmrp_initialize (
 		const unsigned int *member_list, size_t member_list_entries,
 		const unsigned int *left_list, size_t left_list_entries,
 		const unsigned int *joined_list, size_t joined_list_entries,
-		const struct memb_ring_id *ring_id));
+		const struct memb_ring_id *ring_id),
+	void (*waiting_trans_ack_cb_fn) (
+		int waiting_trans_ack));
 
 extern void totemmrp_finalize (void);
 

+ 75 - 12
exec/totempg.c

@@ -211,6 +211,10 @@ DECLARE_LIST_INIT(assembly_list_inuse);
 
 DECLARE_LIST_INIT(assembly_list_free);
 
+DECLARE_LIST_INIT(assembly_list_inuse_trans);
+
+DECLARE_LIST_INIT(assembly_list_free_trans);
+
 DECLARE_LIST_INIT(totempg_groups_list);
 
 /*
@@ -231,6 +235,8 @@ static int fragment_continuation = 0;
 
 static struct iovec iov_delv;
 
+static int totempg_waiting_transack = 0;
+
 struct totempg_group_instance {
 	void (*deliver_fn) (
 		unsigned int nodeid,
@@ -273,16 +279,32 @@ static int msg_count_send_ok (int msg_count);
 
 static int byte_count_send_ok (int byte_count);
 
+static void totempg_waiting_trans_ack_cb (int waiting_trans_ack)
+{
+	log_printf(LOG_DEBUG, "waiting_trans_ack changed to %u", waiting_trans_ack);
+	totempg_waiting_transack = waiting_trans_ack;
+}
+
 static struct assembly *assembly_ref (unsigned int nodeid)
 {
 	struct assembly *assembly;
 	struct list_head *list;
+	struct list_head *active_assembly_list_inuse;
+	struct list_head *active_assembly_list_free;
+
+	if (totempg_waiting_transack) {
+		active_assembly_list_inuse = &assembly_list_inuse_trans;
+		active_assembly_list_free = &assembly_list_free_trans;
+	} else {
+		active_assembly_list_inuse = &assembly_list_inuse;
+		active_assembly_list_free = &assembly_list_free;
+	}
 
 	/*
 	 * Search inuse list for node id and return assembly buffer if found
 	 */
-	for (list = assembly_list_inuse.next;
-		list != &assembly_list_inuse;
+	for (list = active_assembly_list_inuse->next;
+		list != active_assembly_list_inuse;
 		list = list->next) {
 
 		assembly = list_entry (list, struct assembly, list);
@@ -295,10 +317,10 @@ static struct assembly *assembly_ref (unsigned int nodeid)
 	/*
 	 * Nothing found in inuse list get one from free list if available
 	 */
-	if (list_empty (&assembly_list_free) == 0) {
-		assembly = list_entry (assembly_list_free.next, struct assembly, list);
+	if (list_empty (active_assembly_list_free) == 0) {
+		assembly = list_entry (active_assembly_list_free->next, struct assembly, list);
 		list_del (&assembly->list);
-		list_add (&assembly->list, &assembly_list_inuse);
+		list_add (&assembly->list, active_assembly_list_inuse);
 		assembly->nodeid = nodeid;
 		assembly->index = 0;
 		assembly->last_frag_num = 0;
@@ -320,15 +342,56 @@ static struct assembly *assembly_ref (unsigned int nodeid)
 	assembly->last_frag_num = 0;
 	assembly->throw_away_mode = THROW_AWAY_INACTIVE;
 	list_init (&assembly->list);
-	list_add (&assembly->list, &assembly_list_inuse);
+	list_add (&assembly->list, active_assembly_list_inuse);
 
 	return (assembly);
 }
 
 static void assembly_deref (struct assembly *assembly)
 {
+	struct list_head *active_assembly_list_free;
+
+	if (totempg_waiting_transack) {
+		active_assembly_list_free = &assembly_list_free_trans;
+	} else {
+		active_assembly_list_free = &assembly_list_free;
+	}
+
 	list_del (&assembly->list);
-	list_add (&assembly->list, &assembly_list_free);
+	list_add (&assembly->list, active_assembly_list_free);
+}
+
+static void assembly_deref_from_normal_and_trans (int nodeid)
+{
+	int j;
+	struct list_head *list, *list_next;
+	struct list_head *active_assembly_list_inuse;
+	struct list_head *active_assembly_list_free;
+	struct assembly *assembly;
+
+	for (j = 0; j < 2; j++) {
+		if (j == 0) {
+			active_assembly_list_inuse = &assembly_list_inuse;
+			active_assembly_list_free = &assembly_list_free;
+		} else {
+			active_assembly_list_inuse = &assembly_list_inuse_trans;
+			active_assembly_list_free = &assembly_list_free_trans;
+		}
+
+		for (list = active_assembly_list_inuse->next;
+			list != active_assembly_list_inuse;
+			list = list_next) {
+
+			list_next = list->next;
+			assembly = list_entry (list, struct assembly, list);
+
+			if (nodeid == assembly->nodeid) {
+				list_del (&assembly->list);
+				list_add (&assembly->list, active_assembly_list_free);
+			}
+		}
+	}
+
 }
 
 static inline void app_confchg_fn (
@@ -340,7 +403,6 @@ static inline void app_confchg_fn (
 {
 	int i;
 	struct totempg_group_instance *instance;
-	struct assembly *assembly;
 	struct list_head *list;
 
 	/*
@@ -349,9 +411,7 @@ static inline void app_confchg_fn (
 	 * In the leaving processor's assembly buffer.
 	 */
 	for (i = 0; i < left_list_entries; i++) {
-		assembly = assembly_ref (left_list[i]);
-		list_del (&assembly->list);
-		list_add (&assembly->list, &assembly_list_free);
+		assembly_deref_from_normal_and_trans (left_list[i]);
 	}
 
 	for (list = totempg_groups_list.next;
@@ -645,6 +705,8 @@ static void totempg_deliver_fn (
 				}
 			}
 		} else {
+			log_printf (LOG_DEBUG, "fragmented continuation %u is not equal to assembly last_frag_num %u",
+					continuation, assembly->last_frag_num);
 			assembly->throw_away_mode = THROW_AWAY_ACTIVE;
 		}
 	}
@@ -759,7 +821,8 @@ int totempg_initialize (
 		totem_config,
 		&totempg_stats,
 		totempg_deliver_fn,
-		totempg_confchg_fn);
+		totempg_confchg_fn,
+		totempg_waiting_trans_ack_cb);
 
 	totemmrp_callback_token_create (
 		&callback_token_received_handle,

+ 11 - 1
exec/totemsrp.c

@@ -464,6 +464,9 @@ struct totemsrp_instance {
 
         void (*totemsrp_service_ready_fn) (void);
 
+	void (*totemsrp_waiting_trans_ack_cb_fn) (
+		int waiting_trans_ack);
+
 	int global_seqno;
 
 	int my_token_held;
@@ -786,7 +789,9 @@ int totemsrp_initialize (
 		const unsigned int *member_list, size_t member_list_entries,
 		const unsigned int *left_list, size_t left_list_entries,
 		const unsigned int *joined_list, size_t joined_list_entries,
-		const struct memb_ring_id *ring_id))
+		const struct memb_ring_id *ring_id),
+	void (*waiting_trans_ack_cb_fn) (
+		int waiting_trans_ack))
 {
 	struct totemsrp_instance *instance;
 	unsigned int res;
@@ -813,6 +818,9 @@ int totemsrp_initialize (
 
 	totemsrp_instance_initialize (instance);
 
+	instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn;
+	instance->totemsrp_waiting_trans_ack_cb_fn (1);
+
 	stats->srp = &instance->stats;
 	instance->stats.latest_token = 0;
 	instance->stats.earliest_token = 0;
@@ -1837,6 +1845,7 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance)
 		left_list, instance->my_left_memb_entries,
 		0, 0, &instance->my_ring_id);
 	instance->waiting_trans_ack = 1;
+	instance->totemsrp_waiting_trans_ack_cb_fn (1);
 
 // TODO we need to filter to ensure we only deliver those
 // messages which are part of instance->my_deliver_memb
@@ -4626,4 +4635,5 @@ void totemsrp_trans_ack (void *context)
 	struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
 
 	instance->waiting_trans_ack = 0;
+	instance->totemsrp_waiting_trans_ack_cb_fn (0);
 }

+ 3 - 1
exec/totemsrp.h

@@ -65,7 +65,9 @@ int totemsrp_initialize (
 		const unsigned int *member_list, size_t member_list_entries,
 		const unsigned int *left_list, size_t left_list_entries,
 		const unsigned int *joined_list, size_t joined_list_entries,
-		const struct memb_ring_id *ring_id));
+		const struct memb_ring_id *ring_id),
+	void (*waiting_trans_ack_cb_fn) (
+		int waiting_trans_ack));
 
 void totemsrp_finalize (void *srp_context);