Просмотр исходного кода

Add support to disconnect and dispatch to utilize flow control.

(Logical change 1.55)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@179 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake 21 лет назад
Родитель
Сommit
69fd2d8dcd
1 измененных файлов с 66 добавлено и 29 удалено
  1. 66 29
      exec/main.c

+ 66 - 29
exec/main.c

@@ -133,23 +133,37 @@ struct sockaddr_in this_ip;
 
 char *socketname = "libais.socket";
 
-static void libais_disconnect (struct conn_info *conn_info)
+static int libais_disconnect (struct conn_info *conn_info)
 {
-	int fd;
+	int res = 0;
 
 	if (ais_service_handlers[conn_info->service - 1]->libais_exit_fn) {
-		ais_service_handlers[conn_info->service - 1]->libais_exit_fn (conn_info);
-	} else {
-		printf ("exit function not defined\n");
+		res = ais_service_handlers[conn_info->service - 1]->libais_exit_fn (conn_info);
 	}
 
-	fd = conn_info->fd;
+	/*
+	 * Close the library connection and free its
+	 * data if it hasn't already been freed
+	 */
+	if (conn_info->inb) {
+		close (conn_info->fd);
+		queue_free (&conn_info->outq);
+		free (conn_info->inb);
+		conn_info->inb = 0;
+	}
 
-	close (fd);
-	queue_free (&conn_info->outq);
-	free (conn_info->inb);
+	/*
+	 * If exit_fn didn't request a retry,
+	 * free the conn_info structure
+	 */
+	if (res != -1) {
+		free (conn_info);
+	}
 
-	poll_dispatch_delete (aisexec_poll_handle, fd);
+	/*
+	 * Inverse res from libais exit fn handler
+	 */
+	return (res != -1 ? -1 : 0);
 }
 
 extern int libais_send_response (struct conn_info *conn_info,
@@ -280,7 +294,7 @@ retry_accept:
 		return (0); /* This is an error, but -1 would indicate disconnect from poll */
 	}
 
-	poll_dispatch_add (aisexec_poll_handle, new_fd, POLLIN, conn_info,
+	poll_dispatch_add (aisexec_poll_handle, new_fd, POLLIN|POLLNVAL, conn_info,
 		poll_handler_libais_deliver, 0);
 
 // TODO is this needed, or shouldn't it be in conn_info_create ?
@@ -288,11 +302,16 @@ retry_accept:
 	return (0);
 }
 
+struct message_overlay {
+	struct res_header header;
+	char buf[4096];
+};
+
 static int poll_handler_libais_deliver (poll_handle handle, int fd, int revent, void *data, unsigned int *prio)
 {
 	int res;
 	struct conn_info *conn_info = (struct conn_info *)data;
-	struct message_header *header;
+	struct req_header *header;
 	int service;
 	struct msghdr msg_recv;
 	struct iovec iov_recv;
@@ -300,6 +319,8 @@ static int poll_handler_libais_deliver (poll_handle handle, int fd, int revent,
 	char cmsg_cred[CMSG_SPACE (sizeof (struct ucred))];
 	struct ucred *cred;
 	int on = 0;
+	int send_ok = 0;
+	struct message_overlay msg_overlay;
 
 	msg_recv.msg_iov = &iov_recv;
 	msg_recv.msg_iovlen = 1;
@@ -352,19 +373,13 @@ retry_recv:
 	}
 	/*
 	 * Dispatch all messages received in recvmsg that can be dispatched
-	 * sizeof (struct message_header) needed at minimum to do any processing
+	 * sizeof (struct req_header) needed at minimum to do any processing
 	 */
 	conn_info->inb_inuse += res;
 	conn_info->inb_start += res;
 
-	while (conn_info->inb_inuse >= sizeof (struct message_header) && res != -1) {
-		header = (struct message_header *)&conn_info->inb[conn_info->inb_start - conn_info->inb_inuse];
-
-		if (header->magic != MESSAGE_MAGIC) {
-			log_printf (LOG_LEVEL_SECURITY, "Invalid magic is %x should be %x\n", header->magic, MESSAGE_MAGIC);
-			res = -1;
-			goto error_exit;
-		}
+	while (conn_info->inb_inuse >= sizeof (struct req_header) && res != -1) {
+		header = (struct req_header *)&conn_info->inb[conn_info->inb_start - conn_info->inb_inuse];
 
 		if (header->size > conn_info->inb_inuse) {
 			break;
@@ -384,14 +399,36 @@ retry_recv:
 			/*
 			 * Not an init service, but a standard service
 			 */
-			if (header->id < 0 || header->id > ais_service_handlers[service - 1]->libais_handler_fns_count) {
+			if (header->id < 0 || header->id > ais_service_handlers[service - 1]->libais_handlers_count) {
 				log_printf (LOG_LEVEL_SECURITY, "Invalid header id is %d min 0 max %d\n",
-				header->id, ais_service_handlers[service - 1]->libais_handler_fns_count);
+				header->id, ais_service_handlers[service - 1]->libais_handlers_count);
 				res = -1;
 				goto error_exit;
 			}
+
+			/*
+			 * Determine if a message can be queued with gmi and if so
+			 * deliver it, otherwise tell the library we are too busy
+			 */
 	
-			res = ais_service_handlers[service - 1]->libais_handler_fns[header->id](conn_info, header);
+			send_ok = gmi_send_ok (ais_service_handlers[service - 1]->libais_handlers[header->id].gmi_prio, 1000 + header->size);
+			if (send_ok) {
+				*prio = 0;
+				res = ais_service_handlers[service - 1]->libais_handlers[header->id].libais_handler_fn(conn_info, header);
+			} else {
+				*prio = (*prio) + 1;
+
+				/*
+				 * Overload, tell library to retry
+				 */
+				msg_overlay.header.size = 
+					ais_service_handlers[service - 1]->libais_handlers[header->id].response_size;
+				msg_overlay.header.id = 
+					ais_service_handlers[service - 1]->libais_handlers[header->id].response_id;
+				msg_overlay.header.error = SA_ERR_TRY_AGAIN;
+				libais_send_response (conn_info, &msg_overlay,
+					msg_overlay.header.size);
+			}
 		}
 		conn_info->inb_inuse -= header->size;
 	} /* while */
@@ -413,8 +450,8 @@ retry_recv:
 	return (res);
 
 error_exit:
-	libais_disconnect (conn_info);
-	return (-1); /* remove entry from poll list */
+	res = libais_disconnect (conn_info);
+	return (res);
 }
 
 extern void print_stats (void);
@@ -475,7 +512,7 @@ static void deliver_fn (
 	struct iovec *iovec,
 	int iov_len)
 {
-	struct message_header *header;
+	struct req_header *header;
 	int res;
 	int pos = 0;
 	int i;
@@ -491,9 +528,9 @@ static void deliver_fn (
 			pos += iovec[i].iov_len;
 			assert (pos < MESSAGE_SIZE_MAX);
 		}
-		header = (struct message_header *)delivery_data;
+		header = (struct req_header *)delivery_data;
 	} else {
-		header = (struct message_header *)iovec[0].iov_base;
+		header = (struct req_header *)iovec[0].iov_base;
 	}
 	res = aisexec_handler_fns[header->id](header, source_addr);
 }