Sfoglia il codice sorgente

This update allows retained events from a merging partition to
be delivered to applications with the associated channels already
open at the time of the merge.

(Logical change 1.160)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@553 fd59a12c-fef9-0310-b244-a6a79926bd2f

Mark Haverkamp 21 anni fa
parent
commit
790c36084c
1 ha cambiato i file con 48 aggiunte e 32 eliminazioni
  1. 48 32
      exec/evt.c

+ 48 - 32
exec/evt.c

@@ -2886,6 +2886,45 @@ static int evt_exec_init(void)
 	return 0;
 }
 
+static int
+try_deliver_event(struct event_data *evt, 
+		struct event_svr_channel_instance *eci)
+{
+	struct list_head *l, *l1;
+	struct event_svr_channel_open *eco;
+	struct event_svr_channel_subscr *ecs;
+	int delivered_event = 0;
+	/*
+	 * Check open channels
+	 */
+	for (l = eci->esc_open_chans.next; l != &eci->esc_open_chans; l = l->next) {
+		eco = list_entry(l, struct event_svr_channel_open, eco_entry);
+		/*
+		 * See if enabled to receive
+		 */
+		if (!(eco->eco_flags & SA_EVT_CHANNEL_SUBSCRIBER)) {
+				continue;
+		}
+
+		/*
+		 * Check subscriptions
+		 */
+		for (l1 = eco->eco_subscr.next; l1 != &eco->eco_subscr; l1 = l1->next) {
+			ecs = list_entry(l1, struct event_svr_channel_subscr, ecs_entry);
+			/*
+			 * Apply filter rules and deliver if patterns
+			 * match filters.
+			 * Only deliver one event per open channel
+			 */
+			if (event_match(evt, ecs) == SA_AIS_OK) {
+				deliver_event(evt, eco, ecs);
+				delivered_event++;
+				break;
+			}
+		}
+	}
+	return delivered_event;
+}
 
 /*
  * Receive the network event message and distribute it to local subscribers
@@ -2902,10 +2941,7 @@ static int evt_remote_evt(void *msg, struct in_addr source_addr,
 	 */
 	struct lib_event_data *evtpkt = msg;
 	struct event_svr_channel_instance *eci;
-	struct event_svr_channel_open *eco;
-	struct event_svr_channel_subscr *ecs;
 	struct event_data *evt;
-	struct list_head *l, *l1;
 	SaClmClusterNodeT *cn;
 
 	log_printf(LOG_LEVEL_DEBUG, "Remote event data received from %s\n",
@@ -2980,34 +3016,7 @@ static int evt_remote_evt(void *msg, struct in_addr source_addr,
 		retain_event(evt);
 	}
 
-	/*
-	 * Check open channels
-	 */
-	for (l = eci->esc_open_chans.next; l != &eci->esc_open_chans; l = l->next) {
-		eco = list_entry(l, struct event_svr_channel_open, eco_entry);
-		/*
-		 * See if enabled to receive
-		 */
-		if (!(eco->eco_flags & SA_EVT_CHANNEL_SUBSCRIBER)) {
-				continue;
-		}
-
-		/*
-		 * Check subscriptions
-		 */
-		for (l1 = eco->eco_subscr.next; l1 != &eco->eco_subscr; l1 = l1->next) {
-			ecs = list_entry(l1, struct event_svr_channel_subscr, ecs_entry);
-			/*
-			 * Apply filter rules and deliver if patterns
-			 * match filters.
-			 * Only deliver one event per open channel
-			 */
-			if (event_match(evt, ecs) == SA_AIS_OK) {
-				deliver_event(evt, eco, ecs);
-				break;
-			}
-		}
-	}
+	try_deliver_event(evt, eci);
 	free_event_data(evt);
 
 
@@ -3034,13 +3043,17 @@ static int evt_remote_recovery_evt(void *msg, struct in_addr source_addr,
 		int endian_conversion_required)
 {
 	/*
-	 * - retain events that have a retention time
+	 * - calculate remaining retention time
 	 * - Find assocated channel
+	 * - Scan list of subscribers
+	 * - Apply filters
+	 * - Deliver events that pass the filter test
 	 */
 	struct lib_event_data *evtpkt = msg;
 	struct event_svr_channel_instance *eci;
 	struct event_data *evt;
 	struct member_node_data *md;
+	int num_delivered;
 	SaTimeT now;
 
 	now = clust_time_now();
@@ -3128,6 +3141,9 @@ static int evt_remote_recovery_evt(void *msg, struct in_addr source_addr,
 		}
 			
 		retain_event(evt);
+		num_delivered = try_deliver_event(evt, eci);
+		log_printf(RECOVERY_EVENT_DEBUG, "Delivered to %d subscribers\n",
+				num_delivered);
 		free_event_data(evt);
 	}