Преглед изворни кода

non-blocking syslog and file logging support

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1176 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake пре 19 година
родитељ
комит
fa98ebf4c8
3 измењених фајлова са 124 додато и 24 уклоњено
  1. 101 20
      exec/print.c
  2. 22 3
      exec/wthread.c
  3. 1 1
      exec/wthread.h

+ 101 - 20
exec/print.c

@@ -2,8 +2,12 @@
  * Copyright (c) 2002-2004 MontaVista Software, Inc.
  * Copyright (c) 2006 Ericsson AB.
  *
- * Author: Steven Dake (sdake@mvista.com)
- *         Hans Feldt
+ * Author: Steven Dake (sdake@redhat.com)
+ *    original work, Add worker thread to avoid blocking syslog
+ *
+ * Author: Hans Feldt
+ *      Added support for runtime installed loggers, tags tracing,
+ *    and file & line printing.
  *
  * All rights reserved.
  *
@@ -50,16 +54,26 @@
 #endif
 #include <syslog.h>
 #include <stdlib.h>
+#include <pthread.h>
 
 #include "print.h"
 #include "totemip.h"
 #include "../include/saAis.h"
 #include "mainconfig.h"
+#include "wthread.h"
 
 static unsigned int logmode = LOG_MODE_BUFFER | LOG_MODE_STDERR | LOG_MODE_SYSLOG;
+
 static char *logfile = 0;
+
 static int log_setup_called;
 
+static pthread_mutex_t log_mode_mutex;
+
+static struct worker_thread_group log_thread_group;
+
+static unsigned int dropped_log_entries = 0;
+
 #ifndef MAX_LOGGERS
 #define MAX_LOGGERS 32
 #endif
@@ -67,8 +81,7 @@ struct logger loggers[MAX_LOGGERS];
 
 static FILE *log_file_fp = 0;
 
-struct log_entry
-{
+struct log_entry {
 	char *file;
 	int line;
 	int level;
@@ -77,8 +90,15 @@ struct log_entry
 };
 
 static struct log_entry *head;
+
 static struct log_entry *tail;
 
+struct log_data {
+	unsigned int syslog_pos;
+	unsigned int level;
+	char *log_string;
+};
+
 static int logger_init (const char *ident, int tags, int level, int mode)
 {
 	int i;
@@ -128,9 +148,31 @@ static void buffered_log_printf (char *file, int line, int level,
 	vsnprintf(entry->str, sizeof(entry->str), format, ap);
 }
 
+static void log_printf_worker_fn (void *thread_data, void *work_item)
+{
+	struct log_data *log_data = (struct log_data *)work_item;
+
+	/*
+	 * Output the log data
+	 */
+	if (logmode & LOG_MODE_FILE && log_file_fp != 0) {
+		fprintf (log_file_fp, "%s", log_data->log_string);
+		fflush (log_file_fp);
+	}
+	if (logmode & LOG_MODE_STDERR) {
+		fprintf (stderr, "%s", log_data->log_string);
+		fflush (stdout);
+	}
+
+	if (logmode & LOG_MODE_SYSLOG) {
+		syslog (log_data->level, &log_data->log_string[log_data->syslog_pos]);
+	}
+	free (log_data->log_string);
+}
+
 static void _log_printf (char *file, int line,
-						 int level, int id,
-						 char *format, va_list ap)
+	int level, int id,
+	char *format, va_list ap)
 {
 	char newstring[4096];
 	char log_string[4096];
@@ -138,14 +180,18 @@ static void _log_printf (char *file, int line,
 	struct timeval tv;
 	int i = 0;
 	int len;
+	struct log_data log_data;
+	unsigned int res = 0;
 
 	assert (id < MAX_LOGGERS);
 
+	pthread_mutex_lock (&log_mode_mutex);
 	/*
 	** Buffer before log_setup() has been called.
 	*/
 	if (logmode & LOG_MODE_BUFFER) {
 		buffered_log_printf(file, line, level, format, ap);
+		pthread_mutex_unlock (&log_mode_mutex);
 		return;
 	}
 
@@ -162,7 +208,18 @@ static void _log_printf (char *file, int line,
 	} else {	
 		sprintf (&newstring[i], "[%-5s] %s", loggers[id].ident, format);
 	}
-	len = vsprintf (log_string, newstring, ap);
+	if (dropped_log_entries) {
+		/*
+		 * Get rid of \n if there is one
+		 */
+		if (newstring[strlen (newstring) - 1] == '\n') {
+			newstring[strlen (newstring) - 1] = '\0';
+		}
+		len = sprintf (log_string,
+			"%s - prior to this log entry, openais logger dropped '%d' messages because of overflow.", newstring, dropped_log_entries + 1);
+	} else {
+		len = vsprintf (log_string, newstring, ap);
+	}
 
 	/*
 	** add line feed if not done yet
@@ -173,20 +230,32 @@ static void _log_printf (char *file, int line,
 	}
 
 	/*
-	 * Output the log data
+	 * Create work thread data
 	 */
-	if (logmode & LOG_MODE_FILE && log_file_fp != 0) {
-		fprintf (log_file_fp, "%s", log_string);
-		fflush (log_file_fp);
+	log_data.syslog_pos = i;
+	log_data.level = level;
+	log_data.log_string = strdup (log_string);
+	if (log_data.log_string == NULL) {
+		goto drop_log_msg;
 	}
-	if (logmode & LOG_MODE_STDERR) {
-		fprintf (stderr, "%s", log_string);
+	
+	if (log_setup_called) {
+		res = worker_thread_group_work_add (&log_thread_group, &log_data);
+		if (res == 0) {
+			dropped_log_entries = 0;
+		} else {
+			dropped_log_entries += 1;
+		}
+	} else {
+		log_printf_worker_fn (NULL, &log_data);	
 	}
-	fflush (stdout);
 
-	if (logmode & LOG_MODE_SYSLOG) {
-		syslog (level, &log_string[i]);
-	}
+	pthread_mutex_unlock (&log_mode_mutex);
+	return;
+
+drop_log_msg:
+	dropped_log_entries++;
+	pthread_mutex_unlock (&log_mode_mutex);
 }
 
 int _log_init (const char *ident)
@@ -219,7 +288,9 @@ int log_setup (char **error_string, struct main_config *config)
 		}
 	}
 
+	pthread_mutex_lock (&log_mode_mutex);
 	logmode = config->logmode;
+	pthread_mutex_unlock (&log_mode_mutex);
 	logfile = config->logfile;
 
 	if (config->logmode & LOG_MODE_SYSLOG) {
@@ -246,13 +317,23 @@ int log_setup (char **error_string, struct main_config *config)
 			config->logger[i].level = LOG_LEVEL_INFO;
 		config->logger[i].tags |= TAG_LOG;
 		logger_init (config->logger[i].ident,
-					 config->logger[i].tags,
-					 config->logger[i].level,
-					 config->logger[i].mode);
+			config->logger[i].tags,
+			config->logger[i].level,
+			config->logger[i].mode);
 	}
 
 	log_setup_called = 1;
 
+	worker_thread_group_init (
+		&log_thread_group,
+		1,
+		1024,
+		sizeof (struct log_data),
+		0,
+		NULL,
+		log_printf_worker_fn);
+
+
 	/*
 	** Flush what we have buffered
 	*/

+ 22 - 3
exec/wthread.c

@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2005 MontaVista Software, Inc.
+ * Copyright (c) 2006 Red Hat, Inc.
  *
  * All rights reserved.
  *
@@ -75,8 +76,15 @@ void *worker_thread (void *thread_data_in) {
 			&worker_thread->new_work_mutex);
 		}
 
+		/*
+		 * We unlock then relock the new_work_mutex to allow the
+		 * worker function to execute and also allow new work to be
+		 * added to the work queue
+	  	 */
 		data_for_worker_fn = queue_item_get (&worker_thread->queue);
+		pthread_mutex_unlock (&worker_thread->new_work_mutex);
 		worker_thread->worker_thread_group->worker_fn (orf_token_mcast_thread_state, data_for_worker_fn);
+		pthread_mutex_lock (&worker_thread->new_work_mutex);
 		queue_item_remove (&worker_thread->queue);
 		pthread_mutex_unlock (&worker_thread->new_work_mutex);
 		pthread_mutex_lock (&worker_thread->done_work_mutex);
@@ -109,8 +117,14 @@ int worker_thread_group_init (
 	}
 
 	for (i = 0; i < threads; i++) {
-		worker_thread_group->threads[i].thread_state = malloc (thread_state_size);
-		thread_state_constructor (worker_thread_group->threads[i].thread_state);
+		if (thread_state_size) {
+			worker_thread_group->threads[i].thread_state = malloc (thread_state_size);
+		} else {
+			worker_thread_group->threads[i].thread_state = NULL;
+		}
+		if (thread_state_constructor) {
+			thread_state_constructor (worker_thread_group->threads[i].thread_state);
+		}
 		worker_thread_group->threads[i].worker_thread_group = worker_thread_group;
 		pthread_mutex_init (&worker_thread_group->threads[i].new_work_mutex, NULL);
 		pthread_cond_init (&worker_thread_group->threads[i].new_work_cond, NULL);
@@ -128,7 +142,7 @@ int worker_thread_group_init (
 	return (0);
 }
 
-void worker_thread_group_work_add (
+int worker_thread_group_work_add (
 	struct worker_thread_group *worker_thread_group,
 	void *item)
 {
@@ -138,9 +152,14 @@ void worker_thread_group_work_add (
 	worker_thread_group->last_scheduled = schedule;
 
 	pthread_mutex_lock (&worker_thread_group->threads[schedule].new_work_mutex);
+	if (queue_is_full (&worker_thread_group->threads[schedule].queue)) {
+		pthread_mutex_unlock (&worker_thread_group->threads[schedule].new_work_mutex);
+		return (-1);
+	}
 	queue_item_add (&worker_thread_group->threads[schedule].queue, item);
 	pthread_cond_signal (&worker_thread_group->threads[schedule].new_work_cond);
 	pthread_mutex_unlock (&worker_thread_group->threads[schedule].new_work_mutex);
+	return (0);
 }
 
 void worker_thread_group_wait (

+ 1 - 1
exec/wthread.h

@@ -51,7 +51,7 @@ extern int worker_thread_group_init (
 	void (*thread_state_constructor)(void *),
 	void (*worker_fn)(void *thread_state, void *work_item));
 
-extern void worker_thread_group_work_add (
+extern int worker_thread_group_work_add (
 	struct worker_thread_group *worker_thread_group,
 	void *item);