Просмотр исходного кода

Add ability to track changes to queue groups in the messaqge service.

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1580 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake 17 лет назад
Родитель
Сommit
ece8efaec9
3 измененных файлов с 525 добавлено и 26 удалено
  1. 434 24
      exec/msg.c
  2. 4 0
      include/ipc_msg.h
  3. 87 2
      lib/msg.c

+ 434 - 24
exec/msg.c

@@ -101,16 +101,18 @@ struct message_queue {
 
 struct queue_group {
 	SaNameT name;
+	SaUint8T track_flags;
+	SaMsgQueueGroupPolicyT policy;
 	struct list_head list;
 	struct list_head message_queue_head;
-};	
+};
 
 struct queue_group_entry {
+	SaMsgQueueGroupChangesT change;
 	struct message_queue *message_queue;
 	struct list_head list;
 };
 
-
 /*
 struct queue_cleanup {
 	struct message_queue *queue;
@@ -540,6 +542,7 @@ struct req_exec_msg_queuegroupcreate {
 	mar_req_header_t header;
 	mar_message_source_t source;
 	SaNameT queue_group_name;
+	SaMsgQueueGroupPolicyT policy;
 };
 
 struct req_exec_msg_queuegroupinsert {
@@ -566,6 +569,8 @@ struct req_exec_msg_queuegrouptrack {
 	mar_req_header_t header;
 	mar_message_source_t source;
 	SaNameT queue_group_name;
+	SaUint8T track_flags;
+	SaUint8T buffer_flag;
 };
 
 struct req_exec_msg_queuegrouptrackstop {
@@ -654,7 +659,25 @@ static void print_message_list (struct message_queue *queue)
 		entry = list_entry (list, struct message_entry, list);
 
 		log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: print_message_list (%s) (%llu)\n",
-			    (char *)(entry->message.data), (unsigned long long)(entry->time));
+			    (char *)(entry->message.data),
+			    (unsigned long long)(entry->time));
+	}
+}
+
+static void print_queue_group_list (struct queue_group *group)
+{
+	struct list_head *list;
+	struct queue_group_entry *entry;
+
+	for (list = group->message_queue_head.next;
+	     list != &group->message_queue_head;
+	     list = list->next)
+	{
+		entry = list_entry (list, struct queue_group_entry, list);
+
+		log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: print_queue_group_list (%s) (%u)\n",
+			    (char *)(entry->message_queue->name.value),
+			    (unsigned int)(entry->change));
 	}
 }
 
@@ -679,41 +702,130 @@ static struct message_queue *queue_find (SaNameT *name)
 static struct queue_group *queue_group_find (SaNameT *name)
 {
 	struct list_head *list;
-	struct queue_group *queue_group;
+	struct queue_group *group;
 
 	for (list = queue_group_list_head.next;
 	     list != &queue_group_list_head;
 	     list = list->next)
 	{
-	        queue_group = list_entry (list, struct queue_group, list);
+	        group = list_entry (list, struct queue_group, list);
 
-		if (name_match (name, &queue_group->name)) {
-			return (queue_group);
+		if (name_match (name, &group->name)) {
+			return (group);
 		}
 	}
 	return (0);
 }
 
-static struct queue_group_entry *queue_group_entry_find (
-	struct queue_group *queue_group,
-	struct message_queue *queue)
+static struct queue_group_entry *queue_group_entry_find (struct queue_group *group, struct message_queue *queue)
 {
 	struct list_head *list;
-	struct queue_group_entry *queue_group_entry;
+	struct queue_group_entry *entry;
 
-	for (list = queue_group->message_queue_head.next;
-	     list != &queue_group->message_queue_head;
+	for (list = group->message_queue_head.next;
+	     list != &group->message_queue_head;
 	     list = list->next)
 	{
-	        queue_group_entry = list_entry (list, struct queue_group_entry, list);
-	
-		if (queue_group_entry->message_queue == queue) {
-			return (queue_group_entry);
+	        entry = list_entry (list, struct queue_group_entry, list);
+
+		if (entry->message_queue == queue) {
+			return (entry);
 		}
 	}
 	return (0);
 }
 
+static unsigned int queue_group_member_count (struct queue_group *group)
+{
+	struct list_head *list;
+
+	unsigned int count = 0;
+
+	for (list = group->message_queue_head.next;
+	     list != &group->message_queue_head;
+	     list = list->next)
+	{
+		count++;
+	}
+	return (count);
+}
+
+static unsigned int queue_group_change_count (struct queue_group *group)
+{
+	struct list_head *list;
+	struct queue_group_entry *entry;
+
+	unsigned int count = 0;
+
+	for (list = group->message_queue_head.next;
+	     list != &group->message_queue_head;
+	     list = list->next)
+	{
+		entry = list_entry (list, struct queue_group_entry, list);
+
+		if (entry->change != SA_MSG_QUEUE_GROUP_NO_CHANGE) {
+			count++;
+		}
+	}
+	return (count);
+}
+
+static unsigned int queue_group_track (
+	struct queue_group *group,
+	unsigned int flags,
+	void *buffer)
+{
+	struct list_head *list;
+	struct queue_group_entry *entry;
+
+	unsigned int i = 0;
+
+	SaMsgQueueGroupNotificationT *notification =
+		(SaMsgQueueGroupNotificationT *) buffer;
+
+
+	switch (flags) {
+
+	case SA_TRACK_CURRENT:
+	case SA_TRACK_CHANGES:
+
+		for (list = group->message_queue_head.next;
+		     list != &group->message_queue_head;
+		     list = list->next)
+		{
+			entry = list_entry (list, struct queue_group_entry, list);
+			memcpy (&notification[i].member.queueName,
+				&entry->message_queue->name,
+				sizeof (SaNameT));
+			notification[i].change = entry->change;
+			i++;
+		}
+		break;
+
+	case SA_TRACK_CHANGES_ONLY:
+
+		for (list = group->message_queue_head.next;
+		     list != &group->message_queue_head;
+		     list = list->next)
+		{
+			entry = list_entry (list, struct queue_group_entry, list);
+			if (entry->change != SA_MSG_QUEUE_GROUP_NO_CHANGE) {
+				memcpy (&notification[i].member.queueName,
+					&entry->message_queue->name,
+					sizeof (SaNameT));
+				notification[i].change = entry->change;
+				i++;
+			}
+		}
+		break;
+
+	default:
+		break;
+	}
+
+	return (i);
+}
+
 static int msg_exec_init_fn (struct objdb_iface_ver0 *objdb)
 {
 	/*
@@ -1016,10 +1128,16 @@ static void message_handler_req_exec_msg_queuegroupinsert (
 	struct req_exec_msg_queuegroupinsert *req_exec_msg_queuegroupinsert =
 		(struct req_exec_msg_queuegroupinsert *)message;
 	struct res_lib_msg_queuegroupinsert res_lib_msg_queuegroupinsert;
+	struct res_lib_msg_queuegrouptrack res_lib_msg_queuegrouptrack;
 	struct message_queue *queue;
 	struct queue_group *queue_group;
 	struct queue_group_entry *queue_group_entry;
+	SaMsgQueueGroupNotificationT *notification;
 	SaAisErrorT error = SA_AIS_OK;
+	SaAisErrorT error_cb = SA_AIS_OK;
+
+	unsigned int change_count = 0;
+	unsigned int member_count = 0;
 
 	queue_group = queue_group_find (&req_exec_msg_queuegroupinsert->queue_group_name);
 
@@ -1043,7 +1161,50 @@ static void message_handler_req_exec_msg_queuegroupinsert (
 	list_init (&queue_group_entry->list);
 	list_add (&queue_group_entry->list, &queue_group->message_queue_head);
 	list_add (&queue->list, &queue_list_head);
+
 	queue_group_entry->message_queue = queue;
+	queue_group_entry->change = SA_MSG_QUEUE_GROUP_ADDED;
+
+	if (queue_group->track_flags & SA_TRACK_CHANGES) {
+		member_count = queue_group_member_count (queue_group);
+		change_count = queue_group_change_count (queue_group);
+
+		notification = malloc (sizeof (SaMsgQueueGroupNotificationT) * member_count);
+
+		if (notification == NULL) {
+			error_cb = SA_AIS_ERR_NO_MEMORY;
+			goto error_track;
+		}
+
+		memset (notification, 0, sizeof (SaMsgQueueGroupNotificationT) * member_count);
+
+		res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems =
+			queue_group_track (queue_group,
+					   SA_TRACK_CHANGES,
+					   (void *)(notification));
+	}
+
+	if (queue_group->track_flags & SA_TRACK_CHANGES_ONLY) {
+		member_count = queue_group_member_count (queue_group);
+		change_count = queue_group_change_count (queue_group);
+
+		notification = malloc (sizeof (SaMsgQueueGroupNotificationT) * change_count);
+
+		if (notification == NULL) {
+			error_cb = SA_AIS_ERR_NO_MEMORY;
+			goto error_track;
+		}
+
+		memset (notification, 0, sizeof (SaMsgQueueGroupNotificationT) * change_count);
+
+		res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems =
+			queue_group_track (queue_group,
+					   SA_TRACK_CHANGES_ONLY,
+					   (void *)(notification));
+	}
+
+error_track:
+	queue_group_entry->change = SA_MSG_QUEUE_GROUP_NO_CHANGE;
 
 error_exit:
 	if (message_source_is_local(&req_exec_msg_queuegroupinsert->source)) {
@@ -1057,6 +1218,38 @@ error_exit:
 			req_exec_msg_queuegroupinsert->source.conn,
 			&res_lib_msg_queuegroupinsert,
 			sizeof (struct res_lib_msg_queuegroupinsert));
+
+		/*
+		 * Track changes (callback) if tracking is enabled
+		 */
+
+		if ((queue_group->track_flags & SA_TRACK_CHANGES) ||
+		    (queue_group->track_flags & SA_TRACK_CHANGES_ONLY))
+		{
+			res_lib_msg_queuegrouptrack.header.size =
+				(sizeof (struct res_lib_msg_queuegrouptrack) +
+				 (sizeof (SaMsgQueueGroupNotificationT) *
+				  res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems));
+			res_lib_msg_queuegrouptrack.header.id =
+				MESSAGE_RES_MSG_QUEUEGROUPTRACK;
+			res_lib_msg_queuegrouptrack.header.error = error_cb;
+			res_lib_msg_queuegrouptrack.numberOfMembers = member_count;
+
+			memcpy (&res_lib_msg_queuegrouptrack.queueGroupName,
+				&req_exec_msg_queuegroupinsert->queue_group_name,
+				sizeof (SaNameT));
+
+			openais_conn_send_response (
+				openais_conn_partner_get (req_exec_msg_queuegroupinsert->source.conn),
+				&res_lib_msg_queuegrouptrack,
+				sizeof (struct res_lib_msg_queuegrouptrack));
+
+			openais_conn_send_response (
+				openais_conn_partner_get (req_exec_msg_queuegroupinsert->source.conn),
+				notification,
+				(sizeof (SaMsgQueueGroupNotificationT) *
+				 res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems));
+		}
 	}
 }
 
@@ -1067,10 +1260,16 @@ static void message_handler_req_exec_msg_queuegroupremove (
 	struct req_exec_msg_queuegroupremove *req_exec_msg_queuegroupremove =
 		(struct req_exec_msg_queuegroupremove *)message;
 	struct res_lib_msg_queuegroupremove res_lib_msg_queuegroupremove;
+	struct res_lib_msg_queuegrouptrack res_lib_msg_queuegrouptrack;
 	struct queue_group *queue_group;
 	struct message_queue *queue;
 	struct queue_group_entry *queue_group_entry;
+	SaMsgQueueGroupNotificationT *notification;
 	SaAisErrorT error = SA_AIS_OK;
+	SaAisErrorT error_cb = SA_AIS_OK;
+
+	unsigned int change_count = 0;
+	unsigned int member_count = 0;
 
 	queue_group = queue_group_find (&req_exec_msg_queuegroupremove->queue_group_name);
 	if (queue_group == 0) {
@@ -1090,6 +1289,49 @@ static void message_handler_req_exec_msg_queuegroupremove (
 		goto error_exit;
 	}
 
+	queue_group_entry->change = SA_MSG_QUEUE_GROUP_REMOVED;
+
+	if (queue_group->track_flags & SA_TRACK_CHANGES) {
+		member_count = queue_group_member_count (queue_group);
+		change_count = queue_group_change_count (queue_group);
+
+		notification = malloc (sizeof (SaMsgQueueGroupNotificationT) * member_count);
+
+		if (notification == NULL) {
+			error_cb = SA_AIS_ERR_NO_MEMORY;
+			goto error_track;
+		}
+
+		memset (notification, 0, (sizeof (SaMsgQueueGroupNotificationT) * member_count));
+
+		res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems =
+			queue_group_track (queue_group,
+					   SA_TRACK_CHANGES,
+					   (void *)(notification));
+	}
+
+	if (queue_group->track_flags & SA_TRACK_CHANGES_ONLY) {
+		member_count = queue_group_member_count (queue_group);
+		change_count = queue_group_change_count (queue_group);
+
+		notification = malloc (sizeof (SaMsgQueueGroupNotificationT) * change_count);
+
+		if (notification == NULL) {
+			error_cb = SA_AIS_ERR_NO_MEMORY;
+			goto error_track;
+		}
+
+		memset (notification, 0, (sizeof (SaMsgQueueGroupNotificationT) * change_count));
+
+		res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems =
+			queue_group_track (queue_group,
+					   SA_TRACK_CHANGES_ONLY,
+					   (void *)(notification));
+	}
+
+error_track:
+	queue_group_entry->change = SA_MSG_QUEUE_GROUP_NO_CHANGE;
+
 	list_del (&queue_group_entry->list);
 
 error_exit:
@@ -1104,6 +1346,38 @@ error_exit:
 			req_exec_msg_queuegroupremove->source.conn,
 			&res_lib_msg_queuegroupremove,
 			sizeof (struct res_lib_msg_queuegroupremove));
+
+		/*
+		 * Track changes (callback) if tracking is enabled
+		 */
+
+		if ((queue_group->track_flags & SA_TRACK_CHANGES) ||
+		    (queue_group->track_flags & SA_TRACK_CHANGES_ONLY))
+		{
+			res_lib_msg_queuegrouptrack.header.size =
+				(sizeof (struct res_lib_msg_queuegrouptrack) +
+				 (sizeof (SaMsgQueueGroupNotificationT) *
+				  res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems));
+			res_lib_msg_queuegrouptrack.header.id =
+				MESSAGE_RES_MSG_QUEUEGROUPTRACK;
+			res_lib_msg_queuegrouptrack.header.error = error_cb;
+			res_lib_msg_queuegrouptrack.numberOfMembers = member_count;
+
+			memcpy (&res_lib_msg_queuegrouptrack.queueGroupName,
+				&req_exec_msg_queuegroupremove->queue_group_name,
+				sizeof (SaNameT));
+
+			openais_conn_send_response (
+				openais_conn_partner_get (req_exec_msg_queuegroupremove->source.conn),
+				&res_lib_msg_queuegrouptrack,
+				sizeof (struct res_lib_msg_queuegrouptrack));
+
+			openais_conn_send_response (
+				openais_conn_partner_get (req_exec_msg_queuegroupremove->source.conn),
+				notification,
+				(sizeof (SaMsgQueueGroupNotificationT) *
+				 res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems));
+		}
 	}
 }
 
@@ -1144,22 +1418,153 @@ static void message_handler_req_exec_msg_queuegrouptrack (
 	void *message,
 	unsigned int nodeid)
 {
-#if 0
 	struct req_exec_msg_queuegrouptrack *req_exec_msg_queuegrouptrack =
 		(struct req_exec_msg_queuegrouptrack *)message;
 	struct res_lib_msg_queuegrouptrack res_lib_msg_queuegrouptrack;
-#endif
+	struct queue_group *queue_group;
+	SaAisErrorT error = SA_AIS_OK;
+
+	unsigned int change_count = 0;
+	unsigned int member_count = 0;
+
+	SaMsgQueueGroupNotificationT *notification;
+
+	queue_group = queue_group_find (&req_exec_msg_queuegrouptrack->queue_group_name);
+
+	if (queue_group == 0) {
+		error = SA_AIS_ERR_NOT_EXIST;
+		goto error_exit;
+	}
+
+	member_count = queue_group_member_count (queue_group);
+	change_count = queue_group_change_count (queue_group);
+
+	if (req_exec_msg_queuegrouptrack->track_flags & SA_TRACK_CURRENT) {
+		/* DEBUG */
+		log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: SA_TRACK_CURRENT\n");
+
+		notification = malloc (sizeof (SaMsgQueueGroupNotificationT) * member_count);
+
+		if (notification == NULL) {
+			error = SA_AIS_ERR_NO_MEMORY;
+			goto error_exit;
+		}
+
+		memset (notification, 0, sizeof (SaMsgQueueGroupNotificationT) * member_count);
+
+		res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems =
+			queue_group_track (queue_group, SA_TRACK_CURRENT, (void *)(notification));
+	}
+
+	if (req_exec_msg_queuegrouptrack->track_flags & SA_TRACK_CHANGES) {
+		/* DEBUG */
+		log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: SA_TRACK_CHANGES\n");
+		queue_group->track_flags = req_exec_msg_queuegrouptrack->track_flags;
+	}
+
+	if (req_exec_msg_queuegrouptrack->track_flags & SA_TRACK_CHANGES_ONLY) {
+		/* DEBUG */
+		log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: SA_TRACK_CHANGES_ONLY\n");
+		queue_group->track_flags = req_exec_msg_queuegrouptrack->track_flags;
+	}
+
+error_exit:
+	if (message_source_is_local(&req_exec_msg_queuegrouptrack->source)) {
+		res_lib_msg_queuegrouptrack.header.size =
+			sizeof (struct res_lib_msg_queuegrouptrack);
+		res_lib_msg_queuegrouptrack.header.id =
+			MESSAGE_RES_MSG_QUEUEGROUPTRACK;
+		res_lib_msg_queuegrouptrack.header.error = error;
+		res_lib_msg_queuegrouptrack.numberOfMembers = member_count;
+
+		memcpy (&res_lib_msg_queuegrouptrack.queueGroupName,
+			&req_exec_msg_queuegrouptrack->queue_group_name,
+			sizeof (SaNameT));
+
+		if (req_exec_msg_queuegrouptrack->track_flags & SA_TRACK_CURRENT) {
+			if (req_exec_msg_queuegrouptrack->buffer_flag) {
+				res_lib_msg_queuegrouptrack.header.size +=
+					(sizeof (SaMsgQueueGroupNotificationT) *
+					 res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems);
+
+				openais_conn_send_response (
+					req_exec_msg_queuegrouptrack->source.conn,
+					&res_lib_msg_queuegrouptrack,
+					sizeof (struct res_lib_msg_queuegrouptrack));
+
+				openais_conn_send_response (
+					req_exec_msg_queuegrouptrack->source.conn,
+					notification,
+					(sizeof (SaMsgQueueGroupNotificationT) *
+					 res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems));
+			} else {
+				openais_conn_send_response (
+					req_exec_msg_queuegrouptrack->source.conn,
+					&res_lib_msg_queuegrouptrack,
+					sizeof (struct res_lib_msg_queuegrouptrack));
+
+				res_lib_msg_queuegrouptrack.header.size +=
+					(sizeof (SaMsgQueueGroupNotificationT) *
+					 res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems);
+
+				openais_conn_send_response (
+					openais_conn_partner_get (req_exec_msg_queuegrouptrack->source.conn),
+					&res_lib_msg_queuegrouptrack,
+					sizeof (struct res_lib_msg_queuegrouptrack));
+
+				openais_conn_send_response (
+					openais_conn_partner_get (req_exec_msg_queuegrouptrack->source.conn),
+					notification,
+					(sizeof (SaMsgQueueGroupNotificationT) *
+					 res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems));
+			}
+		} else {
+			openais_conn_send_response (
+				req_exec_msg_queuegrouptrack->source.conn,
+				&res_lib_msg_queuegrouptrack,
+				sizeof (struct res_lib_msg_queuegrouptrack));
+		}
+	}
 }
 
 static void message_handler_req_exec_msg_queuegrouptrackstop (
 	void *message,
 	unsigned int nodeid)
 {
-#if 0
 	struct req_exec_msg_queuegrouptrackstop *req_exec_msg_queuegrouptrackstop =
 		(struct req_exec_msg_queuegrouptrackstop *)message;
 	struct res_lib_msg_queuegrouptrackstop res_lib_msg_queuegrouptrackstop;
-#endif
+	struct queue_group *queue_group;
+	SaAisErrorT error = SA_AIS_OK;
+
+	queue_group = queue_group_find (&req_exec_msg_queuegrouptrackstop->queue_group_name);
+
+	if (queue_group == 0) {
+		error = SA_AIS_ERR_NOT_EXIST;
+		goto error_exit;
+	}
+
+	if ((queue_group->track_flags != SA_TRACK_CHANGES) &&
+	    (queue_group->track_flags != SA_TRACK_CHANGES_ONLY)) {
+		error = SA_AIS_ERR_NOT_EXIST;
+		goto error_exit;
+	}
+
+	queue_group->track_flags = 0;
+
+error_exit:
+	if (message_source_is_local(&req_exec_msg_queuegrouptrackstop->source)) {
+		res_lib_msg_queuegrouptrackstop.header.size =
+			sizeof (struct res_lib_msg_queuegrouptrackstop);
+		res_lib_msg_queuegrouptrackstop.header.id =
+			MESSAGE_RES_MSG_QUEUEGROUPTRACKSTOP;
+		res_lib_msg_queuegrouptrackstop.header.error = error;
+
+		openais_conn_send_response (
+			req_exec_msg_queuegrouptrackstop->source.conn,
+			&res_lib_msg_queuegrouptrackstop,
+			sizeof (struct res_lib_msg_queuegrouptrackstop));
+	}
 }
 
 static void message_handler_req_exec_msg_messagesend (
@@ -1209,9 +1614,6 @@ static void message_handler_req_exec_msg_messagesend (
 
 	list_add_tail (&entry->list, &queue->message_list_head);
 
-	/* DEBUG */
-	print_message_list (queue);
-
 error_exit:
 
 	if (message_source_is_local(&req_exec_msg_messagesend->source)) {
@@ -1536,6 +1938,9 @@ static void message_handler_req_lib_msg_queuegroupcreate (
 	memcpy (&req_exec_msg_queuegroupcreate.queue_group_name,
 		&req_lib_msg_queuegroupcreate->queueGroupName, sizeof (SaNameT));
 
+	req_exec_msg_queuegroupcreate.policy =
+		req_lib_msg_queuegroupcreate->queueGroupPolicy;
+
 	iovec.iov_base = (char *)&req_exec_msg_queuegroupcreate;
 	iovec.iov_len = sizeof (req_exec_msg_queuegroupcreate);
 
@@ -1656,6 +2061,11 @@ static void message_handler_req_lib_msg_queuegrouptrack (
 	memcpy (&req_exec_msg_queuegrouptrack.queue_group_name,
 		&req_lib_msg_queuegrouptrack->queueGroupName, sizeof (SaNameT));
 
+	req_exec_msg_queuegrouptrack.track_flags =
+		req_lib_msg_queuegrouptrack->trackFlags;
+	req_exec_msg_queuegrouptrack.buffer_flag =
+		req_lib_msg_queuegrouptrack->bufferFlag;
+
 	iovec.iov_base = (char *)&req_exec_msg_queuegrouptrack;
 	iovec.iov_len = sizeof (req_exec_msg_queuegrouptrack);
 

+ 4 - 0
include/ipc_msg.h

@@ -178,10 +178,14 @@ struct req_lib_msg_queuegrouptrack {
 	mar_req_header_t header;
 	SaNameT queueGroupName;
 	SaUint8T trackFlags;
+	SaUint8T bufferFlag;
 };
 
 struct res_lib_msg_queuegrouptrack {
 	mar_res_header_t header;
+	SaNameT queueGroupName;
+	SaUint32T numberOfMembers;
+	SaMsgQueueGroupNotificationBufferT notificationBuffer;
 };
 
 struct req_lib_msg_queuegrouptrackstop {

+ 87 - 2
lib/msg.c

@@ -301,7 +301,7 @@ saMsgDispatch (
 
 	struct res_lib_msg_queueopenasync *res_lib_msg_queueopenasync;
 	struct res_lib_msg_messagesendasync *res_lib_msg_messagesendasync;
-
+	struct res_lib_msg_queuegrouptrack *res_lib_msg_queuegrouptrack;
 
 	if (dispatchFlags != SA_DISPATCH_ONE &&
 	    dispatchFlags != SA_DISPATCH_ALL &&
@@ -354,6 +354,7 @@ saMsgDispatch (
 			pthread_mutex_unlock(&msgInstance->dispatch_mutex);
 			break; /* exit do while cont is 1 loop */
 		} else
+
 		if (dispatch_avail == 0) {
 			pthread_mutex_unlock(&msgInstance->dispatch_mutex);
 			continue;
@@ -366,6 +367,7 @@ saMsgDispatch (
 		if (error != SA_AIS_OK) {
 			goto error_unlock;
 		}
+
 		if (dispatch_data.header.size > sizeof (mar_res_header_t)) {
 			error = saRecvRetry (msgInstance->dispatch_fd, &dispatch_data.data,
 				dispatch_data.header.size - sizeof (mar_res_header_t));
@@ -453,6 +455,26 @@ saMsgDispatch (
 
 			break;
 
+		case MESSAGE_RES_MSG_QUEUEGROUPTRACK:
+
+			if (callbacks.saMsgQueueGroupTrackCallback == NULL) {
+				continue;
+			}
+			res_lib_msg_queuegrouptrack =
+				(struct res_lib_msg_queuegrouptrack *) &dispatch_data;
+
+			res_lib_msg_queuegrouptrack->notificationBuffer.notification =
+				(SaMsgQueueGroupNotificationT *)
+				(((char *) &dispatch_data) + sizeof (struct res_lib_msg_queuegrouptrack));
+
+			callbacks.saMsgQueueGroupTrackCallback (
+				&res_lib_msg_queuegrouptrack->queueGroupName,
+				&res_lib_msg_queuegrouptrack->notificationBuffer,
+				res_lib_msg_queuegrouptrack->numberOfMembers,
+				res_lib_msg_queuegrouptrack->header.error);
+
+			break;
+
 		default:
 			/* TODO */
 			break;
@@ -1117,6 +1139,22 @@ saMsgQueueGroupTrack (
 		return (SA_AIS_ERR_INVALID_PARAM);
 	}
 
+	if ((notificationBuffer != NULL) &&
+	    (notificationBuffer->notification != NULL) &&
+	    (notificationBuffer->numberOfItems == 0)) {
+		return (SA_AIS_ERR_INVALID_PARAM);
+	}
+
+	if ((notificationBuffer != NULL) &&
+	    (notificationBuffer->notification == NULL)) {
+		notificationBuffer->numberOfItems = 0;
+	}
+
+	if ((trackFlags & SA_TRACK_CHANGES) &&
+	    (trackFlags & SA_TRACK_CHANGES_ONLY)) {
+		return (SA_AIS_ERR_BAD_FLAGS);
+	}
+
 	/* DEBUG */
 	printf ("[DEBUG]: saMsgQueueGroupTrack { queueGroupName = %s }\n",
 		(char *) queueGroupName->value);
@@ -1133,20 +1171,67 @@ saMsgQueueGroupTrack (
 		MESSAGE_REQ_MSG_QUEUEGROUPTRACK;
 
 	req_lib_msg_queuegrouptrack.trackFlags = trackFlags;
+	req_lib_msg_queuegrouptrack.bufferFlag = (notificationBuffer != NULL);
+
+	/* DEBUG */
+	printf ("[DEBUG]: saMsgQueueGroupTrack { bufferFlag = %d }\n",
+		(int)(req_lib_msg_queuegrouptrack.bufferFlag));
 
 	memcpy (&req_lib_msg_queuegrouptrack.queueGroupName, queueGroupName,
 		sizeof (SaNameT));
 
 	pthread_mutex_lock (&msgInstance->response_mutex);
 
+	/*
 	error = saSendReceiveReply (msgInstance->response_fd,
 		&req_lib_msg_queuegrouptrack,
 		sizeof (struct req_lib_msg_queuegrouptrack),
 		&res_lib_msg_queuegrouptrack,
 		sizeof (struct res_lib_msg_queuegrouptrack));
+	*/
 
-	pthread_mutex_unlock (&msgInstance->response_mutex);
+	error = saSendRetry (msgInstance->response_fd, &req_lib_msg_queuegrouptrack,
+		sizeof (struct req_lib_msg_queuegrouptrack));
+	if (error != SA_AIS_OK) {
+		goto error_exit;
+	}
+
+	error = saRecvRetry (msgInstance->response_fd, &res_lib_msg_queuegrouptrack,
+		sizeof (struct res_lib_msg_queuegrouptrack));
+	if (error != SA_AIS_OK) {
+		goto error_exit;
+	}
 
+	if ((trackFlags & SA_TRACK_CURRENT) && (notificationBuffer != NULL)) {
+		if (notificationBuffer->notification != NULL) {
+			if (notificationBuffer->numberOfItems < res_lib_msg_queuegrouptrack.numberOfMembers) {
+				error = SA_AIS_ERR_NO_SPACE;
+				goto error_exit;
+			}
+		} else {
+			notificationBuffer->notification =
+				malloc (sizeof (SaMsgQueueGroupNotificationT) *
+					res_lib_msg_queuegrouptrack.numberOfMembers);
+
+			if (notificationBuffer->notification == NULL) {
+				error = SA_AIS_ERR_NO_MEMORY;
+				goto error_exit;
+			}
+
+			memset (notificationBuffer->notification, 0,
+				(sizeof (SaMsgQueueGroupNotificationT) *
+				 res_lib_msg_queuegrouptrack.numberOfMembers));
+		}
+
+		error = saRecvRetry (msgInstance->response_fd,
+				     notificationBuffer->notification,
+				     (sizeof (SaMsgQueueGroupNotificationT) *
+				      res_lib_msg_queuegrouptrack.numberOfMembers));
+	}
+
+error_exit:
+	pthread_mutex_unlock (&msgInstance->response_mutex);
+error_put_msg:
 	saHandleInstancePut (&msgHandleDatabase, msgHandle);
 
 	return (error == SA_AIS_OK ? res_lib_msg_queuegrouptrack.header.error : error);