Procházet zdrojové kódy

implement gmi_token_callback in amf.c

(Logical change 1.101)


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@374 fd59a12c-fef9-0310-b244-a6a79926bd2f
Miyotaka Sakai před 21 roky
rodič
revize
9781fe3332
1 změnil soubory, kde provedl 135 přidání a 12 odebrání
  1. 135 12
      exec/amf.c

+ 135 - 12
exec/amf.c

@@ -59,6 +59,9 @@
 #define LOG_SERVICE LOG_SERVICE_AMF
 #include "print.h"
 
+#define MCAST_DATA_NUM 2
+#define MCAST_DATA_LEN 256+256+128
+
 struct invocation {
 	struct conn_info *conn_info;
 	int interface;
@@ -69,6 +72,20 @@ struct invocation *invocation_entries = 0;
 
 int invocation_entries_size = 0;
 
+static DECLARE_LIST_INIT (mcast_list);
+
+struct mcast_data {
+	struct list_head	mlist;
+	char 			mcast[MCAST_DATA_NUM][MCAST_DATA_LEN];
+	struct iovec 		iovec[MCAST_DATA_NUM];
+	int			iovec_num;
+	int			priority;
+};
+
+static void *tok_call_handle = NULL;
+
+static int recovery = 0;
+
 #ifdef INPARSEDOTH
 enum amfOperationalState {
 	AMF_OPER_DISABLED,
@@ -88,7 +105,6 @@ enum amfOperationalAdministrativeState {
 	AMF_ENABLED_STOPPING
 };
 
-
 /*
  * State machines for states in AMF
  */
@@ -252,6 +268,10 @@ static int amf_exec_init_fn (void);
 
 static void amf_synchronize (void *message, struct in_addr source_addr);
 
+static void amf_mcast (struct iovec *iovec, int iov_len, int priority);
+
+static int amf_mcast_retain ();
+
 static int message_handler_req_exec_amf_componentregister (void *message, struct in_addr source_addr);
 
 static int message_handler_req_exec_amf_componentunregister (void *message, struct in_addr source_addr);
@@ -561,7 +581,7 @@ static void component_registerpriority (
 	iovecs[0].iov_base = (char *)&req_exec_amf_componentregister;
 	iovecs[0].iov_len = sizeof (req_exec_amf_componentregister);
 
-	assert (gmi_mcast (&aisexec_groupname, iovecs, 1, priority) == 0);
+	amf_mcast (iovecs, 1, priority);
 }
 
 /***
@@ -769,9 +789,9 @@ static void haStateSetCluster (
 	struct saAmfComponent *component,
 	SaAmfHAStateT haState)
 {
-
 	struct req_exec_amf_hastateset req_exec_amf_hastateset;
 	struct iovec iovecs[2];
+	int priority;
 
 	req_exec_amf_hastateset.header.id = MESSAGE_REQ_EXEC_AMF_HASTATESET;
 	req_exec_amf_hastateset.header.size = sizeof (struct req_exec_amf_hastateset);
@@ -784,7 +804,13 @@ static void haStateSetCluster (
 	iovecs[0].iov_base = (char *)&req_exec_amf_hastateset;
 	iovecs[0].iov_len = sizeof (req_exec_amf_hastateset);
 
-	assert (gmi_mcast (&aisexec_groupname, iovecs, 1, GMI_PRIO_HIGH) == 0);
+	if (recovery == 1) {
+		priority = GMI_PRIO_RECOVERY;
+	} else {
+		priority = GMI_PRIO_HIGH;
+	}
+
+	amf_mcast (iovecs, 1, priority);
 }
 
 void readinessStateSetApi (struct saAmfComponent *component,
@@ -859,9 +885,9 @@ static void readinessStateSetCluster (
 	struct saAmfComponent *component,
 	SaAmfReadinessStateT readinessState)
 {
-
 	struct req_exec_amf_readinessstateset req_exec_amf_readinessstateset;
 	struct iovec iovecs[2];
+	int priority;
 
 	req_exec_amf_readinessstateset.header.id = MESSAGE_REQ_EXEC_AMF_READINESSSTATESET;
 	req_exec_amf_readinessstateset.header.size = sizeof (struct req_exec_amf_readinessstateset);
@@ -875,7 +901,13 @@ static void readinessStateSetCluster (
 	iovecs[0].iov_base = (char *)&req_exec_amf_readinessstateset;
 	iovecs[0].iov_len = sizeof (req_exec_amf_readinessstateset);
 
-	assert (gmi_mcast (&aisexec_groupname, iovecs, 1, GMI_PRIO_HIGH) == 0);
+	if (recovery == 1) {
+		priority = GMI_PRIO_RECOVERY;
+	} else {
+		priority = GMI_PRIO_HIGH;
+	}
+
+	amf_mcast (iovecs, 1, priority);
 }
 
 #ifdef CMOPILE_OUT
@@ -1885,10 +1917,7 @@ static int amf_confchg_fn (
 {
 	int i;
 
-	if (configuration_type == GMI_CONFIGURATION_REGULAR) {
-		gmi_recovery_plug_unplug (amf_recovery_plug_handle);
-	}
-
+	recovery = 1;
 	/*
 	 * If node join, component register
 	 */
@@ -1903,6 +1932,11 @@ static int amf_confchg_fn (
 		enumerate_components (amf_confchg_nleave, (void *)&(left_list[i].sin_addr));
 	}
 
+	if (configuration_type == GMI_CONFIGURATION_REGULAR) {
+		gmi_recovery_plug_unplug (amf_recovery_plug_handle);
+		recovery = 0;
+	}
+
 	return (0);
 }
 
@@ -2349,7 +2383,6 @@ static int message_handler_req_amf_componentregister (struct conn_info *conn_inf
 	iovecs[0].iov_len = sizeof (req_exec_amf_componentregister);
 
 	assert (gmi_mcast (&aisexec_groupname, iovecs, 1, GMI_PRIO_MED) == 0);
-
 	return (0);
 }
 
@@ -2377,7 +2410,6 @@ static int message_handler_req_amf_componentunregister (struct conn_info *conn_i
 	iovecs[0].iov_len = sizeof (req_exec_amf_componentunregister);
 
 	assert (gmi_mcast (&aisexec_groupname, iovecs, 1, GMI_PRIO_MED) == 0);
-
 	return (0);
 }
 
@@ -2676,6 +2708,97 @@ static int message_handler_req_amf_componentcapabilitymodelget (struct conn_info
 	return (0);
 }
 
+#ifdef COMPILE_OUT
+int gmi_mcast2 (
+	struct gmi_groupname *groupname,
+	struct iovec *iovec,
+	int iov_len,
+	int priority)
+{
+	static int i = 0;
+
+	if (i%2==0) {
+		return (-1);
+	}
+
+	amf_mcast_retain ();
+	gmi_mcast (groupname, iovec, iov_len, priority);
+}
+#endif
+
+static void amf_mcast (struct iovec *iovec, int iovec_num, int priority)
+{
+	int i;
+	int ret;
+	struct mcast_data *mcast;
+
+	if (mcast_list.next == &mcast_list) {
+		ret = gmi_mcast (&aisexec_groupname, iovec, iovec_num, priority);
+		if (ret == 0) {
+			return;
+		}
+		assert (gmi_token_callback_create (&tok_call_handle, amf_mcast_retain, NULL) == 0);
+	}
+
+	mcast = (struct mcast_data *) malloc (sizeof(*mcast));
+	assert (mcast != NULL);
+	if (mcast == NULL) {
+		log_printf (LOG_LEVEL_ERROR, "Allocation Error in AMF_MCAST\n");
+		return;
+	}
+
+	mcast->iovec[0].iov_base = (void *)(mcast->mcast[0]);
+	mcast->iovec[1].iov_base = (void *)(mcast->mcast[1]);
+
+	assert (iovec_num <= MCAST_DATA_NUM);
+	for (i=0; i<iovec_num; i++) {
+
+		mcast[i].iovec[i].iov_len = iovec[i].iov_len;
+		assert (iovec[i].iov_len <= MCAST_DATA_LEN);
+		log_printf (LOG_LEVEL_ERROR, "Length over in AMF_MCAST\n");
+		memcpy (mcast[i].iovec[i].iov_base, iovec[i].iov_base, iovec[i].iov_len);
+	}
+
+        mcast->iovec_num = iovec_num;
+        mcast->priority = priority;
+        list_add_tail (&mcast->mlist, &mcast_list);
+
+	return;
+}
+
+static int amf_mcast_retain ()
+{
+	struct list_head *list;
+	struct list_head *list_next;
+	struct mcast_data *mdata;
+	int priority;
+	int ret;
+
+	for (priority=GMI_PRIO_RECOVERY; priority<GMI_PRIO_LOW; priority++) {
+	for (list=mcast_list.next; list != &mcast_list; list=list_next) {
+
+		mdata = list_entry (list, struct mcast_data, mlist);
+		list_next = list->next;
+		if (mdata->priority != priority) {
+			continue;
+		}
+
+		list_del (list);
+
+		ret = gmi_mcast (&aisexec_groupname, mdata->iovec, mdata->iovec_num, mdata->priority);
+		if (ret == -1) {
+			list_add (list ,&mcast_list);
+		assert (gmi_token_callback_create (&tok_call_handle, amf_mcast_retain, NULL) == 0);
+			break;
+		}
+
+		free (mdata);
+	}
+	}
+
+	return (0);
+}
+
 static char disabled_unlocked_state_text[6][64] = {
 	"AMF_DISABLED_UNLOCKED_REGISTEREDORERRORCANCEL",
 	"AMF_DISABLED_UNLOCKED_FAILED",