Sfoglia il codice sorgente

defect 188 - use two fds instead of one fd for I/Os to executive

(Logical change 1.175)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@594 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake 21 anni fa
parent
commit
7d1b8e0d11
20 ha cambiato i file con 450 aggiunte e 400 eliminazioni
  1. 4 4
      exec/Makefile
  2. 14 2
      exec/aispoll.c
  3. 43 88
      exec/amf.c
  4. 35 72
      exec/ckpt.c
  5. 20 54
      exec/clm.c
  6. 32 47
      exec/evt.c
  7. 1 0
      exec/handlers.h
  8. 156 33
      exec/main.c
  9. 7 12
      exec/main.h
  10. 1 0
      exec/print.c
  11. 50 14
      include/ais_msg.h
  12. 1 2
      include/ais_types.h
  13. 34 34
      include/ipc_ckpt.h
  14. 10 10
      include/ipc_clm.h
  15. 19 19
      include/ipc_evt.h
  16. 10 8
      include/ipc_gen.h
  17. 1 0
      include/queue.h
  18. 4 0
      test/Makefile
  19. 4 1
      test/testamf4.c
  20. 4 0
      test/testamfth.c

+ 4 - 4
exec/Makefile

@@ -29,13 +29,13 @@
 # THE POSSIBILITY OF SUCH DAMAGE.
 
 # Production mode flags
-CFLAGS = -O3 -Wall -fomit-frame-pointer
-LDFLAGS = 
+#CFLAGS = -O3 -Wall -fomit-frame-pointer
+#LDFLAGS = 
 
 # Debug mode flags
-#CFLAGS = -g -Wall
+CFLAGS = -g -Wall
 ##-DDEBUG
-#LDFLAGS = -g
+LDFLAGS = -g
 
 # Profile mode flags
 #CFLAGS = -O2 -pg

+ 14 - 2
exec/aispoll.c

@@ -265,11 +265,23 @@ int poll_dispatch_delete (
 
 	if (found) {
 		poll_instance->poll_entries[i].ufd.fd = -1;
-		saHandleInstancePut (&poll_instance_database, handle);
-		return (0);
+		poll_instance->poll_entries[i].ufd.revents = 0;
+	}
+
+	for (i = 0; i < poll_instance->poll_entry_count; i++) {
+		if (poll_instance->ufds[i].fd == fd) {
+			found = 1;
+			break;
+		}
+	}
+
+	if (found) {
+		poll_instance->ufds[i].fd = -1;
+		poll_instance->ufds[i].revents = 0;
 	}
 
 	saHandleInstancePut (&poll_instance_database, handle);
+	return (0);
 
 error_exit:
 	errno = EBADF;

+ 43 - 88
exec/amf.c

@@ -250,6 +250,8 @@ static int amf_exit_fn (struct conn_info *conn_info);
 
 static int amf_exec_init_fn (void);
 
+static int amf_init_two_fn (struct conn_info *conn_info);
+
 static void amf_synchronize (void *message, struct in_addr source_addr);
 
 static int message_handler_req_exec_amf_componentregister (void *message, struct in_addr source_addr, int endian_conversion_required);
@@ -264,10 +266,6 @@ static int message_handler_req_exec_amf_readinessstateset (void *message, struct
 
 static int message_handler_req_exec_amf_hastateset (void *message, struct in_addr source_addr, int endian_conversion_required);
 
-static int message_handler_req_amf_init (struct conn_info *conn_info, void *message);
-
-static int message_handler_req_lib_activatepoll (struct conn_info *conn_info, void *message);
-
 static int message_handler_req_amf_componentregister (struct conn_info *conn_info, void *message);
 
 static int message_handler_req_amf_componentunregister (struct conn_info *conn_info, void *message);
@@ -290,92 +288,69 @@ static int message_handler_req_amf_response (struct conn_info *conn_info, void *
 
 static int message_handler_req_amf_componentcapabilitymodelget (struct conn_info *conn_info, void *message);
 
-/*
-int (*amf_libais_handler_fns[]) (struct conn_info *conn_info, void *) = {
-	message_handler_req_lib_activatepoll,
-	message_handler_req_amf_componentregister,
-	message_handler_req_amf_componentunregister,
-	message_handler_req_amf_readinessstateget,
-	message_handler_req_amf_hastateget,
-	message_handler_req_amf_protectiongrouptrackstart,
-	message_handler_req_amf_protectiongrouptrackstop,
-	message_handler_req_amf_errorreport,
-	message_handler_req_amf_errorcancelall,
-	message_handler_req_amf_stoppingcomplete,
-	message_handler_req_amf_response,
-	message_handler_req_amf_componentcapabilitymodelget
-};
-*/
-
 struct libais_handler amf_libais_handlers[] =
 {
 	{ /* 0 */
-		.libais_handler_fn	= message_handler_req_lib_activatepoll,
-		.response_size		= sizeof (struct res_lib_activatepoll),
-		.response_id		= MESSAGE_RES_LIB_ACTIVATEPOLL, // TODO RESPONSE
-		.flow_control		= FLOW_CONTROL_REQUIRED
-	},
-	{ /* 1 */
 		.libais_handler_fn	= message_handler_req_amf_componentregister,
 		.response_size		= sizeof (struct res_lib_amf_componentregister),
 		.response_id		= MESSAGE_RES_AMF_COMPONENTREGISTER,
 		.flow_control		= FLOW_CONTROL_REQUIRED
 	},
-	{ /* 2 */
+	{ /* 1 */
 		.libais_handler_fn	= message_handler_req_amf_componentunregister,
 		.response_size		= sizeof (struct res_lib_amf_componentunregister),
 		.response_id		= MESSAGE_RES_AMF_COMPONENTUNREGISTER,
 		.flow_control		= FLOW_CONTROL_REQUIRED
 	},
-	{ /* 3 */
+	{ /* 2 */
 		.libais_handler_fn	= message_handler_req_amf_readinessstateget,
 		.response_size		= sizeof (struct res_lib_amf_readinessstateget),
 		.response_id		= MESSAGE_RES_AMF_READINESSSTATEGET,
 		.flow_control		= FLOW_CONTROL_NOT_REQUIRED
 	},
-	{ /* 4 */
+	{ /* 3 */
 		.libais_handler_fn	= message_handler_req_amf_hastateget,
 		.response_size		= sizeof (struct res_lib_amf_hastateget),
 		.response_id		= MESSAGE_RES_AMF_READINESSSTATEGET,
 		.flow_control		= FLOW_CONTROL_NOT_REQUIRED
 	},
-	{ /* 5 */
+	{ /* 4 */
 		.libais_handler_fn	= message_handler_req_amf_protectiongrouptrackstart,
 		.response_size		= sizeof (struct res_lib_amf_protectiongrouptrackstart),
 		.response_id		= MESSAGE_RES_AMF_PROTECTIONGROUPTRACKSTART,
 		.flow_control		= FLOW_CONTROL_NOT_REQUIRED
 	},
-	{ /* 6 */
+	{ /* 5 */
 		.libais_handler_fn	= message_handler_req_amf_protectiongrouptrackstop,
 		.response_size		= sizeof (struct res_lib_amf_protectiongrouptrackstop),
 		.response_id		= MESSAGE_RES_AMF_PROTECTIONGROUPTRACKSTOP,
 		.flow_control		= FLOW_CONTROL_NOT_REQUIRED
 	},
-	{ /* 7 */
+	{ /* 6 */
 		.libais_handler_fn	= message_handler_req_amf_errorreport,
 		.response_size		= sizeof (struct res_lib_amf_errorreport),
 		.response_id		= MESSAGE_RES_AMF_ERRORREPORT,
 		.flow_control		= FLOW_CONTROL_REQUIRED
 	},
-	{ /* 8 */
+	{ /* 7 */
 		.libais_handler_fn	= message_handler_req_amf_errorcancelall,
 		.response_size		= sizeof (struct res_lib_amf_errorcancelall),
 		.response_id		= MESSAGE_RES_AMF_ERRORCANCELALL,
 		.flow_control		= FLOW_CONTROL_REQUIRED
 	},
-	{ /* 9 */
+	{ /* 8 */
 		.libais_handler_fn	= message_handler_req_amf_stoppingcomplete,
 		.response_size		= sizeof (struct res_lib_amf_stoppingcomplete),
 		.response_id		= MESSAGE_RES_AMF_STOPPINGCOMPLETE, // TODO 
 		.flow_control		= FLOW_CONTROL_NOT_REQUIRED
 	},
-	{ /* 10 */
+	{ /* 9 */
 		.libais_handler_fn	= message_handler_req_amf_response,
 		.response_size		= sizeof (struct res_lib_amf_response),
 		.response_id		= MESSAGE_RES_AMF_RESPONSE, // TODO
 		.flow_control		= FLOW_CONTROL_NOT_REQUIRED
 	},
-	{ /* 11 */
+	{ /* 10 */
 		.libais_handler_fn	= message_handler_req_amf_componentcapabilitymodelget,
 		.response_size		= sizeof (struct res_lib_amf_componentcapabilitymodelget),
 		.response_id		= MESSAGE_RES_AMF_COMPONENTCAPABILITYMODELGET,
@@ -401,7 +376,7 @@ struct service_handler amf_service_handler = {
 	.aisexec_handler_fns		= amf_aisexec_handler_fns,
 	.aisexec_handler_fns_count	= sizeof (amf_aisexec_handler_fns) / sizeof (int (*)),
 	.confchg_fn					= amf_confchg_fn,
-	.libais_init_fn				= message_handler_req_amf_init,
+	.libais_init_two_fn			= amf_init_two_fn,
 	.libais_exit_fn				= amf_exit_fn,
 	.exec_init_fn				= amf_exec_init_fn,
 	.exec_dump_fn				= amf_dump
@@ -657,7 +632,7 @@ void CSIRemove (struct conn_info *conn_info)
 	struct res_lib_amf_csiremovecallback res_lib_amf_csiremovecallback;
 		
 	if (conn_info->active == 0 ||
-		conn_info->service != SOCKET_SERVICE_AMF) {
+		conn_info->service != AMF_SERVICE) {
 
 		return;
 	}
@@ -701,7 +676,7 @@ void ha_state_api_set (struct saAmfComponent *component, SaAmfHAStateT haState)
 	 * this should be an assertion
 	 */
 	if (component->conn_info->state != CONN_STATE_ACTIVE ||
-		component->conn_info->service != SOCKET_SERVICE_AMF) {
+		component->conn_info->service != AMF_SERVICE) {
 		return;
 	}
 
@@ -729,7 +704,8 @@ void ha_state_api_set (struct saAmfComponent *component, SaAmfHAStateT haState)
 
 	component->newHAState = haState;
 
-	libais_send_response (component->conn_info, &res_lib_amf_csisetcallback,
+	libais_send_response (component->conn_info->conn_info_partner,
+		&res_lib_amf_csisetcallback,
 		sizeof (struct res_lib_amf_csisetcallback));
 }
 
@@ -774,7 +750,7 @@ void readiness_state_api_set (struct saAmfComponent *component,
 	 * this should be an assertion
 	 */
 	if (component->conn_info->state != CONN_STATE_ACTIVE ||
-		component->conn_info->service != SOCKET_SERVICE_AMF) {
+		component->conn_info->service != AMF_SERVICE) {
 
 		return;
 	}
@@ -796,7 +772,8 @@ void readiness_state_api_set (struct saAmfComponent *component,
 
 	log_printf (LOG_LEVEL_DEBUG, "Setting conn_info %p to readiness state %d\n", component->conn_info, readinessState);
 
-	libais_send_response (component->conn_info, &res_lib_amf_readinessstatesetcallback,
+	libais_send_response (component->conn_info->conn_info_partner,
+		&res_lib_amf_readinessstatesetcallback,
 		sizeof (struct res_lib_amf_readinessstatesetcallback));
 }
 
@@ -1163,7 +1140,7 @@ static void dsmEnabledUnlockedActiveRequested (
 	struct saAmfComponent *component)
 {
 	if (component->local == 1) {
-		log_printf (LOG_LEVEL_DEBUG, "Adding healthcheck timer\n");
+		log_printf (LOG_LEVEL_DEBUG, "Adding healthcheck timer1\n");
 		poll_timer_add (aisexec_poll_handle,
 			component->healthcheckInterval,
 			(void *)component->conn_info,
@@ -1179,7 +1156,7 @@ static void dsmEnabledUnlockedStandbyRequested (
 {
 	if (component->local == 1) {
 
-		log_printf (LOG_LEVEL_DEBUG, "Adding healthcheck timer\n");
+		log_printf (LOG_LEVEL_DEBUG, "Adding healthcheck timer2\n");
 
 		poll_timer_add (aisexec_poll_handle,
 			component->healthcheckInterval,
@@ -1490,7 +1467,7 @@ void timer_function_libamf_healthcheck (void *data) {
 
 	log_printf (LOG_LEVEL_DEBUG, "Sending instance %d\n", healthcheck_instance);
 	res_lib_amf_healthcheckcallback.instance = healthcheck_instance++;
-		libais_send_response (conn_info,
+		libais_send_response (conn_info->conn_info_partner,
 			&res_lib_amf_healthcheckcallback,
 			sizeof (struct res_lib_amf_healthcheckcallback));
 
@@ -1954,21 +1931,22 @@ int amf_exit_fn (struct conn_info *conn_info)
 	/*
 	 * Unregister all components registered to this file descriptor
 	 */
-	if (conn_info->service == SOCKET_SERVICE_AMF) {
+	if (conn_info->service == AMF_SERVICE) {
 
-		component_unregister (conn_info->component);
+		component_unregister (conn_info->conn_info_partner->component);
 
-		if (conn_info->component && conn_info->component->timer_healthcheck) {
+		if (conn_info->conn_info_partner->component &&
+			conn_info->conn_info_partner->component->timer_healthcheck) {
 			poll_timer_delete (aisexec_poll_handle,
-				conn_info->component->timer_healthcheck);
+				conn_info->conn_info_partner->component->timer_healthcheck);
 
-			conn_info->component->timer_healthcheck = 0;
+			conn_info->conn_info_partner->component->timer_healthcheck = 0;
 		}
 
-		if (conn_info->ais_ci.u.libamf_ci.tracks) {
-			mempool_free (conn_info->ais_ci.u.libamf_ci.tracks);
-			conn_info->ais_ci.u.libamf_ci.tracks = 0;
-			list_del (&conn_info->conn_list);
+		if (conn_info->conn_info_partner->ais_ci.u.libamf_ci.tracks) {
+			mempool_free (conn_info->conn_info_partner->ais_ci.u.libamf_ci.tracks);
+			conn_info->conn_info_partner->ais_ci.u.libamf_ci.tracks = 0;
+			list_del (&conn_info->conn_info_partner->conn_list);
 		}
 	}
 
@@ -2357,46 +2335,16 @@ static int message_handler_req_exec_amf_hastateset (void *message, struct in_add
 	return (0);
 }
 
-static int message_handler_req_amf_init (struct conn_info *conn_info, void *message)
+static int amf_init_two_fn (struct conn_info *conn_info)
 {
-	struct res_lib_init res_lib_init;
-	SaErrorT error = SA_ERR_SECURITY;
-
-	log_printf (LOG_LEVEL_DEBUG, "Got AMF request to initalize availability management framework service.\n");
-
-	if (conn_info->authenticated) {
-		conn_info->service = SOCKET_SERVICE_AMF;
-		error = SA_OK;
-	}
-
-	res_lib_init.header.size = sizeof (struct res_lib_init);
-	res_lib_init.header.id = MESSAGE_RES_INIT;
-	res_lib_init.header.error = error;
-
-	libais_send_response (conn_info, &res_lib_init, sizeof (res_lib_init));
+	log_printf (LOG_LEVEL_DEBUG, "Got request to initalize availability management framework service.\n"); 
 
 	list_init (&conn_info->conn_list);
 
-	if (conn_info->authenticated) {
-		return (0);
-	}
-	return (-1);
-}
-
-static int message_handler_req_lib_activatepoll (struct conn_info *conn_info, void *message)
-{
-	struct res_lib_activatepoll res_lib_activatepoll;
-
-	log_printf (LOG_LEVEL_FROM_LIB, "Handle : message_handler_req_lib_activatepoll()\n");
-
-	memset (&res_lib_activatepoll,0,sizeof(res_lib_activatepoll));
-	res_lib_activatepoll.header.size = sizeof (struct res_lib_activatepoll);
-	res_lib_activatepoll.header.id = MESSAGE_RES_LIB_ACTIVATEPOLL;
-	libais_send_response (conn_info, &res_lib_activatepoll, sizeof (struct res_lib_activatepoll));
-
 	return (0);
 }
 
+
 static int message_handler_req_amf_componentregister (struct conn_info *conn_info, void *message)
 {
 	struct req_amf_componentregister *req_lib_amf_componentregister = (struct req_amf_componentregister *)message;
@@ -2696,6 +2644,7 @@ void response_handler_healthcheckcallback (struct conn_info *conn_info,
 static int message_handler_req_amf_response (struct conn_info *conn_info_nouse, void *message)
 {
 	struct req_amf_response *req_amf_response = (struct req_amf_response *)message;
+	struct res_lib_amf_response res_lib_amf_response;
 	struct conn_info *conn_info;
 	int interface;
 	int res;
@@ -2728,6 +2677,12 @@ static int message_handler_req_amf_response (struct conn_info *conn_info_nouse,
 		break;
 	}
 
+	res_lib_amf_response.header.id = MESSAGE_RES_AMF_RESPONSE;
+	res_lib_amf_response.header.size = sizeof (struct res_lib_amf_response);
+	res_lib_amf_response.header.error = SA_OK;
+	libais_send_response (conn_info_nouse, &res_lib_amf_response,
+		sizeof (struct res_lib_amf_response));
+
 	return (0);
 }
 

+ 35 - 72
exec/ckpt.c

@@ -82,7 +82,7 @@ static int ckpt_exec_init_fn (void);
 
 static int ckpt_exit_fn (struct conn_info *conn_info);
 
-static int message_handler_req_lib_activatepoll (struct conn_info *, void *message);
+static int ckpt_init_two_fn (struct conn_info *conn_info);
 
 static int message_handler_req_exec_ckpt_checkpointopen (void *message, struct in_addr source_addr, int endian_conversion_required);
 
@@ -110,8 +110,6 @@ static int message_handler_req_exec_ckpt_sectionoverwrite (void *message, struct
 
 static int message_handler_req_exec_ckpt_sectionread (void *message, struct in_addr source_addr, int endian_conversion_required);
 
-static int message_handler_req_lib_ckpt_init (struct conn_info *conn_info, void *message);
-
 static int message_handler_req_lib_ckpt_checkpointopen (struct conn_info *conn_info, void *message);
 
 static int message_handler_req_lib_ckpt_checkpointopenasync (struct conn_info *conn_info, void *message);
@@ -185,108 +183,102 @@ static int ckpt_confchg_fn(
 struct libais_handler ckpt_libais_handlers[] =
 {
 	{ /* 0 */
-		.libais_handler_fn	= message_handler_req_lib_activatepoll,
-		.response_size		= sizeof (struct res_lib_activatepoll),
-		.response_id		= MESSAGE_RES_LIB_ACTIVATEPOLL,
-		.flow_control		= FLOW_CONTROL_REQUIRED
-	},
-	{ /* 1 */
 		.libais_handler_fn	= message_handler_req_lib_ckpt_checkpointopen,
 		.response_size		= sizeof (struct res_lib_ckpt_checkpointopen),
 		.response_id		= MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPEN,
 		.flow_control		= FLOW_CONTROL_REQUIRED
 	},
-	{ /* 2 */
+	{ /* 1 */
 		.libais_handler_fn	= message_handler_req_lib_ckpt_checkpointopenasync,
 		.response_size		= sizeof (struct res_lib_ckpt_checkpointopenasync),
 		.response_id		= MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPENASYNC,
 		.flow_control		= FLOW_CONTROL_REQUIRED
 	},
-	{ /* 3 */
+	{ /* 2 */
 		.libais_handler_fn	= message_handler_req_lib_ckpt_checkpointclose,
 		.response_size		= sizeof (struct res_lib_ckpt_checkpointclose),
 		.response_id		= MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTCLOSE,
 		.flow_control		= FLOW_CONTROL_REQUIRED
 	},
-	{ /* 4 */
+	{ /* 3 */
 		.libais_handler_fn	= message_handler_req_lib_ckpt_checkpointunlink,
 		.response_size		= sizeof (struct res_lib_ckpt_checkpointunlink),
 		.response_id		= MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTUNLINK,
 		.flow_control		= FLOW_CONTROL_REQUIRED
 	},
-	{ /* 5 */
+	{ /* 4 */
 		.libais_handler_fn	= message_handler_req_lib_ckpt_checkpointretentiondurationset,
 		.response_size		= sizeof (struct res_lib_ckpt_checkpointretentiondurationset),
 		.response_id		= MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTRETENTIONDURATIONSET,
 		.flow_control		= FLOW_CONTROL_REQUIRED
 	},
-	{ /* 6 */
+	{ /* 5 */
 		.libais_handler_fn	= message_handler_req_lib_ckpt_activereplicaset,
 		.response_size		= sizeof (struct res_lib_ckpt_activereplicaset),
 		.response_id		= MESSAGE_RES_CKPT_CHECKPOINT_ACTIVEREPLICASET,
 		.flow_control		= FLOW_CONTROL_NOT_REQUIRED
 	},
-	{ /* 7 */
+	{ /* 6 */
 		.libais_handler_fn	= message_handler_req_lib_ckpt_checkpointstatusget,
 		.response_size		= sizeof (struct res_lib_ckpt_checkpointstatusget),
 		.response_id		= MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSTATUSGET,
 		.flow_control		= FLOW_CONTROL_NOT_REQUIRED
 	},
-	{ /* 8 */
+	{ /* 7 */
 		.libais_handler_fn	= message_handler_req_lib_ckpt_sectioncreate,
 		.response_size		= sizeof (struct res_lib_ckpt_sectioncreate),
 		.response_id		= MESSAGE_RES_CKPT_CHECKPOINT_SECTIONCREATE,
 		.flow_control		= FLOW_CONTROL_REQUIRED
 	},
-	{ /* 9 */
+	{ /* 8 */
 		.libais_handler_fn	= message_handler_req_lib_ckpt_sectiondelete,
 		.response_size		= sizeof (struct res_lib_ckpt_sectiondelete),
 		.response_id		= MESSAGE_RES_CKPT_CHECKPOINT_SECTIONDELETE,
 		.flow_control		= FLOW_CONTROL_REQUIRED
 	},
-	{ /* 10 */
+	{ /* 9 */
 		.libais_handler_fn	= message_handler_req_lib_ckpt_sectionexpirationtimeset,
 		.response_size		= sizeof (struct res_lib_ckpt_sectionexpirationtimeset),
 		.response_id		= MESSAGE_RES_CKPT_CHECKPOINT_SECTIONEXPIRATIONTIMESET,
 		.flow_control		= FLOW_CONTROL_REQUIRED
 	},
-	{ /* 11 */
+	{ /* 10 */
 		.libais_handler_fn	= message_handler_req_lib_ckpt_sectionwrite,
 		.response_size		= sizeof (struct res_lib_ckpt_sectionwrite),
 		.response_id		= MESSAGE_RES_CKPT_CHECKPOINT_SECTIONWRITE,
 		.flow_control		= FLOW_CONTROL_REQUIRED
 	},
-	{ /* 12 */
+	{ /* 11 */
 		.libais_handler_fn	= message_handler_req_lib_ckpt_sectionoverwrite,
 		.response_size		= sizeof (struct res_lib_ckpt_sectionoverwrite),
 		.response_id		= MESSAGE_RES_CKPT_CHECKPOINT_SECTIONOVERWRITE,
 		.flow_control		= FLOW_CONTROL_REQUIRED
 	},
-	{ /* 13 */
+	{ /* 12 */
 		.libais_handler_fn	= message_handler_req_lib_ckpt_sectionread,
 		.response_size		= sizeof (struct res_lib_ckpt_sectionread),
 		.response_id		= MESSAGE_RES_CKPT_CHECKPOINT_SECTIONREAD,
 		.flow_control		= FLOW_CONTROL_REQUIRED
 	},
-	{ /* 14 */
+	{ /* 13 */
 		.libais_handler_fn	= message_handler_req_lib_ckpt_checkpointsynchronize,
 		.response_size		= sizeof (struct res_lib_ckpt_checkpointsynchronize),
 		.response_id		= MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZE,
 		.flow_control		= FLOW_CONTROL_NOT_REQUIRED
 	},
-	{ /* 15 */
+	{ /* 14 */
 		.libais_handler_fn	= message_handler_req_lib_ckpt_checkpointsynchronizeasync,
 		.response_size		= sizeof (struct res_lib_ckpt_checkpointsynchronizeasync), /* TODO RESPONSE */
 		.response_id		= MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZEASYNC,
 		.flow_control		= FLOW_CONTROL_NOT_REQUIRED
 	},
-	{ /* 16 */
+	{ /* 15 */
 		.libais_handler_fn	= message_handler_req_lib_ckpt_sectioniteratorinitialize,
 		.response_size		= sizeof (struct res_lib_ckpt_sectioniteratorinitialize),
 		.response_id		= MESSAGE_RES_CKPT_SECTIONITERATOR_SECTIONITERATORINITIALIZE,
 		.flow_control		= FLOW_CONTROL_NOT_REQUIRED
 	},
-	{ /* 17 */
+	{ /* 16 */
 		.libais_handler_fn	= message_handler_req_lib_ckpt_sectioniteratornext,
 		.response_size		= sizeof (struct res_lib_ckpt_sectioniteratornext),
 		.response_id		= MESSAGE_RES_CKPT_SECTIONITERATOR_SECTIONITERATORNEXT,
@@ -317,7 +309,7 @@ struct service_handler ckpt_service_handler = {
 	.aisexec_handler_fns		= ckpt_aisexec_handler_fns,
 	.aisexec_handler_fns_count	= sizeof (ckpt_aisexec_handler_fns) / sizeof (int (*)),
 	.confchg_fn					= ckpt_confchg_fn,
-	.libais_init_fn				= message_handler_req_lib_ckpt_init,
+	.libais_init_two_fn			= ckpt_init_two_fn,
 	.libais_exit_fn				= ckpt_exit_fn,
 	.exec_init_fn				= ckpt_exec_init_fn,
 	.exec_dump_fn				= 0,
@@ -982,18 +974,18 @@ static int ckpt_exit_fn (struct conn_info *conn_info)
 	struct checkpoint_cleanup *checkpoint_cleanup;
 	struct list_head *cleanup_list;
 	
-	if (conn_info->service != SOCKET_SERVICE_CKPT) {
+	if (conn_info->conn_info_partner->service != CKPT_SERVICE) {
 		return 0;
 	}
 	
 	/*
 	 * close all checkpoints opened on this fd
 	 */
-	cleanup_list = conn_info->ais_ci.u.libckpt_ci.checkpoint_list.next;	
-	while (!list_empty(&conn_info->ais_ci.u.libckpt_ci.checkpoint_list)) {
+	cleanup_list = conn_info->conn_info_partner->ais_ci.u.libckpt_ci.checkpoint_list.next;	
+	while (!list_empty(&conn_info->conn_info_partner->ais_ci.u.libckpt_ci.checkpoint_list)) {
 		
 		checkpoint_cleanup = list_entry (cleanup_list,
-										struct checkpoint_cleanup, list);
+			struct checkpoint_cleanup, list);
 		
 		if (checkpoint_cleanup->checkpoint.name.length > 0)	{
 			ckpt_checkpoint_close (&checkpoint_cleanup->checkpoint);
@@ -1002,26 +994,13 @@ static int ckpt_exit_fn (struct conn_info *conn_info)
 		list_del (&checkpoint_cleanup->list);		
 		free (checkpoint_cleanup);
                 
-		cleanup_list = conn_info->ais_ci.u.libckpt_ci.checkpoint_list.next;
+		cleanup_list = conn_info->conn_info_partner->ais_ci.u.libckpt_ci.checkpoint_list.next;
 	}
 
-	if (conn_info->ais_ci.u.libckpt_ci.sectionIterator.sectionIteratorEntries) {
+	if (conn_info->conn_info_partner->ais_ci.u.libckpt_ci.sectionIterator.sectionIteratorEntries) {
 		free (conn_info->ais_ci.u.libckpt_ci.sectionIterator.sectionIteratorEntries);
 	}
-	list_del (&conn_info->ais_ci.u.libckpt_ci.sectionIterator.list);
-	return (0);
-}
-
-static int message_handler_req_lib_activatepoll (struct conn_info *conn_info, void *message)
-{
-	struct res_lib_activatepoll res_lib_activatepoll;
-
-	res_lib_activatepoll.header.size = sizeof (struct res_lib_activatepoll);
-	res_lib_activatepoll.header.id = MESSAGE_RES_LIB_ACTIVATEPOLL;
-	res_lib_activatepoll.header.error = SA_AIS_OK;
-	libais_send_response (conn_info, &res_lib_activatepoll,
-		sizeof (struct res_lib_activatepoll));
-
+	list_del (&conn_info->conn_info_partner->ais_ci.u.libckpt_ci.sectionIterator.list);
 	return (0);
 }
 
@@ -2170,35 +2149,19 @@ error_exit:
 	return (0);
 }
 
-static int message_handler_req_lib_ckpt_init (struct conn_info *conn_info, void *message)
+static int ckpt_init_two_fn (struct conn_info *conn_info)
 {
-	struct res_lib_init res_lib_init;
-	SaErrorT error = SA_AIS_ERR_ACCESS;
-
-	log_printf (LOG_LEVEL_DEBUG, "Got request to initialize CKPT checkpoint.\n");
+	list_init (&conn_info->conn_info_partner->ais_ci.u.libckpt_ci.sectionIterator
+.list);
+	conn_info->conn_info_partner->ais_ci.u.libckpt_ci.sectionIterator.sectionIteratorEntries = 0;
+	conn_info->conn_info_partner->ais_ci.u.libckpt_ci.sectionIterator.iteratorCount = 0;
+	conn_info->conn_info_partner->ais_ci.u.libckpt_ci.sectionIterator.iteratorPos = 0;
+	list_add (&conn_info->conn_info_partner->ais_ci.u.libckpt_ci.sectionIterator.list,
+		&checkpoint_iterator_list_head);
+	list_init (&conn_info->conn_info_partner->ais_ci.u.libckpt_ci.checkpoint_list);
 
-	if (conn_info->authenticated) {
-    	conn_info->service = SOCKET_SERVICE_CKPT;
-		list_init (&conn_info->ais_ci.u.libckpt_ci.sectionIterator.list);
-		conn_info->ais_ci.u.libckpt_ci.sectionIterator.sectionIteratorEntries = 0;
-		conn_info->ais_ci.u.libckpt_ci.sectionIterator.iteratorCount = 0;
-		conn_info->ais_ci.u.libckpt_ci.sectionIterator.iteratorPos = 0;
-		list_add (&conn_info->ais_ci.u.libckpt_ci.sectionIterator.list,
-			&checkpoint_iterator_list_head);
-		list_init (&conn_info->ais_ci.u.libckpt_ci.checkpoint_list);
-		error = SA_AIS_OK;
-	}
-
-	res_lib_init.header.size = sizeof (struct res_lib_init);
-	res_lib_init.header.id = MESSAGE_RES_INIT;
-	res_lib_init.header.error = error;
-
-	libais_send_response (conn_info, &res_lib_init, sizeof (res_lib_init));
+       return (0);
 
-	if (conn_info->authenticated) {
-		return (0);
-	}
-	return (-1);
 }
 
 static int message_handler_req_lib_ckpt_checkpointopen (struct conn_info *conn_info, void *message)

+ 20 - 54
exec/clm.c

@@ -1,4 +1,6 @@
 /*
+ * vi: set autoindent tabstop=4 shiftwidth=4 :
+ *
  * Copyright (c) 2002-2005 MontaVista Software, Inc.
  *
  * All rights reserved.
@@ -96,8 +98,6 @@ SaClmClusterNodeT *clm_get_by_nodeid (struct in_addr node_id)
 /*
  * Service Interfaces required by service_message_handler struct
  */
-static int clm_exec_init_fn (void);
-
 static int clm_confchg_fn (
 	enum totem_configuration_type configuration_type,
     struct in_addr *member_list, int member_list_entries,
@@ -113,13 +113,14 @@ static void clm_sync_activate (void);
 
 static void clm_sync_abort (void);
 
-static int message_handler_req_exec_clm_nodejoin (void *message, struct in_addr source_addr, int endian_conversion_required);
+static int clm_exec_init_fn (void);
+
+static int clm_init_two_fn (struct conn_info *conn_info);
 
-static int message_handler_req_clm_init (struct conn_info *conn_info,
-	void *message);
+static int clm_exit_fn (struct conn_info *conn_info);
 
-static int message_handler_req_lib_activatepoll (struct conn_info *conn_info,
-	void *message);
+static int message_handler_req_exec_clm_nodejoin (void *message, struct in_addr source_addr,
+	int endian_conversion_required);
 
 static int message_handler_req_clm_clustertrack (struct conn_info *conn_info,
 	void *message);
@@ -138,30 +139,24 @@ static int clm_exit_fn (struct conn_info *conn_info);
 struct libais_handler clm_libais_handlers[] =
 {
 	{ /* 0 */
-		.libais_handler_fn			= message_handler_req_lib_activatepoll,
-		.response_size				= sizeof (struct res_lib_activatepoll),
-		.response_id				= MESSAGE_RES_LIB_ACTIVATEPOLL, // TODO RESPONSE
-		.flow_control				= FLOW_CONTROL_NOT_REQUIRED
-	},
-	{ /* 1 */
 		.libais_handler_fn			= message_handler_req_clm_clustertrack,
 		.response_size				= sizeof (struct res_clm_clustertrack),
 		.response_id				= MESSAGE_RES_CLM_TRACKSTART, // TODO RESPONSE
 		.flow_control				= FLOW_CONTROL_NOT_REQUIRED
 	},
-	{ /* 2 */
+	{ /* 1 */
 		.libais_handler_fn			= message_handler_req_clm_trackstop,
 		.response_size				= sizeof (struct res_clm_trackstop),
 		.response_id				= MESSAGE_RES_CLM_TRACKSTOP, // TODO RESPONSE
 		.flow_control				= FLOW_CONTROL_NOT_REQUIRED
 	},
-	{ /* 3 */
+	{ /* 2 */
 		.libais_handler_fn			= message_handler_req_clm_nodeget,
 		.response_size				= sizeof (struct res_clm_nodeget),
 		.response_id				= MESSAGE_RES_CLM_NODEGET, // TODO RESPONSE
 		.flow_control				= FLOW_CONTROL_NOT_REQUIRED
 	},
-	{ /* 4 */
+	{ /* 3 */
 		.libais_handler_fn			= message_handler_req_clm_nodegetasync,
 		.response_size				= sizeof (struct res_clm_nodegetasync),
 		.response_id				= MESSAGE_RES_CLM_NODEGETCALLBACK, // TODO RESPONSE
@@ -179,7 +174,7 @@ struct service_handler clm_service_handler = {
 	.aisexec_handler_fns		= clm_aisexec_handler_fns,
 	.aisexec_handler_fns_count	= sizeof (clm_aisexec_handler_fns) / sizeof (int (*)),
 	.confchg_fn					= clm_confchg_fn,
-	.libais_init_fn				= message_handler_req_clm_init,
+	.libais_init_two_fn			= clm_init_two_fn,
 	.libais_exit_fn				= clm_exit_fn,
 	.exec_init_fn				= clm_exec_init_fn,
 	.exec_dump_fn				= 0,
@@ -303,7 +298,6 @@ void library_notification_send (SaClmClusterNotificationT *cluster_notification_
     }
 }
 
-
 static void libraryNotificationJoin (SaClmNodeIdT node)
 {
 	SaClmClusterNotificationT clusterNotification;
@@ -481,7 +475,8 @@ static void exec_clm_nodejoin_endian_conversion (struct req_exec_clm_nodejoin *i
 		SA_MAX_NAME_LENGTH);
 }
 
-static int message_handler_req_exec_clm_nodejoin (void *message, struct in_addr source_addr, int endian_conversion_required)
+static int message_handler_req_exec_clm_nodejoin (void *message, struct in_addr source_addr,
+	int endian_conversion_required)
 {
 	struct req_exec_clm_nodejoin *req_exec_clm_nodejoin = (struct req_exec_clm_nodejoin *)message;
 	struct req_exec_clm_nodejoin req_exec_clm_nodejoin_storage;
@@ -524,42 +519,12 @@ static int message_handler_req_exec_clm_nodejoin (void *message, struct in_addr
 	return (0);
 }
 
-static int message_handler_req_clm_init (struct conn_info *conn_info, void *message)
+static int clm_init_two_fn (struct conn_info *conn_info)
 {
-	SaErrorT error = SA_ERR_SECURITY;
-	struct res_lib_init res_lib_init;
-
 	log_printf (LOG_LEVEL_DEBUG, "Got request to initalize cluster membership service.\n");
-	if (conn_info->authenticated) {
-		conn_info->service = SOCKET_SERVICE_CLM;
-		error = SA_OK;
-	}
-
-	res_lib_init.header.size = sizeof (struct res_lib_init);
-	res_lib_init.header.id = MESSAGE_RES_INIT;
-	res_lib_init.header.error = error;
-
-	libais_send_response (conn_info, &res_lib_init, sizeof (res_lib_init));
 
 	list_init (&conn_info->conn_list);
 
-	if (conn_info->authenticated) {
-		return (0);
-	}
-
-	return (-1);
-}
-
-static int message_handler_req_lib_activatepoll (struct conn_info *conn_info, void *message)
-{
-	struct res_lib_activatepoll res_lib_activatepoll;
-
-	res_lib_activatepoll.header.size = sizeof (struct res_lib_activatepoll);
-	res_lib_activatepoll.header.id = MESSAGE_RES_LIB_ACTIVATEPOLL;
-	res_lib_activatepoll.header.error = SA_OK;
-	libais_send_response (conn_info, &res_lib_activatepoll,
-		sizeof (struct res_lib_activatepoll));
-
 	return (0);
 }
 
@@ -568,11 +533,11 @@ int message_handler_req_clm_clustertrack (struct conn_info *conn_info, void *mes
 	struct req_clm_clustertrack *req_clm_clustertrack = (struct req_clm_clustertrack *)message;
 
 
-	conn_info->ais_ci.u.libclm_ci.trackFlags = req_clm_clustertrack->trackFlags;
+	conn_info->conn_info_partner->ais_ci.u.libclm_ci.trackFlags = req_clm_clustertrack->trackFlags;
 
-	list_add (&conn_info->conn_list, &library_notification_send_listhead);
+	list_add (&conn_info->conn_info_partner->conn_list, &library_notification_send_listhead);
 
-	libraryNotificationCurrentState (conn_info);
+	libraryNotificationCurrentState (conn_info->conn_info_partner);
 
 	return (0);
 }
@@ -582,7 +547,7 @@ static int message_handler_req_clm_trackstop (struct conn_info *conn_info, void
 {
 	conn_info->ais_ci.u.libclm_ci.trackFlags = 0;
 
-	list_del (&conn_info->conn_list);
+	list_del (&conn_info->conn_info_partner->conn_list);
 
 	return (0);
 }
@@ -615,6 +580,7 @@ static int message_handler_req_clm_nodeget (struct conn_info *conn_info, void *m
 	res_clm_nodeget.header.error = SA_OK;
 	res_clm_nodeget.invocation = req_clm_nodeget->invocation;
 	res_clm_nodeget.valid = valid;
+	printf ("valid is %d\n", res_clm_nodeget.valid);
 	if (valid) {
 		memcpy (&res_clm_nodeget.clusterNode, clusterNode, sizeof (SaClmClusterNodeT));
 	}

+ 32 - 47
exec/evt.c

@@ -64,8 +64,6 @@
 #define LOG_SERVICE LOG_SERVICE_EVT
 #include "print.h"
 
-static int message_handler_req_lib_activatepoll (struct conn_info *conn_info, 
-		void *message);
 static int lib_evt_open_channel(struct conn_info *conn_info, void *message);
 static int lib_evt_open_channel_async(struct conn_info *conn_info, 
 		void *message);
@@ -88,7 +86,7 @@ static int evt_conf_change(
 		struct in_addr *joined_list, int joined_list_entries,
 		struct memb_ring_id *ring_id);
 
-static int evt_initialize(struct conn_info *conn_info, void *msg);
+static int evt_initialize(struct conn_info *conn_info);
 static int evt_finalize(struct conn_info *conn_info);
 static int evt_exec_init(void);
 
@@ -102,12 +100,6 @@ static void evt_sync_abort(void);
 
 
 static struct libais_handler evt_libais_handlers[] = {
-	{
-	.libais_handler_fn = 	message_handler_req_lib_activatepoll,
-	.response_size = 		sizeof(struct res_lib_activatepoll),
-	.response_id = 			MESSAGE_RES_LIB_ACTIVATEPOLL,
-	.flow_control =			FLOW_CONTROL_REQUIRED
-	},
 	{
 	.libais_handler_fn = 	lib_evt_open_channel,
 	.response_size = 		sizeof(struct res_evt_channel_open),
@@ -187,7 +179,7 @@ struct service_handler evt_service_handler = {
 	.aisexec_handler_fns_count	= sizeof(evt_exec_handler_fns) /
 									sizeof(int (*)),
 	.confchg_fn					= evt_conf_change,
-	.libais_init_fn				= evt_initialize,
+	.libais_init_two_fn			= evt_initialize,
 	.libais_exit_fn				= evt_finalize,
 	.exec_init_fn				= evt_exec_init,
 	.exec_dump_fn				= 0,
@@ -1261,22 +1253,6 @@ static int check_last_event(struct lib_event_data *evtpkt,
 	return 1;
 }
 
-/*
- * Send a message to the app to wake it up if it is polling
- */
-static int message_handler_req_lib_activatepoll(struct conn_info *conn_info, 
-		void *message)
-{
-	struct res_lib_activatepoll res;
-
-	res.header.error = SA_AIS_OK;
-	res.header.size = sizeof (struct res_lib_activatepoll);
-	res.header.id = MESSAGE_RES_LIB_ACTIVATEPOLL;
-	libais_send_response(conn_info, &res, sizeof(res));
-
-	return (0);
-}
-
 /*
  * event id generating code.  We use the node ID for this node for the
  * upper 32 bits of the event ID to make sure that we can generate a cluster
@@ -1655,7 +1631,7 @@ static void __notify_event(struct conn_info *conn_info)
 		res.evd_head.size = sizeof(res);
 		res.evd_head.id = MESSAGE_RES_EVT_AVAILABLE;
 		res.evd_head.error = SA_AIS_OK;
-		libais_send_response(conn_info, &res, sizeof(res));
+		libais_send_response(conn_info->conn_info_partner, &res, sizeof(res));
 	}
 
 }
@@ -1960,34 +1936,42 @@ static struct event_svr_channel_subscr *find_subscr(
 /*
  * Handler for saEvtInitialize
  */
-static int evt_initialize(struct conn_info *conn_info, void *msg)
+static int evt_initialize(struct conn_info *conn_info)
 {
-	struct res_lib_init res;
-	struct libevt_ci *libevt_ci = &conn_info->ais_ci.u.libevt_ci;
+	struct libevt_ci *libevt_ci;
+	struct conn_info *resp_conn_info;
 	int i;
 
-	
-	res.header.size = sizeof (struct res_lib_init);
-	res.header.id = MESSAGE_RES_INIT;
-	res.header.error = SA_AIS_OK;
 
 	log_printf(LOG_LEVEL_DEBUG, "saEvtInitialize request.\n");
-	if (!conn_info->authenticated) {
-		log_printf(LOG_LEVEL_ERROR, "event service: Not authenticated\n");
-		res.header.error = SA_AIS_ERR_LIBRARY;
-		libais_send_response(conn_info, &res, sizeof(res));
-		return -1;
-	}
+	list_init (&conn_info->conn_list);
+	resp_conn_info = conn_info->conn_info_partner;
+	list_init (&resp_conn_info->conn_list);
+
+	libevt_ci = &resp_conn_info->ais_ci.u.libevt_ci;
+
+	/*
+	 * Initailze event instance data
+	 */
 
 	memset(libevt_ci, 0, sizeof(*libevt_ci));
+
+	/*
+	 * list of channels open on this instance
+	 */
 	list_init(&libevt_ci->esi_open_chans);
+
+	/*
+	 * pending event lists for each piriority
+	 */
 	for (i = SA_EVT_HIGHEST_PRIORITY; i <= SA_EVT_LOWEST_PRIORITY; i++) {
 		list_init(&libevt_ci->esi_events[i]);
 	}
-	conn_info->service = SOCKET_SERVICE_EVT;
-	list_init (&conn_info->conn_list);
-	list_add_tail(&conn_info->conn_list, &ci_head);
-	libais_send_response (conn_info, &res, sizeof(res));
+
+	/*
+	 * Keep track of all event service connections
+	 */
+	list_add_tail(&resp_conn_info->conn_list, &ci_head);
 
 	return 0;
 }
@@ -2819,7 +2803,7 @@ static int evt_conf_change(
 static int evt_finalize(struct conn_info *conn_info)
 {
 
-	struct libevt_ci *esip = &conn_info->ais_ci.u.libevt_ci;
+	struct libevt_ci *esip = &conn_info->conn_info_partner->ais_ci.u.libevt_ci;
 	struct event_svr_channel_open	*eco;
 	struct list_head *l, *nxt;
 
@@ -2841,7 +2825,7 @@ static int evt_finalize(struct conn_info *conn_info)
 	/*
 	 * Delete track entry if there is one
 	 */
-	list_del (&conn_info->conn_list);
+	list_del (&conn_info->conn_info_partner->conn_list);
 
 	return 0;
 }
@@ -3232,7 +3216,8 @@ open_return:
 		resa.ica_channel_handle = handle;
 		resa.ica_c_handle = ocp->ocp_c_handle;
 		resa.ica_invocation = ocp->ocp_invocation;
-		libais_send_response (ocp->ocp_conn_info, &resa, sizeof(resa));
+		libais_send_response (ocp->ocp_conn_info->conn_info_partner, 
+				&resa, sizeof(resa));
 	} else {
 		struct res_evt_channel_open res;
 		res.ico_head.size = sizeof(res);

+ 1 - 0
exec/handlers.h

@@ -63,6 +63,7 @@ struct service_handler {
 		struct in_addr *joined_list, int joined_list_entries,
 		struct memb_ring_id *ring_id);
 	int (*libais_init_fn) (struct conn_info *conn_info, void *msg);
+	int (*libais_init_two_fn) (struct conn_info *conn_info);
 	int (*libais_exit_fn) (struct conn_info *conn_info);
 	int (*exec_init_fn) (void);
 	void (*exec_dump_fn) (void);

+ 156 - 33
exec/main.c

@@ -1,4 +1,6 @@
 /*
+ * vi: set autoindent tabstop=4 shiftwidth=4 :
+ *
  * Copyright (c) 2002-2004 MontaVista Software, Inc.
  *
  * All rights reserved.
@@ -97,6 +99,18 @@ int sync_callback_count;
 #define AIS_SERVICE_HANDLERS_COUNT 5
 #define AIS_SERVICE_HANDLER_AISEXEC_FUNCTIONS_MAX 40
 
+ /*
+  * IPC Initializers
+  */
+static int dispatch_init_send_response (struct conn_info *conn_info, void *message);
+
+static int response_init_send_response (struct conn_info *conn_info, void *message);
+
+static int (*ais_init_handlers[]) (struct conn_info *conn_info, void *message) = {
+	response_init_send_response,
+	dispatch_init_send_response
+};
+
 static int poll_handler_libais_deliver (poll_handle handle, int fd, int revent, void *data, unsigned int *prio);
 
 enum e_ais_done {
@@ -178,6 +192,7 @@ static int libais_connection_active (struct conn_info *conn_info)
 static void libais_disconnect_delayed (struct conn_info *conn_info)
 {
 	conn_info->state = CONN_STATE_DISCONNECTING_DELAYED;
+	conn_info->conn_info_partner->state = CONN_STATE_DISCONNECTING_DELAYED;
 }
 
 static int libais_disconnect (struct conn_info *conn_info)
@@ -185,8 +200,17 @@ static int libais_disconnect (struct conn_info *conn_info)
 	int res = 0;
 	struct outq_item *outq_item;
 
-	if (ais_service_handlers[conn_info->service - 1]->libais_exit_fn) {
-		res = ais_service_handlers[conn_info->service - 1]->libais_exit_fn (conn_info);
+	if (conn_info->should_exit_fn &&
+		ais_service_handlers[conn_info->service]->libais_exit_fn) {
+
+		res = ais_service_handlers[conn_info->service]->libais_exit_fn (conn_info);
+	}
+
+	if (conn_info->conn_info_partner && 
+		conn_info->conn_info_partner->should_exit_fn &&
+		ais_service_handlers[conn_info->conn_info_partner->service]->libais_exit_fn) {
+
+		res = ais_service_handlers[conn_info->conn_info_partner->service]->libais_exit_fn (conn_info->conn_info_partner);
 	}
 
 	/*
@@ -197,6 +221,7 @@ static int libais_disconnect (struct conn_info *conn_info)
 		conn_info->state = CONN_STATE_DISCONNECTING;
 
 		close (conn_info->fd);
+
 		/*
 		 * Free the outq queued items
 		 */
@@ -210,11 +235,44 @@ static int libais_disconnect (struct conn_info *conn_info)
 		free (conn_info->inb);
 	}
 
+	/*
+	 * Close the library connection and free its
+	 * data if it hasn't already been freed
+	 */
+	if (conn_info->conn_info_partner &&
+		conn_info->conn_info_partner->state != CONN_STATE_DISCONNECTING) {
+
+		conn_info->conn_info_partner->state = CONN_STATE_DISCONNECTING;
+
+		close (conn_info->conn_info_partner->fd);
+
+		/*
+		 * Free the outq queued items
+		 */
+		while (!queue_is_empty (&conn_info->conn_info_partner->outq)) {
+			outq_item = queue_item_get (&conn_info->conn_info_partner->outq);
+			free (outq_item->msg);
+			queue_item_remove (&conn_info->conn_info_partner->outq);
+		}
+
+		queue_free (&conn_info->conn_info_partner->outq);
+		if (conn_info->conn_info_partner->inb) {
+			free (conn_info->conn_info_partner->inb);
+		}
+	}
+
 	/*
 	 * If exit_fn didn't request a retry,
 	 * free the conn_info structure
 	 */
 	if (res != -1) {
+		if (conn_info->conn_info_partner) {
+			poll_dispatch_delete (aisexec_poll_handle,
+				conn_info->conn_info_partner->fd);
+		}
+		poll_dispatch_delete (aisexec_poll_handle, conn_info->fd);
+
+		free (conn_info->conn_info_partner);
 		free (conn_info);
 	}
 
@@ -363,7 +421,6 @@ retry_sendmsg:
 		iov_send.iov_len = mlen;
 retry_sendmsg_two:
 		res = sendmsg (conn_info->fd, &msg_send, MSG_DONTWAIT | MSG_NOSIGNAL);
-
 		if (res == -1 && errno == EINTR) {
 			goto retry_sendmsg_two;
 		}
@@ -456,7 +513,65 @@ retry_accept:
 	return (0);
 }
 
-struct message_overlay {
+static int dispatch_init_send_response (struct conn_info *conn_info, void *message)
+{
+	SaErrorT error = SA_ERR_ACCESS;
+	struct req_lib_dispatch_init *req_lib_dispatch_init = (struct req_lib_dispatch_init *)message;
+	struct res_lib_dispatch_init res_lib_dispatch_init;
+	struct conn_info *msg_conn_info;
+
+	if (conn_info->authenticated) {
+		conn_info->service = req_lib_dispatch_init->resdis_header.service;
+		error = SA_OK;
+
+		conn_info->conn_info_partner = (struct conn_info *)req_lib_dispatch_init->conn_info;
+
+		msg_conn_info = (struct conn_info *)req_lib_dispatch_init->conn_info;
+		msg_conn_info->conn_info_partner = conn_info;
+	}
+
+	res_lib_dispatch_init.header.size = sizeof (struct res_lib_dispatch_init);
+	res_lib_dispatch_init.header.id = MESSAGE_RES_INIT;
+	res_lib_dispatch_init.header.error = error;
+	
+	libais_send_response (conn_info, &res_lib_dispatch_init,
+		sizeof (res_lib_dispatch_init));
+
+	if (error == SA_ERR_ACCESS) {
+		return (-1);
+	}
+
+	conn_info->should_exit_fn = 1;
+	ais_service_handlers[req_lib_dispatch_init->resdis_header.service]->libais_init_two_fn (conn_info);
+	return (0);
+}
+
+static int response_init_send_response (struct conn_info *conn_info, void *message)
+{
+	SaErrorT error = SA_ERR_ACCESS;
+	struct req_lib_response_init *req_lib_response_init = (struct req_lib_response_init *)message;
+	struct res_lib_response_init res_lib_response_init;
+
+	if (conn_info->authenticated) {
+		conn_info->service = req_lib_response_init->resdis_header.service;
+		error = SA_OK;
+	}
+	res_lib_response_init.header.size = sizeof (struct res_lib_response_init);
+	res_lib_response_init.header.id = MESSAGE_RES_INIT;
+	res_lib_response_init.header.error = error;
+	res_lib_response_init.conn_info = (unsigned long)conn_info;
+
+	libais_send_response (conn_info, &res_lib_response_init,
+		sizeof (res_lib_response_init));
+
+	if (error == SA_ERR_ACCESS) {
+		return (-1);
+	}
+	conn_info->should_exit_fn = 0;
+	return (0);
+}
+
+struct res_overlay {
 	struct res_header header;
 	char buf[4096];
 };
@@ -474,29 +589,39 @@ static int poll_handler_libais_deliver (poll_handle handle, int fd, int revent,
 	struct ucred *cred;
 	int on = 0;
 	int send_ok = 0;
-	struct message_overlay msg_overlay;
+	struct res_overlay res_overlay;
 
-	msg_recv.msg_iov = &iov_recv;
-	msg_recv.msg_iovlen = 1;
-	msg_recv.msg_name = 0;
-	msg_recv.msg_namelen = 0;
-	msg_recv.msg_flags = 0;
-
-	if (revent & POLLOUT) {
-		cleanup_send_response (conn_info);
-	}
-	if ((revent & POLLIN) == 0) {
-		return (0);
+	if (revent & (POLLERR|POLLHUP)) {
+		res = libais_disconnect (conn_info);
+		return (res);
 	}
 
 	/*
 	 * Handle delayed disconnections
 	 */
-	if (conn_info->state != CONN_STATE_ACTIVE) {
+	if (conn_info->state == CONN_STATE_DISCONNECTING_DELAYED) {
 		res = libais_disconnect (conn_info);
 		return (res);
 	}
 
+	if (conn_info->state == CONN_STATE_DISCONNECTING) {
+		return (0);
+	}
+
+	if (revent & POLLOUT) {
+		cleanup_send_response (conn_info);
+	}
+
+	if ((revent & POLLIN) == 0) {
+		return (0);
+	}
+
+	msg_recv.msg_iov = &iov_recv;
+	msg_recv.msg_iovlen = 1;
+	msg_recv.msg_name = 0;
+	msg_recv.msg_namelen = 0;
+	msg_recv.msg_flags = 0;
+
 	if (conn_info->authenticated) {
 		msg_recv.msg_control = 0;
 		msg_recv.msg_controllen = 0;
@@ -558,17 +683,15 @@ retry_recv:
 		 * else handle message using service handlers
 		 */
 		if (service == SOCKET_SERVICE_INIT) {
-			/*
-			 * Initializing service
-			 */
-			res = ais_service_handlers[header->id]->libais_init_fn (conn_info, header);
+			res = ais_init_handlers[header->id] (conn_info, header);
+// TODO error in init_two_fn needs to be handled
 		} else  {
 			/*
 			 * Not an init service, but a standard service
 			 */
-			if (header->id < 0 || header->id > ais_service_handlers[service - 1]->libais_handlers_count) {
+			if (header->id < 0 || header->id > ais_service_handlers[service]->libais_handlers_count) {
 				log_printf (LOG_LEVEL_SECURITY, "Invalid header id is %d min 0 max %d\n",
-				header->id, ais_service_handlers[service - 1]->libais_handlers_count);
+				header->id, ais_service_handlers[service]->libais_handlers_count);
 				res = -1;
 				goto error_disconnect;
 			}
@@ -580,27 +703,27 @@ retry_recv:
 			 * try again later
 			 */
 			send_ok =
-				(ais_service_handlers[service - 1]->libais_handlers[header->id].flow_control == FLOW_CONTROL_NOT_REQUIRED) ||
-				((ais_service_handlers[service - 1]->libais_handlers[header->id].flow_control == FLOW_CONTROL_REQUIRED) &&
+				(ais_service_handlers[service]->libais_handlers[header->id].flow_control == FLOW_CONTROL_NOT_REQUIRED) ||
+				((ais_service_handlers[service]->libais_handlers[header->id].flow_control == FLOW_CONTROL_REQUIRED) &&
 				(totempg_send_ok (1000 + header->size)) &&
 				(sync_in_process() == 0));
 
 			if (send_ok) {
 		//		*prio = 0;
-				res = ais_service_handlers[service - 1]->libais_handlers[header->id].libais_handler_fn(conn_info, header);
+				res = ais_service_handlers[service]->libais_handlers[header->id].libais_handler_fn(conn_info, header);
 			} else {
 		//		*prio = (*prio) + 1;
 
 				/*
 				 * Overload, tell library to retry
 				 */
-				msg_overlay.header.size = 
-					ais_service_handlers[service - 1]->libais_handlers[header->id].response_size;
-				msg_overlay.header.id = 
-					ais_service_handlers[service - 1]->libais_handlers[header->id].response_id;
-				msg_overlay.header.error = SA_ERR_TRY_AGAIN;
-				libais_send_response (conn_info, &msg_overlay,
-					msg_overlay.header.size);
+				res_overlay.header.size = 
+					ais_service_handlers[service]->libais_handlers[header->id].response_size;
+				res_overlay.header.id = 
+					ais_service_handlers[service]->libais_handlers[header->id].response_id;
+				res_overlay.header.error = SA_ERR_TRY_AGAIN;
+				libais_send_response (conn_info, &res_overlay,
+					res_overlay.header.size);
 			}
 		}
 		conn_info->inb_inuse -= header->size;

+ 7 - 12
exec/main.h

@@ -50,16 +50,9 @@
  * Size of the queue (entries) for I/O's to the API over socket IPC.
  */
 
-#define SIZEQUEUE 8192
-
-enum socket_service_type {
-	SOCKET_SERVICE_INIT,
-	SOCKET_SERVICE_EVS,
-	SOCKET_SERVICE_CLM,
-	SOCKET_SERVICE_AMF,
-	SOCKET_SERVICE_CKPT,
-	SOCKET_SERVICE_EVT
-};
+#define SIZEQUEUE 256
+
+#define SOCKET_SERVICE_INIT 254
 
 struct aisexec_ci {
 	struct sockaddr_in in_addr;	/* address of AF_INET socket, MUST BE FIRST IN STRUCTURE */
@@ -98,7 +91,7 @@ enum conn_state {
 };
 
 struct conn_info {
-	int fd;				/* File descriptor for this connection */
+	int fd;				/* File descriptor  */
 	enum conn_state state;			/* State of this connection */
 	char *inb;			/* Input buffer for non-blocking reads */
 	int inb_nextheader;	/* Next message header starts here */
@@ -106,11 +99,13 @@ struct conn_info {
 	int inb_inuse;		/* Bytes currently stored in input buffer */
 	struct queue outq;		/* Circular queue for outgoing requests */
 	int byte_start;			/* Byte to start sending from in head of queue */
-	enum socket_service_type service;/* Type of service so dispatch knows how to route message */
+	enum service_types service;/* Type of service so dispatch knows how to route message */
 	struct saAmfComponent *component;	/* Component for which this connection relates to  TODO shouldn't this be in the ci structure */
 	int authenticated;		/* Is this connection authenticated? */
 	struct list_head conn_list;
 	struct ais_ci ais_ci;	/* libais connection information */
+	struct conn_info *conn_info_partner;	/* partner connection dispatch<->response */
+	int should_exit_fn;			/* Should call the exit function when closing this ipc */
 };
 
 extern struct sockaddr_in *this_ip;

+ 1 - 0
exec/print.c

@@ -181,6 +181,7 @@ void internal_log_printf (int logclass, char *string, ...)
 	if (logmode & LOG_MODE_STDERR) {
 		fprintf (stderr, "%s", log_string);
 	}
+	fflush (stdout);
 
 	va_end(ap);
 }

+ 50 - 14
include/ais_msg.h

@@ -52,23 +52,23 @@ enum req_amf_response_interfaces {
 };
 
 enum req_lib_evs_types {
-	MESSAGE_REQ_EVS_JOIN = 1,
-	MESSAGE_REQ_EVS_LEAVE,
-	MESSAGE_REQ_EVS_MCAST_JOINED,
-	MESSAGE_REQ_EVS_MCAST_GROUPS
+	MESSAGE_REQ_EVS_JOIN = 0,
+	MESSAGE_REQ_EVS_LEAVE = 1,
+	MESSAGE_REQ_EVS_MCAST_JOINED = 2,
+	MESSAGE_REQ_EVS_MCAST_GROUPS = 3
 };
 
 enum res_lib_evs_types {
-	MESSAGE_RES_EVS_DELIVER_CALLBACK = 1,
-	MESSAGE_RES_EVS_CONFCHG_CALLBACK,
-	MESSAGE_RES_EVS_JOIN,
-	MESSAGE_RES_EVS_LEAVE,
-	MESSAGE_RES_EVS_MCAST_JOINED,
-	MESSAGE_RES_EVS_MCAST_GROUPS
+	MESSAGE_RES_EVS_DELIVER_CALLBACK = 0,
+	MESSAGE_RES_EVS_CONFCHG_CALLBACK = 1,
+	MESSAGE_RES_EVS_JOIN = 2,
+	MESSAGE_RES_EVS_LEAVE = 3,
+	MESSAGE_RES_EVS_MCAST_JOINED = 4,
+	MESSAGE_RES_EVS_MCAST_GROUPS = 5
 };
 
 enum req_amf_types {
-	MESSAGE_REQ_AMF_COMPONENTREGISTER = 1,
+	MESSAGE_REQ_AMF_COMPONENTREGISTER = 0,
 	MESSAGE_REQ_AMF_COMPONENTUNREGISTER,
 	MESSAGE_REQ_AMF_READINESSSTATEGET,
 	MESSAGE_REQ_AMF_HASTATEGET,
@@ -82,7 +82,7 @@ enum req_amf_types {
 };
 
 enum res_lib_amf_types {
-	MESSAGE_RES_AMF_COMPONENTREGISTER = 1,
+	MESSAGE_RES_AMF_COMPONENTREGISTER = 0,
 	MESSAGE_RES_AMF_COMPONENTUNREGISTER,
 	MESSAGE_RES_AMF_READINESSSTATEGET,
 	MESSAGE_RES_AMF_HASTATEGET,
@@ -170,14 +170,50 @@ struct req_exec_evs_mcast {
 	/* data goes here */
 };
 
+struct req_lib_resdis_init {
+	int size;
+	int id;
+	int service;
+};
+
+// TODO REMOVE THIS
+enum req_init_types_a {
+    MESSAGE_REQ_EVS_INIT,
+    MESSAGE_REQ_CLM_INIT,
+    MESSAGE_REQ_AMF_INIT,
+    MESSAGE_REQ_CKPT_INIT,
+    MESSAGE_REQ_CKPT_CHECKPOINT_INIT,
+    MESSAGE_REQ_CKPT_SECTIONITERATOR_INIT,
+    MESSAGE_REQ_EVT_INIT
+};
+
+struct req_lib_response_init {
+	struct req_lib_resdis_init resdis_header;
+};
+
+struct req_lib_dispatch_init {
+	struct req_lib_resdis_init resdis_header;
+	unsigned long conn_info;
+};
+
+	
 struct req_lib_init {
-	struct req_header header;
+	struct res_header header;
 };
 
 struct res_lib_init {
 	struct res_header header;
 };
 
+struct res_lib_response_init {
+	struct res_header header;
+	unsigned long conn_info;
+};
+
+struct res_lib_dispatch_init {
+	struct res_header header;
+};
+
 struct req_lib_amf_componentregister {
 	struct req_header header;
 	SaNameT compName;
@@ -370,7 +406,7 @@ struct req_amf_stoppingcomplete {
 };
 
 struct res_lib_amf_stoppingcomplete {
-	struct req_header header;
+	struct res_header header;
 };
 
 struct req_amf_componentcapabilitymodelget {

+ 1 - 2
include/ais_types.h

@@ -106,8 +106,7 @@ typedef enum {
 	SA_ERR_QUEUE_FULL = 25,
 	SA_ERR_QUEUE_NOT_AVAILABLE = 26,
 	SA_ERR_BAD_CHECKPOINT = 27,
-	SA_ERR_BAD_FLAGS = 28,
-	SA_ERR_SECURITY = 29
+	SA_ERR_BAD_FLAGS = 28
 } SaErrorT;
 
 typedef enum {

+ 34 - 34
include/ipc_ckpt.h

@@ -41,43 +41,43 @@
 #include "../exec/ckpt.h"
 
 enum req_lib_ckpt_checkpoint_types {
-	MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPEN = 1,
-	MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPENASYNC,
-	MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTCLOSE,
-	MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTUNLINK,
-	MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTRETENTIONDURATIONSET,
-	MESSAGE_REQ_CKPT_CHECKPOINT_ACTIVEREPLICASET,
-	MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTSTATUSGET,
-	MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONCREATE,
-	MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONDELETE,
-	MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONEXPIRATIONTIMESET,
-	MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONWRITE,
-	MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONOVERWRITE,
-	MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONREAD,
-	MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZE,
-	MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZEASYNC,
-	MESSAGE_REQ_CKPT_SECTIONITERATOR_SECTIONITERATORINITIALIZE,
-	MESSAGE_REQ_CKPT_SECTIONITERATOR_SECTIONITERATORNEXT
+	MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPEN = 0,
+	MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPENASYNC = 1,
+	MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTCLOSE = 2,
+	MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTUNLINK = 3,
+	MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTRETENTIONDURATIONSET = 4,
+	MESSAGE_REQ_CKPT_CHECKPOINT_ACTIVEREPLICASET = 5,
+	MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTSTATUSGET = 6,
+	MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONCREATE = 7,
+	MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONDELETE = 8,
+	MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONEXPIRATIONTIMESET = 9,
+	MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONWRITE = 10,
+	MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONOVERWRITE = 11,
+	MESSAGE_REQ_CKPT_CHECKPOINT_SECTIONREAD = 12,
+	MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZE = 13,
+	MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZEASYNC = 14,
+	MESSAGE_REQ_CKPT_SECTIONITERATOR_SECTIONITERATORINITIALIZE = 15,
+	MESSAGE_REQ_CKPT_SECTIONITERATOR_SECTIONITERATORNEXT = 16
 };
 
 enum res_lib_ckpt_checkpoint_types {
-	MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPEN,
-	MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPENASYNC,
-	MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTCLOSE,
-	MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTUNLINK,
-	MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTRETENTIONDURATIONSET,
-	MESSAGE_RES_CKPT_CHECKPOINT_ACTIVEREPLICASET,
-	MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSTATUSGET,
-	MESSAGE_RES_CKPT_CHECKPOINT_SECTIONCREATE,
-	MESSAGE_RES_CKPT_CHECKPOINT_SECTIONDELETE,
-	MESSAGE_RES_CKPT_CHECKPOINT_SECTIONEXPIRATIONTIMESET,
-	MESSAGE_RES_CKPT_CHECKPOINT_SECTIONWRITE,
-	MESSAGE_RES_CKPT_CHECKPOINT_SECTIONOVERWRITE,
-	MESSAGE_RES_CKPT_CHECKPOINT_SECTIONREAD,
-	MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZE,
-	MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZEASYNC,
-	MESSAGE_RES_CKPT_SECTIONITERATOR_SECTIONITERATORINITIALIZE,
-	MESSAGE_RES_CKPT_SECTIONITERATOR_SECTIONITERATORNEXT
+	MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPEN = 0,
+	MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTOPENASYNC = 1,
+	MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTCLOSE = 2,
+	MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTUNLINK = 3,
+	MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTRETENTIONDURATIONSET = 4,
+	MESSAGE_RES_CKPT_CHECKPOINT_ACTIVEREPLICASET = 5,
+	MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSTATUSGET = 6,
+	MESSAGE_RES_CKPT_CHECKPOINT_SECTIONCREATE = 7,
+	MESSAGE_RES_CKPT_CHECKPOINT_SECTIONDELETE = 8,
+	MESSAGE_RES_CKPT_CHECKPOINT_SECTIONEXPIRATIONTIMESET = 9,
+	MESSAGE_RES_CKPT_CHECKPOINT_SECTIONWRITE = 10,
+	MESSAGE_RES_CKPT_CHECKPOINT_SECTIONOVERWRITE = 11,
+	MESSAGE_RES_CKPT_CHECKPOINT_SECTIONREAD = 12,
+	MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZE = 13,
+	MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZEASYNC = 14,
+	MESSAGE_RES_CKPT_SECTIONITERATOR_SECTIONITERATORINITIALIZE = 15,
+	MESSAGE_RES_CKPT_SECTIONITERATOR_SECTIONITERATORNEXT = 16
 };
 
 struct req_exec_ckpt_checkpointclose {

+ 10 - 10
include/ipc_clm.h

@@ -40,19 +40,19 @@
 #include "ipc_gen.h"
 
 enum req_clm_types {
-	MESSAGE_REQ_CLM_TRACKSTART = 1,
-	MESSAGE_REQ_CLM_TRACKSTOP,
-	MESSAGE_REQ_CLM_NODEGET,
-	MESSAGE_REQ_CLM_NODEGETASYNC
+	MESSAGE_REQ_CLM_TRACKSTART = 0,
+	MESSAGE_REQ_CLM_TRACKSTOP = 1,
+	MESSAGE_REQ_CLM_NODEGET = 2,
+	MESSAGE_REQ_CLM_NODEGETASYNC = 3
 };
 
 enum res_clm_types {
-	MESSAGE_RES_CLM_TRACKCALLBACK = 1,
-	MESSAGE_RES_CLM_TRACKSTART,
-	MESSAGE_RES_CLM_TRACKSTOP,
-	MESSAGE_RES_CLM_NODEGET,
-	MESSAGE_RES_CLM_NODEGETASYNC,
-	MESSAGE_RES_CLM_NODEGETCALLBACK
+	MESSAGE_RES_CLM_TRACKCALLBACK = 0,
+	MESSAGE_RES_CLM_TRACKSTART = 1,
+	MESSAGE_RES_CLM_TRACKSTOP = 2,
+	MESSAGE_RES_CLM_NODEGET = 3,
+	MESSAGE_RES_CLM_NODEGETASYNC = 4,
+	MESSAGE_RES_CLM_NODEGETCALLBACK = 5
 };
 
 struct req_clm_clustertrack {

+ 19 - 19
include/ipc_evt.h

@@ -41,28 +41,28 @@
 #include "ipc_gen.h"
 
 enum req_evt_types {
-	MESSAGE_REQ_EVT_OPEN_CHANNEL = 1,
-	MESSAGE_REQ_EVT_OPEN_CHANNEL_ASYNC,
-	MESSAGE_REQ_EVT_CLOSE_CHANNEL,
-	MESSAGE_REQ_EVT_UNLINK_CHANNEL,
-	MESSAGE_REQ_EVT_SUBSCRIBE,
-	MESSAGE_REQ_EVT_UNSUBSCRIBE,
-	MESSAGE_REQ_EVT_PUBLISH,
-	MESSAGE_REQ_EVT_CLEAR_RETENTIONTIME,
-	MESSAGE_REQ_EVT_EVENT_DATA
+	MESSAGE_REQ_EVT_OPEN_CHANNEL = 0,
+	MESSAGE_REQ_EVT_OPEN_CHANNEL_ASYNC = 1,
+	MESSAGE_REQ_EVT_CLOSE_CHANNEL = 2,
+	MESSAGE_REQ_EVT_UNLINK_CHANNEL = 3,
+	MESSAGE_REQ_EVT_SUBSCRIBE = 4,
+	MESSAGE_REQ_EVT_UNSUBSCRIBE = 5,
+	MESSAGE_REQ_EVT_PUBLISH = 6,
+	MESSAGE_REQ_EVT_CLEAR_RETENTIONTIME = 7,
+	MESSAGE_REQ_EVT_EVENT_DATA = 8
 };
 
 enum res_evt_types {
-	MESSAGE_RES_EVT_OPEN_CHANNEL = 1,
-	MESSAGE_RES_EVT_CLOSE_CHANNEL,
-	MESSAGE_RES_EVT_UNLINK_CHANNEL,
-	MESSAGE_RES_EVT_SUBSCRIBE,
-	MESSAGE_RES_EVT_UNSUBSCRIBE,
-	MESSAGE_RES_EVT_PUBLISH,
-	MESSAGE_RES_EVT_CLEAR_RETENTIONTIME,
-	MESSAGE_RES_EVT_CHAN_OPEN_CALLBACK,
-	MESSAGE_RES_EVT_EVENT_DATA,
-	MESSAGE_RES_EVT_AVAILABLE
+	MESSAGE_RES_EVT_OPEN_CHANNEL = 0,
+	MESSAGE_RES_EVT_CLOSE_CHANNEL = 1,
+	MESSAGE_RES_EVT_UNLINK_CHANNEL = 2,
+	MESSAGE_RES_EVT_SUBSCRIBE = 3,
+	MESSAGE_RES_EVT_UNSUBSCRIBE = 4,
+	MESSAGE_RES_EVT_PUBLISH = 5,
+	MESSAGE_RES_EVT_CLEAR_RETENTIONTIME = 6,
+	MESSAGE_RES_EVT_CHAN_OPEN_CALLBACK = 7,
+	MESSAGE_RES_EVT_EVENT_DATA = 8,
+	MESSAGE_RES_EVT_AVAILABLE = 9
 };
 
 /* 

+ 10 - 8
include/ipc_gen.h

@@ -34,21 +34,23 @@
 #ifndef IPC_GEN_H_DEFINED
 #define IPC_GEN_H_DEFINED
 
+enum service_types {
+	EVS_SERVICE = 0,
+	CLM_SERVICE = 1,
+	AMF_SERVICE = 2,
+	CKPT_SERVICE = 3,
+	EVT_SERVICE = 4
+};
+
 enum req_init_types {
-	MESSAGE_REQ_EVS_INIT,
-	MESSAGE_REQ_CLM_INIT,
-	MESSAGE_REQ_AMF_INIT,
-	MESSAGE_REQ_CKPT_INIT,
-	MESSAGE_REQ_EVT_INIT
+	MESSAGE_REQ_RESPONSE_INIT = 0,
+	MESSAGE_REQ_DISPATCH_INIT = 1
 };
 
 enum res_init_types {
 	MESSAGE_RES_INIT
 };
 
-#define	MESSAGE_REQ_LIB_ACTIVATEPOLL 0
-#define	MESSAGE_RES_LIB_ACTIVATEPOLL 50
-
 enum nodeexec_message_types {
 		MESSAGE_REQ_EXEC_SYNC_BARRIER = 0,
 		MESSAGE_REQ_EXEC_EVS_MCAST = 1,

+ 1 - 0
include/queue.h

@@ -123,6 +123,7 @@ static inline void queue_item_remove (struct queue *queue) {
 	assert (queue->tail != queue->head);
 
 	queue->used--;
+	assert (queue->used >= 0);
 }
 
 static inline void queue_items_remove (struct queue *queue, int rel_count)

+ 4 - 0
test/Makefile

@@ -97,6 +97,9 @@ testevt: testevt.o sa_error.o $(LIBRARIES)
 testevs: testevs.o $(LIBS)
 	$(CC) $(LDFLAGS) -o testevs testevs.o $(LIBS)
 
+testevsth: testevsth.o $(LIBS)
+	$(CC) $(LDFLAGS) -o testevsth testevsth.o $(LIBS)
+
 evsbench: evsbench.o $(LIBS)
 	$(CC) $(LDFLAGS) -o evsbench evsbench.o $(LIBS)
 
@@ -165,6 +168,7 @@ ckptbench.o: ../include/ais_types.h ../include/saCkpt.h
 ckptbenchth.o: ../include/ais_types.h ../include/saCkpt.h
 testevt.o: ../include/ais_types.h ../include/saEvt.h
 testevs.o: ../include/evs.h
+testevsth.o: ../include/evs.h
 evsbench.o: ../include/ais_types.h ../include/evs.h
 subscription.o: ../include/ais_types.h ../include/saEvt.h
 publish.o: ../include/ais_types.h ../include/saEvt.h

+ 4 - 1
test/testamf4.c

@@ -83,12 +83,15 @@ void ReadinessStateSetCallback (SaInvocationT invocation,
 	const SaNameT *compName,
 	SaAmfReadinessStateT readinessState)
 {
+	SaErrorT res;
+
 	switch (readinessState) {
 	case SA_AMF_IN_SERVICE:
 		printf ("ReadinessStateSetCallback: '");
 		printSaNameT ((SaNameT *)compName);
 		printf ("' requested to enter operational state SA_AMF_IN_SERVICE.\n");
-		saAmfResponse (invocation, SA_OK);
+		res = saAmfResponse (invocation, SA_OK);
+printf ("res is %d\n", res);
 		break;
 	case SA_AMF_OUT_OF_SERVICE:
 		printf ("ReadinessStateSetCallback: '");

+ 4 - 0
test/testamfth.c

@@ -244,6 +244,7 @@ int main (void) {
 	int result;
 	SaNameT compName;
 	pthread_t dispatch_thread;
+	pthread_attr_t dispatch_thread_attribute;
 
 	result = saAmfInitialize (&handle, &amfCallbacks, &version);
 	if (result != SA_OK) {
@@ -256,6 +257,9 @@ int main (void) {
 	result = saAmfComponentRegister (&handle, &compName, NULL);
 	printf ("register result is %d (should be 1)\n", result);
 
+	pthread_attr_init (&dispatch_thread_attribute);
+	pthread_attr_setschedpolicy (&dispatch_thread_attribute, SCHED_FIFO);
+	pthread_attr_setschedparam (&dispatch_thread_attribute, 99);
 	pthread_create (&dispatch_thread, NULL, th_dispatch, &handle);
 
 	sleep (5);