Procházet zdrojové kódy

Add handler for channel open async

(Logical change 1.133)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@478 fd59a12c-fef9-0310-b244-a6a79926bd2f
Mark Haverkamp před 21 roky
rodič
revize
cd76f75d0a
3 změnil soubory, kde provedl 247 přidání a 22 odebrání
  1. 109 9
      exec/evt.c
  2. 22 5
      include/ipc_evt.h
  3. 116 8
      lib/evt.c

+ 109 - 9
exec/evt.c

@@ -65,6 +65,8 @@
 static int message_handler_req_lib_activatepoll (struct conn_info *conn_info, 
 		void *message);
 static int lib_evt_open_channel(struct conn_info *conn_info, void *message);
+static int lib_evt_open_channel_async(struct conn_info *conn_info, 
+		void *message);
 static int lib_evt_close_channel(struct conn_info *conn_info, void *message);
 static int lib_evt_event_subscribe(struct conn_info *conn_info, 
 		void *message);
@@ -101,6 +103,11 @@ static struct libais_handler evt_libais_handlers[] = {
 	.response_id = 			MESSAGE_RES_EVT_OPEN_CHANNEL,
 	},
 	{
+	.libais_handler_fn = 	lib_evt_open_channel_async,
+	.response_size = 		sizeof(struct res_evt_channel_open),
+	.response_id = 			MESSAGE_RES_EVT_OPEN_CHANNEL,
+	},
+	{
 	.libais_handler_fn = 	lib_evt_close_channel,
 	.response_size = 		sizeof(struct res_evt_channel_close),
 	.response_id = 			MESSAGE_RES_EVT_CLOSE_CHANNEL,
@@ -123,7 +130,7 @@ static struct libais_handler evt_libais_handlers[] = {
 	{
 	.libais_handler_fn = 	lib_evt_event_clear_retentiontime,
 	.response_size = 		sizeof(struct res_evt_event_clear_retentiontime),
-	.response_id = 			MESSAGE_REQ_EVT_CLEAR_RETENTIONTIME,
+	.response_id = 			MESSAGE_RES_EVT_CLEAR_RETENTIONTIME,
 	},
 	{
 	.libais_handler_fn = 	lib_evt_event_data_get,
@@ -1997,10 +2004,14 @@ static int lib_evt_open_channel(struct conn_info *conn_info, void *message)
 	ocp->ocp_chan_name = req->ico_channel_name;
 	ocp->ocp_open_flag = req->ico_open_flag;
 	ocp->ocp_conn_info = conn_info;
+	ocp->ocp_c_handle = req->ico_c_handle;
 	ocp->ocp_timer_handle = 0;
 	list_init(&ocp->ocp_entry);
 	list_add_tail(&ocp->ocp_entry, &open_pending);
 	if (req->ico_timeout != 0) {
+		/*
+		 * Time in nanoseconds - convert to miliseconds
+		 */
 		msec_in_future = (uint32_t)(req->ico_timeout / 1000000ULL);
 		ret = poll_timer_add(aisexec_poll_handle,
 				msec_in_future,
@@ -2025,6 +2036,83 @@ open_return:
 	return 0;
 }
 
+/*
+ * Handler for saEvtChannelOpen
+ */
+static int lib_evt_open_channel_async(struct conn_info *conn_info, 
+		void *message)
+{
+	SaErrorT error;
+	struct req_evt_channel_open *req;
+	struct res_evt_channel_open res;
+	struct open_chan_pending *ocp;
+	int msec_in_future;
+	int ret;
+
+	req = message;
+
+
+	log_printf(CHAN_OPEN_DEBUG, 
+			"saEvtChannelOpenAsync (Async Open channel request)\n");
+	log_printf(CHAN_OPEN_DEBUG, 
+			"handle 0x%x, to 0x%x\n",
+			req->ico_c_handle,
+			req->ico_invocation);
+	log_printf(CHAN_OPEN_DEBUG, "flags %x, channel name(%d)  %s\n",
+			req->ico_open_flag,
+			req->ico_channel_name.length,
+			req->ico_channel_name.value);
+	/*
+	 * Open the channel.
+	 *
+	 */
+	error = evt_open_channel(&req->ico_channel_name, req->ico_open_flag);
+
+	if (error != SA_AIS_OK) {
+		goto open_return;
+	}
+
+	ocp = malloc(sizeof(struct open_chan_pending));
+	if (!ocp) {
+		error = SA_AIS_ERR_NO_MEMORY;
+		goto open_return;
+	}
+
+	ocp->ocp_async = 1;
+	ocp->ocp_invocation = req->ico_invocation;
+	ocp->ocp_c_handle = req->ico_c_handle;
+	ocp->ocp_chan_name = req->ico_channel_name;
+	ocp->ocp_open_flag = req->ico_open_flag;
+	ocp->ocp_conn_info = conn_info;
+	ocp->ocp_timer_handle = 0;
+	list_init(&ocp->ocp_entry);
+	list_add_tail(&ocp->ocp_entry, &open_pending);
+	if (req->ico_timeout != 0) {
+		/*
+		 * Time in nanoseconds - convert to miliseconds
+		 */
+		msec_in_future = (uint32_t)(req->ico_timeout / 1000000ULL);
+		ret = poll_timer_add(aisexec_poll_handle,
+				msec_in_future,
+				ocp,
+				chan_open_timeout,
+				&ocp->ocp_timer_handle);
+		if (ret != 0) {
+			log_printf(LOG_LEVEL_WARNING, 
+					"Error setting timeout for open channel %s\n",
+					req->ico_channel_name.value);
+		}
+	}
+
+open_return:
+	res.ico_head.size = sizeof(res);
+	res.ico_head.id = MESSAGE_RES_EVT_OPEN_CHANNEL;
+	res.ico_head.error = error;
+	libais_send_response (conn_info, &res, sizeof(res));
+
+	return 0;
+}
+
 
 
 /*
@@ -2420,7 +2508,7 @@ static int lib_evt_event_clear_retentiontime(struct conn_info *conn_info,
 	}
 
 	res.iec_head.size = sizeof(res);
-	res.iec_head.id = MESSAGE_REQ_EVT_CLEAR_RETENTIONTIME;
+	res.iec_head.id = MESSAGE_RES_EVT_CLEAR_RETENTIONTIME;
 	res.iec_head.error = error;
 	libais_send_response (conn_info, &res, sizeof(res));
 
@@ -2981,7 +3069,6 @@ static void evt_chan_open_finish(struct open_chan_pending *ocp,
 		struct event_svr_channel_instance *eci)
 {
 	uint32_t handle;
-	struct res_evt_channel_open res;
 	struct event_svr_channel_open *eco;
 	SaErrorT error;
 	struct libevt_ci *esip = &ocp->ocp_conn_info->ais_ci.u.libevt_ci;
@@ -2990,7 +3077,7 @@ static void evt_chan_open_finish(struct open_chan_pending *ocp,
 
 	log_printf(CHAN_OPEN_DEBUG, "Open channel finish %s\n", 
 											getSaNameT(&ocp->ocp_chan_name));
-	if (!ocp->ocp_async && ocp->ocp_timer_handle) {
+	if (ocp->ocp_timer_handle) {
 		ret = poll_timer_delete(aisexec_poll_handle, ocp->ocp_timer_handle);
 		if (ret != 0 ) {
 			log_printf(LOG_LEVEL_WARNING, 
@@ -3032,15 +3119,28 @@ static void evt_chan_open_finish(struct open_chan_pending *ocp,
 	 * open instance for later subscriptions, etc.
 	 */
 	saHandleInstancePut(&esip->esi_hdb, handle);
+
 open_return:
 	log_printf(CHAN_OPEN_DEBUG, "Open channel finish %s send response %d\n", 
 											getSaNameT(&ocp->ocp_chan_name),
 											error);
-	res.ico_head.size = sizeof(res);
-	res.ico_head.id = MESSAGE_RES_EVT_OPEN_CHANNEL;
-	res.ico_head.error = error;
-	res.ico_channel_handle = handle;
-	libais_send_response (ocp->ocp_conn_info, &res, sizeof(res));
+	if (ocp->ocp_async) {
+		struct res_evt_open_chan_async resa;
+		resa.ica_head.size = sizeof(resa);
+		resa.ica_head.id = MESSAGE_RES_EVT_CHAN_OPEN_CALLBACK;
+		resa.ica_head.error = error;
+		resa.ica_channel_handle = handle;
+		resa.ica_c_handle = ocp->ocp_c_handle;
+		resa.ica_invocation = ocp->ocp_invocation;
+		libais_send_response (ocp->ocp_conn_info, &resa, sizeof(resa));
+	} else {
+		struct res_evt_channel_open res;
+		res.ico_head.size = sizeof(res);
+		res.ico_head.id = MESSAGE_RES_EVT_OPEN_CHANNEL;
+		res.ico_head.error = error;
+		res.ico_channel_handle = handle;
+		libais_send_response (ocp->ocp_conn_info, &res, sizeof(res));
+	}
 
 	if (ret == 0) {
 		list_del(&ocp->ocp_entry);

+ 22 - 5
include/ipc_evt.h

@@ -42,6 +42,7 @@
 
 enum req_evt_types {
 	MESSAGE_REQ_EVT_OPEN_CHANNEL = 1,
+	MESSAGE_REQ_EVT_OPEN_CHANNEL_ASYNC,
 	MESSAGE_REQ_EVT_CLOSE_CHANNEL,
 	MESSAGE_REQ_EVT_SUBSCRIBE,
 	MESSAGE_REQ_EVT_UNSUBSCRIBE,
@@ -64,6 +65,7 @@ enum res_evt_types {
 
 /* 
  * MESSAGE_REQ_EVT_OPEN_CHANNEL
+ * MESSAGE_REQ_EVT_OPEN_CHANNEL_ASYNC
  *
  * ico_head				Request head
  * ico_open_flag:		Channel open flags
@@ -78,13 +80,18 @@ struct req_evt_channel_open {
 	struct req_header		ico_head;
 	SaUint8T				ico_open_flag;
 	SaNameT					ico_channel_name;
-	SaEvtChannelHandleT		ico_c_handle;	/* client chan handle */
-	SaTimeT					ico_timeout;    /* open only */
-	SaInvocationT			ico_invocation; /* open async only */
+	SaEvtChannelHandleT		ico_c_handle;
+	SaTimeT					ico_timeout;
+	SaInvocationT			ico_invocation;
 };
 
 /*
  * MESSAGE_RES_EVT_OPEN_CHANNEL
+ *
+ * Used by both the blocing and non-blocking
+ * versions.  Only the error code in the header is used by the async
+ * open.  The channel handle will be returnd via the channel open
+ * callback.
  * 
  *
  * ico_head:			Results head
@@ -102,10 +109,20 @@ struct res_evt_channel_open {
 /*
  * MESSAGE_RES_EVT_CHAN_OPEN_CALLBACK
  *
- * TODO: Define this
+ * ica_head:			Results head.
+ * ica_c_handle:		Lib size channel handle.  So we can look
+ * 						up the new open channel info from the callback.
+ * ica_channel_handle:	Server side handle.
+ * ica_invocation:		Supplied by the user in the open call.  Passed to
+ * 						the callback so the user can assocate the callback
+ * 						to the particular open.
  */
 struct res_evt_open_chan_async {
-	struct res_header	ico_head;
+	struct res_header	ica_head;
+	SaEvtChannelHandleT	ica_c_handle;
+	uint32_t			ica_channel_handle;
+	SaInvocationT		ica_invocation;
+
 };
 
 

+ 116 - 8
lib/evt.c

@@ -1,6 +1,6 @@
 /*
- * Copyright (c) 2004 Mark Haverkamp
- * Copyright (c) 2004 Open Source Development Lab
+ * Copyright (c) 2004-2005 Mark Haverkamp
+ * Copyright (c) 2004-2005 Open Source Development Lab
  *
  * All rights reserved.
  *
@@ -617,10 +617,32 @@ saEvtDispatch(
 			break;
 
 		case MESSAGE_RES_EVT_CHAN_OPEN_CALLBACK:
-			/* 
-			 * TODO: do something here
+		{
+			struct res_evt_open_chan_async *resa = 
+				(struct res_evt_open_chan_async *)dispatch_data;
+			struct event_channel_instance *eci;
+
+			/*
+			 * Check for errors.  If there are none, then
+			 * look up the local channel via the handle that we
+			 * got from the callback request.  All we need to do 
+			 * is place in the handle from the server side and then 
+			 * we can call the callback.
 			 */
-			printf("Dispatch: Open callback\n");
+			error = resa->ica_head.error;
+			if (error == SA_AIS_OK) {
+				error = saHandleInstanceGet(&channel_handle_db, 
+						resa->ica_c_handle, (void*)&eci);
+				if (error == SA_AIS_OK) {
+					eci->eci_svr_channel_handle = resa->ica_channel_handle;
+					saHandleInstancePut (&channel_handle_db, 
+							resa->ica_c_handle);
+				}
+			}
+			callbacks.saEvtChannelOpenCallback(resa->ica_invocation,
+					resa->ica_c_handle, error);
+
+		}
 			break;
 
 		default:
@@ -903,16 +925,102 @@ chan_close_done:
 	return error;
 }
 
+/*
+ * The saEvtChannelOpenAsync() function creates a new event channel or open an 
+ * existing channel. The saEvtChannelOpenAsync() function is a non-blocking 
+ * operation. A new event channel handle is returned in the channel open
+ * callback function (SaEvtChannelOpenCallbackT).
+ */
 SaAisErrorT
 saEvtChannelOpenAsync(SaEvtHandleT evt_handle,
                        SaInvocationT invocation,
                        const SaNameT *channel_name,
                        SaEvtChannelOpenFlagsT channel_open_flags)
 {
-	/* 
-	 * TODO: Fill in code
+	struct event_instance *evti;
+	struct req_evt_channel_open req;
+	struct res_evt_channel_open res;
+	struct event_channel_instance *eci;
+	SaEvtChannelHandleT channel_handle;
+	SaAisErrorT error;
+
+	error = saHandleInstanceGet(&evt_instance_handle_db, evt_handle,
+			(void*)&evti);
+	
+	if (error != SA_AIS_OK) {
+		goto chan_open_done;
+	}
+
+	/*
+	 * create a handle for this open channel
 	 */
-	return SA_AIS_ERR_LIBRARY;
+	error = saHandleCreate(&channel_handle_db, sizeof(*eci), 
+			&channel_handle);
+	if (error != SA_AIS_OK) {
+		goto chan_open_put;
+	}
+
+	error = saHandleInstanceGet(&channel_handle_db, channel_handle,
+					(void*)&eci);
+	if (error != SA_AIS_OK) {
+		saHandleDestroy(&channel_handle_db, channel_handle);
+		goto chan_open_put;
+	}
+
+
+	/*
+	 * Send the request to the server.  The response isn't the open channel,
+	 * just an ack.  The open channel will be returned when the channel open
+	 * callback is called.
+	 */
+	req.ico_head.size = sizeof(req);
+	req.ico_head.id = MESSAGE_REQ_EVT_OPEN_CHANNEL_ASYNC;
+	req.ico_c_handle = channel_handle;
+	req.ico_timeout = 0;
+	req.ico_invocation = invocation;
+	req.ico_open_flag = channel_open_flags;
+	req.ico_channel_name = *channel_name;
+
+
+	pthread_mutex_lock(&evti->ei_mutex);
+
+	error = saSendRetry(evti->ei_fd, &req, sizeof(req), MSG_NOSIGNAL);
+	if (error != SA_AIS_OK) {
+		pthread_mutex_unlock (&evti->ei_mutex);
+		goto chan_open_free;
+	}
+	error = saRecvQueue(evti->ei_fd, &res, &evti->ei_inq, 
+					MESSAGE_RES_EVT_OPEN_CHANNEL);
+
+	pthread_mutex_unlock (&evti->ei_mutex);
+
+	if (error != SA_AIS_OK) {
+		goto chan_open_free;
+	}
+
+	error = res.ico_head.error;
+	if (error != SA_AIS_OK) {
+		goto chan_open_free;
+	}
+
+	eci->eci_svr_channel_handle = 0; /* filled in by callback */
+	eci->eci_channel_name = *channel_name;
+	eci->eci_open_flags = channel_open_flags;
+	eci->eci_instance_handle = evt_handle;
+	eci->eci_closing = 0;
+	pthread_mutex_init(&eci->eci_mutex, NULL);
+	saHandleInstancePut (&evt_instance_handle_db, evt_handle);
+	saHandleInstancePut (&channel_handle_db, channel_handle);
+
+	return SA_AIS_OK;
+
+chan_open_free:
+	saHandleDestroy(&channel_handle_db, channel_handle);
+	saHandleInstancePut (&channel_handle_db, channel_handle);
+chan_open_put:
+	saHandleInstancePut (&evt_instance_handle_db, evt_handle);
+chan_open_done:
+	return error;
 }
 
 SaAisErrorT