Просмотр исходного кода

Add passive monitoring support to AMF.

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1386 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake 18 лет назад
Родитель
Сommit
6377455cd7
7 измененных файлов с 418 добавлено и 10 удалено
  1. 1 2
      README.amf
  2. 95 0
      exec/amf.c
  3. 30 0
      exec/amf.h
  4. 272 8
      exec/amfcomp.c
  5. 1 0
      include/saAmf.h
  6. 1 0
      lib/amf.c
  7. 18 0
      test/testamf1.c

+ 1 - 2
README.amf

@@ -379,8 +379,7 @@ Chapter:				Deviation:
 3.7.6 No Redundancy Model		Not supported.
 3.7.7 The Effect of Administrative...	Not supported.
 3.9 Dependencies Among SIs, Compone.. 	Not supported.
-3.11 Component Monitoring		• Passive Monitoring: Not supported.
-					• External Active Monitoring:
+3.11 Component Monitoring		• External Active Monitoring:
 					  Not supported.
 3.12.1.1 Error Detection		AMF does not support that a component
 					reports an error for another component.

+ 95 - 0
exec/amf.c

@@ -1925,12 +1925,77 @@ static void message_handler_req_lib_amf_pmstart (
 	void *conn,
 	void *msg)
 {
+	struct req_lib_amf_pmstart *req_lib = msg;
+	struct res_lib_amf_pmstart  res_lib;
+	struct amf_comp *comp;
+	SaAisErrorT error = SA_AIS_OK;
+
+	TRACE2("PmStart msg: '%s', %llu %d %d %d",
+				req_lib->compName.value, 
+				req_lib->processId,
+				req_lib->descendentsTreeDepth,
+				req_lib->pmErrors,
+				req_lib->recommendedRecovery);
+
+	comp = amf_comp_find (amf_cluster, &req_lib->compName);
+
+	if (comp != NULL) {
+		comp->conn = conn;
+
+		error = amf_comp_pm_start (comp, req_lib->processId,
+								   req_lib->descendentsTreeDepth,
+								   req_lib->pmErrors,
+								   req_lib->recommendedRecovery);
+	} else {
+		log_printf (LOG_ERR, "PmStart: Component '%s' not found",
+					req_lib->compName.value);
+		error = SA_AIS_ERR_NOT_EXIST;
+	}
+
+	res_lib.header.id = MESSAGE_RES_AMF_PMSTART;
+	res_lib.header.size = sizeof (res_lib);
+	res_lib.header.error = error;
+	openais_conn_send_response (conn, &res_lib,
+								sizeof (struct res_lib_amf_pmstart));
+
 }
 
 static void message_handler_req_lib_amf_pmstop (
 	void *conn,
 	void *msg)
 {
+	struct req_lib_amf_pmstop *req_lib = msg;
+	struct res_lib_amf_pmstop  res_lib;
+	struct amf_comp *comp;
+	SaAisErrorT error = SA_AIS_OK;
+
+	TRACE2 ("PmStop msg: '%s', %llu %d %d %d",
+			req_lib->compName.value, 
+			req_lib->processId,
+			req_lib->stopQualifier,
+			req_lib->pmErrors);
+
+	comp = amf_comp_find (amf_cluster, &req_lib->compName);
+
+	if (comp != NULL) {
+		comp->conn = conn;
+
+		error = amf_comp_pm_stop (comp,
+								  req_lib->stopQualifier,
+								  req_lib->processId,
+								  req_lib->pmErrors);
+	} else {
+		log_printf (LOG_ERR, "PmStop: Component '%s' not found",
+					req_lib->compName.value);
+		error = SA_AIS_ERR_NOT_EXIST;
+	}
+
+	res_lib.header.id = MESSAGE_RES_AMF_PMSTOP;
+	res_lib.header.size = sizeof (res_lib);
+	res_lib.header.error = error;
+	openais_conn_send_response (conn, &res_lib,
+								sizeof (struct res_lib_amf_pmstop));
+
 }
 
 static void message_handler_req_lib_amf_healthcheckstart (
@@ -2153,6 +2218,36 @@ static void message_handler_req_lib_amf_protectiongrouptrackstop (
 #endif
 }
 
+/**
+ * multicast a message out reporting a component error
+ * (to be called by passive monitoring)
+ */
+void mcast_error_report_from_pm (
+    struct amf_comp *comp,
+    SaAmfRecommendedRecoveryT recommendedRecovery)
+{
+	struct req_exec_amf_comp_error_report req_exec;
+	struct iovec iovec;
+	SaNameT erroneous_comp_name;
+
+	amf_comp_dn_make(comp, &erroneous_comp_name);
+
+	TRACE2("%s %s",comp->name.value, erroneous_comp_name.value);
+
+	req_exec.header.size = sizeof (struct req_exec_amf_comp_error_report);
+	req_exec.header.id = SERVICE_ID_MAKE (AMF_SERVICE,
+			MESSAGE_REQ_EXEC_AMF_COMPONENT_ERROR_REPORT);
+
+	memcpy (&req_exec.erroneousComponent, &erroneous_comp_name, sizeof (SaNameT));
+	memcpy (&req_exec.recommendedRecovery, &recommendedRecovery, sizeof (SaAmfRecommendedRecoveryT));
+
+	iovec.iov_base = (char *)&req_exec;
+	iovec.iov_len = sizeof (req_exec);
+
+	totempg_groups_mcast_joined (openais_group_handle, 
+								 &iovec, 1, TOTEMPG_AGREED);
+
+}
 
 static void message_handler_req_lib_amf_componenterrorreport (
 	void *conn,

+ 30 - 0
exec/amf.h

@@ -206,6 +206,7 @@ typedef enum {
 struct amf_si_assignment;
 struct amf_csi_assignment;
 struct amf_healthcheck;
+struct amf_pm;
 
 typedef enum {
 	CLUSTER_AC_UNINSTANTIATED = 1,
@@ -453,6 +454,7 @@ typedef struct amf_comp {
 	void *conn;
 	enum clc_component_types comptype;
 	struct amf_healthcheck *healthcheck_head;
+	struct list_head pm_head;
 	poll_timer_handle instantiate_timeout_handle;
 	poll_timer_handle cleanup_timeout_handle;
 	/*
@@ -481,6 +483,19 @@ typedef struct amf_healthcheck {
 
 } amf_healthcheck_t;
 
+
+typedef struct amf_pm {
+       /* Configuration Attributes */
+       SaUint64T pid;
+       SaAmfPmErrorsT errors;
+       SaAmfRecommendedRecoveryT recovery;
+
+       /* Implementation */
+	   struct list_head entry;
+       poll_timer_handle timer_handle_period;
+} amf_pm_t;
+
+
 typedef struct amf_si {
 	/* Configuration Attributes */
 	SaNameT name;
@@ -977,6 +992,17 @@ extern SaAisErrorT amf_comp_healthcheck_start (
 extern SaAisErrorT amf_comp_healthcheck_stop (
 	struct amf_comp *comp,
 	SaAmfHealthcheckKeyT *healthcheckKey);
+extern SaAisErrorT amf_comp_pm_start (
+	struct amf_comp *comp,
+	SaUint64T pid,
+	SaInt32T depth,
+	SaAmfPmErrorsT pmErrors,
+	SaAmfRecommendedRecoveryT recommendedRecovery);
+extern SaAisErrorT amf_comp_pm_stop (
+	struct amf_comp *comp,
+	SaAmfPmStopQualifierT stopQualifier,
+	SaInt64T pid,
+	SaAmfPmErrorsT pmErrors);
 extern SaAisErrorT amf_comp_register (struct amf_comp *comp);
 extern void amf_comp_unregister (struct amf_comp *comp);
 extern void amf_comp_error_report (
@@ -1009,6 +1035,10 @@ extern void amf_comp_error_suspected_clear (amf_comp_t *comp);
 extern void amf_comp_error_suspected_set (amf_comp_t *comp);
 extern int amf_comp_is_error_suspected (amf_comp_t *comp);
 
+extern void mcast_error_report_from_pm (
+    struct amf_comp *comp,
+    SaAmfRecommendedRecoveryT recommendedRecovery);
+
 /*===========================================================================*/
 /* amfsi.c */
 

+ 272 - 8
exec/amfcomp.c

@@ -132,6 +132,7 @@
 #include <stdlib.h>
 #include <errno.h>
 #include <assert.h>
+#include <dirent.h>
 
 #include "../include/saAis.h"
 #include "../include/saAmf.h"
@@ -785,6 +786,7 @@ struct amf_comp *amf_comp_new(struct amf_su *su, char *name)
 	setSaNameT (&comp->name, name);
 	comp->instantiate_timeout_handle = 0;
 	comp->cleanup_timeout_handle = 0;
+	list_init(&comp->pm_head);
 	return comp;
 }
 
@@ -1297,14 +1299,19 @@ void amf_comp_error_report (struct amf_comp *comp, amf_comp_t* reporting_comp,
 	SaAmfRecommendedRecoveryT recommendedRecovery)
 {
 	struct res_lib_amf_componenterrorreport res_lib;
-	TRACE2("Exec comp error report on comp'%s' from %s", comp->name.value, 
-		reporting_comp->name.value );
-	 
-	if (amf_su_is_local (reporting_comp->su)) {
-		res_lib.header.size = sizeof (struct res_lib_amf_componenterrorreport);
-		res_lib.header.id = MESSAGE_RES_AMF_COMPONENTERRORREPORT;
-		res_lib.header.error = SA_AIS_OK;
-		openais_conn_send_response (reporting_comp->conn, &res_lib, sizeof (res_lib));
+
+	if (reporting_comp != NULL) {
+		TRACE2("Exec comp error report on comp'%s' from %s", comp->name.value, 
+			   reporting_comp->name.value );
+
+		if (amf_su_is_local (reporting_comp->su)) {
+			res_lib.header.size = sizeof (struct res_lib_amf_componenterrorreport);
+			res_lib.header.id = MESSAGE_RES_AMF_COMPONENTERRORREPORT;
+			res_lib.header.error = SA_AIS_OK;
+			openais_conn_send_response (reporting_comp->conn, &res_lib, sizeof (res_lib));
+		}
+	} else {
+		TRACE2("Exec comp error report on comp'%s' from AMF", comp->name.value);
 	}
 
     /* Report to SU and let it handle the problem */
@@ -1463,6 +1470,261 @@ void amf_comp_cleanup_completed (struct amf_comp *comp)
 	}
 }
 
+/**
+ * go through the pids for this component and
+ * check the existence of of /proc/<pid>/stat
+ */
+static void timer_function_pm_fn (void *data)
+{
+	struct amf_comp *comp = (struct amf_comp *)data;
+	struct amf_pm    *pm = NULL;
+	struct list_head *pmlist = NULL;
+	struct list_head *next = NULL;
+	SaBoolT reported = SA_FALSE;
+	char f[30];
+
+	assert (comp);
+	/* we are going to ignore the pmErrors
+	 * and only check to see if the process exists.
+	 */
+	for (pmlist = comp->pm_head.next;
+		 pmlist != &comp->pm_head;
+		 pmlist = next) {
+
+		pm = list_entry(pmlist,struct amf_pm,entry);
+		next = pmlist->next;
+
+		if (pm->errors == 0) {
+			list_del(pmlist);
+			free(pm);
+			continue;
+		}
+		sprintf(f,"/proc/%llu/stat", pm->pid);
+		if (access( f, R_OK) != 0) {
+			if ((comp->su->restart_control_state != SU_RC_RESTART_SU_DEACTIVATING) &&
+				(comp->su->restart_control_state != SU_RC_RESTART_SU_TERMINATING) &&
+				(reported == SA_FALSE)) {
+
+				/* don't report it as an error if we are busy
+				 * shutting down
+				 */
+				syslog(LOG_ALERT, "component %s:%s exited",
+					   comp->su->saAmfSUHostedByNode.value, comp->name.value);
+				mcast_error_report_from_pm (comp, pm->recovery);
+				reported = SA_TRUE;
+			}
+			list_del(pmlist);
+			free(pm);
+			break;
+		}
+	}
+
+	if (!list_empty(&comp->pm_head)) {
+		pm = list_entry(comp->pm_head.next,struct amf_pm,entry);
+		poll_timer_add (aisexec_poll_handle,
+						500,
+						(void *)comp,
+						timer_function_pm_fn,
+						&pm->timer_handle_period);
+	}
+}
+
+/**
+ * Find and add all children of a given PID 
+ * @param comp the component
+ * @param pmErrors the errors to monitor
+ * @param recommendedRecovery
+ * @param dirList list of files in proc filesystem
+ * @param numProcEntriesFound number of file entries in proc filesystem
+ * @param ppid the process id to find children of
+ * @param depth the descendents tree depth to monitor
+ */
+void amf_comp_find_and_add_child_pids(
+	struct amf_comp *comp,
+	SaAmfPmErrorsT pmErrors,
+	SaAmfRecommendedRecoveryT recommendedRecovery,
+	struct dirent **dirList,
+	SaInt32T numProcEntriesFound,
+	SaUint64T ppid,
+	SaInt32T depth)
+{
+	SaUint64T parent;
+	SaUint64T p_id;
+	SaInt32T res;
+	SaInt32T n = numProcEntriesFound;
+	char f[30];
+	FILE *p;
+	struct amf_pm *pm = NULL;
+
+	while (n--) {
+
+		sprintf(f, "/proc/%s/stat", dirList[n]->d_name);
+
+		p = fopen(f, "r");
+		if (p == NULL)
+			continue;
+
+		res = fscanf(p, "%llu %*s %*c %llu", &p_id, &parent);
+
+		if ((res == 2) && (parent == ppid)) {
+
+			pm = amf_calloc(1, sizeof(struct amf_pm));
+			if ( pm == NULL ) {
+				return;
+			}
+
+			TRACE2 ("add child (pid=%llu) for comp pid=%llu (%s)\n", p_id, ppid, comp->name.value);
+
+			pm->pid = p_id;
+			pm->errors = pmErrors;
+			pm->recovery = recommendedRecovery;
+			pm->timer_handle_period = 0;
+
+			list_add(&pm->entry, &comp->pm_head);
+
+			if (depth > 1) {
+				amf_comp_find_and_add_child_pids(comp,
+												 pmErrors,
+												 recommendedRecovery,
+												 dirList,
+												 numProcEntriesFound,
+												 p_id,
+												 depth - 1);
+			}
+		}
+		fclose(p);
+	}
+}
+
+/**
+ * Handle the request to start passive monitoring
+ *
+ * @param comp the component
+ * @param pid the process id to monitor
+ * @param depth the descendents tree depth to monitor
+ * @param pmErrors the errors to monitor
+ * @param recommendedRecovery
+ *
+ * @return SaAisErrorT
+ */
+SaAisErrorT amf_comp_pm_start (
+	struct amf_comp *comp,
+	SaUint64T pid,
+	SaInt32T depth,
+	SaAmfPmErrorsT pmErrors,
+	SaAmfRecommendedRecoveryT recommendedRecovery)
+{
+	struct amf_pm *pm = NULL;
+	struct list_head *pmlist = NULL;
+	struct dirent **dirList;
+	SaInt32T numProcEntriesFound;
+
+	if (is_not_instantiating_or_instantiated_or_restarting (comp)) {
+		log_printf (LOG_ERR, "PmStart: ignored due to wrong state = %d, comp = %s",
+					comp->saAmfCompPresenceState, comp->name.value);
+		return SA_AIS_ERR_FAILED_OPERATION;
+	}
+
+	/* try and find one thats already there, and mod it */
+
+	for (pmlist = comp->pm_head.next;
+		 pmlist != &comp->pm_head;
+		 pmlist = pmlist->next) {
+
+		pm = list_entry(pmlist,struct amf_pm,entry);
+
+		if (pm->pid == pid) {
+			break;
+		}
+	}
+	if ( pm == NULL ) {
+		/* not found, create it */
+		pm = amf_calloc(1, sizeof(struct amf_pm));
+		if ( pm == NULL ) {
+			return SA_AIS_ERR_NO_MEMORY;
+		}
+
+		pm->pid = pid;
+		pm->errors = pmErrors;
+		pm->recovery = recommendedRecovery;
+		pm->timer_handle_period = 0;
+
+		if ( list_empty(&comp->pm_head)) {
+			/* only add a timer per comp */
+			/* TODO: should this timer period be a define or a config option?
+			*/
+			poll_timer_add (aisexec_poll_handle,
+							500,
+							(void *)comp,
+							timer_function_pm_fn,
+							&pm->timer_handle_period);
+
+		}
+		list_add(&pm->entry, &comp->pm_head);
+
+		numProcEntriesFound = scandir("/proc/", &dirList, 0, alphasort);
+		if (numProcEntriesFound < 0) {
+			perror("scandir");
+			return -2;
+		}
+
+		amf_comp_find_and_add_child_pids(comp,
+										 pmErrors,
+										 recommendedRecovery,
+										 dirList,
+										 numProcEntriesFound,
+										 pid,
+										 depth);
+
+		free(dirList);
+
+
+	} else {
+		/* only esculate the checking */
+		pm->errors |= pmErrors;
+
+		if (pm->recovery < recommendedRecovery) {
+			pm->recovery = recommendedRecovery;
+		}
+	}
+
+	return SA_AIS_OK;
+}
+
+/**
+ * Handle the request to stop passive monitoring on
+ * a component (or part of it)
+ *
+ * @param comp the component
+ * @param stopQualifier what processes to stop
+ * @param pid the process id to monitor
+ * @param pmErrors the errors to monitor
+ *
+ * @return SaAisErrorT - return value to component
+ */
+SaAisErrorT amf_comp_pm_stop (
+	struct amf_comp *comp,
+	SaAmfPmStopQualifierT stopQualifier,
+	SaInt64T pid,
+	SaAmfPmErrorsT pmErrors)
+{
+	struct amf_pm *pm = NULL;
+	struct list_head *pmlist = NULL;
+
+	for (pmlist = comp->pm_head.next; pmlist != &comp->pm_head; pmlist = pmlist->next) {
+
+		pm = list_entry(pmlist,struct amf_pm,entry);
+
+		if ((pm->pid == pid) ||
+			( stopQualifier == SA_AMF_PM_ALL_PROCESSES)) {
+			/* remove the error to check */
+			pm->errors &= ~pmErrors;
+		}
+	}
+	return SA_AIS_OK;
+}
+
+
 /**
  * Handle the request from a component to start a healthcheck
  * 
@@ -1924,6 +2186,7 @@ void amf_comp_terminate (struct amf_comp *comp)
 
 	if (amf_su_is_local (comp->su)) {
 		amf_comp_healthcheck_stop (comp, NULL);
+		amf_comp_pm_stop(comp, SA_AMF_PM_ALL_PROCESSES, 0, SA_AMF_PM_ALL_ERRORS);
 		if (amf_comp_is_error_suspected(comp)) {
 			clc_interfaces[comp->comptype]->cleanup (comp);
 		} else {
@@ -1946,6 +2209,7 @@ void amf_comp_restart (struct amf_comp *comp)
 
 	if (amf_su_is_local (comp->su)) {
 		amf_comp_healthcheck_stop (comp, NULL);
+		amf_comp_pm_stop(comp, SA_AMF_PM_ALL_PROCESSES, 0, SA_AMF_PM_ALL_ERRORS);
 		clc_interfaces[comp->comptype]->cleanup (comp);
 	}
 }

+ 1 - 0
include/saAmf.h

@@ -42,6 +42,7 @@ typedef SaUint64T SaAmfHandleT;
 #define SA_AMF_PM_ZERO_EXIT 0x1
 #define SA_AMF_PM_NON_ZERO_EXIT 0x2
 #define SA_AMF_PM_ABNORMAL_END 0x4
+#define SA_AMF_PM_ALL_ERRORS (SA_AMF_PM_ZERO_EXIT | SA_AMF_PM_NON_ZERO_EXIT | SA_AMF_PM_ABNORMAL_END)
 
 typedef SaUint32T SaAmfPmErrorsT;
 

+ 1 - 0
lib/amf.c

@@ -611,6 +611,7 @@ saAmfPmStart (
 	req_lib_amf_pmstart.processId = processId;
 	req_lib_amf_pmstart.descendentsTreeDepth = descendentsTreeDepth;
 	req_lib_amf_pmstart.pmErrors = pmErrors;
+	req_lib_amf_pmstart.recommendedRecovery = recommendedRecovery;
 
 	pthread_mutex_lock (&amfInstance->response_mutex);
 

+ 18 - 0
test/testamf1.c

@@ -344,6 +344,9 @@ static SaSelectionObjectT comp_init ()
 	char *env;
 	int result;
 	SaSelectionObjectT select_fd;
+	SaAmfPmErrorsT pmErrors = (SA_AMF_PM_ZERO_EXIT | 
+							   SA_AMF_PM_NON_ZERO_EXIT | 
+							   SA_AMF_PM_ABNORMAL_END);
 
 	name = getenv ("SA_AMF_COMPONENT_NAME");
 	if (name == NULL) {
@@ -485,6 +488,21 @@ static SaSelectionObjectT comp_init ()
 		die ("saAmfComponentRegister failed %d", result);
 	}
 
+	/*
+	 * startup passive monitoring
+	 */
+	do {
+		result = saAmfPmStart (handle,
+			&compNameGlobal, getpid(), 1,
+			pmErrors, 
+			SA_AMF_COMPONENT_FAILOVER);
+
+		if (result == SA_AIS_ERR_TRY_AGAIN) {
+			printf("%d: TRY_AGAIN received\n", (int)getpid());
+			usleep (100000);
+		}
+	} while (result == SA_AIS_ERR_TRY_AGAIN);
+
 	/*
 	 * Test already started healthcheck
 	 */