Selaa lähdekoodia

Development, updates, and bug fixes to event service lib.

(Logical change 1.63)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@231 fd59a12c-fef9-0310-b244-a6a79926bd2f
Mark Haverkamp 21 vuotta sitten
vanhempi
commit
7f324361a7
1 muutettua tiedostoa jossa 239 lisäystä ja 82 poistoa
  1. 239 82
      lib/evt.c

+ 239 - 82
lib/evt.c

@@ -14,7 +14,7 @@
  * - Redistributions in binary form must reproduce the above copyright notice,
  *   this list of conditions and the following disclaimer in the documentation
  *   and/or other materials provided with the distribution.
- * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ * - Neither the name of the Open Source Developement Lab nor the names of its
  *   contributors may be used to endorse or promote products derived from this
  *   software without specific prior written permission.
  *
@@ -32,6 +32,7 @@
  */
 
 #include <sys/types.h>
+#include <errno.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
@@ -40,6 +41,7 @@
 #include <sys/socket.h>
 #include "../include/ais_evt.h"
 #include "../include/ais_msg.h"
+#include "../exec/gmi.h"
 #include "util.h"
 
 static void evtHandleInstanceDestructor(void *instance);
@@ -89,6 +91,11 @@ struct saHandleDatabase event_handle_db = {
 	.handleInstanceDestructor	= eventHandleInstanceDestructor
 };
 
+struct message_overlay {
+	struct res_header header;
+	char data[0];
+};
+
 /*
  * data required to support events for a given initialization
  *
@@ -103,14 +110,15 @@ struct saHandleDatabase event_handle_db = {
  *
  */
 struct event_instance {
-	int 			ei_fd;
-	SaEvtCallbacksT	ei_callback;
-	SaVersionT		ei_version;
-	SaClmNodeIdT	ei_node_id;
-	SaNameT			ei_node_name;
-	int				ei_finalize;
-	struct queue	ei_inq;
-	pthread_mutex_t	ei_mutex;
+	int 					ei_fd;
+	SaEvtCallbacksT			ei_callback;
+	SaVersionT				ei_version;
+	SaClmNodeIdT			ei_node_id;
+	SaNameT					ei_node_name;
+	int						ei_finalize;
+	struct queue			ei_inq;
+	char 					ei_message[MESSAGE_SIZE_MAX];
+	pthread_mutex_t			ei_mutex;
 };
 
 
@@ -170,13 +178,16 @@ struct event_data_instance {
 	pthread_mutex_t			edi_mutex;
 };
 
-struct message_overlay {
-	struct req_header header;
-	char data[4096];
-};
+
+#define PUB_SLEEP_NSEC 0L;
+#define PUB_SLEEP_SEC 1
+#define PUB_SLEEP_TRYS 20
 
 #define min(a,b) ((a) < (b) ? (a) : (b))
 
+/*
+ * Clean up function for an evt instance (saEvtInitialize) handle
+ */
 static void evtHandleInstanceDestructor(void *instance)
 {
 	struct event_instance *evti = instance;
@@ -208,10 +219,16 @@ static void evtHandleInstanceDestructor(void *instance)
 	}
 }
 
+/*
+ * Clean up function for an open channel handle
+ */
 static void chanHandleInstanceDestructor(void *instance)
 {
 }
 
+/*
+ * Clean up function for an event handle
+ */
 static void eventHandleInstanceDestructor(void *instance)
 {
 	struct event_data_instance *edi = instance;
@@ -278,7 +295,7 @@ saEvtInitialize(
 	 * An inq is needed to store async messages while waiting for a 
 	 * sync response
 	 */
-	error = saQueueInit(&evti->ei_inq, 512, sizeof(void *));
+	error = saQueueInit(&evti->ei_inq, 1024, sizeof(void *));
 	if (error != SA_OK) {
 		goto error_handle_put;
 	}
@@ -345,6 +362,78 @@ saEvtSelectionObjectGet(
 }
 
 
+/*
+ * Alocate an event data structure and associated handle to be
+ * used to supply event data to a call back function.
+ */
+static SaErrorT make_event(SaEvtEventHandleT *event_handle,
+				struct lib_event_data *evt)
+{
+	struct event_data_instance *edi;
+	SaEvtEventPatternT *pat;
+	SaUint8T *str;
+	SaErrorT error;
+	int i;
+
+	error = saHandleCreate(&event_handle_db, sizeof(*edi), 
+		(void*)event_handle);
+	if (error != SA_OK) {
+			goto make_evt_done;
+	}
+
+	error = saHandleInstanceGet(&event_handle_db, *event_handle,
+				(void**)&edi);
+	if (error != SA_OK) {
+			goto make_evt_done;
+	}
+
+	memset(edi, 0, sizeof(*edi));
+
+	pthread_mutex_init(&edi->edi_mutex, NULL);
+	edi->edi_freeing = 0;
+	edi->edi_channel_handle = evt->led_lib_channel_handle;
+	edi->edi_priority = evt->led_priority;
+	edi->edi_retention_time = evt->led_retention_time;
+	edi->edi_pub_node = evt->led_publisher_node_id;
+	edi->edi_pub_time = evt->led_publish_time;
+	edi->edi_event_data_size = evt->led_user_data_size;
+	edi->edi_event_id = evt->led_event_id;
+	edi->edi_pub_name = evt->led_publisher_name;
+	if (edi->edi_event_data_size) {
+		edi->edi_event_data = malloc(edi->edi_event_data_size);
+		memcpy(edi->edi_event_data, 
+				evt->led_body + evt->led_user_data_offset,
+				edi->edi_event_data_size);
+	}
+
+	/*
+	 * Move the pattern bits into the SaEvtEventPatternArrayT
+	 */
+	edi->edi_patterns.patternsNumber = evt->led_patterns_number;
+	edi->edi_patterns.patterns = malloc(sizeof(SaEvtEventPatternT) * 
+					edi->edi_patterns.patternsNumber);
+	pat = (SaEvtEventPatternT *)evt->led_body;
+	str = evt->led_body + sizeof(SaEvtEventPatternT) * 
+						edi->edi_patterns.patternsNumber;
+	for (i = 0; i < evt->led_patterns_number; i++) {
+		edi->edi_patterns.patterns[i].patternSize = pat->patternSize;
+		edi->edi_patterns.patterns[i].pattern = malloc(pat->patternSize);
+		if (!edi->edi_patterns.patterns[i].pattern) {
+			printf("make_event: couldn't alloc %d bytes\n", pat->patternSize);
+			error =  SA_ERR_NO_MEMORY;
+			break;
+		}
+		memcpy(edi->edi_patterns.patterns[i].pattern,
+				str, pat->patternSize);
+		str += pat->patternSize;
+		pat++; 
+	}
+	saHandleInstancePut (&event_handle_db, *event_handle);
+
+make_evt_done:
+	return error;
+}
+
 /*
  * The saEvtDispatch() function invokes, in the context of the calling 
  * thread, one or all of the pending callbacks for the handle evt_handle.
@@ -359,14 +448,17 @@ saEvtDispatch(
 	SaErrorT error;
 	int dispatch_avail;
 	struct event_instance *evti;
+	SaEvtEventHandleT event_handle;
 	SaEvtCallbacksT callbacks;
-	struct req_header **queue_msg;
-	struct req_header *msg;
+	struct res_header **queue_msg;
+	struct res_header *msg = 0;
 	int empty;
 	int ignore_dispatch = 0;
 	int cont = 1; /* always continue do loop except when set to 0 */
 	int poll_fd;
-	struct message_overlay dispatch_data;
+	struct message_overlay *dispatch_data;
+	struct lib_event_data *evt;
+	struct res_evt_event_data res;
 
 	error = saHandleInstanceGet(&evt_instance_handle_db, evt_handle,
 		(void **)&evti);
@@ -384,19 +476,24 @@ saEvtDispatch(
 	do {
 		poll_fd = evti->ei_fd;
 
-		/*
-		 * Read data directly from socket
-		 */
 		ufds.fd = poll_fd;
 		ufds.events = POLLIN;
 		ufds.revents = 0;
 
-		error = saPollRetry(&ufds, 1, timeout);
-		if (error != SA_OK) {
-			goto error_nounlock;
-		}
-
 		pthread_mutex_lock(&evti->ei_mutex);
+		saQueueIsEmpty(&evti->ei_inq, &empty);
+		/*
+		 * Read from the socket if there is nothing in
+		 * our queue.
+		 */
+		if (empty == 1) {
+			pthread_mutex_unlock(&evti->ei_mutex);
+			error = saPollRetry(&ufds, 1, timeout);
+			if (error != SA_OK) {
+				goto error_nounlock;
+			}
+			pthread_mutex_lock(&evti->ei_mutex);
+		}
 
 		/*
 		 * Handle has been finalized in another thread
@@ -407,7 +504,7 @@ saEvtDispatch(
 			goto error_unlock;
 		}
 
-		dispatch_avail = ufds.revents & POLLIN;
+		dispatch_avail = (ufds.revents & POLLIN) | (empty == 0);
 		if (dispatch_avail == 0 && dispatch_flags == SA_DISPATCH_ALL) {
 			pthread_mutex_unlock(&evti->ei_mutex);
 			break; /* exit do while cont is 1 loop */
@@ -424,26 +521,21 @@ saEvtDispatch(
 			 */
 			saQueueItemGet(&evti->ei_inq, (void *)&queue_msg);
 			msg = *queue_msg;
-			memcpy(&dispatch_data, msg, msg->size);
+			dispatch_data = (struct message_overlay *)msg;
 			saQueueItemRemove(&evti->ei_inq);
-			free(msg);
 		} else {
 			/*
 			 * Queue empty, read response from socket
 			 */
-			error = saRecvRetry(evti->ei_fd, 
-				&dispatch_data.header,
-				sizeof(struct req_header), 
-				MSG_WAITALL | MSG_NOSIGNAL);
+			dispatch_data = (struct message_overlay *)&evti->ei_message;
+			error = saRecvRetry(evti->ei_fd, &dispatch_data->header,
+				sizeof(struct res_header), MSG_WAITALL | MSG_NOSIGNAL);
 			if (error != SA_OK) {
 				goto error_unlock;
 			}
-			if (dispatch_data.header.size > 
-					sizeof(struct req_header)) {
-				error = saRecvRetry(evti->ei_fd, 
-					&dispatch_data.data,
-					dispatch_data.header.size - 
-					sizeof(struct req_header),
+			if (dispatch_data->header.size > sizeof(struct res_header)) {
+				error = saRecvRetry(evti->ei_fd, dispatch_data->data,
+					dispatch_data->header.size - sizeof(struct res_header),
 					MSG_WAITALL | MSG_NOSIGNAL);
 				if (error != SA_OK) {
 					goto error_unlock;
@@ -456,15 +548,14 @@ saEvtDispatch(
 		 * the callback routines may operate at the same time that 
 		 * EvtFinalize has been called in another thread.
 		 */
-		memcpy(&callbacks, &evti->ei_callback, 
-				sizeof(evti->ei_callback));
+		memcpy(&callbacks, &evti->ei_callback, sizeof(evti->ei_callback));
 		pthread_mutex_unlock(&evti->ei_mutex);
 
 
 		/*
 		 * Dispatch incoming response
 		 */
-		switch (dispatch_data.header.id) {
+		switch (dispatch_data->header.id) {
 		case MESSAGE_RES_LIB_ACTIVATEPOLL:
 			/*
 			 * This is a do nothing message which the node 
@@ -478,14 +569,50 @@ saEvtDispatch(
 			 *    DISPATCH_ONE case
 			 */
 			ignore_dispatch = 1;
-			printf("Dispatch: activate poll\n");
 			break;
 
-		case MESSAGE_RES_EVT_EVENT_DATA:
-			/* 
-			 * TODO: Do something here 
+		case MESSAGE_RES_EVT_AVAILABLE:
+			/*
+			 * There are events available.  Send a request for one and then
+			 * dispatch it.
 			 */
-			printf("Dispatch: Event Data\n");
+			evt = (struct lib_event_data *)&evti->ei_message;
+			res.evd_head.id = MESSAGE_REQ_EVT_EVENT_DATA;
+			res.evd_head.size = sizeof(res);
+			error = saSendRetry(evti->ei_fd, &res, sizeof(res), MSG_NOSIGNAL);
+			if (error != SA_OK) {
+				printf("MESSAGE_RES_EVT_AVAILABLE: send failed: %d\n", error);
+					break;
+			}
+			error = saRecvQueue(evti->ei_fd, evt, &evti->ei_inq, 
+											MESSAGE_RES_EVT_EVENT_DATA);
+			if (error != SA_OK) {
+				printf("MESSAGE_RES_EVT_AVAILABLE: receive failed: %d\n", 
+						error);
+				break;
+			}
+			/*
+			 * No data available.  This is OK.
+			 */
+			if (evt->led_head.error == SA_ERR_NOT_EXIST) {
+				// printf("MESSAGE_RES_EVT_AVAILABLE: No event data\n");
+				error = SA_OK;
+				break;
+			}
+
+			if (evt->led_head.error != SA_OK) {
+				error = evt->led_head.error;
+				printf("MESSAGE_RES_EVT_AVAILABLE: Error returned: %d\n", 
+						error);
+				break;
+			}
+
+			error = make_event(&event_handle, evt);
+			if (error != SA_OK) {
+					break;
+			}
+			callbacks.saEvtEventDeliverCallback(evt->led_sub_id, event_handle,
+							evt->led_user_data_size);
 			break;
 
 		case MESSAGE_RES_EVT_CHAN_OPEN_CALLBACK:
@@ -497,12 +624,21 @@ saEvtDispatch(
 
 		default:
 			printf("Dispatch: Bad message type 0x%x\n",
-					dispatch_data.header.id);
+					dispatch_data->header.id);
 			error = SA_ERR_LIBRARY;	
 			goto error_nounlock;
 			break;
 		}
 
+		/*
+		 * If empty is zero it means the we got the 
+		 * message from the queue and we are responsible
+		 * for freeing it.
+		 */
+		if (empty == 0) {
+			free(msg);
+		}
+
 		/*
 		 * Determine if more messages should be processed
 		 */
@@ -918,26 +1054,24 @@ saEvtEventAttributesSet(
 	edi->edi_retention_time = retention_time;
 
 	/*
-	 * TODO: publisher_name or pattern_array not allowed to be NULL
+	 * publisher_name or pattern_array not allowed to be NULL
 	 */
-	if (publisher_name) {
-		edi->edi_pub_name = *publisher_name;
-	}
-	if (!pattern_array) {
-		goto attr_set_unlock;
+	if (!publisher_name || !pattern_array) {
+			error = SA_ERR_INVALID_PARAM;
+			goto attr_set_unlock;
 	}
 
+	edi->edi_pub_name = *publisher_name;
+
 	oldpatterns = edi->edi_patterns.patterns;
 	oldnumber = edi->edi_patterns.patternsNumber;
 	edi->edi_patterns.patterns = 0;
-
 	edi->edi_patterns.patterns = malloc(sizeof(SaEvtEventPatternT) * 
 					pattern_array->patternsNumber);
 	if (!edi->edi_patterns.patterns) {
 		error = SA_ERR_NO_MEMORY;
 		goto attr_set_done_reset;
 	}
-
 	edi->edi_patterns.patternsNumber = pattern_array->patternsNumber;
 
 	/*
@@ -1149,8 +1283,7 @@ static uint32_t aispatt_to_evt_patt(const SaEvtEventPatternArrayT *patterns,
 		void *data)
 {
 	int i;
-	SaEvtEventPatternArrayT *pata = data;
-	SaEvtEventPatternT *pats = data + sizeof(SaEvtEventPatternArrayT);
+	SaEvtEventPatternT *pats = data;
 	SaUint8T *str  = (SaUint8T *)pats + 
 				(patterns->patternsNumber * sizeof(*pats));
 
@@ -1158,9 +1291,6 @@ static uint32_t aispatt_to_evt_patt(const SaEvtEventPatternArrayT *patterns,
 	 * Pointers are replaced with offsets into the data array.  These
 	 * will be later converted back into pointers when received as events.
 	 */
-	pata->patterns = (SaEvtEventPatternT *)((void *)pats - data);
-	pata->patternsNumber = patterns->patternsNumber;
-
 	for (i = 0; i < patterns->patternsNumber; i++) {
 		memcpy(str, patterns->patterns[i].pattern, 
 			 	patterns->patterns[i].patternSize);
@@ -1187,6 +1317,11 @@ static size_t filt_size(const SaEvtEventFilterArrayT *filters)
 	return size;
 }
 
+/*
+ * Convert the Sa filters to a form that can be sent over the network
+ * i.e. replace pointers with offsets.  The pointers will be reconstituted
+ * by the receiver.
+ */
 static uint32_t aisfilt_to_evt_filt(const SaEvtEventFilterArrayT *filters, 
 		void *data)
 {
@@ -1259,6 +1394,7 @@ saEvtEventPublish(
 	size_t pattern_size;
 	struct event_pattern *patterns;
 	void   *data_start;
+	int pub_sleep_trys = PUB_SLEEP_TRYS;
 
 	if (event_data_size > SA_EVT_DATA_MAX_LEN) {
 		error = SA_ERR_INVALID_PARAM;
@@ -1321,9 +1457,6 @@ saEvtEventPublish(
 	 */
 	aispatt_to_evt_patt(&edi->edi_patterns, patterns);
 
-	/*
-	 * TODO: Is this needed anymore?
-	 */
 	req->led_patterns_number = edi->edi_patterns.patternsNumber;
 
 	req->led_user_data_offset = pattern_size;
@@ -1342,26 +1475,50 @@ saEvtEventPublish(
 	req->led_priority = edi->edi_priority;
 	req->led_publisher_name = edi->edi_pub_name;
 
-	pthread_mutex_lock(&evti->ei_mutex);
-	error = saSendRetry(evti->ei_fd, req, req->led_head.size, MSG_NOSIGNAL);
-	free(req);
-	if (error != SA_OK) {
-		pthread_mutex_unlock (&evti->ei_mutex);
-		goto pub_put3;
-	}
-	error = saRecvQueue(evti->ei_fd, &res, &evti->ei_inq, 
+	while (--pub_sleep_trys) {
+		pthread_mutex_lock(&evti->ei_mutex);
+		error = saSendRetry(evti->ei_fd, req, req->led_head.size, MSG_NOSIGNAL);
+		if (error != SA_OK) {
+			pthread_mutex_unlock (&evti->ei_mutex);
+			goto pub_put3_free;
+		}
+		error = saRecvQueue(evti->ei_fd, &res, &evti->ei_inq, 
 					MESSAGE_RES_EVT_PUBLISH);
+	
+		pthread_mutex_unlock (&evti->ei_mutex);
 
-	pthread_mutex_unlock (&evti->ei_mutex);
-
-	if (error != SA_OK) {
-		goto pub_put3;
-	}
+		if (error != SA_OK) {
+			goto pub_put3_free;
+		}
 
+		error = res.iep_head.error;
+
+		if (error == SA_ERR_TRY_AGAIN) {
+			struct timespec ts;
+			struct timespec rem;
+			ts.tv_sec = PUB_SLEEP_SEC;
+			ts.tv_nsec = PUB_SLEEP_NSEC;
+pub_sleep:
+			if (nanosleep(&ts, &rem) < 0) {
+				if (errno == EINTR) {
+					ts = rem;
+					goto pub_sleep;
+				}
+				error = SA_ERR_TIMEOUT;
+				goto pub_put3_free;
+			}
+			continue;
+		}
 
-	error = res.iep_head.error;
-	*eventid = res.iep_event_id;
+		*eventid = res.iep_event_id;
+		break;
+	}
+	if (error == SA_ERR_TRY_AGAIN) {
+		error = SA_ERR_TIMEOUT;
+	}
 
+pub_put3_free:
+	free(req);
 pub_put3:
 	saHandleInstancePut (&evt_instance_handle_db, eci->eci_instance_handle);
 pub_put2:
@@ -1398,8 +1555,8 @@ saEvtEventSubscribe(
 	SaErrorT error;
 	struct event_instance *evti;
 	struct event_channel_instance *eci;
-	struct req_evt_channel_subscribe *req;
-	struct res_evt_channel_subscribe res;
+	struct req_evt_event_subscribe *req;
+	struct res_evt_event_subscribe res;
 	int	sz;
 
 	error = saHandleInstanceGet(&channel_handle_db, channel_handle,
@@ -1499,8 +1656,8 @@ saEvtEventUnsubscribe(
 	SaErrorT error;
 	struct event_instance *evti;
 	struct event_channel_instance *eci;
-	struct req_evt_channel_unsubscribe req;
-	struct res_evt_channel_unsubscribe res;
+	struct req_evt_event_unsubscribe req;
+	struct res_evt_event_unsubscribe res;
 
 	error = saHandleInstanceGet(&channel_handle_db, channel_handle,
 			(void**)&eci);