Bladeren bron

Message service implemntation - more apis now supported.

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1552 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake 17 jaren geleden
bovenliggende
commit
f323f47343
6 gewijzigde bestanden met toevoegingen van 987 en 242 verwijderingen
  1. 344 95
      exec/msg.c
  2. 30 23
      include/ipc_msg.h
  3. 11 2
      include/saAis.h
  4. 67 4
      include/saMsg.h
  5. 436 109
      lib/msg.c
  6. 99 9
      test/testmsg.c

+ 344 - 95
exec/msg.c

@@ -32,6 +32,7 @@
  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  * THE POSSIBILITY OF SUCH DAMAGE.
  */
+
 #include <sys/types.h>
 #include <sys/uio.h>
 #include <sys/socket.h>
@@ -52,6 +53,7 @@
 #include "../include/list.h"
 #include "../include/queue.h"
 #include "../lcr/lcr_comp.h"
+
 #include "objdb.h"
 #include "totem.h"
 #include "service.h"
@@ -84,10 +86,17 @@ enum msg_exec_message_req_types {
 	MESSAGE_REQ_EXEC_MSG_MESSAGEREPLY = 14
 };
 
+struct message_entry {
+	SaTimeT time;
+	SaMsgMessageT message;
+	struct list_head list;
+};
+
 struct message_queue {
 	SaNameT name;
 	int refcount;
 	struct list_head list;
+	struct list_head message_list_head;
 };
 
 struct queue_group {
@@ -254,7 +263,8 @@ static void message_handler_req_lib_msg_messagereplyasync (
 
 #ifdef TODO
 static void msg_sync_init (void);
-#endif
+#endif	/* TODO */
+
 static void msg_sync_activate (void);
 static int  msg_sync_process (void);
 static void msg_sync_abort(void);
@@ -262,18 +272,17 @@ static void msg_sync_abort(void);
 void queue_release (struct message_queue *queue);
 
 static void msg_confchg_fn (
-		enum totem_configuration_type configuration_type,
-		unsigned int *member_list, int member_list_entries,
-		unsigned int *left_list, int left_list_entries,
-		unsigned int *joined_list, int joined_list_entries,
-		struct memb_ring_id *ring_id);
+	enum totem_configuration_type configuration_type,
+	unsigned int *member_list, int member_list_entries,
+	unsigned int *left_list, int left_list_entries,
+	unsigned int *joined_list, int joined_list_entries,
+	struct memb_ring_id *ring_id);
 
 struct msg_pd {
 	struct list_head queue_list;
 	struct list_head queue_cleanup_list;
 };
 
-
 /*
  * Executive Handler Definition
  */
@@ -532,54 +541,68 @@ struct req_exec_msg_queuegroupcreate {
 	mar_message_source_t source;
 	SaNameT queue_group_name;
 };
+
 struct req_exec_msg_queuegroupinsert {
 	mar_req_header_t header;
 	mar_message_source_t source;
 	SaNameT queue_name;
 	SaNameT queue_group_name;
 };
+
 struct req_exec_msg_queuegroupremove {
 	mar_req_header_t header;
 	mar_message_source_t source;
 	SaNameT queue_name;
 	SaNameT queue_group_name;
 };
+
 struct req_exec_msg_queuegroupdelete {
 	mar_req_header_t header;
 	mar_message_source_t source;
 	SaNameT queue_group_name;
 };
+
 struct req_exec_msg_queuegrouptrack {
 	mar_req_header_t header;
 	mar_message_source_t source;
 	SaNameT queue_group_name;
 };
+
 struct req_exec_msg_queuegrouptrackstop {
 	mar_req_header_t header;
 	mar_message_source_t source;
 	SaNameT queue_group_name;
 };
+
 struct req_exec_msg_messagesend {
 	mar_req_header_t header;
 	mar_message_source_t source;
 	SaNameT destination;
+	SaTimeT timeout;
+	SaMsgMessageT message;
+	SaInvocationT invocation;
+	SaMsgAckFlagsT ack_flags;
 	int async_call;
 };
+
 struct req_exec_msg_messageget {
 	mar_req_header_t header;
 	mar_message_source_t source;
 	SaNameT queue_name;
 };
+
 struct req_exec_msg_messagecancel {
 	mar_req_header_t header;
 	mar_message_source_t source;
 	SaNameT queue_name;
 };
+
 struct req_exec_msg_messagesendreceive {
 	mar_req_header_t header;
 	mar_message_source_t source;
 	SaNameT queue_name;
 };
+
 struct req_exec_msg_messagereply {
 	mar_req_header_t header;
 	mar_message_source_t source;
@@ -592,7 +615,7 @@ static void msg_sync_init (void)
 {
 	return;
 }
-#endif
+#endif	/* TODO */
 
 static int msg_sync_process (void) 
 {
@@ -616,6 +639,23 @@ static void msg_confchg_fn (
 	unsigned int *joined_list, int joined_list_entries,
 	struct memb_ring_id *ring_id) 
 {
+	return;
+}
+
+static void print_message_list (struct message_queue *queue)
+{
+	struct list_head *list;
+	struct message_entry *entry;
+
+	for (list = queue->message_list_head.next;
+	     list != &queue->message_list_head;
+	     list = list->next)
+	{
+		entry = list_entry (list, struct message_entry, list);
+
+		log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: print_message_list (%s) (%llu)\n",
+			    (char *)(entry->message.data), (unsigned long long)(entry->time));
+	}
 }
 
 static struct message_queue *queue_find (SaNameT *name)
@@ -624,9 +664,9 @@ static struct message_queue *queue_find (SaNameT *name)
 	struct message_queue *queue;
 
 	for (list = queue_list_head.next;
-		list != &queue_list_head;
-		list = list->next) {
-
+	     list != &queue_list_head;
+	     list = list->next)
+	{
 	        queue = list_entry (list, struct message_queue, list);
 
 		if (name_match (name, &queue->name)) {
@@ -642,9 +682,9 @@ static struct queue_group *queue_group_find (SaNameT *name)
 	struct queue_group *queue_group;
 
 	for (list = queue_group_list_head.next;
-		list != &queue_group_list_head;
-		list = list->next) {
-
+	     list != &queue_group_list_head;
+	     list = list->next)
+	{
 	        queue_group = list_entry (list, struct queue_group, list);
 
 		if (name_match (name, &queue_group->name)) {
@@ -662,11 +702,11 @@ static struct queue_group_entry *queue_group_entry_find (
 	struct queue_group_entry *queue_group_entry;
 
 	for (list = queue_group->message_queue_head.next;
-		list != &queue_group->message_queue_head;
-		list = list->next) {
-
+	     list != &queue_group->message_queue_head;
+	     list = list->next)
+	{
 	        queue_group_entry = list_entry (list, struct queue_group_entry, list);
-
+	
 		if (queue_group_entry->message_queue == queue) {
 			return (queue_group_entry);
 		}
@@ -679,23 +719,23 @@ static int msg_exec_init_fn (struct objdb_iface_ver0 *objdb)
 	/*
 	 *  Initialize the saved ring ID.
 	 */
-//	saved_ring_id.seq = 0;
-//	saved_ring_id.rep.s_addr = this_ip->sin_addr.s_addr;		
-	
+
+	/* saved_ring_id.seq = 0; */
+	/* saved_ring_id.rep.s_addr = this_ip->sin_addr.s_addr; */
+
 	return (0);
 }
 
 static int msg_lib_exit_fn (void *conn)
 {
-//	struct msg_pd *msg_pd = (struct msg_pd *)openais_conn_private_data_get (conn);
+	/*
+	 * struct msg_pd *msg_pd = (struct msg_pd *)openais_conn_private_data_get (conn);
+	 */
+
 #ifdef COMPILE_OUT
 	struct queue_cleanup *queue_cleanup;
 	struct list_head *list;
 	
-printf ("exit_fn\n");
-
-	log_printf(LOG_LEVEL_NOTICE, "msg_exit_fn conn_info = %p\n", conn);
-	
 	/*
 	 * close all queues opened on this fd
 	 */
@@ -703,20 +743,18 @@ printf ("exit_fn\n");
 	while (!list_empty(&conn_info->conn_info_partner->ais_ci.u.libmsg_ci.queue_cleanup_list)) {
 		
 		queue_cleanup = list_entry (list, struct queue_cleanup, list);
-		
-printf ("queue to cleanup\n");
+
 		if (queue_cleanup->queue->name.length > 0)	{
 			msg_queue_cleanup_lock_remove (queue_cleanup);
 			msg_queue_close (queue_cleanup->queue);
 		}
-		
-printf ("queue cleanup %x\n", queue_cleanup);
+
 		list_del (&queue_cleanup->list);	
 		free (queue_cleanup);
                 
 		list = conn_info->conn_info_partner->ais_ci.u.libmsg_ci.queue_cleanup_list.next;
 	}
-#endif
+#endif	/* COMPILE_OUT */
 
 	return (0);
 }
@@ -727,6 +765,7 @@ static int msg_lib_init_fn (void *conn)
 
 	list_init (&msg_pd->queue_list);
 	list_init (&msg_pd->queue_cleanup_list);
+
 	return (0);
 }
 
@@ -734,11 +773,12 @@ static void message_handler_req_exec_msg_queueopen (
 	void *message,
 	unsigned int nodeid)
 {
-	struct req_exec_msg_queueopen *req_exec_msg_queueopen = (struct req_exec_msg_queueopen *)message;
+	struct req_exec_msg_queueopen *req_exec_msg_queueopen =
+		(struct req_exec_msg_queueopen *)message;
 	struct res_lib_msg_queueopen res_lib_msg_queueopen;
 	struct res_lib_msg_queueopenasync res_lib_msg_queueopenasync;
 	struct message_queue *queue;
-//	struct queue_cleanup *queue_cleanup;
+	/* struct queue_cleanup *queue_cleanup; */
 	SaAisErrorT error = SA_AIS_OK;
 
 	log_printf (LOG_LEVEL_NOTICE, "EXEC request: saMsgQueueOpen %s\n",
@@ -746,7 +786,6 @@ static void message_handler_req_exec_msg_queueopen (
 	
 	queue = queue_find (&req_exec_msg_queueopen->queue_name);
 
-	printf ("queue %p\n", queue);
 	/*
 	 * If queue doesn't exist, create one
 	 */
@@ -766,16 +805,16 @@ static void message_handler_req_exec_msg_queueopen (
 			&req_exec_msg_queueopen->queue_name,
 			sizeof (SaNameT));
 		list_init (&queue->list);
+		list_init (&queue->message_list_head);
 		list_add (&queue->list, &queue_list_head);
 		queue->refcount = 0;
 	}
 	queue->refcount += 1;
-	printf ("Incrementing queue refcount to %d\n", queue->refcount);
+
 #ifdef COMPILE_OUT
 	/*
 	 * Setup connection information and mark queue as referenced
 	 */
-	log_printf (LOG_LEVEL_DEBUG, "Lock queue opened is %p\n", queue);
 	queue_cleanup = malloc (sizeof (struct queue_cleanup));
 	if (queue_cleanup == 0) {
 		free (queue);
@@ -790,9 +829,7 @@ static void message_handler_req_exec_msg_queueopen (
 			&req_exec_msg_queueopen->source.conn_info->ais_ci.u.libmsg_ci.queue_cleanup_list);
 	}
 	queue->refcount += 1;
-printf ("refcount == %d\n", queue->refcount);
-#endif
-	
+#endif	/* COMPILE_OUT */
 	
 	/*
 	 * Send error result to MSG library
@@ -805,11 +842,19 @@ error_exit:
 		/*
 		 * If its an async call respond with the invocation and handle
 		 */
-		if (req_exec_msg_queueopen->async_call) {
-			res_lib_msg_queueopenasync.header.size = sizeof (struct res_lib_msg_queueopenasync);
-			res_lib_msg_queueopenasync.header.id = MESSAGE_RES_MSG_QUEUEOPENASYNC;
+		if (req_exec_msg_queueopen->async_call)
+		{
+			res_lib_msg_queueopenasync.header.size =
+				sizeof (struct res_lib_msg_queueopenasync);
+			res_lib_msg_queueopenasync.header.id =
+				MESSAGE_RES_MSG_QUEUEOPENASYNC;
 			res_lib_msg_queueopenasync.header.error = error;
-			res_lib_msg_queueopenasync.invocation = req_exec_msg_queueopen->invocation;
+
+			res_lib_msg_queueopenasync.invocation =
+				req_exec_msg_queueopen->invocation;
+			res_lib_msg_queueopenasync.queueHandle =
+				req_exec_msg_queueopen->queue_handle;
+
 			memcpy (&res_lib_msg_queueopenasync.source,
 				&req_exec_msg_queueopen->source,
 				sizeof (mar_message_source_t));
@@ -818,6 +863,7 @@ error_exit:
 				req_exec_msg_queueopen->source.conn,
 				&res_lib_msg_queueopenasync,
 				sizeof (struct res_lib_msg_queueopenasync));
+
 			openais_conn_send_response (
 				openais_conn_partner_get (req_exec_msg_queueopen->source.conn),
 				&res_lib_msg_queueopenasync,
@@ -826,9 +872,15 @@ error_exit:
 			/*
 			 * otherwise respond with the normal queueopen response
 			 */
-			res_lib_msg_queueopen.header.size = sizeof (struct res_lib_msg_queueopen);
-			res_lib_msg_queueopen.header.id = MESSAGE_RES_MSG_QUEUEOPEN;
+			res_lib_msg_queueopen.header.size =
+				sizeof (struct res_lib_msg_queueopen);
+			res_lib_msg_queueopen.header.id =
+				MESSAGE_RES_MSG_QUEUEOPEN;
 			res_lib_msg_queueopen.header.error = error;
+
+			res_lib_msg_queueopen.queueHandle =
+				req_exec_msg_queueopen->queue_handle;
+
 			memcpy (&res_lib_msg_queueopen.source,
 				&req_exec_msg_queueopen->source,
 				sizeof (mar_message_source_t));
@@ -845,7 +897,8 @@ static void message_handler_req_exec_msg_queueclose (
 	void *message,
 	unsigned int nodeid)
 {
-	struct req_exec_msg_queueclose *req_exec_msg_queueclose = (struct req_exec_msg_queueclose *)message;
+	struct req_exec_msg_queueclose *req_exec_msg_queueclose =
+		(struct req_exec_msg_queueclose *)message;
 	struct res_lib_msg_queueclose res_lib_msg_queueclose;
 	struct message_queue *queue = 0;
 	SaAisErrorT error = SA_AIS_OK;
@@ -859,21 +912,33 @@ static void message_handler_req_exec_msg_queueclose (
 	}
 		
 	queue->refcount -= 1;
-	printf ("decrementing queue refcount to %d\n", queue->refcount);
+
 	if (queue->refcount == 0) {
-		printf ("should free queue\n");
+		/* free queue */
 	}
+
 error_exit:
-	if (message_source_is_local(&req_exec_msg_queueclose->source)) {
-// TODO		msg_queue_cleanup_remove (
-//			req_exec_msg_queueclose->source.conn_info,
-//			req_exec_msg_queueclose->queue_handle);
+	if (message_source_is_local(&req_exec_msg_queueclose->source))
+	{
 
-		res_lib_msg_queueclose.header.size = sizeof (struct res_lib_msg_queueclose);
-		res_lib_msg_queueclose.header.id = MESSAGE_RES_MSG_QUEUECLOSE;
+		/* TODO */
+
+		/*
+		 * msg_queue_cleanup_remove (
+		 *	req_exec_msg_queueclose->source.conn_info,
+		 *	req_exec_msg_queueclose->queue_handle);
+		 */
+
+		res_lib_msg_queueclose.header.size =
+			sizeof (struct res_lib_msg_queueclose);
+		res_lib_msg_queueclose.header.id =
+			MESSAGE_RES_MSG_QUEUECLOSE;
 		res_lib_msg_queueclose.header.error = error;
-		openais_conn_send_response (req_exec_msg_queueclose->source.conn,
-			&res_lib_msg_queueclose, sizeof (struct res_lib_msg_queueclose));
+
+		openais_conn_send_response (
+			req_exec_msg_queueclose->source.conn,
+			&res_lib_msg_queueclose,
+			sizeof (struct res_lib_msg_queueclose));
 	}
 }
 
@@ -884,7 +949,7 @@ static void message_handler_req_exec_msg_queuestatusget (
 #if 0
 	struct req_exec_msg_queuestatusget *req_exec_msg_queuestatusget =
 		(struct req_exec_msg_queuestatusget *)message;
-	struct res_lib_msg_queueclose res_lib_msg_queuestatusget;
+	struct res_lib_msg_queuestatusget res_lib_msg_queuestatusget;
 #endif
 }
 
@@ -895,7 +960,7 @@ static void message_handler_req_exec_msg_queueunlink (
 #if 0
 	struct req_exec_msg_queueunlink *req_exec_msg_queueunlink =
 		(struct req_exec_msg_queueunlink *)message;
-	struct res_lib_msg_queueclose res_lib_msg_queueunlink;
+	struct res_lib_msg_queueunlink res_lib_msg_queueunlink;
 #endif
 }
 
@@ -910,6 +975,7 @@ static void message_handler_req_exec_msg_queuegroupcreate (
 	SaAisErrorT error = SA_AIS_OK;
 
 	queue_group = queue_group_find (&req_exec_msg_queuegroupcreate->queue_group_name);
+
 	if (queue_group == 0) {
 		queue_group = malloc (sizeof (struct queue_group));
 		if (queue_group == 0) {
@@ -930,8 +996,10 @@ static void message_handler_req_exec_msg_queuegroupcreate (
 
 error_exit:
 	if (message_source_is_local(&req_exec_msg_queuegroupcreate->source)) {
-		res_lib_msg_queuegroupcreate.header.size = sizeof (struct res_lib_msg_queuegroupcreate);
-		res_lib_msg_queuegroupcreate.header.id = MESSAGE_RES_MSG_QUEUEGROUPCREATE;
+		res_lib_msg_queuegroupcreate.header.size =
+			sizeof (struct res_lib_msg_queuegroupcreate);
+		res_lib_msg_queuegroupcreate.header.id =
+			MESSAGE_RES_MSG_QUEUEGROUPCREATE;
 		res_lib_msg_queuegroupcreate.header.error = error;
 
 		openais_conn_send_response (
@@ -954,22 +1022,24 @@ static void message_handler_req_exec_msg_queuegroupinsert (
 	SaAisErrorT error = SA_AIS_OK;
 
 	queue_group = queue_group_find (&req_exec_msg_queuegroupinsert->queue_group_name);
+
 	if (queue_group == 0) {
-printf ("a\n");
 		error = SA_AIS_ERR_NOT_EXIST;
 		goto error_exit;
 	}
+
 	queue = queue_find (&req_exec_msg_queuegroupinsert->queue_name);
 	if (queue == 0) {
 		error = SA_AIS_ERR_NOT_EXIST;
 		goto error_exit;
 	}
+
 	queue_group_entry = malloc (sizeof (struct queue_group_entry));
 	if (queue_group_entry == 0) {
-printf ("c\n");
 		error = SA_AIS_ERR_NO_MEMORY;
 		goto error_exit;
 	}	
+
 	list_init (&queue_group_entry->list);
 	list_add (&queue_group_entry->list, &queue_group->message_queue_head);
 	list_add (&queue->list, &queue_list_head);
@@ -977,8 +1047,10 @@ printf ("c\n");
 
 error_exit:
 	if (message_source_is_local(&req_exec_msg_queuegroupinsert->source)) {
-		res_lib_msg_queuegroupinsert.header.size = sizeof (struct res_lib_msg_queuegroupinsert);
-		res_lib_msg_queuegroupinsert.header.id = MESSAGE_RES_MSG_QUEUEGROUPCREATE;
+		res_lib_msg_queuegroupinsert.header.size =
+			sizeof (struct res_lib_msg_queuegroupinsert);
+		res_lib_msg_queuegroupinsert.header.id =
+			MESSAGE_RES_MSG_QUEUEGROUPINSERT;
 		res_lib_msg_queuegroupinsert.header.error = error;
 
 		openais_conn_send_response (
@@ -1011,6 +1083,7 @@ static void message_handler_req_exec_msg_queuegroupremove (
 		error = SA_AIS_ERR_NOT_EXIST;
 		goto error_exit;
 	}
+
 	queue_group_entry = queue_group_entry_find (queue_group, queue);
 	if (queue_group_entry == 0) {
 		error = SA_AIS_ERR_NOT_EXIST;
@@ -1021,8 +1094,10 @@ static void message_handler_req_exec_msg_queuegroupremove (
 
 error_exit:
 	if (message_source_is_local(&req_exec_msg_queuegroupremove->source)) {
-		res_lib_msg_queuegroupremove.header.size = sizeof (struct res_lib_msg_queuegroupremove);
-		res_lib_msg_queuegroupremove.header.id = MESSAGE_RES_MSG_QUEUEGROUPCREATE;
+		res_lib_msg_queuegroupremove.header.size =
+			sizeof (struct res_lib_msg_queuegroupremove);
+		res_lib_msg_queuegroupremove.header.id =
+			MESSAGE_RES_MSG_QUEUEGROUPREMOVE;
 		res_lib_msg_queuegroupremove.header.error = error;
 
 		openais_conn_send_response (
@@ -1040,10 +1115,10 @@ static void message_handler_req_exec_msg_queuegroupdelete (
 		(struct req_exec_msg_queuegroupdelete *)message;
 	struct res_lib_msg_queuegroupdelete res_lib_msg_queuegroupdelete;
 	struct queue_group *queue_group;
-
 	SaAisErrorT error = SA_AIS_OK;
 
 	queue_group = queue_group_find (&req_exec_msg_queuegroupdelete->queue_group_name);
+
 	if (queue_group) {
 		list_del (&queue_group->list);
 		free (queue_group);
@@ -1052,8 +1127,10 @@ static void message_handler_req_exec_msg_queuegroupdelete (
 	}
 
 	if (message_source_is_local(&req_exec_msg_queuegroupdelete->source)) {
-		res_lib_msg_queuegroupdelete.header.size = sizeof (struct res_lib_msg_queuegroupdelete);
-		res_lib_msg_queuegroupdelete.header.id = MESSAGE_RES_MSG_QUEUEGROUPCREATE;
+		res_lib_msg_queuegroupdelete.header.size =
+			sizeof (struct res_lib_msg_queuegroupdelete);
+		res_lib_msg_queuegroupdelete.header.id =
+			MESSAGE_RES_MSG_QUEUEGROUPDELETE;
 		res_lib_msg_queuegroupdelete.header.error = error;
 
 		openais_conn_send_response (
@@ -1070,7 +1147,7 @@ static void message_handler_req_exec_msg_queuegrouptrack (
 #if 0
 	struct req_exec_msg_queuegrouptrack *req_exec_msg_queuegrouptrack =
 		(struct req_exec_msg_queuegrouptrack *)message;
-	struct res_lib_msg_queueclose res_lib_msg_queuegrouptrack;
+	struct res_lib_msg_queuegrouptrack res_lib_msg_queuegrouptrack;
 #endif
 }
 
@@ -1081,7 +1158,7 @@ static void message_handler_req_exec_msg_queuegrouptrackstop (
 #if 0
 	struct req_exec_msg_queuegrouptrackstop *req_exec_msg_queuegrouptrackstop =
 		(struct req_exec_msg_queuegrouptrackstop *)message;
-	struct res_lib_msg_queueclose res_lib_msg_queuegrouptrackstop;
+	struct res_lib_msg_queuegrouptrackstop res_lib_msg_queuegrouptrackstop;
 #endif
 }
 
@@ -1089,22 +1166,154 @@ static void message_handler_req_exec_msg_messagesend (
 	void *message,
 	unsigned int nodeid)
 {
-#if 0
 	struct req_exec_msg_messagesend *req_exec_msg_messagesend =
 		(struct req_exec_msg_messagesend *)message;
-	struct res_lib_msg_queueclose res_lib_msg_messagesend;
-#endif
+	struct res_lib_msg_messagesend res_lib_msg_messagesend;
+	struct res_lib_msg_messagesendasync res_lib_msg_messagesendasync;
+	struct message_queue *queue;
+	struct message_entry *entry;
+	SaAisErrorT error = SA_AIS_OK;
+
+	char *data = ((char *)(req_exec_msg_messagesend) +
+		      sizeof (struct req_exec_msg_messagesend));
+
+	log_printf (LOG_LEVEL_NOTICE, "EXEC request: saMsgMessageSend %s\n",
+		getSaNameT (&req_exec_msg_messagesend->destination));
+
+	queue = queue_find (&req_exec_msg_messagesend->destination);
+	if (queue == NULL) {
+		error = SA_AIS_ERR_NOT_EXIST;
+		goto error_exit;
+	}
+
+	entry = malloc (sizeof (struct message_entry));
+	if (entry == NULL) {
+		error = SA_AIS_ERR_NO_MEMORY;
+		goto error_exit;
+	}
+
+	memset (entry, 0, sizeof (struct message_entry));
+	memcpy (&entry->message, &req_exec_msg_messagesend->message,
+		sizeof (SaMsgMessageT));
+
+	entry->message.data = malloc (entry->message.size);
+	if (entry->message.data == NULL) {
+		error = SA_AIS_ERR_NO_MEMORY;
+		goto error_exit;
+	}
+
+	memset (entry->message.data, 0, entry->message.size);
+	memcpy (entry->message.data, (void *)(data), entry->message.size);
+
+	entry->time = clust_time_now();
+
+	list_add_tail (&entry->list, &queue->message_list_head);
+
+	/* DEBUG */
+	print_message_list (queue);
+
+error_exit:
+
+	if (message_source_is_local(&req_exec_msg_messagesend->source)) {
+		if (req_exec_msg_messagesend->async_call) {
+			res_lib_msg_messagesendasync.header.size =
+				sizeof (struct res_lib_msg_messagesendasync);
+			res_lib_msg_messagesendasync.header.id =
+				MESSAGE_RES_MSG_MESSAGESENDASYNC;
+			res_lib_msg_messagesendasync.header.error = error;
+			res_lib_msg_messagesendasync.invocation =
+				req_exec_msg_messagesend->invocation;
+
+			memcpy (&res_lib_msg_messagesendasync.source,
+				&req_exec_msg_messagesend->source,
+				sizeof (mar_message_source_t));
+
+			openais_conn_send_response (
+				req_exec_msg_messagesend->source.conn,
+				&res_lib_msg_messagesendasync,
+				sizeof (struct res_lib_msg_messagesendasync));
+
+			openais_conn_send_response (
+				openais_conn_partner_get (req_exec_msg_messagesend->source.conn),
+				&res_lib_msg_messagesendasync,
+				sizeof (struct res_lib_msg_messagesendasync));
+		} else {
+			res_lib_msg_messagesend.header.size =
+				sizeof (struct res_lib_msg_messagesend);
+			res_lib_msg_messagesend.header.id =
+				MESSAGE_RES_MSG_MESSAGESEND;
+			res_lib_msg_messagesend.header.error = error;
+
+			memcpy (&res_lib_msg_messagesend.source,
+				&req_exec_msg_messagesend->source,
+				sizeof (mar_message_source_t));
+
+			openais_conn_send_response (
+				req_exec_msg_messagesend->source.conn,
+				&res_lib_msg_messagesend,
+				sizeof (struct res_lib_msg_messagesend));
+		}
+	}
 }
 
 static void message_handler_req_exec_msg_messageget (
 	void *message,
 	unsigned int nodeid)
 {
-#if 0
 	struct req_exec_msg_messageget *req_exec_msg_messageget =
 		(struct req_exec_msg_messageget *)message;
-	struct res_lib_msg_queueclose res_lib_msg_messageget;
-#endif
+	struct res_lib_msg_messageget res_lib_msg_messageget;
+	struct message_queue *queue;
+	struct message_entry *entry;
+	SaAisErrorT error = SA_AIS_OK;
+
+	log_printf (LOG_LEVEL_NOTICE, "EXEC request: saMsgMessageGet %s\n",
+		getSaNameT (&req_exec_msg_messageget->queue_name));
+
+	queue = queue_find (&req_exec_msg_messageget->queue_name);
+	if (queue == NULL) {
+		error = SA_AIS_ERR_NOT_EXIST;
+		goto error_exit;
+	}
+
+	if (list_empty (queue->message_list_head.next)) {
+		error = SA_AIS_ERR_TIMEOUT; /* FIX ME */
+		goto error_exit;
+	}
+
+	entry = list_entry (queue->message_list_head.next, struct message_entry, list);
+	if (entry == NULL) {
+		error = SA_AIS_ERR_LIBRARY; /* FIX ME */
+		goto error_exit;
+	}
+
+	list_del (queue->message_list_head.next);
+
+error_exit:
+
+	if (message_source_is_local(&req_exec_msg_messageget->source)) {
+		res_lib_msg_messageget.header.size =
+			sizeof (struct res_lib_msg_messageget);
+		res_lib_msg_messageget.header.id =
+			MESSAGE_RES_MSG_MESSAGEGET;
+		res_lib_msg_messageget.header.error = error;
+
+		memcpy (&res_lib_msg_messageget.message, &entry->message,
+			sizeof (SaMsgMessageT));
+		memcpy (&res_lib_msg_messageget.source,
+			&req_exec_msg_messageget->source,
+			sizeof (mar_message_source_t));
+
+		openais_conn_send_response (
+			req_exec_msg_messageget->source.conn,
+			&res_lib_msg_messageget,
+			sizeof (struct res_lib_msg_messageget));
+
+		openais_conn_send_response (
+			req_exec_msg_messageget->source.conn,
+			res_lib_msg_messageget.message.data,
+			res_lib_msg_messageget.message.size);
+	}
 }
 
 static void message_handler_req_exec_msg_messagecancel (
@@ -1114,7 +1323,7 @@ static void message_handler_req_exec_msg_messagecancel (
 #if 0
 	struct req_exec_msg_messagecancel *req_exec_msg_messagecancel =
 		(struct req_exec_msg_messagecancel *)message;
-	struct res_lib_msg_queueclose res_lib_msg_messagecancel;
+	struct res_lib_msg_messagecancel res_lib_msg_messagecancel;
 #endif
 }
 
@@ -1125,7 +1334,7 @@ static void message_handler_req_exec_msg_messagesendreceive (
 #if 0
 	struct req_exec_msg_messagesendreceive *req_exec_msg_messagesendreceive =
 		(struct req_exec_msg_messagesendreceive *)message;
-	struct res_lib_msg_queueclose res_lib_msg_messagesendreceive;
+	struct res_lib_msg_messagesendreceive res_lib_msg_messagesendreceive;
 #endif
 }
 
@@ -1136,16 +1345,16 @@ static void message_handler_req_exec_msg_messagereply (
 #if 0
 	struct req_exec_msg_messagereply *req_exec_msg_messagereply =
 		(struct req_exec_msg_messagereply *)message;
-	struct res_lib_msg_queueclose res_lib_msg_messagereply;
+	struct res_lib_msg_messagereply res_lib_msg_messagereply;
 #endif
 }
 
-
 static void message_handler_req_lib_msg_queueopen (
 	void *conn,
 	void *msg)
 {
-	struct req_lib_msg_queueopen *req_lib_msg_queueopen = (struct req_lib_msg_queueopen *)msg;
+	struct req_lib_msg_queueopen *req_lib_msg_queueopen =
+		(struct req_lib_msg_queueopen *)msg;
 	struct req_exec_msg_queueopen req_exec_msg_queueopen;
 	struct iovec iovec;
 
@@ -1171,6 +1380,7 @@ static void message_handler_req_lib_msg_queueopen (
 	req_exec_msg_queueopen.queue_handle = req_lib_msg_queueopen->queueHandle;
 	req_exec_msg_queueopen.openFlags = req_lib_msg_queueopen->openFlags;
 	req_exec_msg_queueopen.timeout = req_lib_msg_queueopen->timeout;
+
 	iovec.iov_base = (char *)&req_exec_msg_queueopen;
 	iovec.iov_len = sizeof (req_exec_msg_queueopen);
 
@@ -1182,7 +1392,8 @@ static void message_handler_req_lib_msg_queueopenasync (
 	void *conn,
 	void *msg)
 {
-	struct req_lib_msg_queueopen *req_lib_msg_queueopen = (struct req_lib_msg_queueopen *)msg;
+	struct req_lib_msg_queueopen *req_lib_msg_queueopen =
+		(struct req_lib_msg_queueopen *)msg;
 	struct req_exec_msg_queueopen req_exec_msg_queueopen;
 	struct iovec iovec;
 
@@ -1220,7 +1431,8 @@ static void message_handler_req_lib_msg_queueclose (
 	void *conn,
 	void *msg)
 {
-	struct req_lib_msg_queueclose *req_lib_msg_queueclose = (struct req_lib_msg_queueclose *)msg;
+	struct req_lib_msg_queueclose *req_lib_msg_queueclose =
+		(struct req_lib_msg_queueclose *)msg;
 	struct req_exec_msg_queueclose req_exec_msg_queueclose;
 	struct iovec iovec;
 
@@ -1487,7 +1699,7 @@ static void message_handler_req_lib_msg_messagesend (
 	struct req_lib_msg_messagesend *req_lib_msg_messagesend =
 		(struct req_lib_msg_messagesend *)msg;
 	struct req_exec_msg_messagesend req_exec_msg_messagesend;
-	struct iovec iovec;
+	struct iovec iovecs[2];
 
 	req_exec_msg_messagesend.header.size =
 		sizeof (struct req_exec_msg_messagesend);
@@ -1502,12 +1714,31 @@ static void message_handler_req_lib_msg_messagesend (
 
 	memcpy (&req_exec_msg_messagesend.destination,
 		&req_lib_msg_messagesend->destination, sizeof (SaNameT));
+	memcpy (&req_exec_msg_messagesend.message,
+		&req_lib_msg_messagesend->message, sizeof (SaMsgMessageT));
 
-	iovec.iov_base = (char *)&req_exec_msg_messagesend;
-	iovec.iov_len = sizeof (req_exec_msg_messagesend);
+	req_exec_msg_messagesend.async_call = 0;
+	req_exec_msg_messagesend.invocation = 0;
+	req_exec_msg_messagesend.ack_flags = req_lib_msg_messagesend->ackFlags;
+	req_exec_msg_messagesend.timeout = req_lib_msg_messagesend->timeout;
 
-	assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
-		TOTEMPG_AGREED) == 0);
+	iovecs[0].iov_base = (char *)&req_exec_msg_messagesend;
+	iovecs[0].iov_len = sizeof (req_exec_msg_messagesend);
+
+	iovecs[1].iov_base = ((char *)req_lib_msg_messagesend) +
+		sizeof (struct req_lib_msg_messagesend);
+	iovecs[1].iov_len = req_lib_msg_messagesend->header.size -
+		sizeof (struct req_lib_msg_messagesend);
+
+	req_exec_msg_messagesend.header.size += iovecs[1].iov_len;
+
+	if (iovecs[1].iov_len > 0) {
+		assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2,
+			TOTEMPG_AGREED) == 0);
+	} else {
+		assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 1,
+			TOTEMPG_AGREED) == 0);
+	}
 }
 
 static void message_handler_req_lib_msg_messagesendasync (
@@ -1517,7 +1748,7 @@ static void message_handler_req_lib_msg_messagesendasync (
 	struct req_lib_msg_messagesend *req_lib_msg_messagesend =
 		(struct req_lib_msg_messagesend *)msg;
 	struct req_exec_msg_messagesend req_exec_msg_messagesend;
-	struct iovec iovec;
+	struct iovec iovecs[2];
 
 	log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgMessageSendAsync %s\n",
 		getSaNameT (&req_lib_msg_messagesend->destination));
@@ -1526,18 +1757,36 @@ static void message_handler_req_lib_msg_messagesendasync (
 		sizeof (struct req_exec_msg_messagesend);
 	req_exec_msg_messagesend.header.id =
 		SERVICE_ID_MAKE (MSG_SERVICE, MESSAGE_REQ_EXEC_MSG_MESSAGESEND);
-	req_exec_msg_messagesend.async_call = 1;
 
 	message_source_set (&req_exec_msg_messagesend.source, conn);
 
 	memcpy (&req_exec_msg_messagesend.destination,
 		&req_lib_msg_messagesend->destination, sizeof (SaNameT));
+	memcpy (&req_exec_msg_messagesend.message,
+		&req_lib_msg_messagesend->message, sizeof (SaMsgMessageT));
 
-	iovec.iov_base = (char *)&req_exec_msg_messagesend;
-	iovec.iov_len = sizeof (req_exec_msg_messagesend);
+	req_exec_msg_messagesend.async_call = 1;
+	req_exec_msg_messagesend.invocation = req_lib_msg_messagesend->invocation;
+	req_exec_msg_messagesend.ack_flags = req_lib_msg_messagesend->ackFlags;
+	req_exec_msg_messagesend.timeout = SA_TIME_END;
 
-	assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
-		TOTEMPG_AGREED) == 0);
+	iovecs[0].iov_base = (char *)&req_exec_msg_messagesend;
+	iovecs[0].iov_len = sizeof (req_exec_msg_messagesend);
+
+	iovecs[1].iov_base = ((char *)req_lib_msg_messagesend) +
+		sizeof (struct req_lib_msg_messagesend);
+	iovecs[1].iov_len = req_lib_msg_messagesend->header.size -
+		sizeof (struct req_lib_msg_messagesend);
+
+	req_exec_msg_messagesend.header.size += iovecs[1].iov_len;
+
+	if (iovecs[1].iov_len > 0) {
+		assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2,
+			TOTEMPG_AGREED) == 0);
+	} else {
+		assert (totempg_groups_mcast_joined (openais_group_handle, iovecs, 2,
+			TOTEMPG_AGREED) == 0);
+	}
 }
 
 static void message_handler_req_lib_msg_messageget (

+ 30 - 23
include/ipc_msg.h

@@ -36,6 +36,7 @@
 
 #include "saAis.h"
 #include "saMsg.h"
+
 #include "ipc_gen.h"
 
 enum req_lib_msg_queue_types {
@@ -51,31 +52,33 @@ enum req_lib_msg_queue_types {
 	MESSAGE_REQ_MSG_QUEUEGROUPTRACK = 9,
 	MESSAGE_REQ_MSG_QUEUEGROUPTRACKSTOP = 10,
 	MESSAGE_REQ_MSG_MESSAGESEND = 11,
-	MESSAGE_REQ_MSG_MESSAGEGET = 12,
-	MESSAGE_REQ_MSG_MESSAGECANCEL = 13,
-	MESSAGE_REQ_MSG_MESSAGESENDRECEIVE = 14,
-	MESSAGE_REQ_MSG_MESSAGEREPLY = 15
+	MESSAGE_REQ_MSG_MESSAGESENDASYNC = 12,
+	MESSAGE_REQ_MSG_MESSAGEGET = 13,
+	MESSAGE_REQ_MSG_MESSAGECANCEL = 14,
+	MESSAGE_REQ_MSG_MESSAGESENDRECEIVE = 15,
+	MESSAGE_REQ_MSG_MESSAGEREPLY = 16,
+	MESSAGE_REQ_MSG_MESSAGEREPLYASYNC = 17
 };
 
 enum res_lib_msg_queue_types {
 	MESSAGE_RES_MSG_QUEUEOPEN = 0,
-	MESSAGE_RES_MSG_QUEUEOPENASYNC = 2,
-	MESSAGE_RES_MSG_QUEUECLOSE = 3,
-	MESSAGE_RES_MSG_QUEUESTATUSGET = 4,
-	MESSAGE_RES_MSG_QUEUEUNLINK = 5,
-	MESSAGE_RES_MSG_QUEUEGROUPCREATE = 6,
-	MESSAGE_RES_MSG_QUEUEGROUPINSERT = 7,
-	MESSAGE_RES_MSG_QUEUEGROUPREMOVE = 8,
-	MESSAGE_RES_MSG_QUEUEGROUPDELETE = 9,
-	MESSAGE_RES_MSG_QUEUEGROUPTRACK = 10,
-	MESSAGE_RES_MSG_QUEUEGROUPTRACKSTOP = 11,
-	MESSAGE_RES_MSG_MESSAGESEND = 12,
-	MESSAGE_RES_MSG_MESSAGESENDASYNC = 13,
-	MESSAGE_RES_MSG_MESSAGEGET = 14,
-	MESSAGE_RES_MSG_MESSAGECANCEL = 15,
-	MESSAGE_RES_MSG_MESSAGESENDRECEIVE = 16,
-	MESSAGE_RES_MSG_MESSAGEREPLY = 17,
-	MESSAGE_RES_MSG_MESSAGEREPLYASYNC = 18
+	MESSAGE_RES_MSG_QUEUEOPENASYNC = 1,
+	MESSAGE_RES_MSG_QUEUECLOSE = 2,
+	MESSAGE_RES_MSG_QUEUESTATUSGET = 3,
+	MESSAGE_RES_MSG_QUEUEUNLINK = 4,
+	MESSAGE_RES_MSG_QUEUEGROUPCREATE = 5,
+	MESSAGE_RES_MSG_QUEUEGROUPINSERT = 6,
+	MESSAGE_RES_MSG_QUEUEGROUPREMOVE = 7,
+	MESSAGE_RES_MSG_QUEUEGROUPDELETE = 8,
+	MESSAGE_RES_MSG_QUEUEGROUPTRACK = 9,
+	MESSAGE_RES_MSG_QUEUEGROUPTRACKSTOP = 10,
+	MESSAGE_RES_MSG_MESSAGESEND = 11,
+	MESSAGE_RES_MSG_MESSAGESENDASYNC = 12,
+	MESSAGE_RES_MSG_MESSAGEGET = 13,
+	MESSAGE_RES_MSG_MESSAGECANCEL = 14,
+	MESSAGE_RES_MSG_MESSAGESENDRECEIVE = 15,
+	MESSAGE_RES_MSG_MESSAGEREPLY = 16,
+	MESSAGE_RES_MSG_MESSAGEREPLYASYNC = 17
 };
 
 struct req_lib_msg_queueopen {
@@ -92,15 +95,15 @@ struct req_lib_msg_queueopen {
 
 struct res_lib_msg_queueopen {
 	mar_res_header_t header;
-	SaMsgQueueHandleT queueHandle;
 	mar_message_source_t source;
+	SaMsgQueueHandleT queueHandle;
 };
 
 struct res_lib_msg_queueopenasync {
 	mar_res_header_t header;
+	mar_message_source_t source;
 	SaInvocationT invocation;
 	SaMsgQueueHandleT queueHandle;
-	mar_message_source_t source;
 };
 
 struct req_lib_msg_queueclose {
@@ -202,10 +205,13 @@ struct req_lib_msg_messagesend {
 
 struct res_lib_msg_messagesend {
 	mar_res_header_t header;
+	mar_message_source_t source;
 };
 
 struct res_lib_msg_messagesendasync {
 	mar_res_header_t header;
+	mar_message_source_t source;
+	SaInvocationT invocation;
 };
 
 struct req_lib_msg_messageget {
@@ -216,6 +222,7 @@ struct req_lib_msg_messageget {
 
 struct res_lib_msg_messageget {
 	mar_res_header_t header;
+	mar_message_source_t source;
 	SaTimeT sendTime;
 	SaMsgSenderIdT senderId;
 	SaMsgMessageT message;

+ 11 - 2
include/saAis.h

@@ -66,7 +66,7 @@ typedef double SaDoubleT;
 typedef char * SaStringT;
 typedef SaInt64T SaTimeT;
 
-#define SA_TIME_END ((SaTimeT)0x7fffffffffffffffull)
+#define SA_TIME_END    ((SaTimeT)0x7FFFFFFFFFFFFFFFULL)
 #define SA_TIME_BEGIN            0x0ULL
 #define SA_TIME_UNKNOWN          0x8000000000000000ULL
 
@@ -79,6 +79,7 @@ typedef SaInt64T SaTimeT;
 #define SA_TIME_MAX             SA_TIME_END
 
 #define SA_MAX_NAME_LENGTH 256
+
 typedef struct {
 	SaUint16T length;
 	SaUint8T value[SA_MAX_NAME_LENGTH];
@@ -132,12 +133,20 @@ typedef enum {
 	SA_AIS_ERR_NO_SECTIONS = 27
 } SaAisErrorT;
 
+typedef union {
+	SaInt64T int64Value;
+	SaUint64T uint64Value;
+	SaTimeT timeValue;
+	SaFloatT floatValue;
+	SaDoubleT doubleValue;
+} SaLimitValueT;
+
 typedef SaUint64T SaSelectionObjectT;
 
 typedef SaUint64T SaInvocationT;
 
 typedef SaUint64T SaSizeT;
 
-#define SA_HANDLE_INVALID 0x0ull
+#define SA_HANDLE_INVALID 0x0ULL
 
 #endif /* AIS_TYPES_H_DEFINED */

+ 67 - 4
include/saMsg.h

@@ -144,13 +144,40 @@ typedef struct {
 	SaMsgMessageReceivedCallbackT saMsgMessageReceivedCallback;
 } SaMsgCallbacksT;
 
+typedef enum {
+	SA_MSG_QUEUE_CAPACITY_REACHED = 1,
+	SA_MSG_QUEUE_CAPACITY_AVAILABLE = 2,
+	SA_MSG_QUEUE_GROUP_CAPACITY_REACHED = 3,
+	SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE = 4
+} SaMsgMessageCapacityStatusT;
+
+typedef struct {
+	SaSizeT capacityReached[SA_MSG_MESSAGE_LOWEST_PRIORITY + 1];
+	SaSizeT capacityAvailable[SA_MSG_MESSAGE_LOWEST_PRIORITY + 1];
+} SaMsgQueueThresholdsT;
+
+typedef enum {
+	SA_MSG_DEST_CAPACITY_STATUS = 1
+} SaMsgStateT;
+
+typedef enum {
+	SA_MSG_MAX_PRIORITY_AREA_SIZE_ID = 1,
+	SA_MSG_MAX_QUEUE_SIZE_ID = 2,
+	SA_MSG_MAX_NUM_QUEUES_ID = 3,
+	SA_MSG_MAX_NUM_QUEUE_GROUPS_ID = 4,
+	SA_MSG_MAX_NUM_QUEUES_PER_GROUP_ID = 5,
+	SA_MSG_MAX_MESSAGE_SIZE_ID = 6,
+	SA_MSG_MAX_REPLY_SIZE_ID = 7
+} SaMsgLimitIdT;
+
 SaAisErrorT
 saMsgInitialize (
 	SaMsgHandleT *msgHandle,
 	const SaMsgCallbacksT *msgCallbacks,
 	SaVersionT *version);
 
-SaAisErrorT saMsgSelectionObjectGet (
+SaAisErrorT
+saMsgSelectionObjectGet (
 	SaMsgHandleT msgHandle,
 	SaSelectionObjectT *selectionObject);
 
@@ -186,10 +213,15 @@ saMsgQueueClose (
 
 SaAisErrorT
 saMsgQueueStatusGet (
-	SaMsgQueueHandleT msgHandle,
+	SaMsgHandleT msgHandle,
 	const SaNameT *queueName,
 	SaMsgQueueStatusT *queueStatus);
 
+SaAisErrorT
+saMsgQueueRetentionTimeSet (
+	SaMsgQueueHandleT queueHandle,
+	SaTimeT *retentionTime);
+
 SaAisErrorT
 saMsgQueueUnlink (
 	SaMsgQueueHandleT msgHandle,
@@ -230,6 +262,11 @@ saMsgQueueGroupTrackStop (
 	SaMsgHandleT msgHandle,
 	const SaNameT *queueGroupName);
 
+SaAisErrorT
+saMsgQueueGroupNotificationFree (
+	SaMsgHandleT msgHandle,
+	SaMsgQueueGroupNotificationT *notification);
+
 SaAisErrorT
 saMsgMessageSend (
 	SaMsgHandleT msgHandle,
@@ -253,6 +290,11 @@ saMsgMessageGet (
 	SaMsgSenderIdT *senderId,
 	SaTimeT timeout);
 
+SaAisErrorT
+saMsgMessageDataFree (
+	SaMsgHandleT msgHandle,
+	void *data);
+
 SaAisErrorT
 saMsgMessageCancel (
 	SaMsgQueueHandleT queueHandle);
@@ -273,12 +315,33 @@ saMsgMessageReply (
 	const SaMsgSenderIdT *senderId,
 	SaTimeT timeout);
 
-SaAisErrorT saMsgMessageReplyAsync (
+SaAisErrorT
+saMsgMessageReplyAsync (
 	SaMsgHandleT msgHandle,
 	SaInvocationT invocation,
 	const SaMsgMessageT *replyMessage,
 	const SaMsgSenderIdT *senderId,
 	SaMsgAckFlagsT ackFlags);
 
-#endif /* SAMSG_H_DEFINED */
+SaAisErrorT
+saMsgQueueCapacityThresholdSet (
+	SaMsgQueueHandleT queueHandle,
+	const SaMsgQueueThresholdsT *thresholds);
 
+SaAisErrorT
+saMsgQueueCapacityThresholdGet (
+	SaMsgQueueHandleT queueHandle,
+	SaMsgQueueThresholdsT *thresholds);
+
+SaAisErrorT
+saMsgMetadataSizeGet (
+	SaMsgHandleT msgHandle,
+	SaUint32T *metadataSize);
+
+SaAisErrorT
+saMsgLimitGet (
+	SaMsgHandleT msgHandle,
+	SaMsgLimitIdT limitId,
+	SaLimitValueT *limitValue);
+
+#endif /* SAMSG_H_DEFINED */

File diff suppressed because it is too large
+ 436 - 109
lib/msg.c


+ 99 - 9
test/testmsg.c

@@ -46,11 +46,19 @@
 #include "saAis.h"
 #include "saMsg.h"
 
+SaMsgQueueHandleT async_handle;
+
 void QueueOpenCallback (
 	SaInvocationT invocation,
 	SaMsgQueueHandleT queueHandle,
 	SaAisErrorT error)
 {
+	/* DEBUG */
+	printf ("[DEBUG]: testmsg (QueueOpenCallback)\n");
+	printf ("[DEBUG]: \t { queueHandle = %llx }\n",
+		(unsigned long long) queueHandle);
+
+	async_handle = queueHandle;
 }
 
 void QueueGroupTrackCallback (
@@ -59,17 +67,25 @@ void QueueGroupTrackCallback (
 	SaUint32T numberOfMembers,
 	SaAisErrorT error)
 {
+	/* DEBUG */
+	printf ("[DEBUG]: testmsg (QueueGroupTrackCallback)\n");
 }
 
 void MessageDeliveredCallback (
 	SaInvocationT invocation,
 	SaAisErrorT error)
 {
+	/* DEBUG */
+	printf ("[DEBUG]: testmsg (MessageDeliveredCallback)\n");
+	printf ("[DEBUG]: \t { invocation = %llx }\n",
+		(unsigned long long) invocation);
 }
 
 void MessageReceivedCallback (
 	SaMsgQueueHandleT queueHandle)
 {
+	/* DEBUG */
+	printf ("[DEBUG]: testmsg (MessageReceivedCallback)\n");
 }
 
 SaMsgCallbacksT callbacks = {
@@ -83,13 +99,22 @@ SaVersionT version = { 'B', 1, 1 };
 
 SaMsgQueueCreationAttributesT creation_attributes = {
 	SA_MSG_QUEUE_PERSISTENT,
-	{128000, 128000, 128000},
+	{ 128000, 128000, 128000 },
 	SA_TIME_END
 };
 
 void setSaNameT (SaNameT *name, char *str) {
 	name->length = strlen (str);
-	memcpy (name->value, str, name->length);
+	strcpy (name->value, str);
+}
+
+void setSaMsgMessageT (SaMsgMessageT *message, char *data) {
+	message->type = 1;
+	message->version = 2;
+	message->size = strlen (data) + 1;
+	message->senderName = NULL;
+	message->data = strdup (data);
+	message->priority = 0;
 }
 
 void sigintr_handler (int signum) {
@@ -98,12 +123,26 @@ void sigintr_handler (int signum) {
 
 int main (void) {
 	SaMsgHandleT handle;
+	SaMsgMessageT message;
 	SaMsgQueueHandleT queue_handle;
-	fd_set read_fds;
 	SaSelectionObjectT select_fd;
+	SaInvocationT invocation = 3;
+
+	fd_set read_fds;
 	int result;
+
+	SaNameT async_name;
 	SaNameT queue_name;
 	SaNameT queue_group_name;
+	SaTimeT time;
+	SaMsgSenderIdT id;
+	SaMsgMessageT msg_a;
+	SaMsgMessageT msg_b;
+	SaMsgMessageT msg_c;
+
+	memset (&msg_a, 0, sizeof (SaMsgMessageT));
+	memset (&msg_b, 0, sizeof (SaMsgMessageT));
+	memset (&msg_c, 0, sizeof (SaMsgMessageT));
 
 	signal (SIGINT, sigintr_handler);
 
@@ -115,6 +154,7 @@ int main (void) {
 
 	saMsgSelectionObjectGet (handle, &select_fd);
 
+	setSaNameT (&async_name, "async");
 	setSaNameT (&queue_name, "queue");
 
 	result = saMsgQueueOpen (handle,
@@ -124,6 +164,15 @@ int main (void) {
 		SA_TIME_END,
 		&queue_handle);
 	printf ("saMsgQueueOpen result is %d (should be 1)\n", result);
+	printf ("saMsgQueueOpen { queue_handle = %llx }\n", queue_handle);
+
+	result = saMsgQueueOpenAsync (handle,
+				      invocation,
+				      &async_name,
+				      &creation_attributes,
+				      SA_MSG_QUEUE_CREATE);
+	printf ("saMsgQueueOpenAsync result is %d (should be 1)\n", result);
+	printf ("saMsgQueueOpen { async_handle = %llx }\n", async_handle);
 
 	setSaNameT (&queue_group_name, "queue_group");
 
@@ -145,7 +194,9 @@ int main (void) {
 		&queue_name);
 	printf ("saMsgQueueGroupInsert result is %d (should be 1)\n", result);
 
+	saMsgDispatch (handle, SA_DISPATCH_ALL);
 
+	/*
 	FD_ZERO (&read_fds);
 	do {
 		FD_SET (select_fd, &read_fds);
@@ -159,20 +210,59 @@ int main (void) {
 		}
 		saMsgDispatch (handle, SA_DISPATCH_ALL);
 	} while (result);
+	*/
 
-	result = saMsgQueueGroupRemove (
-		handle,
-		&queue_group_name,
-		&queue_name);
+	setSaMsgMessageT (&message, "test_msg_01");
+	result = saMsgMessageSend (handle, &queue_name, &message, SA_TIME_ONE_SECOND);
+	printf ("saMsgMessageSend [1] result is %d (should be 1)\n", result);
+
+	setSaMsgMessageT (&message, "test_msg_02");
+	result = saMsgMessageSend (handle, &queue_name, &message, SA_TIME_ONE_SECOND);
+	printf ("saMsgMessageSend [2] result is %d (should be 1)\n", result);
+
+	setSaMsgMessageT (&message, "test_msg_03");
+	result = saMsgMessageSend (handle, &queue_name, &message, SA_TIME_ONE_SECOND);
+	printf ("saMsgMessageSend [3] result is %d (should be 1)\n", result);
+
+	setSaMsgMessageT (&message, "test_msg_04");
+	result = saMsgMessageSendAsync (handle, invocation, &queue_name, &message,
+		SA_MSG_MESSAGE_DELIVERED_ACK);
+	printf ("saMsgMessageSendAsync [4] result is %d (should be 1)\n", result);
+
+	setSaMsgMessageT (&message, "test_msg_05");
+	result = saMsgMessageSendAsync (handle, invocation, &queue_name, &message,
+		SA_MSG_MESSAGE_DELIVERED_ACK);
+	printf ("saMsgMessageSendAsync [5] result is %d (should be 1)\n", result);
+
+	saMsgDispatch (handle, SA_DISPATCH_ALL);
+
+	result = saMsgMessageGet (queue_handle, &msg_a, &time, &id, SA_TIME_ONE_MINUTE);
+	printf ("saMsgMessageGet [a] result is %d (should be 1)\n", result);
+
+	result = saMsgMessageGet (queue_handle, &msg_b, &time, &id, SA_TIME_ONE_MINUTE);
+	printf ("saMsgMessageGet [b] result is %d (should be 1)\n", result);
+
+	result = saMsgMessageGet (queue_handle, &msg_c, &time, &id, SA_TIME_ONE_MINUTE);
+	printf ("saMsgMessageGet [c] result is %d (should be 1)\n", result);
+
+	printf ("saMsgMessageGet { (a) data = %s }\n", (char *)(msg_a.data));
+	printf ("saMsgMessageGet { (b) data = %s }\n", (char *)(msg_b.data));
+	printf ("saMsgMessageGet { (c) data = %s }\n", (char *)(msg_c.data));
+
+	result = saMsgQueueGroupRemove (handle,	&queue_group_name, &queue_name);
 	printf ("saMsgQueueGroupRemove result is %d (should be 1)\n", result);
 
-	result = saMsgQueueGroupDelete (handle,
-		&queue_group_name);
+	result = saMsgQueueGroupDelete (handle,	&queue_group_name);
 	printf ("saMsgQueueGroupDelete result is %d (should be 1)\n", result);
 
+	printf ("saMsgQueueClose { queue_handle = %llx }\n", queue_handle);
 	result = saMsgQueueClose (queue_handle);
 	printf ("saMsgQueueClose result is %d (should be 1)\n", result);
 
+	printf ("saMsgQueueClose { async_handle = %llx }\n", async_handle);
+	result = saMsgQueueClose (async_handle);
+	printf ("saMsgQueueClose result is %d (should be 1)\n", result);
+
 	result = saMsgFinalize (handle);
 	printf ("Finalize result is %d (should be 1)\n", result);
 	return (0);

Some files were not shown because too many files changed in this diff