Просмотр исходного кода

This patch reworks IPC to use threads instead of the main poll loop

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1011 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake 20 лет назад
Родитель
Сommit
3c7f8b7c05
14 измененных файлов с 163 добавлено и 893 удалено
  1. 2 2
      exec/Makefile
  2. 7 1
      exec/aispoll.c
  3. 1 1
      exec/aispoll.h
  4. 11 0
      exec/evt.c
  5. 52 878
      exec/main.c
  6. 2 1
      exec/objdb.c
  7. 2 1
      exec/totemnet.c
  8. 11 1
      exec/totempg.c
  9. 2 1
      exec/totemrrp.c
  10. 2 2
      exec/totemsrp.c
  11. 19 0
      include/hdb.h
  12. 45 4
      include/queue.h
  13. 0 1
      lib/evt.c
  14. 7 0
      test/subscription.c

+ 2 - 2
exec/Makefile

@@ -58,9 +58,9 @@ LCR_OBJS = evs.o clm.o amf.o ckpt.o evt.o lck.o msg.o cfg.o cpg.o amfconfig.o ai
 
 # main executive objects
 MAIN_SRC = main.c print.c mempool.c \
-		util.c sync.c ykd.c service.c totemconfig.c mainconfig.c
+		util.c sync.c ykd.c service.c ipc.c totemconfig.c mainconfig.c
 MAIN_OBJS = main.o print.o mempool.o \
-		util.o sync.o service.o totemconfig.o mainconfig.o ../lcr/lcr_ifact.o
+		util.o sync.o service.o ipc.o totemconfig.o mainconfig.o ../lcr/lcr_ifact.o
 OTHER_OBJS = objdb.o
 
 ifeq (${BUILD_DYNAMIC}, 1) 

+ 7 - 1
exec/aispoll.c

@@ -57,6 +57,7 @@ struct poll_instance {
 	struct pollfd *ufds;
 	int poll_entry_count;
 	struct timerlist timerlist;
+	pthread_mutex_t *serialize;
 };
 
 /*
@@ -68,7 +69,7 @@ static struct hdb_handle_database poll_instance_database = {
 	.iterator	= 0
 };
 
-poll_handle poll_create (void)
+poll_handle poll_create (pthread_mutex_t *serialize)
 {
 	poll_handle handle;
 	struct poll_instance *poll_instance;
@@ -88,6 +89,7 @@ poll_handle poll_create (void)
 	poll_instance->poll_entries = 0;
 	poll_instance->ufds = 0;
 	poll_instance->poll_entry_count = 0;
+	poll_instance->serialize = serialize;
 	timerlist_init (&poll_instance->timerlist);
 
 	return (handle);
@@ -386,7 +388,9 @@ int poll_run (
 				&poll_instance->poll_entries[i].ufd,
 				sizeof (struct pollfd));
 		}
+		pthread_mutex_lock (poll_instance->serialize);
 		timeout = timerlist_timeout_msec (&poll_instance->timerlist);
+		pthread_mutex_unlock (poll_instance->serialize);
 
 retry_poll:
 		res = poll (poll_instance->ufds,
@@ -402,6 +406,7 @@ retry_poll:
 		for (i = 0; i < poll_entry_count; i++) {
 			if (poll_instance->ufds[i].fd != -1 &&
 				poll_instance->ufds[i].revents) {
+				pthread_mutex_lock (poll_instance->serialize);
 
 				res = poll_instance->poll_entries[i].dispatch_fn (handle,
 					poll_instance->ufds[i].fd, 
@@ -415,6 +420,7 @@ retry_poll:
 				if (res == -1) {
 					poll_instance->poll_entries[i].ufd.fd = -1; /* empty entry */
 				}
+				pthread_mutex_unlock (poll_instance->serialize);
 			}
 		}
 		timerlist_expire (&poll_instance->timerlist);

+ 1 - 1
exec/aispoll.h

@@ -37,7 +37,7 @@
 typedef void * poll_timer_handle;
 typedef unsigned int poll_handle;
 
-poll_handle poll_create (void);
+poll_handle poll_create (pthread_mutex_t *mutex);
 
 int poll_destroy (poll_handle poll_handle);
 

+ 11 - 0
exec/evt.c

@@ -1838,6 +1838,7 @@ deliver_event(struct event_data *evt,
 		}
 	}
 
+	assert (esip->esi_nevents >= 0);
 	if (!esip->esi_queue_blocked && 
 							(esip->esi_nevents >= evt_delivery_queue_size)) {
 		log_printf(LOG_LEVEL_DEBUG, "block\n");
@@ -2106,6 +2107,11 @@ static int evt_lib_init(void *conn)
 	 */
 	memset(libevt_pd, 0, sizeof(*libevt_pd));
 
+	/*
+	 * Initialize the open channel handle database.
+	 */
+	hdb_create(&libevt_pd->esi_hdb);
+
 	/*
 	 * list of channels open on this instance
 	 */
@@ -3013,6 +3019,11 @@ static int evt_lib_exit(void *conn)
 		}
 	}
 
+	/*
+	 * Destroy the open channel handle database
+	 */
+	hdb_destroy(&esip->esi_hdb);
+
 	return 0;
 }
 

+ 52 - 878
exec/main.c

@@ -1,7 +1,6 @@
 /*
- * vi: set autoindent tabstop=4 shiftwidth=4 :
- *
  * Copyright (c) 2002-2006 MontaVista Software, Inc.
+ * Copyright (c) 2006 Red Hat, Inc..
  *
  * All rights reserved.
  *
@@ -72,6 +71,7 @@
 #include "swab.h"
 #include "objdb.h"
 #include "config.h"
+#include "ipc.h"
 #define LOG_SERVICE LOG_SERVICE_MAIN
 #include "print.h"
 
@@ -87,78 +87,12 @@ static int gid_valid = 0;
 
 static unsigned int service_count = 32;
 
-struct outq_item {
-	void *msg;
-	size_t mlen;
-};
+static struct totem_logging_configuration totem_logging_configuration;
 
-enum conn_state {
-	CONN_STATE_ACTIVE,
-	CONN_STATE_DISCONNECTING,
-	CONN_STATE_DISCONNECTING_DELAYED
-};
+static char delivery_data[MESSAGE_SIZE_MAX];
 
-struct conn_info {
-	int fd;			/* File descriptor  */
-	enum conn_state state;	/* State of this connection */
-	char *inb;		/* Input buffer for non-blocking reads */
-	int inb_nextheader;	/* Next message header starts here */
-	int inb_start;		/* Start location of input buffer */
-	int inb_inuse;		/* Bytes currently stored in input buffer */
-	struct queue outq;	/* Circular queue for outgoing requests */
-	int byte_start;		/* Byte to start sending from in head of queue */
-	enum service_types service;/* Type of service so dispatch knows how to route message */
-	int authenticated;	/* Is this connection authenticated? */
-	void *private_data;	/* library connection private data */
-	struct conn_info *conn_info_partner;	/* partner connection dispatch<->response */
-	int should_exit_fn;	/* Should call the exit function when closing this ipc */
-};
 SaClmClusterNodeT *(*main_clm_get_by_nodeid) (unsigned int node_id);
 
- /*
-  * IPC Initializers
-  */
-static int dispatch_init_send_response (struct conn_info *conn_info, void *message);
-
-static int response_init_send_response (struct conn_info *conn_info, void *message);
-
-static int (*ais_init_service[]) (struct conn_info *conn_info, void *message) = {
-	response_init_send_response,
-	dispatch_init_send_response
-};
-
-static int poll_handler_libais_deliver (poll_handle handle, int fd, int revent, void *data, unsigned int *prio);
-
-
-static inline struct conn_info *conn_info_create (int fd) {
-	struct conn_info *conn_info;
-	int res;
-
-	conn_info = malloc (sizeof (struct conn_info));
-	if (conn_info == 0) {
-		return (0);
-	}
-
-	memset (conn_info, 0, sizeof (struct conn_info));
-	res = queue_init (&conn_info->outq, SIZEQUEUE,
-		sizeof (struct outq_item));
-	if (res != 0) {
-		free (conn_info);
-		return (0);
-	}
-	conn_info->inb = malloc (sizeof (char) * SIZEINB);
-	if (conn_info->inb == 0) {
-		queue_free (&conn_info->outq);
-		free (conn_info);
-		return (0);
-	}
-
-	conn_info->state = CONN_STATE_ACTIVE;
-	conn_info->fd = fd;
-	conn_info->service = SOCKET_SERVICE_INIT;
-	return (conn_info);
-}
-
 #ifdef COMPILE_OUT
 static void sigusr2_handler (int num)
 {
@@ -179,18 +113,6 @@ struct totem_ip_address *this_ip;
 struct totem_ip_address this_non_loopback_ip;
 #define LOCALHOST_IP inet_addr("127.0.0.1")
 
-#if defined(OPENAIS_LINUX)
-/* SUN_LEN is broken for abstract namespace
- */
-#define AIS_SUN_LEN(a) sizeof(*(a))
-
-char *socketname = "libais.socket";
-#else
-#define AIS_SUN_LEN(a) SUN_LEN(a)
-
-char *socketname = "/var/run/libais.socket";
-#endif
-
 totempg_groups_handle openais_group_handle;
 
 struct totempg_group openais_group = {
@@ -198,673 +120,6 @@ struct totempg_group openais_group = {
 	.group_len	= 1
 };
 
-
-static int libais_connection_active (struct conn_info *conn_info)
-{
-	return (conn_info->state == CONN_STATE_ACTIVE);
-}
-
-static void libais_disconnect_delayed (struct conn_info *conn_info)
-{
-	conn_info->state = CONN_STATE_DISCONNECTING_DELAYED;
-	conn_info->conn_info_partner->state = CONN_STATE_DISCONNECTING_DELAYED;
-}
-
-static int libais_disconnect (struct conn_info *conn_info)
-{
-	int res = 0;
-	struct outq_item *outq_item;
-
-	if (conn_info->should_exit_fn &&
-		ais_service[conn_info->service]->lib_exit_fn) {
-
-		res = ais_service[conn_info->service]->lib_exit_fn (conn_info);
-	}
-
-	/*
-	 * Call library exit handler and free private data
-	 */
-	if (conn_info->conn_info_partner &&
-		conn_info->conn_info_partner->should_exit_fn &&
-		ais_service[conn_info->conn_info_partner->service]->lib_exit_fn) {
-
-		res = ais_service[conn_info->conn_info_partner->service]->lib_exit_fn (conn_info->conn_info_partner);
-		if (conn_info->private_data) {
-			free (conn_info->private_data);
-		}
-	}
-
-	/*
-	 * Close the library connection and free its
-	 * data if it hasn't already been freed
-	 */
-	if (conn_info->state != CONN_STATE_DISCONNECTING) {
-		conn_info->state = CONN_STATE_DISCONNECTING;
-
-		close (conn_info->fd);
-
-		/*
-		 * Free the outq queued items
-		 */
-		while (!queue_is_empty (&conn_info->outq)) {
-			outq_item = queue_item_get (&conn_info->outq);
-			free (outq_item->msg);
-			queue_item_remove (&conn_info->outq);
-		}
-
-		queue_free (&conn_info->outq);
-		free (conn_info->inb);
-	}
-
-	/*
-	 * Close the library connection and free its
-	 * data if it hasn't already been freed
-	 */
-	if (conn_info->conn_info_partner &&
-		conn_info->conn_info_partner->state != CONN_STATE_DISCONNECTING) {
-
-		conn_info->conn_info_partner->state = CONN_STATE_DISCONNECTING;
-
-		close (conn_info->conn_info_partner->fd);
-
-		/*
-		 * Free the outq queued items
-		 */
-		while (!queue_is_empty (&conn_info->conn_info_partner->outq)) {
-			outq_item = queue_item_get (&conn_info->conn_info_partner->outq);
-			free (outq_item->msg);
-			queue_item_remove (&conn_info->conn_info_partner->outq);
-		}
-
-		queue_free (&conn_info->conn_info_partner->outq);
-		if (conn_info->conn_info_partner->inb) {
-			free (conn_info->conn_info_partner->inb);
-		}
-	}
-
-	/*
-	 * If exit_fn didn't request a retry,
-	 * free the conn_info structure
-	 */
-	if (res != -1) {
-		if (conn_info->conn_info_partner) {
-			poll_dispatch_delete (aisexec_poll_handle,
-				conn_info->conn_info_partner->fd);
-		}
-		poll_dispatch_delete (aisexec_poll_handle, conn_info->fd);
-
-		free (conn_info->conn_info_partner);
-		free (conn_info);
-	}
-
-	/*
-	 * Inverse res from libais exit fn handler
-	 */
-	return (res != -1 ? -1 : 0);
-}
-
-static int cleanup_send_response (struct conn_info *conn_info) {
-	struct queue *outq;
-	int res = 0;
-	struct outq_item *queue_item;
-	struct msghdr msg_send;
-	struct iovec iov_send;
-	char *msg_addr;
-
-	if (!libais_connection_active (conn_info)) {
-		return (-1);
-	}
-	outq = &conn_info->outq;
-
-	msg_send.msg_iov = &iov_send;
-	msg_send.msg_name = 0;
-	msg_send.msg_namelen = 0;
-	msg_send.msg_iovlen = 1;
-	msg_send.msg_control = 0;
-	msg_send.msg_controllen = 0;
-	msg_send.msg_flags = 0;
-
-	while (!queue_is_empty (outq)) {
-		queue_item = queue_item_get (outq);
-		msg_addr = (char *)queue_item->msg;
-		msg_addr = &msg_addr[conn_info->byte_start];
-
-		iov_send.iov_base = msg_addr;
-		iov_send.iov_len = queue_item->mlen - conn_info->byte_start;
-
-retry_sendmsg:
-		res = sendmsg (conn_info->fd, &msg_send, MSG_NOSIGNAL);
-		if (res == -1 && errno == EINTR) {
-			goto retry_sendmsg;
-		}
-		if (res == -1 && errno == EAGAIN) {
-			break; /* outgoing kernel queue full */
-		}
-		if (res == -1) {
-			return (-1); /* message couldn't be sent */
-		}
-		if (res + conn_info->byte_start != queue_item->mlen) {
-			conn_info->byte_start += res;
-			break;
-		}
-
-		/*
-		 * Message sent, try sending another message
-		 */
-		queue_item_remove (outq);
-		conn_info->byte_start = 0;
-		free (queue_item->msg);
-	} /* while queue not empty */
-
-	if (queue_is_empty (outq)) {
-		poll_dispatch_modify (aisexec_poll_handle, conn_info->fd,
-			POLLIN|POLLNVAL, poll_handler_libais_deliver, 0);
-	}
-	return (0);
-}
-
-int openais_conn_send_response (
-	void *conn,
-	void *msg,
-	int mlen)
-{
-	struct queue *outq;
-	char *cmsg;
-	int res = 0;
-	int queue_empty;
-	struct outq_item *queue_item;
-	struct outq_item queue_item_out;
-	struct msghdr msg_send;
-	struct iovec iov_send;
-	char *msg_addr;
-	struct conn_info *conn_info = (struct conn_info *)conn;
-
-	if (conn_info == NULL) {
-		return -1;
-	}
-
-	if (!libais_connection_active (conn_info)) {
-		return (-1);
-	}
-	outq = &conn_info->outq;
-
-	msg_send.msg_iov = &iov_send;
-	msg_send.msg_name = 0;
-	msg_send.msg_namelen = 0;
-	msg_send.msg_iovlen = 1;
-	msg_send.msg_control = 0;
-	msg_send.msg_controllen = 0;
-	msg_send.msg_flags = 0;
-
-	if (queue_is_full (outq)) {
-		/*
-		 * Start a disconnect if we have not already started one
-		 * and report that the outgoing queue is full
-		 */
-		log_printf (LOG_LEVEL_ERROR, "Library queue is full, disconnecting library connection.\n");
-		libais_disconnect_delayed (conn_info);
-		return (-1);
-	}
-	while (!queue_is_empty (outq)) {
-		queue_item = queue_item_get (outq);
-		msg_addr = (char *)queue_item->msg;
-		msg_addr = &msg_addr[conn_info->byte_start];
-
-		iov_send.iov_base = msg_addr;
-		iov_send.iov_len = queue_item->mlen - conn_info->byte_start;
-
-retry_sendmsg:
-		res = sendmsg (conn_info->fd, &msg_send, MSG_NOSIGNAL);
-		if (res == -1 && errno == EINTR) {
-			goto retry_sendmsg;
-		}
-		if (res == -1 && errno == EAGAIN) {
-			break; /* outgoing kernel queue full */
-		}
-		if (res == -1) {
-			break; /* some other error, stop trying to send message */
-		}
-		if (res + conn_info->byte_start != queue_item->mlen) {
-			conn_info->byte_start += res;
-			break;
-		}
-
-		/*
-		 * Message sent, try sending another message
-		 */
-		queue_item_remove (outq);
-		conn_info->byte_start = 0;
-		free (queue_item->msg);
-	} /* while queue not empty */
-
-	res = -1;
-
-	queue_empty = queue_is_empty (outq);
-	/*
-	 * Send requested message
-	 */
-	if (queue_empty) {
-
-		iov_send.iov_base = msg;
-		iov_send.iov_len = mlen;
-retry_sendmsg_two:
-		res = sendmsg (conn_info->fd, &msg_send, MSG_NOSIGNAL);
-		if (res == -1 && errno == EINTR) {
-			goto retry_sendmsg_two;
-		}
-		if (res == -1 && errno == EAGAIN) {
-			conn_info->byte_start = 0;
-			poll_dispatch_modify (aisexec_poll_handle, conn_info->fd,
-				POLLIN|POLLNVAL, poll_handler_libais_deliver, 0);
-		}
-		if (res != -1) {
-			if (res + conn_info->byte_start != mlen) {
-				conn_info->byte_start += res;
-				res = -1;
-			} else {
-				conn_info->byte_start = 0;
-				poll_dispatch_modify (aisexec_poll_handle, conn_info->fd,
-					POLLIN|POLLNVAL, poll_handler_libais_deliver, 0);
-			}
-		}
-	}
-
-	/*
-	 * If res == -1 , errrno == EAGAIN which means kernel queue full
-	 */
-	if (res == -1)  {
-		cmsg = malloc (mlen);
-		if (cmsg == 0) {
-			log_printf (LOG_LEVEL_ERROR, "Library queue couldn't allocate a message, disconnecting library connection.\n");
-			libais_disconnect_delayed (conn_info);
-			return (-1);
-		}
-		queue_item_out.msg = cmsg;
-		queue_item_out.mlen = mlen;
-		memcpy (cmsg, msg, mlen);
-		queue_item_add (outq, &queue_item_out);
-
-		poll_dispatch_modify (aisexec_poll_handle, conn_info->fd,
-			POLLOUT|POLLIN|POLLNVAL, poll_handler_libais_deliver, 0);
-	}
-	return (0);
-}
-
-static int poll_handler_libais_accept (
-	poll_handle handle,
-	int fd,
-	int revent,
-	void *data,
-	unsigned int *prio)
-{
-	socklen_t addrlen;
-	struct conn_info *conn_info;
-	struct sockaddr_un un_addr;
-	int new_fd;
-#ifdef OPENAIS_LINUX
-	int on = 1;
-#endif
-	int res;
-
-	addrlen = sizeof (struct sockaddr_un);
-
-retry_accept:
-	new_fd = accept (fd, (struct sockaddr *)&un_addr, &addrlen);
-	if (new_fd == -1 && errno == EINTR) {
-		goto retry_accept;
-	}
-
-	if (new_fd == -1) {
-		log_printf (LOG_LEVEL_ERROR, "ERROR: Could not accept Library connection: %s\n", strerror (errno));
-		return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
-	}
-
-	totemip_nosigpipe(new_fd);
-	res = fcntl (new_fd, F_SETFL, O_NONBLOCK);
-	if (res == -1) {
-		log_printf (LOG_LEVEL_ERROR, "Could not set non-blocking operation on library connection: %s\n", strerror (errno));
-		close (new_fd);
-		return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
-	}
-
-	/*
-	 * Valid accept
-	 */
-
-	/*
-	 * Request credentials of sender provided by kernel
-	 */
-#ifdef OPENAIS_LINUX
-	setsockopt(new_fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on));
-#endif
-
-	log_printf (LOG_LEVEL_DEBUG, "connection received from libais client %d.\n", new_fd);
-
-	conn_info = conn_info_create (new_fd);
-	if (conn_info == 0) {
-		close (new_fd);
-		return (0); /* This is an error, but -1 would indicate disconnect from poll */
-	}
-
-	poll_dispatch_add (aisexec_poll_handle, new_fd, POLLIN|POLLNVAL, conn_info,
-		poll_handler_libais_deliver, 0);
-	return (0);
-}
-
-static int dispatch_init_send_response (struct conn_info *conn_info, void *message)
-{
-	SaAisErrorT error = SA_AIS_ERR_ACCESS;
-	struct req_lib_dispatch_init *req_lib_dispatch_init = (struct req_lib_dispatch_init *)message;
-	struct res_lib_dispatch_init res_lib_dispatch_init;
-	struct conn_info *msg_conn_info;
-
-	if (conn_info->authenticated) {
-		conn_info->service = req_lib_dispatch_init->resdis_header.service;
-		if (!ais_service[req_lib_dispatch_init->resdis_header.service])
-			error = SA_AIS_ERR_NOT_SUPPORTED;
-		else
-			error = SA_AIS_OK;
-
-		conn_info->conn_info_partner = (struct conn_info *)req_lib_dispatch_init->conn_info;
-
-		msg_conn_info = (struct conn_info *)req_lib_dispatch_init->conn_info;
-		msg_conn_info->conn_info_partner = conn_info;
-
-		if (error == SA_AIS_OK) {
-			int private_data_size;
-
-			private_data_size = ais_service[req_lib_dispatch_init->resdis_header.service]->private_data_size;
-			if (private_data_size) {
-				conn_info->private_data = malloc (private_data_size);
-
-				conn_info->conn_info_partner->private_data = conn_info->private_data;
-				if (conn_info->private_data == NULL) {
-					error = SA_AIS_ERR_NO_MEMORY;
-				} else {
-					memset (conn_info->private_data, 0, private_data_size);
-				}
-			} else {
-				conn_info->private_data = NULL;
-				conn_info->conn_info_partner->private_data = NULL;
-			}
-		}
-
-	res_lib_dispatch_init.header.size = sizeof (struct res_lib_dispatch_init);
-	res_lib_dispatch_init.header.id = MESSAGE_RES_INIT;
-	res_lib_dispatch_init.header.error = error;
-
-	openais_conn_send_response (
-		conn_info,
-		&res_lib_dispatch_init,
-		sizeof (res_lib_dispatch_init));
-
-	if (error != SA_AIS_OK) {
-		return (-1);
-	}
-
-	}
-
-	conn_info->should_exit_fn = 1;
-	ais_service[req_lib_dispatch_init->resdis_header.service]->lib_init_fn (conn_info);
-	return (0);
-}
-
-void *openais_conn_partner_get (void *conn)
-{
-	struct conn_info *conn_info = (struct conn_info *)conn;
-
-	if (conn != NULL) {
-		return ((void *)conn_info->conn_info_partner);
-	} else {
-		return NULL;
-	}
-}
-
-void *openais_conn_private_data_get (void *conn)
-{
-	struct conn_info *conn_info = (struct conn_info *)conn;
-
-	if (conn != NULL) {
-		return ((void *)conn_info->private_data);
-	} else {
-		return NULL;
-	}
-}
-
-static int response_init_send_response (struct conn_info *conn_info, void *message)
-{
-	SaAisErrorT error = SA_AIS_ERR_ACCESS;
-	struct req_lib_response_init *req_lib_response_init = (struct req_lib_response_init *)message;
-	struct res_lib_response_init res_lib_response_init;
-
-	if (conn_info->authenticated) {
-		conn_info->service = req_lib_response_init->resdis_header.service;
-		error = SA_AIS_OK;
-	}
-	res_lib_response_init.header.size = sizeof (struct res_lib_response_init);
-	res_lib_response_init.header.id = MESSAGE_RES_INIT;
-	res_lib_response_init.header.error = error;
-	res_lib_response_init.conn_info = (unsigned long)conn_info;
-
-	openais_conn_send_response (
-		conn_info,
-		&res_lib_response_init,
-		sizeof (res_lib_response_init));
-
-	if (error == SA_AIS_ERR_ACCESS) {
-		return (-1);
-	}
-	conn_info->should_exit_fn = 0;
-	return (0);
-}
-
-struct res_overlay {
-	struct res_header header;
-	char buf[4096];
-};
-
-static int poll_handler_libais_deliver (poll_handle handle, int fd, int revent, void *data, unsigned int *prio)
-{
-	int res;
-	struct conn_info *conn_info = (struct conn_info *)data;
-	struct req_header *header;
-	int service;
-	struct msghdr msg_recv;
-	struct iovec iov_recv;
-#ifdef OPENAIS_LINUX
-	struct cmsghdr *cmsg;
-	char cmsg_cred[CMSG_SPACE (sizeof (struct ucred))];
-	struct ucred *cred;
-	int on = 0;
-#else
-	uid_t euid;
-	gid_t egid;
-#endif
-	int send_ok = 0;
-	int send_ok_joined = 0;
-	struct iovec send_ok_joined_iovec;
-	struct res_overlay res_overlay;
-
-	if (revent & (POLLERR|POLLHUP)) {
-		res = libais_disconnect (conn_info);
-		return (res);
-	}
-
-	/*
-	 * Handle delayed disconnections
-	 */
-	if (conn_info->state == CONN_STATE_DISCONNECTING_DELAYED) {
-		res = libais_disconnect (conn_info);
-		return (res);
-	}
-
-	if (conn_info->state == CONN_STATE_DISCONNECTING) {
-		return (0);
-	}
-
-	if (revent & POLLOUT) {
-		cleanup_send_response (conn_info);
-	}
-
-	if ((revent & POLLIN) == 0) {
-		return (0);
-	}
-
-	msg_recv.msg_iov = &iov_recv;
-	msg_recv.msg_iovlen = 1;
-	msg_recv.msg_name = 0;
-	msg_recv.msg_namelen = 0;
-	msg_recv.msg_flags = 0;
-
-	if (conn_info->authenticated) {
-		msg_recv.msg_control = 0;
-		msg_recv.msg_controllen = 0;
-	} else {
-#ifdef OPENAIS_LINUX
-		msg_recv.msg_control = (void *)cmsg_cred;
-		msg_recv.msg_controllen = sizeof (cmsg_cred);
-#else
-		euid = -1; egid = -1;
-		if (getpeereid(fd, &euid, &egid) != -1 &&
-		    (euid == 0 || egid == gid_valid)) {
-				conn_info->authenticated = 1;
-		}
-		if (conn_info->authenticated == 0) {
-			log_printf (LOG_LEVEL_SECURITY, "Connection not authenticated because gid is %d, expecting %d\n", egid, gid_valid);
-		}
-#endif
-	}
-
-	iov_recv.iov_base = &conn_info->inb[conn_info->inb_start];
-	iov_recv.iov_len = (SIZEINB) - conn_info->inb_start;
-	assert (iov_recv.iov_len != 0);
-
-retry_recv:
-	res = recvmsg (fd, &msg_recv, MSG_NOSIGNAL);
-	if (res == -1 && errno == EINTR) {
-		goto retry_recv;
-	} else
-	if (res == -1 && errno != EAGAIN) {
-		goto error_disconnect;
-	} else
-	if (res == 0) {
-		goto error_disconnect;
-		return (-1);
-	}
-
-	/*
-	 * Authenticate if this connection has not been authenticated
-	 */
-#ifdef OPENAIS_LINUX
-	if (conn_info->authenticated == 0) {
-		cmsg = CMSG_FIRSTHDR (&msg_recv);
-		cred = (struct ucred *)CMSG_DATA (cmsg);
-		if (cred) {
-			if (cred->uid == 0 || cred->gid == gid_valid) {
-				setsockopt(fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof (on));
-				conn_info->authenticated = 1;
-			}
-		}
-		if (conn_info->authenticated == 0) {
-			log_printf (LOG_LEVEL_SECURITY, "Connection not authenticated because gid is %d, expecting %d\n", cred->gid, gid_valid);
-		}
-	}
-#endif
-	/*
-	 * Dispatch all messages received in recvmsg that can be dispatched
-	 * sizeof (struct req_header) needed at minimum to do any processing
-	 */
-	conn_info->inb_inuse += res;
-	conn_info->inb_start += res;
-
-	while (conn_info->inb_inuse >= sizeof (struct req_header) && res != -1) {
-		header = (struct req_header *)&conn_info->inb[conn_info->inb_start - conn_info->inb_inuse];
-
-		if (header->size > conn_info->inb_inuse) {
-			break;
-		}
-		service = conn_info->service;
-
-		/*
-		 * If this service is in init phase, initialize service
-		 * else handle message using service service
-		 */
-		if (service == SOCKET_SERVICE_INIT) {
-			res = ais_init_service[header->id] (conn_info, header);
-// TODO error in init_two_fn needs to be handled
-		} else  {
-			/*
-			 * Not an init service, but a standard service
-			 */
-			if (header->id < 0 || header->id > ais_service[service]->lib_service_count) {
-				log_printf (LOG_LEVEL_SECURITY, "Invalid header id is %d min 0 max %d\n",
-				header->id, ais_service[service]->lib_service_count);
-				res = -1;
-				goto error_disconnect;
-			}
-
-			/*
-			 * If flow control is required of the library handle, determine that
-			 * openais is not in synchronization and that totempg has room available
-			 * to queue a message, otherwise tell the library we are busy and to
-			 * try again later
-			 */
-			send_ok_joined_iovec.iov_base = header;
-			send_ok_joined_iovec.iov_len = header->size;
-			send_ok_joined = totempg_groups_send_ok_joined (openais_group_handle,
-				&send_ok_joined_iovec, 1);
-
-			send_ok =
-				(sync_primary_designated() == 1) && (
-				(ais_service[service]->lib_service[header->id].flow_control == OPENAIS_FLOW_CONTROL_NOT_REQUIRED) ||
-				((ais_service[service]->lib_service[header->id].flow_control == OPENAIS_FLOW_CONTROL_REQUIRED) &&
-				(send_ok_joined) &&
-				(sync_in_process() == 0)));
-
-			if (send_ok) {
-		//		*prio = 0;
-				ais_service[service]->lib_service[header->id].lib_handler_fn(conn_info, header);
-			} else {
-		//		*prio = (*prio) + 1;
-
-				/*
-				 * Overload, tell library to retry
-				 */
-				res_overlay.header.size =
-					ais_service[service]->lib_service[header->id].response_size;
-				res_overlay.header.id =
-					ais_service[service]->lib_service[header->id].response_id;
-				res_overlay.header.error = SA_AIS_ERR_TRY_AGAIN;
-				openais_conn_send_response (
-					conn_info,
-					&res_overlay,
-					res_overlay.header.size);
-			}
-		}
-		conn_info->inb_inuse -= header->size;
-	} /* while */
-
-	if (conn_info->inb_inuse == 0) {
-		conn_info->inb_start = 0;
-	} else
-// BUG	if (connections[fd].inb_start + connections[fd].inb_inuse >= SIZEINB) {
-	if (conn_info->inb_start >= SIZEINB) {
-		/*
-		 * If in buffer is full, move it back to start
-		 */
-		memmove (conn_info->inb,
-			&conn_info->inb[conn_info->inb_start - conn_info->inb_inuse],
-			sizeof (char) * conn_info->inb_inuse);
-		conn_info->inb_start = conn_info->inb_inuse;
-	}
-
-	return (0);
-
-error_disconnect:
-	res = libais_disconnect (conn_info);
-	return (res);
-}
-
 void sigintr_handler (int signum)
 {
 
@@ -910,56 +165,6 @@ static int openais_sync_callbacks_retrieve (int sync_id,
 	return (0);
 }
 
-char delivery_data[MESSAGE_SIZE_MAX];
-
-static void deliver_fn (
-	struct totem_ip_address *source_addr,
-	struct iovec *iovec,
-	int iov_len,
-	int endian_conversion_required)
-{
-	struct req_header *header;
-	int pos = 0;
-	int i;
-	int service;
-	int fn_id;
-
-	/*
-	 * Build buffer without iovecs to make processing easier
-	 * This is only used for messages which are multicast with iovecs
-	 * and self-delivered.  All other mechanisms avoid the copy.
-	 */
-	if (iov_len > 1) {
-		for (i = 0; i < iov_len; i++) {
-			memcpy (&delivery_data[pos], iovec[i].iov_base, iovec[i].iov_len);
-			pos += iovec[i].iov_len;
-			assert (pos < MESSAGE_SIZE_MAX);
-		}
-		header = (struct req_header *)delivery_data;
-	} else {
-		header = (struct req_header *)iovec[0].iov_base;
-	}
-	if (endian_conversion_required) {
-		header->id = swab32 (header->id);
-		header->size = swab32 (header->size);
-	}
-
-//	assert(iovec->iov_len == header->size);
-
-	/*
-	 * Call the proper executive handler
-	 */
-	service = header->id >> 16;
-	fn_id = header->id & 0xffff;
-	if (endian_conversion_required) {
-		ais_service[service]->exec_service[fn_id].exec_endian_convert_fn
-			(header);
-	}
-
-	ais_service[service]->exec_service[fn_id].exec_handler_fn
-		(header, source_addr);
-}
-
 static struct memb_ring_id aisexec_ring_id;
 
 static void confchg_fn (
@@ -1054,52 +259,6 @@ static void aisexec_tty_detach (void)
 #endif
 }
 
-static void aisexec_libais_bind (int *server_fd)
-{
-	int libais_server_fd;
-	struct sockaddr_un un_addr;
-	int res;
-
-	/*
-	 * Create socket for libais clients, name socket, listen for connections
-	 */
-	libais_server_fd = socket (PF_UNIX, SOCK_STREAM, 0);
-	if (libais_server_fd == -1) {
-		log_printf (LOG_LEVEL_ERROR ,"Cannot create libais client connections socket.\n");
-		openais_exit_error (AIS_DONE_LIBAIS_SOCKET);
-	};
-
-	totemip_nosigpipe(libais_server_fd);
-	res = fcntl (libais_server_fd, F_SETFL, O_NONBLOCK);
-	if (res == -1) {
-		log_printf (LOG_LEVEL_ERROR, "Could not set non-blocking operation on server socket: %s\n", strerror (errno));
-		openais_exit_error (AIS_DONE_LIBAIS_SOCKET);
-	}
-
-#if !defined(OPENAIS_LINUX)
-	unlink(socketname);
-#endif
-	memset (&un_addr, 0, sizeof (struct sockaddr_un));
-	un_addr.sun_family = AF_UNIX;
-#if defined(OPENAIS_BSD) || defined(OPENAIS_DARWIN)
-	un_addr.sun_len = sizeof(struct sockaddr_un);
-#endif
-#if defined(OPENAIS_LINUX)
-	strcpy (un_addr.sun_path + 1, socketname);
-#else
-	strcpy (un_addr.sun_path, socketname);
-#endif
-
-	res = bind (libais_server_fd, (struct sockaddr *)&un_addr, AIS_SUN_LEN(&un_addr));
-	if (res) {
-		log_printf (LOG_LEVEL_ERROR, "ERROR: Could not bind AF_UNIX: %s.\n", strerror (errno));
-		openais_exit_error (AIS_DONE_LIBAIS_BIND);
-	}
-	listen (libais_server_fd, SERVER_BACKLOG);
-
-	*server_fd = libais_server_fd;
-}
-
 static void aisexec_setscheduler (void)
 {
 #if defined(OPENAIS_BSD) || defined(OPENAIS_LINUX)
@@ -1145,33 +304,57 @@ static void aisexec_mlockall (void)
 #endif
 }
 
-int message_source_is_local(struct message_source *source)
+
+static void deliver_fn (
+	struct totem_ip_address *source_addr,
+	struct iovec *iovec,
+	int iov_len,
+	int endian_conversion_required)
 {
-	int ret = 0;
+	struct req_header *header;
+	int pos = 0;
+	int i;
+	int service;
+	int fn_id;
 
-	assert (source != NULL);
-	if ((totemip_localhost_check(&source->addr)
-	     ||(totemip_equal(&source->addr, &this_non_loopback_ip)))) {
-		ret = 1;
+	/*
+	 * Build buffer without iovecs to make processing easier
+	 * This is only used for messages which are multicast with iovecs
+	 * and self-delivered.  All other mechanisms avoid the copy.
+	 */
+	if (iov_len > 1) {
+		for (i = 0; i < iov_len; i++) {
+			memcpy (&delivery_data[pos], iovec[i].iov_base, iovec[i].iov_len);
+			pos += iovec[i].iov_len;
+			assert (pos < MESSAGE_SIZE_MAX);
+		}
+		header = (struct req_header *)delivery_data;
+	} else {
+		header = (struct req_header *)iovec[0].iov_base;
+	}
+	if (endian_conversion_required) {
+		header->id = swab32 (header->id);
+		header->size = swab32 (header->size);
 	}
-	return ret;
-}
 
-void message_source_set (
-	struct message_source *source,
-	void *conn)
-{
-	assert ((source != NULL) && (conn != NULL));
-	totemip_copy(&source->addr, this_ip);
-	source->conn = conn;
-}
+//	assert(iovec->iov_len == header->size);
 
+	/*
+	 * Call the proper executive handler
+	 */
+	service = header->id >> 16;
+	fn_id = header->id & 0xffff;
+	if (endian_conversion_required) {
+		ais_service[service]->exec_service[fn_id].exec_endian_convert_fn
+			(header);
+	}
 
-struct totem_logging_configuration totem_logging_configuration;
+	ais_service[service]->exec_service[fn_id].exec_handler_fn
+		(header, source_addr);
+}
 
 int main (int argc, char **argv)
 {
-	int libais_server_fd;
 	char *error_string;
 	struct main_config main_config;
 	struct totem_config totem_config;
@@ -1195,9 +378,7 @@ int main (int argc, char **argv)
 
 	totemip_localhost(AF_INET, &this_non_loopback_ip);
 
-	aisexec_poll_handle = poll_create ();
-
-//TODO	signal (SIGUSR2, sigusr2_handler);
+	aisexec_poll_handle = poll_create (openais_ipc_mutex_get());
 
 	/*
 	 * Load the object database interface
@@ -1308,6 +489,10 @@ int main (int argc, char **argv)
 	 * there is more then one interface in a system, so
 	 * in this case, only a warning is printed
 	 */
+	/*
+	 * Join multicast group and setup delivery
+	 *  and configuration change functions
+	 */
 	totempg_initialize (
 		aisexec_poll_handle,
 		&totem_config);
@@ -1349,23 +534,12 @@ int main (int argc, char **argv)
 
 	signal (SIGINT, sigintr_handler);
 
-	aisexec_libais_bind (&libais_server_fd);
+	openais_ipc_init (aisexec_poll_handle, gid_valid, &this_non_loopback_ip);
 
 	aisexec_tty_detach ();
 
 	log_printf (LOG_LEVEL_NOTICE, "AIS Executive Service: started and ready to provide service.\n");
 
-	/*
-	 * Setup libais connection dispatch routine
-	 */
-	poll_dispatch_add (aisexec_poll_handle, libais_server_fd,
-		POLLIN, 0, poll_handler_libais_accept, 0);
-
-	/*
-	 * Join multicast group and setup delivery
-	 *  and configuration change functions
-	 */
-
 	/*
 	 * Start main processing loop
 	 */

+ 2 - 1
exec/objdb.c

@@ -65,7 +65,8 @@ struct object_instance {
 static struct hdb_handle_database object_instance_database = {
 	.handle_count	= 0,
 	.handles	= 0,
-	.iterator	= 0
+	.iterator	= 0,
+	.mutex		= PTHREAD_MUTEX_INITIALIZER
 };
 
 static int objdb_init (void)

+ 2 - 1
exec/totemnet.c

@@ -202,7 +202,8 @@ static struct totem_ip_address localhost;
 static struct hdb_handle_database totemnet_instance_database = {
 	.handle_count	= 0,
 	.handles	= 0,
-	.iterator	= 0
+	.iterator	= 0,
+	.mutex		= PTHREAD_MUTEX_INITIALIZER
 };
 
 static void totemnet_instance_initialize (struct totemnet_instance *instance)

+ 11 - 1
exec/totempg.c

@@ -203,7 +203,8 @@ struct totempg_group_instance {
 static struct hdb_handle_database totempg_groups_instance_database = {
 	.handle_count	= 0,
 	.handles	= 0,
-	.iterator	= 0
+	.iterator	= 0,
+	.mutex		= PTHREAD_MUTEX_INITIALIZER
 };
 
 static int send_ok (int msg_size);
@@ -570,6 +571,8 @@ static void totempg_deliver_fn (
 
 void *callback_token_received_handle;
 
+pthread_mutex_t mcast_msg_mutex = PTHREAD_MUTEX_INITIALIZER;
+
 int callback_token_received_fn (enum totem_callback_token_type type,
 	void *data)
 {
@@ -577,10 +580,13 @@ int callback_token_received_fn (enum totem_callback_token_type type,
 	struct iovec iovecs[3];
 	int res;
 
+	pthread_mutex_lock (&mcast_msg_mutex);
 	if (mcast_packed_msg_count == 0) {
+		pthread_mutex_unlock (&mcast_msg_mutex);
 		return (0);
 	}
 	if (totemmrp_avail() == 0) {
+		pthread_mutex_unlock (&mcast_msg_mutex);
 		return (0);
 	}
 	mcast.fragmented = 0;
@@ -605,6 +611,7 @@ int callback_token_received_fn (enum totem_callback_token_type type,
 	mcast_packed_msg_count = 0;
 	fragment_size = 0;
 
+	pthread_mutex_unlock (&mcast_msg_mutex);
 	return (0);
 }
 
@@ -672,6 +679,7 @@ static int mcast_msg (
 	int copy_base = 0;
 	int total_size = 0;
 
+	pthread_mutex_lock (&mcast_msg_mutex);
 	totemmrp_new_msg_signal ();
 
 	max_packet_size = TOTEMPG_PACKET_SIZE -
@@ -689,6 +697,7 @@ static int mcast_msg (
 	if (send_ok (total_size + sizeof(unsigned short) *
 		(mcast_packed_msg_count+1)) == 0) {
 
+		pthread_mutex_unlock (&mcast_msg_mutex);
 		return(-1);
 	}
 
@@ -800,6 +809,7 @@ static int mcast_msg (
 			mcast_packed_msg_count++;
 	}
 
+	pthread_mutex_unlock (&mcast_msg_mutex);
 	return (res);
 }
 

+ 2 - 1
exec/totemrrp.c

@@ -288,7 +288,8 @@ struct rrp_algo active_algo = {
 static struct hdb_handle_database totemrrp_instance_database = {
 	.handle_count	= 0,
 	.handles	= 0,
-	.iterator	= 0
+	.iterator	= 0,
+	.mutex		= PTHREAD_MUTEX_INITIALIZER
 };
 
 #define log_printf(level, format, args...) \

+ 2 - 2
exec/totemsrp.c

@@ -48,7 +48,6 @@
  */
 
 #include <assert.h>
-#include <pthread.h>
 #include <sys/mman.h>
 #include <sys/types.h>
 #include <sys/stat.h>
@@ -571,7 +570,8 @@ void main_iface_change_fn (
 static struct hdb_handle_database totemsrp_instance_database = {
 	.handle_count	= 0,
 	.handles	= 0,
-	.iterator	= 0
+	.iterator	= 0,
+	.mutex		= PTHREAD_MUTEX_INITIALIZER
 };
 struct message_handlers totemsrp_message_handlers = {
 	6,

+ 19 - 0
include/hdb.h

@@ -38,6 +38,7 @@
 #include <stdlib.h>
 #include <string.h>
 #include <assert.h>
+#include <pthread.h>
 
 enum HDB_HANDLE_STATE {
 	HDB_HANDLE_STATE_EMPTY,
@@ -55,12 +56,14 @@ struct hdb_handle_database {
 	unsigned int handle_count;
 	struct hdb_handle *handles;
 	unsigned int iterator;
+	pthread_mutex_t mutex;
 };
 
 static inline void hdb_create (
 	struct hdb_handle_database *handle_database)
 {
 	memset (handle_database, 0, sizeof (struct hdb_handle_database));
+	pthread_mutex_init (&handle_database->mutex, NULL);
 }
 
 static inline void hdb_destroy (
@@ -83,6 +86,8 @@ static inline int hdb_handle_create (
 	int found = 0;
 	void *instance;
 
+	pthread_mutex_lock (&handle_database->mutex);
+
 	for (handle = 0; handle < handle_database->handle_count; handle++) {
 		if (handle_database->handles[handle].state == HDB_HANDLE_STATE_EMPTY) {
 			found = 1;
@@ -95,6 +100,7 @@ static inline int hdb_handle_create (
 		new_handles = (struct hdb_handle *)realloc (handle_database->handles,
 			sizeof (struct hdb_handle) * handle_database->handle_count);
 		if (new_handles == 0) {
+			pthread_mutex_unlock (&handle_database->mutex);
 			return (-1);
 		}
 		handle_database->handles = new_handles;
@@ -114,6 +120,8 @@ static inline int hdb_handle_create (
 
 	*handle_id_out = handle;
 
+	pthread_mutex_unlock (&handle_database->mutex);
+
 	return (0);
 }
 
@@ -122,18 +130,24 @@ static inline int hdb_handle_get (
 	unsigned int handle,
 	void **instance)
 {
+	pthread_mutex_lock (&handle_database->mutex);
+
 	*instance = NULL;
 	if (handle >= handle_database->handle_count) {
+		pthread_mutex_unlock (&handle_database->mutex);
 		return (-1);
 	}
 
 	if (handle_database->handles[handle].state != HDB_HANDLE_STATE_ACTIVE) {
+		pthread_mutex_unlock (&handle_database->mutex);
 		return (-1);
 	}
 
 	*instance = handle_database->handles[handle].instance;
 
 	handle_database->handles[handle].ref_count += 1;
+
+	pthread_mutex_unlock (&handle_database->mutex);
 	return (0);
 }
 
@@ -141,6 +155,7 @@ static inline void hdb_handle_put (
 	struct hdb_handle_database *handle_database,
 	unsigned int handle)
 {
+	pthread_mutex_lock (&handle_database->mutex);
 	handle_database->handles[handle].ref_count -= 1;
 	assert (handle_database->handles[handle].ref_count >= 0);
 
@@ -148,13 +163,17 @@ static inline void hdb_handle_put (
 		free (handle_database->handles[handle].instance);
 		memset (&handle_database->handles[handle], 0, sizeof (struct hdb_handle));
 	}
+	pthread_mutex_unlock (&handle_database->mutex);
 }
 
 static inline void hdb_handle_destroy (
 	struct hdb_handle_database *handle_database,
 	unsigned int handle)
 {
+	pthread_mutex_lock (&handle_database->mutex);
+
 	handle_database->handles[handle].state = HDB_HANDLE_STATE_PENDINGREMOVAL;
+	pthread_mutex_unlock (&handle_database->mutex);
 	hdb_handle_put (handle_database, handle);
 }
 

+ 45 - 4
include/queue.h

@@ -35,6 +35,7 @@
 #define QUEUE_H_DEFINED
 
 #include <string.h>
+#include <pthread.h>
 #include "assert.h"
 
 struct queue {
@@ -46,6 +47,7 @@ struct queue {
 	void *items;
 	int size_per_item;
 	int iterator;
+	pthread_mutex_t mutex;
 };
 
 static inline int queue_init (struct queue *queue, int queue_items, int size_per_item) {
@@ -61,17 +63,20 @@ static inline int queue_init (struct queue *queue, int queue_items, int size_per
 		return (-ENOMEM);
 	}
 	memset (queue->items, 0, queue_items * size_per_item);
+	pthread_mutex_init (&queue->mutex, NULL);
 	return (0);
 }
 
 static inline int queue_reinit (struct queue *queue)
 {
+	pthread_mutex_lock (&queue->mutex);
 	queue->head = 0;
 	queue->tail = queue->size - 1;
 	queue->used = 0;
 	queue->usedhw = 0;
 
 	memset (queue->items, 0, queue->size * queue->size_per_item);
+	pthread_mutex_unlock (&queue->mutex);
 	return (0);
 }
 
@@ -80,11 +85,21 @@ static inline void queue_free (struct queue *queue) {
 }
 
 static inline int queue_is_full (struct queue *queue) {
-	return (queue->size - 1 == queue->used);
+	int full;
+
+	pthread_mutex_lock (&queue->mutex);
+	full = queue->size - 1 == queue->used;
+	pthread_mutex_unlock (&queue->mutex);
+	return (full);
 }
 
 static inline int queue_is_empty (struct queue *queue) {
-	return (queue->used == 0);
+	int empty;
+
+	pthread_mutex_lock (&queue->mutex);
+	empty = queue->used == 0;
+	pthread_mutex_unlock (&queue->mutex);
+	return (empty);
 }
 
 static inline void queue_item_add (struct queue *queue, void *item)
@@ -92,6 +107,7 @@ static inline void queue_item_add (struct queue *queue, void *item)
 	char *queue_item;
 	int queue_position;
 
+	pthread_mutex_lock (&queue->mutex);
 	queue_position = queue->head;
 	queue_item = queue->items;
 	queue_item += queue_position * queue->size_per_item;
@@ -104,6 +120,7 @@ static inline void queue_item_add (struct queue *queue, void *item)
 	if (queue->used > queue->usedhw) {
 		queue->usedhw = queue->used;
 	}
+	pthread_mutex_unlock (&queue->mutex);
 }
 
 static inline void *queue_item_get (struct queue *queue)
@@ -111,34 +128,42 @@ static inline void *queue_item_get (struct queue *queue)
 	char *queue_item;
 	int queue_position;
 
+	pthread_mutex_lock (&queue->mutex);
 	queue_position = (queue->tail + 1) % queue->size;
 	queue_item = queue->items;
 	queue_item += queue_position * queue->size_per_item;
+	pthread_mutex_unlock (&queue->mutex);
 	return ((void *)queue_item);
 }
 
 static inline void queue_item_remove (struct queue *queue) {
+	pthread_mutex_lock (&queue->mutex);
 	queue->tail = (queue->tail + 1) % queue->size;
 	
 	assert (queue->tail != queue->head);
 
 	queue->used--;
 	assert (queue->used >= 0);
+	pthread_mutex_unlock (&queue->mutex);
 }
 
 static inline void queue_items_remove (struct queue *queue, int rel_count)
 {
+	pthread_mutex_lock (&queue->mutex);
 	queue->tail = (queue->tail + rel_count) % queue->size;
 	
 	assert (queue->tail != queue->head);
 
 	queue->used -= rel_count;
+	pthread_mutex_unlock (&queue->mutex);
 }
 
 
 static inline void queue_item_iterator_init (struct queue *queue)
 {
+	pthread_mutex_lock (&queue->mutex);
 	queue->iterator = (queue->tail + 1) % queue->size;
+	pthread_mutex_unlock (&queue->mutex);
 }
 
 static inline void *queue_item_iterator_get (struct queue *queue)
@@ -146,30 +171,46 @@ static inline void *queue_item_iterator_get (struct queue *queue)
 	char *queue_item;
 	int queue_position;
 
+	pthread_mutex_lock (&queue->mutex);
 	queue_position = (queue->iterator) % queue->size;
 	if (queue->iterator == queue->head) {
+		pthread_mutex_unlock (&queue->mutex);
 		return (0);
 	}
 	queue_item = queue->items;
 	queue_item += queue_position * queue->size_per_item;
+	pthread_mutex_unlock (&queue->mutex);
 	return ((void *)queue_item);
 }
 
 static inline int queue_item_iterator_next (struct queue *queue)
 {
+	int next_res;
+
+	pthread_mutex_lock (&queue->mutex);
 	queue->iterator = (queue->iterator + 1) % queue->size;
 
-	return (queue->iterator == queue->head);
+	next_res = queue->iterator == queue->head;
+	pthread_mutex_unlock (&queue->mutex);
+	return (next_res);
 }
 
 static inline void queue_avail (struct queue *queue, int *avail)
 {
+	pthread_mutex_lock (&queue->mutex);
 	*avail = queue->size - queue->used - 2;
 	assert (*avail >= 0);
+	pthread_mutex_unlock (&queue->mutex);
 }
 
 static inline int queue_used (struct queue *queue) {
-	return (queue->used);
+	int used;
+
+	pthread_mutex_lock (&queue->mutex);
+	used = queue->used;
+	pthread_mutex_unlock (&queue->mutex);
+
+	return (used);
 }
 
 #endif /* QUEUE_H_DEFINED */

+ 0 - 1
lib/evt.c

@@ -742,7 +742,6 @@ saEvtDispatch(
  			 * grabbed it.
 			 */
 			if (evt->led_head.error == SA_AIS_ERR_NOT_EXIST) {
-                DPRINT (("MESSAGE_RES_EVT_AVAILABLE: No event data\n"));
 				error = SA_AIS_OK;
 				break;
 			}

+ 7 - 0
test/subscription.c

@@ -11,6 +11,7 @@
 #include <sys/poll.h>
 #include <stdlib.h>
 #include <getopt.h>
+#include <sched.h>
 #include "saAis.h"
 #include "saEvt.h"
 
@@ -372,6 +373,10 @@ evt_free:
 
 static int err_wait_time = -1;
 
+static struct sched_param sched_param = {
+	sched_priority: 1
+};
+
 int main (int argc, char **argv)
 {
 	static const char opts[] = "c:s:n:qu:f:";
@@ -379,6 +384,8 @@ int main (int argc, char **argv)
 	int option;
 	char *p;
 
+	sched_setscheduler (0, SCHED_RR, &sched_param);
+
 	while (1) {
 		option = getopt(argc, argv, opts);
 		if (option == -1)