Răsfoiți Sursa

libqb: remove worker thread - keep to one thread.

Signed-off-by: Angus Salkeld <asalkeld@redhat.com>
Reviewed-by: Steven Dake <sdake@redhat.com>
Angus Salkeld 15 ani în urmă
părinte
comite
78e06739b7
5 a modificat fișierele cu 5 adăugiri și 398 ștergeri
  1. 3 3
      exec/Makefile.am
  2. 0 1
      exec/totemsrp.c
  3. 2 112
      exec/totemudp.c
  4. 0 215
      exec/wthread.c
  5. 0 67
      exec/wthread.h

+ 3 - 3
exec/Makefile.am

@@ -37,12 +37,12 @@ INCLUDES		= -I$(top_builddir)/include -I$(top_srcdir)/include $(nss_CFLAGS) $(rd
 
 
 TOTEM_SRC		= totemip.c totemnet.c totemudp.c \
 TOTEM_SRC		= totemip.c totemnet.c totemudp.c \
 			  totemudpu.c totemrrp.c totemsrp.c totemmrp.c \
 			  totemudpu.c totemrrp.c totemsrp.c totemmrp.c \
-			  totempg.c crypto.c wthread.c tsafe.c
+			  totempg.c crypto.c tsafe.c
 if BUILD_RDMA
 if BUILD_RDMA
 TOTEM_SRC		+= totemiba.c
 TOTEM_SRC		+= totemiba.c
 endif
 endif
 
 
-LOGSYS_SRC		= wthread.c logsys.c
+LOGSYS_SRC		= logsys.c
 LCRSO_SRC		= objdb.c vsf_ykd.c coroparse.c vsf_quorum.c
 LCRSO_SRC		= objdb.c vsf_ykd.c coroparse.c vsf_quorum.c
 LCRSO_OBJS		= $(LCRSO_SRC:%.c=%.o)
 LCRSO_OBJS		= $(LCRSO_SRC:%.c=%.o)
 LCRSO			= $(LCRSO_SRC:%.c=%.lcrso)
 LCRSO			= $(LCRSO_SRC:%.c=%.lcrso)
@@ -70,7 +70,7 @@ SHARED_LIBS_SO_TWO	= $(SHARED_LIBS:%.so.$(SONAME)=%.so.$(SOMAJOR))
 noinst_HEADERS		= apidef.h crypto.h mainconfig.h main.h tsafe.h \
 noinst_HEADERS		= apidef.h crypto.h mainconfig.h main.h tsafe.h \
 			  quorum.h service.h sync.h timer.h totemconfig.h \
 			  quorum.h service.h sync.h timer.h totemconfig.h \
 			  totemmrp.h totemnet.h totemudp.h totemiba.h totemrrp.h \
 			  totemmrp.h totemnet.h totemudp.h totemiba.h totemrrp.h \
-			  totemudpu.h totemsrp.h util.h vsf.h wthread.h schedwrk.h \
+			  totemudpu.h totemsrp.h util.h vsf.h schedwrk.h \
 			  evil.h syncv2.h fsm.h
 			  evil.h syncv2.h fsm.h
 
 
 EXTRA_DIST		= $(LCRSO_SRC)
 EXTRA_DIST		= $(LCRSO_SRC)

+ 0 - 1
exec/totemsrp.c

@@ -90,7 +90,6 @@
 #include "totemsrp.h"
 #include "totemsrp.h"
 #include "totemrrp.h"
 #include "totemrrp.h"
 #include "totemnet.h"
 #include "totemnet.h"
-#include "wthread.h"
 
 
 #include "crypto.h"
 #include "crypto.h"
 
 

+ 2 - 112
exec/totemudp.c

@@ -67,7 +67,6 @@
 #define LOGSYS_UTILS_ONLY 1
 #define LOGSYS_UTILS_ONLY 1
 #include <corosync/engine/logsys.h>
 #include <corosync/engine/logsys.h>
 #include "totemudp.h"
 #include "totemudp.h"
-#include "wthread.h"
 
 
 #include "crypto.h"
 #include "crypto.h"
 #include "util.h"
 #include "util.h"
@@ -131,8 +130,6 @@ struct totemudp_instance {
 
 
 	int netif_bind_state;
 	int netif_bind_state;
 
 
-	struct worker_thread_group worker_thread_group;
-
 	void *context;
 	void *context;
 
 
 	void (*totemudp_deliver_fn) (
 	void (*totemudp_deliver_fn) (
@@ -1051,83 +1048,6 @@ static inline void mcast_sendmsg (
 	}
 	}
 }
 }
 
 
-static void totemudp_mcast_thread_state_constructor (
-	void *totemudp_mcast_thread_state_in)
-{
-	struct totemudp_mcast_thread_state *totemudp_mcast_thread_state =
-		(struct totemudp_mcast_thread_state *)totemudp_mcast_thread_state_in;
-	memset (totemudp_mcast_thread_state, 0,
-		sizeof (*totemudp_mcast_thread_state));
-
-	rng_make_prng (128, PRNG_SOBER,
-		&totemudp_mcast_thread_state->prng_state, NULL);
-}
-
-
-static void totemudp_mcast_worker_fn (void *thread_state, void *work_item_in)
-{
-	struct work_item *work_item = (struct work_item *)work_item_in;
-	struct totemudp_mcast_thread_state *totemudp_mcast_thread_state =
-		(struct totemudp_mcast_thread_state *)thread_state;
-	struct totemudp_instance *instance = work_item->instance;
-	struct msghdr msg_mcast;
-	unsigned char sheader[sizeof (struct security_header)];
-	int res = 0;
-	size_t buf_len;
-	struct iovec iovec_enc[2];
-	struct iovec iovec;
-	struct sockaddr_storage sockaddr;
-	int addrlen;
-
-	if (instance->totem_config->secauth == 1) {
-		iovec_enc[0].iov_base = (void *)sheader;
-		iovec_enc[0].iov_len = sizeof (struct security_header);
-		iovec_enc[1].iov_base = (void *)work_item->msg;
-		iovec_enc[1].iov_len = work_item->msg_len;
-
-		/*
-		 * Encrypt and digest the message
-		 */
-		encrypt_and_sign_worker (
-			instance,
-			totemudp_mcast_thread_state->iobuf,
-			&buf_len,
-			iovec_enc, 2);
-
-		iovec.iov_base = (void *)totemudp_mcast_thread_state->iobuf;
-		iovec.iov_len = buf_len;
-	} else {
-		iovec.iov_base = (void *)work_item->msg;
-		iovec.iov_len = work_item->msg_len;
-	}
-
-	totemip_totemip_to_sockaddr_convert(&instance->mcast_address,
-		instance->totem_interface->ip_port, &sockaddr, &addrlen);
-
-	msg_mcast.msg_name = &sockaddr;
-	msg_mcast.msg_namelen = addrlen;
-	msg_mcast.msg_iov = &iovec;
-	msg_mcast.msg_iovlen = 1;
-#if !defined(COROSYNC_SOLARIS)
-	msg_mcast.msg_control = 0;
-	msg_mcast.msg_controllen = 0;
-	msg_mcast.msg_flags = 0;
-#else
-	msg_mcast.msg_accrights = NULL;
-	msg_mcast.msg_accrightslen = 0;
-#endif
-
-	/*
-	 * Transmit multicast message
-	 * An error here is recovered by totemudp
-	 */
-	res = sendmsg (instance->totemudp_sockets.mcast_send, &msg_mcast,
-		MSG_NOSIGNAL);
-	if (res < 0) {
-		LOGSYS_PERROR (errno, instance->totemudp_log_level_debug,
-			"sendmsg(mcast) failed (non-critical)");
-	}
-}
 
 
 int totemudp_finalize (
 int totemudp_finalize (
 	void *udp_context)
 	void *udp_context)
@@ -1135,8 +1055,6 @@ int totemudp_finalize (
 	struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
 	struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
 	int res = 0;
 	int res = 0;
 
 
-	worker_thread_group_exit (&instance->worker_thread_group);
-
 	if (instance->totemudp_sockets.mcast_recv > 0) {
 	if (instance->totemudp_sockets.mcast_recv > 0) {
 		close (instance->totemudp_sockets.mcast_recv);
 		close (instance->totemudp_sockets.mcast_recv);
 	 	qb_loop_poll_del (instance->totemudp_poll_handle,
 	 	qb_loop_poll_del (instance->totemudp_poll_handle,
@@ -1798,19 +1716,6 @@ int totemudp_initialize (
 	totemip_copy (&instance->mcast_address, &instance->totem_interface->mcast_addr);
 	totemip_copy (&instance->mcast_address, &instance->totem_interface->mcast_addr);
 	memset (instance->iov_buffer, 0, FRAME_SIZE_MAX);
 	memset (instance->iov_buffer, 0, FRAME_SIZE_MAX);
 
 
-	/*
-	* If threaded send requested, initialize thread group data structure
-	*/
-	if (totem_config->threads) {
-		worker_thread_group_init (
-			&instance->worker_thread_group,
-			totem_config->threads, 128,
-			sizeof (struct work_item),
-			sizeof (struct totemudp_mcast_thread_state),
-			totemudp_mcast_thread_state_constructor,
-			totemudp_mcast_worker_fn);
-	}
-
 	instance->totemudp_poll_handle = poll_handle;
 	instance->totemudp_poll_handle = poll_handle;
 
 
 	instance->totem_interface->bindnet.nodeid = instance->totem_config->node_id;
 	instance->totem_interface->bindnet.nodeid = instance->totem_config->node_id;
@@ -1898,12 +1803,7 @@ int totemudp_recv_flush (void *udp_context)
 
 
 int totemudp_send_flush (void *udp_context)
 int totemudp_send_flush (void *udp_context)
 {
 {
-	struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
-	int res = 0;
-
-	worker_thread_group_wait (&instance->worker_thread_group);
-
-	return (res);
+	return 0;
 }
 }
 
 
 int totemudp_token_send (
 int totemudp_token_send (
@@ -1937,19 +1837,9 @@ int totemudp_mcast_noflush_send (
 	unsigned int msg_len)
 	unsigned int msg_len)
 {
 {
 	struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
 	struct totemudp_instance *instance = (struct totemudp_instance *)udp_context;
-	struct work_item work_item;
 	int res = 0;
 	int res = 0;
 
 
-	if (instance->totem_config->threads) {
-		work_item.msg = msg;
-		work_item.msg_len = msg_len;
-		work_item.instance = instance;
-
-		worker_thread_group_work_add (&instance->worker_thread_group,
-			&work_item);
-	} else {
-		mcast_sendmsg (instance, msg, msg_len);
-	}
+	mcast_sendmsg (instance, msg, msg_len);
 
 
 	return (res);
 	return (res);
 }
 }

+ 0 - 215
exec/wthread.c

@@ -1,215 +0,0 @@
-/*
- * Copyright (c) 2005 MontaVista Software, Inc.
- * Copyright (c) 2006, 2009 Red Hat, Inc.
- *
- * All rights reserved.
- *
- * Author: Steven Dake (sdake@redhat.com)
- *
- * This software licensed under BSD license, the text of which follows:
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * - Redistributions of source code must retain the above copyright notice,
- *   this list of conditions and the following disclaimer.
- * - Redistributions in binary form must reproduce the above copyright notice,
- *   this list of conditions and the following disclaimer in the documentation
- *   and/or other materials provided with the distribution.
- * - Neither the name of the MontaVista Software, Inc. nor the names of its
- *   contributors may be used to endorse or promote products derived from this
- *   software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
- * THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-/*
- * Add work to a work group and have threads process the work
- * Provide blocking for all work to complete
- */
-
-#include <config.h>
-
-#include <stdlib.h>
-#include <pthread.h>
-#include <errno.h>
-#include <corosync/cs_queue.h>
-
-#include "wthread.h"
-
-struct thread_data {
-	void *thread_state;
-	void *data;
-};
-
-struct worker_thread_t {
-	struct worker_thread_group *worker_thread_group;
-	pthread_mutex_t new_work_mutex;
-	pthread_cond_t new_work_cond;
-	pthread_cond_t cond;
-	pthread_mutex_t done_work_mutex;
-	pthread_cond_t done_work_cond;
-	pthread_t thread_id;
-	struct cs_queue queue;
-	void *thread_state;
-	struct thread_data thread_data;
-};
-
-static void *start_worker_thread (void *thread_data_in) {
-	struct thread_data *thread_data = (struct thread_data *)thread_data_in;
-	struct worker_thread_t *worker_thread =
-		(struct worker_thread_t *)thread_data->data;
-	void *data_for_worker_fn;
-
-	for (;;) {
-		pthread_mutex_lock (&worker_thread->new_work_mutex);
-		if (cs_queue_is_empty (&worker_thread->queue) == 1) {
-		pthread_cond_wait (&worker_thread->new_work_cond,
-			&worker_thread->new_work_mutex);
-		}
-
-		/*
-		 * We unlock then relock the new_work_mutex to allow the
-		 * worker function to execute and also allow new work to be
-		 * added to the work queue
-	  	 */
-		data_for_worker_fn = cs_queue_item_get (&worker_thread->queue);
-		pthread_mutex_unlock (&worker_thread->new_work_mutex);
-		worker_thread->worker_thread_group->worker_fn (worker_thread->thread_state, data_for_worker_fn);
-		pthread_mutex_lock (&worker_thread->new_work_mutex);
-		cs_queue_item_remove (&worker_thread->queue);
-		pthread_mutex_unlock (&worker_thread->new_work_mutex);
-		pthread_mutex_lock (&worker_thread->done_work_mutex);
-		if (cs_queue_is_empty (&worker_thread->queue) == 1) {
-			pthread_cond_signal (&worker_thread->done_work_cond);
-		}
-		pthread_mutex_unlock (&worker_thread->done_work_mutex);
-	}
-	return (NULL);
-}
-
-int worker_thread_group_init (
-	struct worker_thread_group *worker_thread_group,
-	int threads,
-	int items_max,
-	int item_size,
-	int thread_state_size,
-	void (*thread_state_constructor)(void *),
-	void (*worker_fn)(void *thread_state, void *work_item))
-{
-	int i;
-
-	worker_thread_group->threadcount = threads;
-	worker_thread_group->last_scheduled = 0;
-	worker_thread_group->worker_fn = worker_fn;
-	worker_thread_group->threads = malloc (sizeof (struct worker_thread_t) *
-		threads);
-	if (worker_thread_group->threads == 0) {
-		return (-1);
-	}
-
-	for (i = 0; i < threads; i++) {
-		if (thread_state_size) {
-			worker_thread_group->threads[i].thread_state = malloc (thread_state_size);
-		} else {
-			worker_thread_group->threads[i].thread_state = NULL;
-		}
-		if (thread_state_constructor) {
-			thread_state_constructor (worker_thread_group->threads[i].thread_state);
-		}
-		worker_thread_group->threads[i].worker_thread_group = worker_thread_group;
-		pthread_mutex_init (&worker_thread_group->threads[i].new_work_mutex, NULL);
-		pthread_cond_init (&worker_thread_group->threads[i].new_work_cond, NULL);
-		pthread_mutex_init (&worker_thread_group->threads[i].done_work_mutex, NULL);
-		pthread_cond_init (&worker_thread_group->threads[i].done_work_cond, NULL);
-		cs_queue_init (&worker_thread_group->threads[i].queue, items_max,
-			item_size);
-
-		worker_thread_group->threads[i].thread_data.thread_state =
-			worker_thread_group->threads[i].thread_state;
-		worker_thread_group->threads[i].thread_data.data = &worker_thread_group->threads[i];
-		pthread_create (&worker_thread_group->threads[i].thread_id,
-			NULL, start_worker_thread, &worker_thread_group->threads[i].thread_data);
-	}
-	return (0);
-}
-
-int worker_thread_group_work_add (
-	struct worker_thread_group *worker_thread_group,
-	void *item)
-{
-	int schedule;
-
-	schedule = (worker_thread_group->last_scheduled + 1) % (worker_thread_group->threadcount);
-	worker_thread_group->last_scheduled = schedule;
-
-	pthread_mutex_lock (&worker_thread_group->threads[schedule].new_work_mutex);
-	if (cs_queue_is_full (&worker_thread_group->threads[schedule].queue)) {
-		pthread_mutex_unlock (&worker_thread_group->threads[schedule].new_work_mutex);
-		return (-1);
-	}
-	cs_queue_item_add (&worker_thread_group->threads[schedule].queue, item);
-	pthread_cond_signal (&worker_thread_group->threads[schedule].new_work_cond);
-	pthread_mutex_unlock (&worker_thread_group->threads[schedule].new_work_mutex);
-	return (0);
-}
-
-void worker_thread_group_wait (
-	struct worker_thread_group *worker_thread_group)
-{
-	int i;
-
-	for (i = 0; i < worker_thread_group->threadcount; i++) {
-		pthread_mutex_lock (&worker_thread_group->threads[i].done_work_mutex);
-		if (cs_queue_is_empty (&worker_thread_group->threads[i].queue) == 0) {
-			pthread_cond_wait (&worker_thread_group->threads[i].done_work_cond,
-				&worker_thread_group->threads[i].done_work_mutex);
-		}
-		pthread_mutex_unlock (&worker_thread_group->threads[i].done_work_mutex);
-	}
-}
-
-void worker_thread_group_exit (
-	struct worker_thread_group *worker_thread_group)
-{
-	int i;
-
-	for (i = 0; i < worker_thread_group->threadcount; i++) {
-		pthread_cancel (worker_thread_group->threads[i].thread_id);
-
-		/* Wait for worker thread to exit gracefully before destroying
-		 * mutexes and processing items in the queue etc.
-		 */
-		pthread_join (worker_thread_group->threads[i].thread_id, NULL);
-		pthread_mutex_destroy (&worker_thread_group->threads[i].new_work_mutex);
-		pthread_cond_destroy (&worker_thread_group->threads[i].new_work_cond);
-		pthread_mutex_destroy (&worker_thread_group->threads[i].done_work_mutex);
-		pthread_cond_destroy (&worker_thread_group->threads[i].done_work_cond);
-	}
-}
-void worker_thread_group_atsegv (
-	struct worker_thread_group *worker_thread_group)
-{
-	void *data_for_worker_fn;
-	struct worker_thread_t *worker_thread;
-	unsigned int i;
-
-	for (i = 0; i < worker_thread_group->threadcount; i++) {
-		worker_thread = &worker_thread_group->threads[i];
-		while (cs_queue_is_empty (&worker_thread->queue) == 0) {
-			data_for_worker_fn = cs_queue_item_get (&worker_thread->queue);
-			worker_thread->worker_thread_group->worker_fn (worker_thread->thread_state, data_for_worker_fn);
-			cs_queue_item_remove (&worker_thread->queue);
-		}
-	}
-}

+ 0 - 67
exec/wthread.h

@@ -1,67 +0,0 @@
-/*
- * Copyright (c) 2005 MontaVista Software, Inc.
- *
- * All rights reserved.
- *
- * Author: Steven Dake (sdake@redhat.com)
- *
- * This software licensed under BSD license, the text of which follows:
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * - Redistributions of source code must retain the above copyright notice,
- *   this list of conditions and the following disclaimer.
- * - Redistributions in binary form must reproduce the above copyright notice,
- *   this list of conditions and the following disclaimer in the documentation
- *   and/or other materials provided with the distribution.
- * - Neither the name of the MontaVista Software, Inc. nor the names of its
- *   contributors may be used to endorse or promote products derived from this
- *   software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
- * THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-#ifndef CONFIG_WTHREAD_H_DEFINED
-#define CONFIG_WTHREAD_H_DEFINED
-
-struct worker_thread_group {
-	int threadcount;
-	int last_scheduled;
-	struct worker_thread_t *threads;
-	void (*worker_fn) (void *thread_state, void *work_item);
-};
-
-extern int worker_thread_group_init (
-	struct worker_thread_group *worker_thread_group,
-	int threads,
-	int items_max,
-	int item_size,
-	int thread_state_size,
-	void (*thread_state_constructor)(void *),
-	void (*worker_fn)(void *thread_state, void *work_item));
-
-extern int worker_thread_group_work_add (
-	struct worker_thread_group *worker_thread_group,
-	void *item);
-
-extern void worker_thread_group_wait (
-	struct worker_thread_group *worker_thread_group);
-
-extern void worker_thread_group_exit (
-	struct worker_thread_group *worker_thread_group);
-
-extern void worker_thread_group_atsegv (
-	struct worker_thread_group *worker_thread_group);
-
-#endif /* WTHREAD_H_DEFINED */