Browse Source

qdevice: Improve socket based IPC

Signed-off-by: Jan Friesse <jfriesse@redhat.com>
Jan Friesse 10 years ago
parent
commit
40ca891974

+ 1 - 1
qdevices/Makefile.am

@@ -55,7 +55,7 @@ corosync_qdevice_SOURCES	= corosync-qdevice.c qdevice-cmap.c qdevice-instance.c
 				  qdevice-net-instance.c dynar.c send-buffer-list.c timer-list.c \
 				  qdevice-net-instance.c dynar.c send-buffer-list.c timer-list.c \
 				  msg.c msgio.c nss-sock.c tlv.c \
 				  msg.c msgio.c nss-sock.c tlv.c \
 				  unix-socket.c unix-socket-client.c unix-socket-client-list.c \
 				  unix-socket.c unix-socket-client.c unix-socket-client-list.c \
-				  unix-socket-ipc.c qdevice-ipc.c pr-poll-array.c \
+				  unix-socket-ipc.c qdevice-ipc.c pr-poll-array.c dynar-simple-lex.c \
 				  qdevice-net-poll.c qdevice-net-send.c qdevice-net-votequorum.c \
 				  qdevice-net-poll.c qdevice-net-send.c qdevice-net-votequorum.c \
 				  qdevice-net-socket.c qdevice-net-nss.c qdevice-net-msg-received.c \
 				  qdevice-net-socket.c qdevice-net-nss.c qdevice-net-msg-received.c \
 				  qdevice-net-cast-vote-timer.c qdevice-net-echo-request-timer.c \
 				  qdevice-net-cast-vote-timer.c qdevice-net-echo-request-timer.c \

+ 1 - 3
qdevices/corosync-qnetd.c

@@ -97,10 +97,8 @@ qnetd_pr_poll_array_create(struct qnetd_instance *instance)
 
 
 	poll_desc->fd = instance->server.socket;
 	poll_desc->fd = instance->server.socket;
 	poll_desc->in_flags = PR_POLL_READ;
 	poll_desc->in_flags = PR_POLL_READ;
-	poll_desc->out_flags = 0;
 
 
 	user_data->type = QNETD_POLL_ARRAY_USER_DATA_TYPE_SOCKET;
 	user_data->type = QNETD_POLL_ARRAY_USER_DATA_TYPE_SOCKET;
-	user_data->client = NULL;
 
 
 	TAILQ_FOREACH(client, client_list, entries) {
 	TAILQ_FOREACH(client, client_list, entries) {
 		if (pr_poll_array_add(poll_array, &poll_desc, (void **)&user_data) < 0) {
 		if (pr_poll_array_add(poll_array, &poll_desc, (void **)&user_data) < 0) {
@@ -108,10 +106,10 @@ qnetd_pr_poll_array_create(struct qnetd_instance *instance)
 		}
 		}
 		poll_desc->fd = client->socket;
 		poll_desc->fd = client->socket;
 		poll_desc->in_flags = PR_POLL_READ;
 		poll_desc->in_flags = PR_POLL_READ;
+
 		if (!send_buffer_list_empty(&client->send_buffer_list)) {
 		if (!send_buffer_list_empty(&client->send_buffer_list)) {
 			poll_desc->in_flags |= PR_POLL_WRITE;
 			poll_desc->in_flags |= PR_POLL_WRITE;
 		}
 		}
-		poll_desc->out_flags = 0;
 
 
 		user_data->type = QNETD_POLL_ARRAY_USER_DATA_TYPE_CLIENT;
 		user_data->type = QNETD_POLL_ARRAY_USER_DATA_TYPE_CLIENT;
 		user_data->client = client;
 		user_data->client = client;

+ 84 - 0
qdevices/dynar-simple-lex.c

@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2015-2016 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Jan Friesse (jfriesse@redhat.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the Red Hat, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <string.h>
+#include <ctype.h>
+
+#include "dynar-simple-lex.h"
+
+void
+dynar_simple_lex_init(struct dynar_simple_lex *lex, struct dynar *input)
+{
+
+	memset(lex, 0, sizeof(*lex));
+	lex->input = input;
+	dynar_init(&lex->token, dynar_max_size(input));
+}
+
+void
+dynar_simple_lex_destroy(struct dynar_simple_lex *lex)
+{
+
+	dynar_destroy(&lex->token);
+	memset(lex, 0, sizeof(*lex));
+}
+
+struct dynar *
+dynar_simple_lex_token_next(struct dynar_simple_lex *lex)
+{
+	size_t pos;
+	size_t size;
+	char *str;
+	char ch;
+
+	dynar_clean(&lex->token);
+
+	size = dynar_size(lex->input);
+	str = dynar_data(lex->input);
+
+	for (pos = lex->pos; pos < size && isspace(str[pos]) && str[pos] != '\n'; pos++) ;
+
+	for (; pos < size && !isspace(str[pos]); pos++) {
+		if (dynar_cat(&lex->token, &str[pos], sizeof(*str)) != 0) {
+			return (NULL);
+		}
+	}
+
+	ch = '\0';
+	dynar_cat(&lex->token, &ch, sizeof(ch));
+
+	lex->pos = pos;
+
+	return (&lex->token);
+}

+ 60 - 0
qdevices/dynar-simple-lex.h

@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2015-2016 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Jan Friesse (jfriesse@redhat.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the Red Hat, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _DYNAR_SIMPLE_LEX_H_
+#define _DYNAR_SIMPLE_LEX_H_
+
+#include "dynar.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct dynar_simple_lex {
+	struct dynar token;
+	struct dynar *input;
+	size_t pos;
+};
+
+extern void	 	 dynar_simple_lex_init(struct dynar_simple_lex *lex, struct dynar *input);
+
+extern void	 	 dynar_simple_lex_destroy(struct dynar_simple_lex *lex);
+
+extern struct dynar	*dynar_simple_lex_token_next(struct dynar_simple_lex *lex);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _DYNAR_SIMPLE_LEX_H_ */

+ 7 - 7
qdevices/nss-sock.c

@@ -149,14 +149,14 @@ nss_sock_create_listen_socket(const char *hostname, uint16_t port, PRIntn af)
 			if (af == PR_AF_UNSPEC || addr.raw.family == af) {
 			if (af == PR_AF_UNSPEC || addr.raw.family == af) {
 				sock = nss_sock_create_socket(addr.raw.family, 1);
 				sock = nss_sock_create_socket(addr.raw.family, 1);
 				if (sock == NULL) {
 				if (sock == NULL) {
-					continue ;
+					continue;
 				}
 				}
 
 
 				if (PR_Bind(sock, &addr) != PR_SUCCESS) {
 				if (PR_Bind(sock, &addr) != PR_SUCCESS) {
 					PR_Close(sock);
 					PR_Close(sock);
 					sock = NULL;
 					sock = NULL;
 
 
-					continue ;
+					continue;
 				}
 				}
 
 
 				/*
 				/*
@@ -210,12 +210,12 @@ nss_sock_create_client_socket(const char *hostname, uint16_t port, PRIntn af,
 
 
 	while ((addr_iter = PR_EnumerateAddrInfo(addr_iter, addr_info, port, &addr)) != NULL) {
 	while ((addr_iter = PR_EnumerateAddrInfo(addr_iter, addr_info, port, &addr)) != NULL) {
 		if (af != PR_AF_UNSPEC && addr.raw.family != af) {
 		if (af != PR_AF_UNSPEC && addr.raw.family != af) {
-			continue ;
+			continue;
 		}
 		}
 
 
 		sock = nss_sock_create_socket(addr.raw.family, 0);
 		sock = nss_sock_create_socket(addr.raw.family, 0);
 		if (sock == NULL) {
 		if (sock == NULL) {
-			continue ;
+			continue;
 		}
 		}
 
 
 		if ((res = PR_Connect(sock, &addr, timeout)) != PR_SUCCESS) {
 		if ((res = PR_Connect(sock, &addr, timeout)) != PR_SUCCESS) {
@@ -294,18 +294,18 @@ nss_sock_non_blocking_client_try_next(struct nss_sock_non_blocking_client *clien
 	while ((client->addr_iter = PR_EnumerateAddrInfo(client->addr_iter, client->addr_info,
 	while ((client->addr_iter = PR_EnumerateAddrInfo(client->addr_iter, client->addr_info,
 	    client->port, &addr)) != NULL) {
 	    client->port, &addr)) != NULL) {
 		if (client->af != PR_AF_UNSPEC && addr.raw.family != client->af) {
 		if (client->af != PR_AF_UNSPEC && addr.raw.family != client->af) {
-			continue ;
+			continue;
 		}
 		}
 
 
 		client->socket = nss_sock_create_socket(addr.raw.family, 0);
 		client->socket = nss_sock_create_socket(addr.raw.family, 0);
 		if (client->socket == NULL) {
 		if (client->socket == NULL) {
-			continue ;
+			continue;
 		}
 		}
 
 
 		if (nss_sock_set_non_blocking(client->socket) == -1) {
 		if (nss_sock_set_non_blocking(client->socket) == -1) {
 			PR_Close(client->socket);
 			PR_Close(client->socket);
 			client->socket = NULL;
 			client->socket = NULL;
-			continue ;
+			continue;
 		}
 		}
 
 
 		res = PR_Connect(client->socket, &addr, PR_INTERVAL_NO_TIMEOUT);
 		res = PR_Connect(client->socket, &addr, PR_INTERVAL_NO_TIMEOUT);

+ 3 - 0
qdevices/pr-poll-array.c

@@ -114,7 +114,10 @@ pr_poll_array_add(struct pr_poll_array *poll_array, PRPollDesc **pfds, void **us
 	}
 	}
 
 
 	*pfds = &poll_array->array[pr_poll_array_size(poll_array)];
 	*pfds = &poll_array->array[pr_poll_array_size(poll_array)];
+	memset(*pfds, 0, sizeof(**pfds));
+
 	*user_data = poll_array->user_data_array + (poll_array->items * poll_array->user_data_size);
 	*user_data = poll_array->user_data_array + (poll_array->items * poll_array->user_data_size);
+	memset(*user_data, 0, poll_array->user_data_size);
 
 
 	poll_array->items++;
 	poll_array->items++;
 
 

+ 74 - 0
qdevices/qdevice-ipc.c

@@ -36,6 +36,7 @@
 #include "qdevice-ipc.h"
 #include "qdevice-ipc.h"
 #include "qdevice-log.h"
 #include "qdevice-log.h"
 #include "unix-socket-ipc.h"
 #include "unix-socket-ipc.h"
+#include "dynar-simple-lex.h"
 
 
 int
 int
 qdevice_ipc_init(struct qdevice_instance *instance)
 qdevice_ipc_init(struct qdevice_instance *instance)
@@ -86,9 +87,82 @@ qdevice_ipc_accept(struct qdevice_instance *instance, struct unix_socket_client
 		res = -1;
 		res = -1;
 		break;
 		break;
 	default:
 	default:
+		unix_socket_client_read_line(*res_client, 1);
 		res = 0;
 		res = 0;
 		break;
 		break;
 	}
 	}
 
 
+
 	return (res);
 	return (res);
 }
 }
+
+void
+qdevice_ipc_client_disconnect(struct qdevice_instance *instance, struct unix_socket_client *client)
+{
+
+	unix_socket_ipc_client_disconnect(&instance->local_ipc, client);
+}
+
+static void
+qdevice_ipc_parse_line(struct qdevice_instance *instance, struct unix_socket_client *client)
+{
+	struct dynar_simple_lex lex;
+	struct dynar *token;
+	char *str;
+
+	dynar_simple_lex_init(&lex, &client->receive_buffer);
+	token = dynar_simple_lex_token_next(&lex);
+
+	if (token == NULL) {
+		qdevice_log(LOG_ERR, "Can't alloc memory for simple lex");
+
+		return;
+	}
+
+	str = dynar_data(token);
+	if (strcasecmp(str, "") == 0) {
+		qdevice_log(LOG_DEBUG, "IPC client error: No command specified");
+		// SEND ERROR
+	} else if (strcasecmp(str, "shutdown") == 0) {
+		qdevice_log(LOG_DEBUG, "IPC client requested shutdown");
+		// Send output?
+	} else if (strcasecmp(str, "status") == 0) {
+		qdevice_log(LOG_DEBUG, "IPC client requested status display");
+		// Send output
+	} else {
+		qdevice_log(LOG_DEBUG, "IPC client sent unknown command");
+		// Send output
+	}
+
+	dynar_simple_lex_destroy(&lex);
+}
+
+void
+qdevice_ipc_io_read(struct qdevice_instance *instance, struct unix_socket_client *client)
+{
+	int res;
+
+	res = unix_socket_client_io_read(client);
+
+	switch (res) {
+	case 0:
+		/*
+		 * Partial read
+		 */
+		break;
+	case -1:
+		qdevice_log(LOG_DEBUG, "IPC client closed connection");
+		client->schedule_disconnect = 1;
+		break;
+	case -2:
+		qdevice_log(LOG_ERR, "Can't store message from IPC client. Disconnecting client");
+		client->schedule_disconnect = 1;
+		break;
+	case 1:
+		/*
+		 * Full message received
+		 */
+		qdevice_ipc_parse_line(instance, client);
+		break;
+	}
+}

+ 6 - 0
qdevices/qdevice-ipc.h

@@ -48,6 +48,12 @@ extern int		qdevice_ipc_destroy(struct qdevice_instance *instance);
 extern int		qdevice_ipc_accept(struct qdevice_instance *instance,
 extern int		qdevice_ipc_accept(struct qdevice_instance *instance,
     struct unix_socket_client **res_client);
     struct unix_socket_client **res_client);
 
 
+extern void		qdevice_ipc_client_disconnect(struct qdevice_instance *instance,
+    struct unix_socket_client *client);
+
+extern void		qdevice_ipc_io_read(struct qdevice_instance *instance,
+    struct unix_socket_client *client);
+
 #ifdef __cplusplus
 #ifdef __cplusplus
 }
 }
 #endif
 #endif

+ 1 - 1
qdevices/qdevice-model-net.c

@@ -172,7 +172,7 @@ qdevice_model_net_run(struct qdevice_instance *instance)
 			qdevice_log(LOG_CRIT, "Can't schedule connect timer");
 			qdevice_log(LOG_CRIT, "Can't schedule connect timer");
 
 
 			try_connect = 0;
 			try_connect = 0;
-			break ;
+			break;
 		}
 		}
 
 
 		qdevice_log(LOG_DEBUG, "Trying connect to qnetd server %s:%u (timeout = %ums)",
 		qdevice_log(LOG_DEBUG, "Trying connect to qnetd server %s:%u (timeout = %ums)",

+ 8 - 3
qdevices/qdevice-net-instance.c

@@ -36,6 +36,7 @@
 #include "qdevice-net-instance.h"
 #include "qdevice-net-instance.h"
 #include "qnet-config.h"
 #include "qnet-config.h"
 #include "utils.h"
 #include "utils.h"
+#include "qdevice-net-poll-array-user-data.h"
 
 
 /*
 /*
  * Needed for creating nspr handle from unix fd
  * Needed for creating nspr handle from unix fd
@@ -82,6 +83,8 @@ qdevice_net_instance_init(struct qdevice_net_instance *instance, size_t initial_
 
 
 	timer_list_init(&instance->main_timer_list);
 	timer_list_init(&instance->main_timer_list);
 
 
+	pr_poll_array_init(&instance->poll_array, sizeof(struct qdevice_net_poll_array_user_data));
+
 	instance->tls_supported = tls_supported;
 	instance->tls_supported = tls_supported;
 
 
 	if ((instance->cmap_poll_fd = PR_CreateSocketPollFd(cmap_fd)) == NULL) {
 	if ((instance->cmap_poll_fd = PR_CreateSocketPollFd(cmap_fd)) == NULL) {
@@ -94,8 +97,8 @@ qdevice_net_instance_init(struct qdevice_net_instance *instance, size_t initial_
 		return (-1);
 		return (-1);
 	}
 	}
 
 
-	if ((instance->local_socket_poll_fd = PR_CreateSocketPollFd(local_socket_fd)) == NULL) {
-		qdevice_log_nss(LOG_CRIT, "Can't create NSPR local socket poll fd");
+	if ((instance->ipc_socket_poll_fd = PR_CreateSocketPollFd(local_socket_fd)) == NULL) {
+		qdevice_log_nss(LOG_CRIT, "Can't create NSPR IPC socket poll fd");
 		return (-1);
 		return (-1);
 	}
 	}
 
 
@@ -128,6 +131,8 @@ qdevice_net_instance_destroy(struct qdevice_net_instance *instance)
 
 
 	send_buffer_list_free(&instance->send_buffer_list);
 	send_buffer_list_free(&instance->send_buffer_list);
 
 
+	pr_poll_array_destroy(&instance->poll_array);
+
 	timer_list_free(&instance->main_timer_list);
 	timer_list_free(&instance->main_timer_list);
 
 
 	free((void *)instance->cluster_name);
 	free((void *)instance->cluster_name);
@@ -141,7 +146,7 @@ qdevice_net_instance_destroy(struct qdevice_net_instance *instance)
 		qdevice_log_nss(LOG_WARNING, "Unable to close votequorum connection fd");
 		qdevice_log_nss(LOG_WARNING, "Unable to close votequorum connection fd");
 	}
 	}
 
 
-	if (PR_DestroySocketPollFd(instance->local_socket_poll_fd) != PR_SUCCESS) {
+	if (PR_DestroySocketPollFd(instance->ipc_socket_poll_fd) != PR_SUCCESS) {
 		qdevice_log_nss(LOG_WARNING, "Unable to close local socket poll fd");
 		qdevice_log_nss(LOG_WARNING, "Unable to close local socket poll fd");
 	}
 	}
 
 

+ 5 - 2
qdevices/qdevice-net-instance.h

@@ -46,10 +46,11 @@
 
 
 #include "dynar.h"
 #include "dynar.h"
 #include "node-list.h"
 #include "node-list.h"
+#include "pr-poll-array.h"
+#include "qdevice-net-disconnect-reason.h"
 #include "send-buffer-list.h"
 #include "send-buffer-list.h"
 #include "tlv.h"
 #include "tlv.h"
 #include "timer-list.h"
 #include "timer-list.h"
-#include "qdevice-net-disconnect-reason.h"
 
 
 #ifdef __cplusplus
 #ifdef __cplusplus
 extern "C" {
 extern "C" {
@@ -57,6 +58,7 @@ extern "C" {
 
 
 enum qdevice_net_instance_state {
 enum qdevice_net_instance_state {
 	QDEVICE_NET_INSTANCE_STATE_WAITING_CONNECT,
 	QDEVICE_NET_INSTANCE_STATE_WAITING_CONNECT,
+	QDEVICE_NET_INSTANCE_STATE_SENDING_PREINIT_REPLY,
 	QDEVICE_NET_INSTANCE_STATE_WAITING_PREINIT_REPLY,
 	QDEVICE_NET_INSTANCE_STATE_WAITING_PREINIT_REPLY,
 	QDEVICE_NET_INSTANCE_STATE_WAITING_STARTTLS_BEING_SENT,
 	QDEVICE_NET_INSTANCE_STATE_WAITING_STARTTLS_BEING_SENT,
 	QDEVICE_NET_INSTANCE_STATE_WAITING_INIT_REPLY,
 	QDEVICE_NET_INSTANCE_STATE_WAITING_INIT_REPLY,
@@ -94,7 +96,7 @@ struct qdevice_net_instance {
 	int schedule_disconnect;
 	int schedule_disconnect;
 	PRFileDesc *votequorum_poll_fd;
 	PRFileDesc *votequorum_poll_fd;
 	PRFileDesc *cmap_poll_fd;
 	PRFileDesc *cmap_poll_fd;
-	PRFileDesc *local_socket_poll_fd;
+	PRFileDesc *ipc_socket_poll_fd;
 	struct tlv_ring_id last_sent_ring_id;
 	struct tlv_ring_id last_sent_ring_id;
 	struct tlv_tie_breaker tie_breaker;
 	struct tlv_tie_breaker tie_breaker;
 	void *algorithm_data;
 	void *algorithm_data;
@@ -103,6 +105,7 @@ struct qdevice_net_instance {
 	struct nss_sock_non_blocking_client non_blocking_client;
 	struct nss_sock_non_blocking_client non_blocking_client;
 	struct timer_list_entry *connect_timer;
 	struct timer_list_entry *connect_timer;
 	int force_ip_version;
 	int force_ip_version;
+	struct pr_poll_array poll_array;
 };
 };
 
 
 extern int		qdevice_net_instance_init(struct qdevice_net_instance *instance,
 extern int		qdevice_net_instance_init(struct qdevice_net_instance *instance,

+ 61 - 0
qdevices/qdevice-net-poll-array-user-data.h

@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2015-2016 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Jan Friesse (jfriesse@redhat.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the Red Hat, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _QDEVICE_NET_POLL_ARRAY_USER_DATA_H_
+#define _QDEVICE_NET_POLL_ARRAY_USER_DATA_H_
+
+#include "unix-socket-client.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+enum qdevice_net_poll_array_user_data_type {
+	QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_VOTEQUORUM,
+	QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_CMAP,
+	QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_IPC_SOCKET,
+	QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_SOCKET,
+	QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_IPC_CLIENT,
+};
+
+struct qdevice_net_poll_array_user_data {
+	enum qdevice_net_poll_array_user_data_type type;
+	struct unix_socket_client *ipc_client;
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _QDEVICE_NET_POLL_ARRAY_USER_DATA_H_ */

+ 133 - 49
qdevices/qdevice-net-poll.c

@@ -39,20 +39,13 @@
 #include "qdevice-net-socket.h"
 #include "qdevice-net-socket.h"
 #include "qdevice-votequorum.h"
 #include "qdevice-votequorum.h"
 #include "qdevice-ipc.h"
 #include "qdevice-ipc.h"
+#include "qdevice-net-poll-array-user-data.h"
 
 
 /*
 /*
  * Needed for creating nspr handle from unix fd
  * Needed for creating nspr handle from unix fd
  */
  */
 #include <private/pprio.h>
 #include <private/pprio.h>
 
 
-enum qdevice_net_poll_pfd {
-	QDEVICE_NET_POLL_VOTEQUORUM,
-	QDEVICE_NET_POLL_CMAP,
-	QDEVICE_NET_POLL_LOCAL_SOCKET,
-	QDEVICE_NET_POLL_SOCKET,
-	QDEVICE_NET_POLL_MAX_PFDS
-};
-
 static void
 static void
 qdevice_net_poll_read_socket(struct qdevice_net_instance *instance)
 qdevice_net_poll_read_socket(struct qdevice_net_instance *instance)
 {
 {
@@ -110,6 +103,8 @@ qdevice_net_poll_write_socket(struct qdevice_net_instance *instance, const PRPol
 			nss_sock_non_blocking_client_destroy(&instance->non_blocking_client);
 			nss_sock_non_blocking_client_destroy(&instance->non_blocking_client);
 			instance->non_blocking_client.socket = NULL;
 			instance->non_blocking_client.socket = NULL;
 
 
+			instance->state = QDEVICE_NET_INSTANCE_STATE_SENDING_PREINIT_REPLY;
+
 			qdevice_log(LOG_DEBUG, "Sending preinit msg to qnetd");
 			qdevice_log(LOG_DEBUG, "Sending preinit msg to qnetd");
 			if (qdevice_net_send_preinit(instance) != 0) {
 			if (qdevice_net_send_preinit(instance) != 0) {
 				instance->disconnect_reason = QDEVICE_NET_DISCONNECT_REASON_CANT_ALLOCATE_MSG_BUFFER;
 				instance->disconnect_reason = QDEVICE_NET_DISCONNECT_REASON_CANT_ALLOCATE_MSG_BUFFER;
@@ -150,7 +145,7 @@ qdevice_net_poll_err_socket(struct qdevice_net_instance *instance, const PRPollD
 }
 }
 
 
 static void
 static void
-qdevice_net_poll_read_local_socket(struct qdevice_net_instance *instance)
+qdevice_net_poll_read_ipc_socket(struct qdevice_net_instance *instance)
 {
 {
 	struct unix_socket_client *client;
 	struct unix_socket_client *client;
 	PRFileDesc *prfd;
 	PRFileDesc *prfd;
@@ -161,7 +156,9 @@ qdevice_net_poll_read_local_socket(struct qdevice_net_instance *instance)
 
 
 	prfd = PR_CreateSocketPollFd(client->socket);
 	prfd = PR_CreateSocketPollFd(client->socket);
 	if (prfd == NULL) {
 	if (prfd == NULL) {
-		qdevice_log_nss(LOG_CRIT, "Can't create NSPR poll fd for IPC client");
+		qdevice_log_nss(LOG_CRIT, "Can't create NSPR poll fd for IPC client. "
+		    "Disconnecting client");
+		qdevice_ipc_client_disconnect(instance->qdevice_instance_ptr, client);
 
 
 		return ;
 		return ;
 	}
 	}
@@ -169,60 +166,134 @@ qdevice_net_poll_read_local_socket(struct qdevice_net_instance *instance)
 	client->user_data = (void *)prfd;
 	client->user_data = (void *)prfd;
 }
 }
 
 
-int
-qdevice_net_poll(struct qdevice_net_instance *instance)
+static void
+qdevice_net_poll_write_ipc_client(struct qdevice_net_instance *instance, struct unix_socket_client *client)
 {
 {
-	PRPollDesc pfds[QDEVICE_NET_POLL_MAX_PFDS];
-	PRInt32 poll_res;
-	PRIntn no_pfds;
-	int i;
+}
 
 
-	no_pfds = 0;
 
 
-	pfds[QDEVICE_NET_POLL_VOTEQUORUM].fd = instance->votequorum_poll_fd;
-	pfds[QDEVICE_NET_POLL_VOTEQUORUM].in_flags = PR_POLL_READ;
-	no_pfds++;
+static PRPollDesc *
+qdevice_net_pr_poll_array_create(struct qdevice_net_instance *instance)
+{
+	struct pr_poll_array *poll_array;
+	PRPollDesc *poll_desc;
+	struct qdevice_net_poll_array_user_data *user_data;
+	struct unix_socket_client *ipc_client;
+	const struct unix_socket_client_list *ipc_client_list;
+
+	poll_array = &instance->poll_array;
+	ipc_client_list = &instance->qdevice_instance_ptr->local_ipc.clients;
+
+	pr_poll_array_clean(poll_array);
 
 
-	pfds[QDEVICE_NET_POLL_CMAP].fd = instance->cmap_poll_fd;
-	pfds[QDEVICE_NET_POLL_CMAP].in_flags = PR_POLL_READ;
-	no_pfds++;
+	if (pr_poll_array_add(poll_array, &poll_desc, (void **)&user_data) < 0) {
+		return (NULL);
+	}
+	poll_desc->fd = instance->votequorum_poll_fd;
+	poll_desc->in_flags = PR_POLL_READ;
+	user_data->type = QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_VOTEQUORUM;
 
 
-	pfds[QDEVICE_NET_POLL_LOCAL_SOCKET].fd = instance->local_socket_poll_fd;
-	pfds[QDEVICE_NET_POLL_LOCAL_SOCKET].in_flags = PR_POLL_READ;
-	no_pfds++;
+	if (pr_poll_array_add(poll_array, &poll_desc, (void **)&user_data) < 0) {
+		return (NULL);
+	}
+	poll_desc->fd = instance->cmap_poll_fd;
+	poll_desc->in_flags = PR_POLL_READ;
+	user_data->type = QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_CMAP;
 
 
-	if (instance->state == QDEVICE_NET_INSTANCE_STATE_WAITING_CONNECT &&
+	if (pr_poll_array_add(poll_array, &poll_desc, (void **)&user_data) < 0) {
+		return (NULL);
+	}
+	poll_desc->fd = instance->ipc_socket_poll_fd;
+	poll_desc->in_flags = PR_POLL_READ;
+	user_data->type = QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_IPC_SOCKET;
+
+	if (instance->state != QDEVICE_NET_INSTANCE_STATE_WAITING_CONNECT ||
 	    !instance->non_blocking_client.destroyed) {
 	    !instance->non_blocking_client.destroyed) {
-		pfds[QDEVICE_NET_POLL_SOCKET].fd = instance->non_blocking_client.socket;
-		pfds[QDEVICE_NET_POLL_SOCKET].in_flags = PR_POLL_WRITE | PR_POLL_EXCEPT;
-		no_pfds++;
-	} else {
-		pfds[QDEVICE_NET_POLL_SOCKET].fd = instance->socket;
-		pfds[QDEVICE_NET_POLL_SOCKET].in_flags = PR_POLL_READ;
-		if (!send_buffer_list_empty(&instance->send_buffer_list)) {
-			pfds[QDEVICE_NET_POLL_SOCKET].in_flags |= PR_POLL_WRITE;
+		if (pr_poll_array_add(poll_array, &poll_desc, (void **)&user_data) < 0) {
+			return (NULL);
+		}
+
+		user_data->type = QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_SOCKET;
+
+		if (instance->state == QDEVICE_NET_INSTANCE_STATE_WAITING_CONNECT) {
+			poll_desc->fd = instance->non_blocking_client.socket;
+			poll_desc->in_flags = PR_POLL_WRITE | PR_POLL_EXCEPT;
+		} else {
+			poll_desc->fd = instance->socket;
+			poll_desc->in_flags = PR_POLL_READ;
+
+			if (!send_buffer_list_empty(&instance->send_buffer_list)) {
+				poll_desc->in_flags |= PR_POLL_WRITE;
+			}
 		}
 		}
-		no_pfds++;
+	}
+
+	TAILQ_FOREACH(ipc_client, ipc_client_list, entries) {
+		if (!ipc_client->reading_line && !ipc_client->writing_buffer) {
+			continue;
+		}
+
+		if (pr_poll_array_add(poll_array, &poll_desc, (void **)&user_data) < 0) {
+			return (NULL);
+		}
+
+		poll_desc->fd = ipc_client->user_data;
+		if (ipc_client->reading_line) {
+			poll_desc->in_flags |= PR_POLL_READ;
+		}
+
+		if (ipc_client->writing_buffer) {
+			poll_desc->in_flags |= PR_POLL_WRITE;
+		}
+
+		user_data->type = QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_IPC_CLIENT;
+		user_data->ipc_client = ipc_client;
+	}
+
+	pr_poll_array_gc(poll_array);
+
+	return (poll_array->array);
+}
+
+int
+qdevice_net_poll(struct qdevice_net_instance *instance)
+{
+	PRPollDesc *pfds;
+	PRInt32 poll_res;
+	ssize_t i;
+	struct qdevice_net_poll_array_user_data *user_data;
+	struct unix_socket_client *ipc_client;
+
+	pfds = qdevice_net_pr_poll_array_create(instance);
+	if (pfds == NULL) {
+		return (-1);
 	}
 	}
 
 
 	instance->schedule_disconnect = 0;
 	instance->schedule_disconnect = 0;
 
 
-	if ((poll_res = PR_Poll(pfds, no_pfds,
+	if ((poll_res = PR_Poll(pfds, pr_poll_array_size(&instance->poll_array),
 	    timer_list_time_to_expire(&instance->main_timer_list))) > 0) {
 	    timer_list_time_to_expire(&instance->main_timer_list))) > 0) {
-		for (i = 0; i < no_pfds; i++) {
+		for (i = 0; i < pr_poll_array_size(&instance->poll_array); i++) {
+			user_data = pr_poll_array_get_user_data(&instance->poll_array, i);
+
+			ipc_client = user_data->ipc_client;
+
 			if (pfds[i].out_flags & PR_POLL_READ) {
 			if (pfds[i].out_flags & PR_POLL_READ) {
-				switch (i) {
-				case QDEVICE_NET_POLL_SOCKET:
+				switch (user_data->type) {
+				case QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_SOCKET:
 					qdevice_net_poll_read_socket(instance);
 					qdevice_net_poll_read_socket(instance);
 					break;
 					break;
-				case QDEVICE_NET_POLL_VOTEQUORUM:
+				case QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_VOTEQUORUM:
 					qdevice_net_poll_read_votequorum(instance);
 					qdevice_net_poll_read_votequorum(instance);
 					break;
 					break;
-				case QDEVICE_NET_POLL_CMAP:
+				case QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_CMAP:
 					qdevice_net_poll_read_cmap(instance);
 					qdevice_net_poll_read_cmap(instance);
 					break;
 					break;
-				case QDEVICE_NET_POLL_LOCAL_SOCKET:
-					qdevice_net_poll_read_local_socket(instance);
+				case QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_IPC_SOCKET:
+					qdevice_net_poll_read_ipc_socket(instance);
+					break;
+				case QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_IPC_CLIENT:
+					qdevice_ipc_io_read(instance->qdevice_instance_ptr, ipc_client);
 					break;
 					break;
 				default:
 				default:
 					qdevice_log(LOG_CRIT, "Unhandled read on poll descriptor %u", i);
 					qdevice_log(LOG_CRIT, "Unhandled read on poll descriptor %u", i);
@@ -232,10 +303,13 @@ qdevice_net_poll(struct qdevice_net_instance *instance)
 			}
 			}
 
 
 			if (!instance->schedule_disconnect && pfds[i].out_flags & PR_POLL_WRITE) {
 			if (!instance->schedule_disconnect && pfds[i].out_flags & PR_POLL_WRITE) {
-				switch (i) {
-				case QDEVICE_NET_POLL_SOCKET:
+				switch (user_data->type) {
+				case QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_SOCKET:
 					qdevice_net_poll_write_socket(instance, &pfds[i]);
 					qdevice_net_poll_write_socket(instance, &pfds[i]);
 					break;
 					break;
+				case QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_IPC_CLIENT:
+					qdevice_net_poll_write_ipc_client(instance, ipc_client);
+					break;
 				default:
 				default:
 					qdevice_log(LOG_CRIT, "Unhandled write on poll descriptor %u", i);
 					qdevice_log(LOG_CRIT, "Unhandled write on poll descriptor %u", i);
 					exit(1);
 					exit(1);
@@ -246,11 +320,11 @@ qdevice_net_poll(struct qdevice_net_instance *instance)
 			if (!instance->schedule_disconnect &&
 			if (!instance->schedule_disconnect &&
 			    (pfds[i].out_flags & (PR_POLL_ERR|PR_POLL_NVAL|PR_POLL_HUP|PR_POLL_EXCEPT)) &&
 			    (pfds[i].out_flags & (PR_POLL_ERR|PR_POLL_NVAL|PR_POLL_HUP|PR_POLL_EXCEPT)) &&
 			    !(pfds[i].out_flags & (PR_POLL_READ|PR_POLL_WRITE))) {
 			    !(pfds[i].out_flags & (PR_POLL_READ|PR_POLL_WRITE))) {
-				switch (i) {
-				case QDEVICE_NET_POLL_SOCKET:
+				switch (user_data->type) {
+				case QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_SOCKET:
 					qdevice_net_poll_err_socket(instance, &pfds[i]);
 					qdevice_net_poll_err_socket(instance, &pfds[i]);
 					break;
 					break;
-				case QDEVICE_NET_POLL_LOCAL_SOCKET:
+				case QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_IPC_SOCKET:
 					if (pfds[i].out_flags != PR_POLL_NVAL) {
 					if (pfds[i].out_flags != PR_POLL_NVAL) {
 						qdevice_log(LOG_CRIT, "POLLERR (%u) on local socket",
 						qdevice_log(LOG_CRIT, "POLLERR (%u) on local socket",
 						    pfds[i].out_flags);
 						    pfds[i].out_flags);
@@ -262,12 +336,22 @@ qdevice_net_poll(struct qdevice_net_instance *instance)
 						    QDEVICE_NET_DISCONNECT_REASON_LOCAL_SOCKET_CLOSED;
 						    QDEVICE_NET_DISCONNECT_REASON_LOCAL_SOCKET_CLOSED;
 					}
 					}
 					break;
 					break;
+				case QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_IPC_CLIENT:
+					qdevice_log(LOG_DEBUG, "POLL_ERR (%u) on ipc client socket. "
+					    "Disconnecting.",  pfds[i].out_flags);
+					ipc_client->schedule_disconnect = 1;
+					break;
 				default:
 				default:
 					qdevice_log(LOG_CRIT, "Unhandled error on poll descriptor %u", i);
 					qdevice_log(LOG_CRIT, "Unhandled error on poll descriptor %u", i);
 					exit(1);
 					exit(1);
 					break;
 					break;
 				}
 				}
 			}
 			}
+
+			if (user_data->type == QDEVICE_NET_POLL_ARRAY_USER_DATA_TYPE_IPC_CLIENT &&
+			    ipc_client->schedule_disconnect) {
+				qdevice_ipc_client_disconnect(instance->qdevice_instance_ptr, ipc_client);
+			}
 		}
 		}
 	}
 	}
 
 

+ 1 - 1
qdevices/qnetd-dpd-timer.c

@@ -45,7 +45,7 @@ qnetd_dpd_timer_cb(void *data1, void *data2)
 
 
 	TAILQ_FOREACH(client, &instance->clients, entries) {
 	TAILQ_FOREACH(client, &instance->clients, entries) {
 		if (!client->init_received) {
 		if (!client->init_received) {
-			continue ;
+			continue;
 		}
 		}
 
 
 		client->dpd_time_since_last_check += QNETD_DPD_INTERVAL;
 		client->dpd_time_since_last_check += QNETD_DPD_INTERVAL;

+ 1 - 1
qdevices/timer-list.c

@@ -98,7 +98,7 @@ timer_list_insert_into_list(struct timer_list *tlist, struct timer_list_entry *n
 			 */
 			 */
 			TAILQ_INSERT_BEFORE(entry, new_entry, entries);
 			TAILQ_INSERT_BEFORE(entry, new_entry, entries);
 
 
-			break ;
+			break;
 		}
 		}
 
 
 		entry = TAILQ_NEXT(entry, entries);
 		entry = TAILQ_NEXT(entry, entries);

+ 55 - 0
qdevices/unix-socket-client.c

@@ -35,6 +35,9 @@
 #include <string.h>
 #include <string.h>
 
 
 #include "unix-socket-client.h"
 #include "unix-socket-client.h"
+#include "unix-socket.h"
+
+#define UNIX_SOCKET_CLIENT_BUFFER	2
 
 
 void
 void
 unix_socket_client_init(struct unix_socket_client *client, int sock, size_t max_receive_size,
 unix_socket_client_init(struct unix_socket_client *client, int sock, size_t max_receive_size,
@@ -55,3 +58,55 @@ unix_socket_client_destroy(struct unix_socket_client *client)
 	dynar_destroy(&client->send_buffer);
 	dynar_destroy(&client->send_buffer);
 	dynar_destroy(&client->receive_buffer);
 	dynar_destroy(&client->receive_buffer);
 }
 }
+
+void
+unix_socket_client_read_line(struct unix_socket_client *client, int enabled)
+{
+
+	client->reading_line = enabled;
+}
+
+void
+unix_socket_client_write_buffer(struct unix_socket_client *client, int enabled)
+{
+
+	client->writing_buffer = enabled;
+}
+
+/*
+ *  1 Full line readed
+ *  0 Partial read (no error)
+ * -1 End of connection
+ * -2 Buffer too long
+ */
+int
+unix_socket_client_io_read(struct unix_socket_client *client)
+{
+	char buf[UNIX_SOCKET_CLIENT_BUFFER];
+	ssize_t readed;
+	int res;
+	size_t zi;
+
+	res = 0;
+	readed = unix_socket_read(client->socket, buf, sizeof(buf));
+	if (readed > 0) {
+		client->msg_already_received_bytes += readed;
+		if (dynar_cat(&client->receive_buffer, buf, readed) == -1) {
+			res = -2;
+			goto exit_err;
+		}
+
+		for (zi = 0; zi < readed; zi++) {
+			if (buf[zi] == '\n') {
+				res = 1;
+			}
+		}
+	}
+
+	if (readed == 0) {
+		res = -1;
+	}
+
+exit_err:
+	return (res);
+}

+ 8 - 0
qdevices/unix-socket-client.h

@@ -52,6 +52,8 @@ struct unix_socket_client {
 	struct dynar send_buffer;
 	struct dynar send_buffer;
 	size_t msg_already_received_bytes;
 	size_t msg_already_received_bytes;
 	size_t msg_already_sent_bytes;
 	size_t msg_already_sent_bytes;
+	int reading_line;
+	int writing_buffer;
 	int schedule_disconnect;
 	int schedule_disconnect;
 	void *user_data;
 	void *user_data;
 	TAILQ_ENTRY(unix_socket_client) entries;
 	TAILQ_ENTRY(unix_socket_client) entries;
@@ -62,6 +64,12 @@ extern void		unix_socket_client_init(struct unix_socket_client *client, int sock
 
 
 extern void		unix_socket_client_destroy(struct unix_socket_client *client);
 extern void		unix_socket_client_destroy(struct unix_socket_client *client);
 
 
+extern void		unix_socket_client_read_line(struct unix_socket_client *client, int enabled);
+
+extern void		unix_socket_client_write_buffer(struct unix_socket_client *client, int enabled);
+
+extern int		unix_socket_client_io_read(struct unix_socket_client *client);
+
 #ifdef __cplusplus
 #ifdef __cplusplus
 }
 }
 #endif
 #endif

+ 8 - 0
qdevices/unix-socket-ipc.c

@@ -125,3 +125,11 @@ unix_socket_ipc_accept(struct unix_socket_ipc *ipc, struct unix_socket_client **
 
 
 	return (0);
 	return (0);
 }
 }
+
+void
+unix_socket_ipc_client_disconnect(struct unix_socket_ipc *ipc, struct unix_socket_client *client)
+{
+
+	unix_socket_close(client->socket);
+	unix_socket_client_list_del(&ipc->clients, client);
+}

+ 3 - 0
qdevices/unix-socket-ipc.h

@@ -61,6 +61,9 @@ extern int		unix_socket_ipc_destroy(struct unix_socket_ipc *ipc);
 extern int		unix_socket_ipc_accept(struct unix_socket_ipc *ipc,
 extern int		unix_socket_ipc_accept(struct unix_socket_ipc *ipc,
     struct unix_socket_client **res_client);
     struct unix_socket_client **res_client);
 
 
+void			unix_socket_ipc_client_disconnect(struct unix_socket_ipc *ipc,
+    struct unix_socket_client *client);
+
 #ifdef __cplusplus
 #ifdef __cplusplus
 }
 }
 #endif
 #endif

+ 7 - 0
qdevices/unix-socket.c

@@ -150,3 +150,10 @@ unix_socket_close(int sock)
 
 
 	return (close(sock));
 	return (close(sock));
 }
 }
+
+ssize_t
+unix_socket_read(int sock, void *buf, size_t len)
+{
+
+	return (recv(sock, buf, len, 0));
+}

+ 2 - 0
qdevices/unix-socket.h

@@ -50,6 +50,8 @@ extern int		unix_socket_server_accept(int sock, int non_blocking);
 
 
 extern int		unix_socket_close(int sock);
 extern int		unix_socket_close(int sock);
 
 
+extern ssize_t		unix_socket_read(int sock, void *buf, size_t len);
+
 #ifdef __cplusplus
 #ifdef __cplusplus
 }
 }
 #endif
 #endif