Просмотр исходного кода

major rewrite of the IPC code for sending messages
to the library. What was there previously was very
broken.

(Logical change 1.62)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@214 fd59a12c-fef9-0310-b244-a6a79926bd2f

Steven Dake 21 лет назад
Родитель
Сommit
50a4099c23
1 измененных файлов с 106 добавлено и 14 удалено
  1. 106 14
      exec/main.c

+ 106 - 14
exec/main.c

@@ -62,6 +62,7 @@
 #include "parse.h"
 #include "main.h"
 #include "handlers.h"
+#include "evs.h"
 #include "clm.h"
 #include "amf.h"
 #include "ckpt.h"
@@ -79,6 +80,7 @@ struct gmi_groupname aisexec_groupname = { "0123" };
  * All service handlers in the AIS
  */
 struct service_handler *ais_service_handlers[] = {
+    &evs_service_handler,
     &clm_service_handler,
     &amf_service_handler,
     &ckpt_service_handler,
@@ -87,7 +89,7 @@ struct service_handler *ais_service_handlers[] = {
     &evt_service_handler
 };
 
-#define AIS_SERVICE_HANDLERS_COUNT 6
+#define AIS_SERVICE_HANDLERS_COUNT 7
 #define AIS_SERVICE_HANDLER_AISEXEC_FUNCTIONS_MAX 40
 
 static int poll_handler_libais_deliver (poll_handle handle, int fd, int revent, void *data, unsigned int *prio);
@@ -178,7 +180,7 @@ static int libais_disconnect (struct conn_info *conn_info)
 		 */
 		while (!queue_is_empty (&conn_info->outq)) {
 			outq_item = queue_item_get (&conn_info->outq);
-			mempool_free (outq_item->msg);
+			free (outq_item->msg);
 			queue_item_remove (&conn_info->outq);
 		}
 
@@ -200,6 +202,66 @@ static int libais_disconnect (struct conn_info *conn_info)
 	return (res != -1 ? -1 : 0);
 }
 
+static int cleanup_send_response (struct conn_info *conn_info) {
+	struct queue *outq;
+	int res = 0;
+	struct outq_item *queue_item;
+	struct msghdr msg_send;
+	struct iovec iov_send;
+	char *msg_addr;
+
+	if (!libais_connection_active (conn_info)) {
+		return (-1);
+	}
+	outq = &conn_info->outq;
+
+	msg_send.msg_iov = &iov_send;
+	msg_send.msg_name = 0;
+	msg_send.msg_namelen = 0;
+	msg_send.msg_iovlen = 1;
+	msg_send.msg_control = 0;
+	msg_send.msg_controllen = 0;
+	msg_send.msg_flags = 0;
+
+	while (!queue_is_empty (outq)) {
+		queue_item = queue_item_get (outq);
+		msg_addr = (char *)queue_item->msg;
+		msg_addr = &msg_addr[conn_info->byte_start];
+
+		iov_send.iov_base = msg_addr;
+		iov_send.iov_len = queue_item->mlen - conn_info->byte_start;
+
+retry_sendmsg:
+		res = sendmsg (conn_info->fd, &msg_send, MSG_DONTWAIT | MSG_NOSIGNAL);
+		if (res == -1 && errno == EINTR) {
+			goto retry_sendmsg;
+		}
+		if (res == -1 && errno == EAGAIN) {
+			break; /* outgoing kernel queue full */
+		}
+		if (res == -1) {
+			return (-1); /* message couldn't be sent */
+		}
+		if (res + conn_info->byte_start != queue_item->mlen) {
+			conn_info->byte_start += res;
+			break;
+		}
+
+		/*
+		 * Message sent, try sending another message
+		 */
+		queue_item_remove (outq);
+		conn_info->byte_start = 0;
+		free (queue_item->msg);
+	} /* while queue not empty */
+
+	if (queue_is_empty (outq)) {
+		poll_dispatch_modify (aisexec_poll_handle, conn_info->fd,
+			POLLIN|POLLNVAL, poll_handler_libais_deliver, 0);
+	}
+	return (0);
+}
+
 extern int libais_send_response (struct conn_info *conn_info,
 	void *msg, int mlen)
 {
@@ -211,6 +273,7 @@ extern int libais_send_response (struct conn_info *conn_info,
 	struct outq_item queue_item_out;
 	struct msghdr msg_send;
 	struct iovec iov_send;
+	char *msg_addr;
 
 	if (!libais_connection_active (conn_info)) {
 		return (-1);
@@ -236,8 +299,11 @@ extern int libais_send_response (struct conn_info *conn_info,
 	}
 	while (!queue_is_empty (outq)) {
 		queue_item = queue_item_get (outq);
-		iov_send.iov_base = (void *)conn_info->byte_start;
-		iov_send.iov_len = queue_item->mlen;
+		msg_addr = (char *)queue_item->msg;
+		msg_addr = &msg_addr[conn_info->byte_start];
+
+		iov_send.iov_base = msg_addr;
+		iov_send.iov_len = queue_item->mlen - conn_info->byte_start;
 
 retry_sendmsg:
 		res = sendmsg (conn_info->fd, &msg_send, MSG_DONTWAIT | MSG_NOSIGNAL);
@@ -248,7 +314,11 @@ retry_sendmsg:
 			break; /* outgoing kernel queue full */
 		}
 		if (res == -1) {
-			return (-1); /* message couldn't be sent */
+			break; /* some other error, stop trying to send message */
+		}
+		if (res + conn_info->byte_start != queue_item->mlen) {
+			conn_info->byte_start += res;
+			break;
 		}
 
 		/*
@@ -256,23 +326,39 @@ retry_sendmsg:
 		 */
 		queue_item_remove (outq);
 		conn_info->byte_start = 0;
-		mempool_free (queue_item->msg);
+		free (queue_item->msg);
 	} /* while queue not empty */
 
+	res = -1;
+
 	queue_empty = queue_is_empty (outq);
 	/*
 	 * Send requested message
 	 */
 	if (queue_empty) {
+		
 		iov_send.iov_base = msg;
 		iov_send.iov_len = mlen;
 retry_sendmsg_two:
 		res = sendmsg (conn_info->fd, &msg_send, MSG_DONTWAIT | MSG_NOSIGNAL);
+
 		if (res == -1 && errno == EINTR) {
 			goto retry_sendmsg_two;
 		}
-		if (res == -1 && errno != EAGAIN) {
-			return (-1);
+		if (res == -1 && errno == EAGAIN) {
+			conn_info->byte_start = 0;
+			poll_dispatch_modify (aisexec_poll_handle, conn_info->fd,
+				POLLIN|POLLNVAL, poll_handler_libais_deliver, 0);
+		}
+		if (res != -1) {
+			if (res + conn_info->byte_start != mlen) {
+				conn_info->byte_start += res;
+				res = -1;
+			} else {
+				conn_info->byte_start = 0;
+				poll_dispatch_modify (aisexec_poll_handle, conn_info->fd,
+					POLLIN|POLLNVAL, poll_handler_libais_deliver, 0);
+			}
 		}
 	}
 
@@ -280,7 +366,7 @@ retry_sendmsg_two:
 	 * If res == -1 , errrno == EAGAIN which means kernel queue full
 	 */
 	if (res == -1)  {
-		cmsg = mempool_malloc (mlen);
+		cmsg = malloc (mlen);
 		if (cmsg == 0) {
 			log_printf (LOG_LEVEL_ERROR, "Library queue couldn't allocate a message, disconnecting library connection.\n");
 			libais_disconnect_delayed (conn_info);
@@ -290,6 +376,9 @@ retry_sendmsg_two:
 		queue_item_out.mlen = mlen;
 		memcpy (cmsg, msg, mlen);
 		queue_item_add (outq, &queue_item_out);
+
+		poll_dispatch_modify (aisexec_poll_handle, conn_info->fd,
+			POLLOUT|POLLIN|POLLNVAL, poll_handler_libais_deliver, 0);
 	}
 	return (0);
 }
@@ -371,6 +460,13 @@ static int poll_handler_libais_deliver (poll_handle handle, int fd, int revent,
 	msg_recv.msg_namelen = 0;
 	msg_recv.msg_flags = 0;
 
+	if (revent & POLLOUT) {
+		cleanup_send_response (conn_info);
+	}
+	if ((revent & POLLIN) == 0) {
+		return (0);
+	}
+
 	/*
 	 * Handle delayed disconnections
 	 */
@@ -390,20 +486,16 @@ static int poll_handler_libais_deliver (poll_handle handle, int fd, int revent,
 	iov_recv.iov_base = &conn_info->inb[conn_info->inb_start];
 	iov_recv.iov_len = (SIZEINB) - conn_info->inb_start;
 	assert (iov_recv.iov_len != 0);
-//printf ("inb start inb inuse %d %d\n", conn_info->inb_start, conn_info->inb_inuse);
 
 retry_recv:
 	res = recvmsg (fd, &msg_recv, MSG_DONTWAIT | MSG_NOSIGNAL);
-//printf ("received %d bytes\n", res);
 	if (res == -1 && errno == EINTR) {
 		goto retry_recv;
 	} else
-	if (res == -1) {
-printf ("res-1 errno = %d\n", errno);
+	if (res == -1 && errno != EAGAIN) {
 		goto error_disconnect;
 	} else
 	if (res == 0) {
-printf ("Res0 errno = %d\n", errno);
 		goto error_disconnect;
 		return (-1);
 	}