Просмотр исходного кода

Add waiting_trans_ack also to fragmentation layer

Patch for support waiting_trans_ack may fail if there is synchronization
happening between delivery of fragmented message. In such situation,
fragmentation layer is waiting for message with correct number, but it
will never arrive.

Solution is to handle (callback) change of waiting_trans_ack and use
different queue.

Signed-off-by: Jan Friesse <jfriesse@redhat.com>
Reviewed-by: Fabio M. Di Nitto <fdinitto@redhat.com>
Jan Friesse 13 лет назад
Родитель
Сommit
92e0f9c7bb
5 измененных файлов с 97 добавлено и 17 удалено
  1. 5 2
      exec/totemmrp.c
  2. 3 1
      exec/totemmrp.h
  3. 75 12
      exec/totempg.c
  4. 11 1
      exec/totemsrp.c
  5. 3 1
      exec/totemsrp.h

+ 5 - 2
exec/totemmrp.c

@@ -130,7 +130,9 @@ int totemmrp_initialize (
 		const unsigned int *member_list, size_t member_list_entries,
 		const unsigned int *member_list, size_t member_list_entries,
 		const unsigned int *left_list, size_t left_list_entries,
 		const unsigned int *left_list, size_t left_list_entries,
 		const unsigned int *joined_list, size_t joined_list_entries,
 		const unsigned int *joined_list, size_t joined_list_entries,
-		const struct memb_ring_id *ring_id))
+		const struct memb_ring_id *ring_id),
+	void (*waiting_trans_ack_cb_fn) (
+		int waiting_trans_ack))
 {
 {
 	int result;
 	int result;
 	pg_deliver_fn = deliver_fn;
 	pg_deliver_fn = deliver_fn;
@@ -143,7 +145,8 @@ int totemmrp_initialize (
 		totem_config,
 		totem_config,
 		stats->mrp,
 		stats->mrp,
 		totemmrp_deliver_fn,
 		totemmrp_deliver_fn,
-		totemmrp_confchg_fn);
+		totemmrp_confchg_fn,
+		waiting_trans_ack_cb_fn);
 
 
 	return (result);
 	return (result);
 }
 }

+ 3 - 1
exec/totemmrp.h

@@ -74,7 +74,9 @@ extern int totemmrp_initialize (
 		const unsigned int *member_list, size_t member_list_entries,
 		const unsigned int *member_list, size_t member_list_entries,
 		const unsigned int *left_list, size_t left_list_entries,
 		const unsigned int *left_list, size_t left_list_entries,
 		const unsigned int *joined_list, size_t joined_list_entries,
 		const unsigned int *joined_list, size_t joined_list_entries,
-		const struct memb_ring_id *ring_id));
+		const struct memb_ring_id *ring_id),
+	void (*waiting_trans_ack_cb_fn) (
+		int waiting_trans_ack));
 
 
 extern void totemmrp_finalize (void);
 extern void totemmrp_finalize (void);
 
 

+ 75 - 12
exec/totempg.c

@@ -214,6 +214,10 @@ DECLARE_LIST_INIT(assembly_list_inuse);
 
 
 DECLARE_LIST_INIT(assembly_list_free);
 DECLARE_LIST_INIT(assembly_list_free);
 
 
+DECLARE_LIST_INIT(assembly_list_inuse_trans);
+
+DECLARE_LIST_INIT(assembly_list_free_trans);
+
 DECLARE_LIST_INIT(totempg_groups_list);
 DECLARE_LIST_INIT(totempg_groups_list);
 
 
 /*
 /*
@@ -234,6 +238,8 @@ static int fragment_continuation = 0;
 
 
 static struct iovec iov_delv;
 static struct iovec iov_delv;
 
 
+static int totempg_waiting_transack = 0;
+
 struct totempg_group_instance {
 struct totempg_group_instance {
 	void (*deliver_fn) (
 	void (*deliver_fn) (
 		unsigned int nodeid,
 		unsigned int nodeid,
@@ -276,16 +282,32 @@ static int msg_count_send_ok (int msg_count);
 
 
 static int byte_count_send_ok (int byte_count);
 static int byte_count_send_ok (int byte_count);
 
 
+static void totempg_waiting_trans_ack_cb (int waiting_trans_ack)
+{
+	log_printf(LOG_DEBUG, "waiting_trans_ack changed to %u", waiting_trans_ack);
+	totempg_waiting_transack = waiting_trans_ack;
+}
+
 static struct assembly *assembly_ref (unsigned int nodeid)
 static struct assembly *assembly_ref (unsigned int nodeid)
 {
 {
 	struct assembly *assembly;
 	struct assembly *assembly;
 	struct list_head *list;
 	struct list_head *list;
+	struct list_head *active_assembly_list_inuse;
+	struct list_head *active_assembly_list_free;
+
+	if (totempg_waiting_transack) {
+		active_assembly_list_inuse = &assembly_list_inuse_trans;
+		active_assembly_list_free = &assembly_list_free_trans;
+	} else {
+		active_assembly_list_inuse = &assembly_list_inuse;
+		active_assembly_list_free = &assembly_list_free;
+	}
 
 
 	/*
 	/*
 	 * Search inuse list for node id and return assembly buffer if found
 	 * Search inuse list for node id and return assembly buffer if found
 	 */
 	 */
-	for (list = assembly_list_inuse.next;
-		list != &assembly_list_inuse;
+	for (list = active_assembly_list_inuse->next;
+		list != active_assembly_list_inuse;
 		list = list->next) {
 		list = list->next) {
 
 
 		assembly = list_entry (list, struct assembly, list);
 		assembly = list_entry (list, struct assembly, list);
@@ -298,10 +320,10 @@ static struct assembly *assembly_ref (unsigned int nodeid)
 	/*
 	/*
 	 * Nothing found in inuse list get one from free list if available
 	 * Nothing found in inuse list get one from free list if available
 	 */
 	 */
-	if (list_empty (&assembly_list_free) == 0) {
-		assembly = list_entry (assembly_list_free.next, struct assembly, list);
+	if (list_empty (active_assembly_list_free) == 0) {
+		assembly = list_entry (active_assembly_list_free->next, struct assembly, list);
 		list_del (&assembly->list);
 		list_del (&assembly->list);
-		list_add (&assembly->list, &assembly_list_inuse);
+		list_add (&assembly->list, active_assembly_list_inuse);
 		assembly->nodeid = nodeid;
 		assembly->nodeid = nodeid;
 		assembly->index = 0;
 		assembly->index = 0;
 		assembly->last_frag_num = 0;
 		assembly->last_frag_num = 0;
@@ -323,15 +345,56 @@ static struct assembly *assembly_ref (unsigned int nodeid)
 	assembly->last_frag_num = 0;
 	assembly->last_frag_num = 0;
 	assembly->throw_away_mode = THROW_AWAY_INACTIVE;
 	assembly->throw_away_mode = THROW_AWAY_INACTIVE;
 	list_init (&assembly->list);
 	list_init (&assembly->list);
-	list_add (&assembly->list, &assembly_list_inuse);
+	list_add (&assembly->list, active_assembly_list_inuse);
 
 
 	return (assembly);
 	return (assembly);
 }
 }
 
 
 static void assembly_deref (struct assembly *assembly)
 static void assembly_deref (struct assembly *assembly)
 {
 {
+	struct list_head *active_assembly_list_free;
+
+	if (totempg_waiting_transack) {
+		active_assembly_list_free = &assembly_list_free_trans;
+	} else {
+		active_assembly_list_free = &assembly_list_free;
+	}
+
 	list_del (&assembly->list);
 	list_del (&assembly->list);
-	list_add (&assembly->list, &assembly_list_free);
+	list_add (&assembly->list, active_assembly_list_free);
+}
+
+static void assembly_deref_from_normal_and_trans (int nodeid)
+{
+	int j;
+	struct list_head *list, *list_next;
+	struct list_head *active_assembly_list_inuse;
+	struct list_head *active_assembly_list_free;
+	struct assembly *assembly;
+
+	for (j = 0; j < 2; j++) {
+		if (j == 0) {
+			active_assembly_list_inuse = &assembly_list_inuse;
+			active_assembly_list_free = &assembly_list_free;
+		} else {
+			active_assembly_list_inuse = &assembly_list_inuse_trans;
+			active_assembly_list_free = &assembly_list_free_trans;
+		}
+
+		for (list = active_assembly_list_inuse->next;
+			list != active_assembly_list_inuse;
+			list = list_next) {
+
+			list_next = list->next;
+			assembly = list_entry (list, struct assembly, list);
+
+			if (nodeid == assembly->nodeid) {
+				list_del (&assembly->list);
+				list_add (&assembly->list, active_assembly_list_free);
+			}
+		}
+	}
+
 }
 }
 
 
 static inline void app_confchg_fn (
 static inline void app_confchg_fn (
@@ -343,7 +406,6 @@ static inline void app_confchg_fn (
 {
 {
 	int i;
 	int i;
 	struct totempg_group_instance *instance;
 	struct totempg_group_instance *instance;
-	struct assembly *assembly;
 	struct list_head *list;
 	struct list_head *list;
 
 
 	/*
 	/*
@@ -352,9 +414,7 @@ static inline void app_confchg_fn (
 	 * In the leaving processor's assembly buffer.
 	 * In the leaving processor's assembly buffer.
 	 */
 	 */
 	for (i = 0; i < left_list_entries; i++) {
 	for (i = 0; i < left_list_entries; i++) {
-		assembly = assembly_ref (left_list[i]);
-		list_del (&assembly->list);
-		list_add (&assembly->list, &assembly_list_free);
+		assembly_deref_from_normal_and_trans (left_list[i]);
 	}
 	}
 
 
 	for (list = totempg_groups_list.next;
 	for (list = totempg_groups_list.next;
@@ -648,6 +708,8 @@ static void totempg_deliver_fn (
 				}
 				}
 			}
 			}
 		} else {
 		} else {
+			log_printf (LOG_DEBUG, "fragmented continuation %u is not equal to assembly last_frag_num %u",
+					continuation, assembly->last_frag_num);
 			assembly->throw_away_mode = THROW_AWAY_ACTIVE;
 			assembly->throw_away_mode = THROW_AWAY_ACTIVE;
 		}
 		}
 	}
 	}
@@ -762,7 +824,8 @@ int totempg_initialize (
 		totem_config,
 		totem_config,
 		&totempg_stats,
 		&totempg_stats,
 		totempg_deliver_fn,
 		totempg_deliver_fn,
-		totempg_confchg_fn);
+		totempg_confchg_fn,
+		totempg_waiting_trans_ack_cb);
 
 
 	totemmrp_callback_token_create (
 	totemmrp_callback_token_create (
 		&callback_token_received_handle,
 		&callback_token_received_handle,

+ 11 - 1
exec/totemsrp.c

@@ -463,6 +463,9 @@ struct totemsrp_instance {
 
 
         void (*totemsrp_service_ready_fn) (void);
         void (*totemsrp_service_ready_fn) (void);
 
 
+	void (*totemsrp_waiting_trans_ack_cb_fn) (
+		int waiting_trans_ack);
+
 	int global_seqno;
 	int global_seqno;
 
 
 	int my_token_held;
 	int my_token_held;
@@ -785,7 +788,9 @@ int totemsrp_initialize (
 		const unsigned int *member_list, size_t member_list_entries,
 		const unsigned int *member_list, size_t member_list_entries,
 		const unsigned int *left_list, size_t left_list_entries,
 		const unsigned int *left_list, size_t left_list_entries,
 		const unsigned int *joined_list, size_t joined_list_entries,
 		const unsigned int *joined_list, size_t joined_list_entries,
-		const struct memb_ring_id *ring_id))
+		const struct memb_ring_id *ring_id),
+	void (*waiting_trans_ack_cb_fn) (
+		int waiting_trans_ack))
 {
 {
 	struct totemsrp_instance *instance;
 	struct totemsrp_instance *instance;
 	unsigned int res;
 	unsigned int res;
@@ -812,6 +817,9 @@ int totemsrp_initialize (
 
 
 	totemsrp_instance_initialize (instance);
 	totemsrp_instance_initialize (instance);
 
 
+	instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn;
+	instance->totemsrp_waiting_trans_ack_cb_fn (1);
+
 	stats->srp = &instance->stats;
 	stats->srp = &instance->stats;
 	instance->stats.latest_token = 0;
 	instance->stats.latest_token = 0;
 	instance->stats.earliest_token = 0;
 	instance->stats.earliest_token = 0;
@@ -1837,6 +1845,7 @@ static void memb_state_operational_enter (struct totemsrp_instance *instance)
 		left_list, instance->my_left_memb_entries,
 		left_list, instance->my_left_memb_entries,
 		0, 0, &instance->my_ring_id);
 		0, 0, &instance->my_ring_id);
 	instance->waiting_trans_ack = 1;
 	instance->waiting_trans_ack = 1;
+	instance->totemsrp_waiting_trans_ack_cb_fn (1);
 
 
 // TODO we need to filter to ensure we only deliver those
 // TODO we need to filter to ensure we only deliver those
 // messages which are part of instance->my_deliver_memb
 // messages which are part of instance->my_deliver_memb
@@ -4618,4 +4627,5 @@ void totemsrp_trans_ack (void *context)
 	struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
 	struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
 
 
 	instance->waiting_trans_ack = 0;
 	instance->waiting_trans_ack = 0;
+	instance->totemsrp_waiting_trans_ack_cb_fn (0);
 }
 }

+ 3 - 1
exec/totemsrp.h

@@ -65,7 +65,9 @@ int totemsrp_initialize (
 		const unsigned int *member_list, size_t member_list_entries,
 		const unsigned int *member_list, size_t member_list_entries,
 		const unsigned int *left_list, size_t left_list_entries,
 		const unsigned int *left_list, size_t left_list_entries,
 		const unsigned int *joined_list, size_t joined_list_entries,
 		const unsigned int *joined_list, size_t joined_list_entries,
-		const struct memb_ring_id *ring_id));
+		const struct memb_ring_id *ring_id),
+	void (*waiting_trans_ack_cb_fn) (
+		int waiting_trans_ack));
 
 
 void totemsrp_finalize (void *srp_context);
 void totemsrp_finalize (void *srp_context);