瀏覽代碼

Forward port of flow control work from whitetank branch.

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1289 fd59a12c-fef9-0310-b244-a6a79926bd2f
Steven Dake 19 年之前
父節點
當前提交
336dc17daa
共有 25 個文件被更改,包括 1035 次插入101 次删除
  1. 2 2
      exec/Makefile
  2. 1 0
      exec/amf.c
  3. 1 0
      exec/amfcomp.c
  4. 1 0
      exec/cfg.c
  5. 1 0
      exec/ckpt.c
  6. 1 0
      exec/clm.c
  7. 60 22
      exec/cpg.c
  8. 1 0
      exec/evs.c
  9. 1 0
      exec/evt.c
  10. 434 0
      exec/flow.c
  11. 73 0
      exec/flow.h
  12. 184 49
      exec/ipc.c
  13. 21 0
      exec/ipc.h
  14. 1 0
      exec/lck.c
  15. 4 0
      exec/main.c
  16. 1 1
      exec/main.h
  17. 1 0
      exec/msg.c
  18. 1 0
      exec/service.h
  19. 9 1
      include/cpg.h
  20. 1 1
      include/hdb.h
  21. 8 2
      include/ipc_cpg.h
  22. 11 7
      include/queue.h
  23. 32 10
      lib/cpg.c
  24. 10 6
      test/Makefile
  25. 175 0
      test/cpgbench.c

+ 2 - 2
exec/Makefile

@@ -62,9 +62,9 @@ LCR_SRC = evs.c clm.c ckpt.c evt.c lck.c msg.c cfg.c cpg.c aisparser.c vsf_ykd.c
 LCR_OBJS = evs.o clm.o ckpt.o evt.o lck.o msg.o cfg.o cpg.o aisparser.o vsf_ykd.o $(AMF_OBJS)
 LCR_OBJS = evs.o clm.o ckpt.o evt.o lck.o msg.o cfg.o cpg.o aisparser.o vsf_ykd.o $(AMF_OBJS)
 
 
 # main executive objects
 # main executive objects
-MAIN_SRC = main.c print.c mempool.c util.c sync.c service.c ipc.c timer.c \
+MAIN_SRC = main.c print.c mempool.c util.c sync.c service.c ipc.c flow.c timer.c \
 	totemconfig.c mainconfig.c
 	totemconfig.c mainconfig.c
-MAIN_OBJS = main.o print.o mempool.o util.o sync.o service.o ipc.o timer.o \
+MAIN_OBJS = main.o print.o mempool.o util.o sync.o service.o ipc.o flow.o timer.o \
 	totemconfig.o mainconfig.o ../lcr/lcr_ifact.o
 	totemconfig.o mainconfig.o ../lcr/lcr_ifact.o
 OTHER_OBJS = objdb.o
 OTHER_OBJS = objdb.o
 
 

+ 1 - 0
exec/amf.c

@@ -374,6 +374,7 @@ static struct openais_service_handler amf_service_handler = {
 	.name				= (unsigned char *)"openais availability management framework B.01.01",
 	.name				= (unsigned char *)"openais availability management framework B.01.01",
 	.id					= AMF_SERVICE,
 	.id					= AMF_SERVICE,
 	.private_data_size	= sizeof (struct amf_pd),
 	.private_data_size	= sizeof (struct amf_pd),
+	.flow_control		= OPENAIS_FLOW_CONTROL_NOT_REQUIRED,
 	.lib_init_fn		= amf_lib_init_fn,
 	.lib_init_fn		= amf_lib_init_fn,
 	.lib_exit_fn		= amf_lib_exit_fn,
 	.lib_exit_fn		= amf_lib_exit_fn,
 	.lib_service		= amf_lib_service,
 	.lib_service		= amf_lib_service,

+ 1 - 0
exec/amfcomp.c

@@ -1394,6 +1394,7 @@ void amf_comp_instantiate (struct amf_comp *comp)
 				comp->saAmfCompPresenceState);
 				comp->saAmfCompPresenceState);
 			break;
 			break;
 	}
 	}
+	return 0;
 }
 }
 
 
 void amf_comp_instantiate_tmo_event (struct amf_comp *comp)
 void amf_comp_instantiate_tmo_event (struct amf_comp *comp)

+ 1 - 0
exec/cfg.c

@@ -167,6 +167,7 @@ struct openais_service_handler cfg_service_handler = {
 	.name					= (unsigned char*)"openais configuration service",
 	.name					= (unsigned char*)"openais configuration service",
 	.id					= CFG_SERVICE,
 	.id					= CFG_SERVICE,
 	.private_data_size			= 0,
 	.private_data_size			= 0,
+	.flow_control				= OPENAIS_FLOW_CONTROL_NOT_REQUIRED, 
 	.lib_init_fn				= cfg_lib_init_fn,
 	.lib_init_fn				= cfg_lib_init_fn,
 	.lib_exit_fn				= cfg_lib_exit_fn,
 	.lib_exit_fn				= cfg_lib_exit_fn,
 	.lib_service				= cfg_lib_service,
 	.lib_service				= cfg_lib_service,

+ 1 - 0
exec/ckpt.c

@@ -548,6 +548,7 @@ struct openais_service_handler ckpt_service_handler = {
 	.name				= (unsigned char *)"openais checkpoint service B.01.01",
 	.name				= (unsigned char *)"openais checkpoint service B.01.01",
 	.id				= CKPT_SERVICE,
 	.id				= CKPT_SERVICE,
 	.private_data_size		= sizeof (struct ckpt_pd),
 	.private_data_size		= sizeof (struct ckpt_pd),
+	.flow_control			= OPENAIS_FLOW_CONTROL_NOT_REQUIRED, 
 	.lib_init_fn			= ckpt_lib_init_fn,
 	.lib_init_fn			= ckpt_lib_init_fn,
 	.lib_exit_fn			= ckpt_lib_exit_fn,
 	.lib_exit_fn			= ckpt_lib_exit_fn,
 	.lib_service			= ckpt_lib_service,
 	.lib_service			= ckpt_lib_service,

+ 1 - 0
exec/clm.c

@@ -203,6 +203,7 @@ struct openais_service_handler clm_service_handler = {
 	.name			= (unsigned char*)"openais cluster membership service B.01.01",
 	.name			= (unsigned char*)"openais cluster membership service B.01.01",
 	.id			= CLM_SERVICE,
 	.id			= CLM_SERVICE,
 	.private_data_size	= sizeof (struct clm_pd),
 	.private_data_size	= sizeof (struct clm_pd),
+	.flow_control		= OPENAIS_FLOW_CONTROL_NOT_REQUIRED, 
 	.lib_init_fn		= clm_lib_init_fn,
 	.lib_init_fn		= clm_lib_init_fn,
 	.lib_exit_fn		= clm_lib_exit_fn,
 	.lib_exit_fn		= clm_lib_exit_fn,
 	.lib_service		= clm_lib_service,
 	.lib_service		= clm_lib_service,

+ 60 - 22
exec/cpg.c

@@ -32,10 +32,6 @@
  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  * THE POSSIBILITY OF SUCH DAMAGE.
  * THE POSSIBILITY OF SUCH DAMAGE.
  */
  */
-
-#ifndef OPENAIS_BSD
-#include <alloca.h>
-#endif
 #include <sys/types.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <sys/socket.h>
 #include <sys/un.h>
 #include <sys/un.h>
@@ -70,6 +66,7 @@
 #include "jhash.h"
 #include "jhash.h"
 #include "swab.h"
 #include "swab.h"
 #include "ipc.h"
 #include "ipc.h"
+#include "flow.h"
 #include "print.h"
 #include "print.h"
 
 
 #define GROUP_HASH_SIZE 32
 #define GROUP_HASH_SIZE 32
@@ -106,6 +103,7 @@ struct process_info {
 	void *conn;
 	void *conn;
 	void *trackerconn;
 	void *trackerconn;
 	struct group_info *group;
 	struct group_info *group;
+	enum openais_flow_control_state flow_control_state;
 	struct list_head list; /* on the group_info members list */
 	struct list_head list; /* on the group_info members list */
 };
 };
 
 
@@ -193,7 +191,7 @@ static struct openais_lib_handler cpg_lib_service[] =
 	},
 	},
 	{ /* 2 */
 	{ /* 2 */
 		.lib_handler_fn				= message_handler_req_lib_cpg_mcast,
 		.lib_handler_fn				= message_handler_req_lib_cpg_mcast,
-		.response_size				= sizeof (mar_res_header_t),
+		.response_size				= sizeof (struct res_lib_cpg_mcast),
 		.response_id				= MESSAGE_RES_CPG_MCAST,
 		.response_id				= MESSAGE_RES_CPG_MCAST,
 		.flow_control				= OPENAIS_FLOW_CONTROL_REQUIRED
 		.flow_control				= OPENAIS_FLOW_CONTROL_REQUIRED
 	},
 	},
@@ -241,6 +239,7 @@ struct openais_service_handler cpg_service_handler = {
 	.name				        = (unsigned char*)"openais cluster closed process group service v1.01",
 	.name				        = (unsigned char*)"openais cluster closed process group service v1.01",
 	.id					= CPG_SERVICE,
 	.id					= CPG_SERVICE,
 	.private_data_size			= sizeof (struct process_info),
 	.private_data_size			= sizeof (struct process_info),
+	.flow_control				= OPENAIS_FLOW_CONTROL_REQUIRED,
 	.lib_init_fn				= cpg_lib_init_fn,
 	.lib_init_fn				= cpg_lib_init_fn,
 	.lib_exit_fn				= cpg_lib_exit_fn,
 	.lib_exit_fn				= cpg_lib_exit_fn,
 	.lib_service				= cpg_lib_service,
 	.lib_service				= cpg_lib_service,
@@ -308,6 +307,7 @@ struct req_exec_cpg_mcast {
 	mar_cpg_name_t group_name __attribute__((aligned(8)));
 	mar_cpg_name_t group_name __attribute__((aligned(8)));
 	mar_uint32_t msglen __attribute__((aligned(8)));
 	mar_uint32_t msglen __attribute__((aligned(8)));
 	mar_uint32_t pid __attribute__((aligned(8)));
 	mar_uint32_t pid __attribute__((aligned(8)));
+	mar_message_source_t source __attribute__((aligned(8)));
 	mar_uint8_t message[] __attribute__((aligned(8)));
 	mar_uint8_t message[] __attribute__((aligned(8)));
 };
 };
 
 
@@ -504,7 +504,7 @@ static int cpg_node_joinleave_send (struct group_info *gi, struct process_info *
 	req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin);
 	req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin);
 	req_exec_cpg_procjoin.header.id = SERVICE_ID_MAKE(CPG_SERVICE, fn);
 	req_exec_cpg_procjoin.header.id = SERVICE_ID_MAKE(CPG_SERVICE, fn);
 
 
-	req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_procjoin;
+	req_exec_cpg_iovec.iov_base = &req_exec_cpg_procjoin;
 	req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin);
 	req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin);
 
 
 	result = totempg_groups_mcast_joined (openais_group_handle, &req_exec_cpg_iovec, 1, TOTEMPG_AGREED);
 	result = totempg_groups_mcast_joined (openais_group_handle, &req_exec_cpg_iovec, 1, TOTEMPG_AGREED);
@@ -553,9 +553,8 @@ static void remove_node_from_groups(
 
 
 						list_del(&gi->rg->list);
 						list_del(&gi->rg->list);
 						newsize = gi->rg->left_list_size * 2;
 						newsize = gi->rg->left_list_size * 2;
-						newrg = realloc (gi->rg,
-							sizeof(struct removed_group) + newsize * sizeof(mar_cpg_address_t));
-						if (newrg == NULL) {
+						newrg = realloc(gi->rg, sizeof(struct removed_group) + newsize*sizeof(mar_cpg_address_t));
+						if (!newrg) {
 							log_printf(LOG_LEVEL_CRIT, "Unable to realloc removed group struct. CPG callbacks will be junk.");
 							log_printf(LOG_LEVEL_CRIT, "Unable to realloc removed group struct. CPG callbacks will be junk.");
 							return;
 							return;
 						}
 						}
@@ -613,6 +612,15 @@ static void cpg_confchg_fn (
 	}
 	}
 }
 }
 
 
+static void cpg_flow_control_state_set_fn (
+	void *context,
+	enum openais_flow_control_state flow_control_state)
+{
+	struct process_info *process_info = (struct process_info *)context;
+
+	process_info->flow_control_state = flow_control_state;
+}
+
 /* Can byteswap join & leave messages */
 /* Can byteswap join & leave messages */
 static void exec_cpg_procjoin_endian_convert (void *msg)
 static void exec_cpg_procjoin_endian_convert (void *msg)
 {
 {
@@ -645,7 +653,7 @@ static void exec_cpg_mcast_endian_convert (void *msg)
 	swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
 	swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
 	req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
 	req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
 	req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
 	req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
-
+	swab_mar_message_source_t (&req_exec_cpg_mcast->source);
 }
 }
 
 
 static void do_proc_join(
 static void do_proc_join(
@@ -787,11 +795,15 @@ static void message_handler_req_exec_cpg_mcast (
 {
 {
 	struct req_exec_cpg_mcast *req_exec_cpg_mcast = (struct req_exec_cpg_mcast *)message;
 	struct req_exec_cpg_mcast *req_exec_cpg_mcast = (struct req_exec_cpg_mcast *)message;
 	struct res_lib_cpg_deliver_callback *res_lib_cpg_mcast;
 	struct res_lib_cpg_deliver_callback *res_lib_cpg_mcast;
+	struct process_info *process_info;
 	int msglen = req_exec_cpg_mcast->msglen;
 	int msglen = req_exec_cpg_mcast->msglen;
 	char buf[sizeof(*res_lib_cpg_mcast) + msglen];
 	char buf[sizeof(*res_lib_cpg_mcast) + msglen];
 	struct group_info *gi;
 	struct group_info *gi;
 	struct list_head *iter;
 	struct list_head *iter;
 
 
+	/*
+	 * Track local messages so that flow is controlled on the local node
+	 */
 	gi = get_group(&req_exec_cpg_mcast->group_name); /* this will always succeed ! */
 	gi = get_group(&req_exec_cpg_mcast->group_name); /* this will always succeed ! */
 	assert(gi);
 	assert(gi);
 
 
@@ -801,6 +813,12 @@ static void message_handler_req_exec_cpg_mcast (
 	res_lib_cpg_mcast->msglen = msglen;
 	res_lib_cpg_mcast->msglen = msglen;
 	res_lib_cpg_mcast->pid = req_exec_cpg_mcast->pid;
 	res_lib_cpg_mcast->pid = req_exec_cpg_mcast->pid;
 	res_lib_cpg_mcast->nodeid = nodeid;
 	res_lib_cpg_mcast->nodeid = nodeid;
+	res_lib_cpg_mcast->flow_control_state = CPG_FLOW_CONTROL_DISABLED;
+	if (message_source_is_local (&req_exec_cpg_mcast->source)) {
+		openais_ipc_flow_control_local_decrement (req_exec_cpg_mcast->source.conn);
+		process_info = (struct process_info *)openais_conn_private_data_get (req_exec_cpg_mcast->source.conn);
+		res_lib_cpg_mcast->flow_control_state = process_info->flow_control_state;
+	}
 	memcpy(&res_lib_cpg_mcast->group_name, &gi->group_name,
 	memcpy(&res_lib_cpg_mcast->group_name, &gi->group_name,
 		sizeof(mar_cpg_name_t));
 		sizeof(mar_cpg_name_t));
 	memcpy(&res_lib_cpg_mcast->message, (char*)message+sizeof(*req_exec_cpg_mcast),
 	memcpy(&res_lib_cpg_mcast->message, (char*)message+sizeof(*req_exec_cpg_mcast),
@@ -916,6 +934,14 @@ static void message_handler_req_lib_cpg_join (void *conn, void *message)
 		goto join_err;
 		goto join_err;
 	}
 	}
 
 
+	openais_ipc_flow_control_create (
+		conn,
+		CPG_SERVICE,
+		req_lib_cpg_join->group_name.value,
+		req_lib_cpg_join->group_name.length,
+		cpg_flow_control_state_set_fn,
+		pi);
+
 	/* Add a node entry for us */
 	/* Add a node entry for us */
 	pi->nodeid = this_ip->nodeid;
 	pi->nodeid = this_ip->nodeid;
 	pi->pid = req_lib_cpg_join->pid;
 	pi->pid = req_lib_cpg_join->pid;
@@ -953,6 +979,12 @@ static void message_handler_req_lib_cpg_leave (void *conn, void *message)
 	cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_LEAVE);
 	cpg_node_joinleave_send(gi, pi, MESSAGE_REQ_EXEC_CPG_PROCLEAVE, CONFCHG_CPG_REASON_LEAVE);
 	pi->group = NULL;
 	pi->group = NULL;
 
 
+	openais_ipc_flow_control_destroy (
+		conn,
+		CPG_SERVICE,
+		(unsigned char *)gi->group_name.value,
+		(unsigned int)gi->group_name.length);
+
 leave_ret:
 leave_ret:
 	/* send return */
 	/* send return */
 	res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave);
 	res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave);
@@ -969,7 +1001,7 @@ static void message_handler_req_lib_cpg_mcast (void *conn, void *message)
 	struct group_info *gi = pi->group;
 	struct group_info *gi = pi->group;
 	struct iovec req_exec_cpg_iovec[2];
 	struct iovec req_exec_cpg_iovec[2];
 	struct req_exec_cpg_mcast req_exec_cpg_mcast;
 	struct req_exec_cpg_mcast req_exec_cpg_mcast;
-	mar_res_header_t res;
+	struct res_lib_cpg_mcast res_lib_cpg_mcast;
 	int msglen = req_lib_cpg_mcast->msglen;
 	int msglen = req_lib_cpg_mcast->msglen;
 	int result;
 	int result;
 
 
@@ -977,10 +1009,12 @@ static void message_handler_req_lib_cpg_mcast (void *conn, void *message)
 
 
 	/* Can't send if we're not joined */
 	/* Can't send if we're not joined */
 	if (!gi) {
 	if (!gi) {
-		res.size = sizeof(res);
-		res.id = MESSAGE_RES_CPG_MCAST;
-		res.error = SA_AIS_ERR_ACCESS; /* TODO Better error code ?? */
-		openais_conn_send_response(conn, &res, sizeof(res));
+		res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast);
+		res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST;
+		res_lib_cpg_mcast.header.error = SA_AIS_ERR_ACCESS; /* TODO Better error code ?? */
+		res_lib_cpg_mcast.flow_control_state = CPG_FLOW_CONTROL_DISABLED;
+		openais_conn_send_response(conn, &res_lib_cpg_mcast,
+			sizeof(res_lib_cpg_mcast));
 		return;
 		return;
 	}
 	}
 
 
@@ -989,21 +1023,25 @@ static void message_handler_req_lib_cpg_mcast (void *conn, void *message)
 		MESSAGE_REQ_EXEC_CPG_MCAST);
 		MESSAGE_REQ_EXEC_CPG_MCAST);
 	req_exec_cpg_mcast.pid = pi->pid;
 	req_exec_cpg_mcast.pid = pi->pid;
 	req_exec_cpg_mcast.msglen = msglen;
 	req_exec_cpg_mcast.msglen = msglen;
+	message_source_set (&req_exec_cpg_mcast.source, conn);
 	memcpy(&req_exec_cpg_mcast.group_name, &gi->group_name,
 	memcpy(&req_exec_cpg_mcast.group_name, &gi->group_name,
 		sizeof(mar_cpg_name_t));
 		sizeof(mar_cpg_name_t));
 
 
-	req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
+	req_exec_cpg_iovec[0].iov_base = &req_exec_cpg_mcast;
 	req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
 	req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
-	req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
+	req_exec_cpg_iovec[1].iov_base = &req_lib_cpg_mcast->message;
 	req_exec_cpg_iovec[1].iov_len = msglen;
 	req_exec_cpg_iovec[1].iov_len = msglen;
 
 
 	// TODO: guarantee type...
 	// TODO: guarantee type...
 	result = totempg_groups_mcast_joined (openais_group_handle, req_exec_cpg_iovec, 2, TOTEMPG_AGREED);
 	result = totempg_groups_mcast_joined (openais_group_handle, req_exec_cpg_iovec, 2, TOTEMPG_AGREED);
-
-	res.size = sizeof(res);
-	res.id = MESSAGE_RES_CPG_MCAST;
-	res.error = SA_AIS_OK;
-	openais_conn_send_response(conn, &res, sizeof(res));
+	openais_ipc_flow_control_local_increment (conn);
+
+	res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast);
+	res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_MCAST;
+	res_lib_cpg_mcast.header.error = SA_AIS_OK;
+	res_lib_cpg_mcast.flow_control_state = pi->flow_control_state;
+	openais_conn_send_response(conn, &res_lib_cpg_mcast,
+		sizeof(res_lib_cpg_mcast));
 }
 }
 
 
 static void message_handler_req_lib_cpg_membership (void *conn, void *message)
 static void message_handler_req_lib_cpg_membership (void *conn, void *message)

+ 1 - 0
exec/evs.c

@@ -145,6 +145,7 @@ struct openais_service_handler evs_service_handler = {
 	.name			= (unsigned char*)"openais extended virtual synchrony service",
 	.name			= (unsigned char*)"openais extended virtual synchrony service",
 	.id			= EVS_SERVICE,
 	.id			= EVS_SERVICE,
 	.private_data_size	= sizeof (struct evs_pd),
 	.private_data_size	= sizeof (struct evs_pd),
+	.flow_control		= OPENAIS_FLOW_CONTROL_REQUIRED, 
 	.lib_init_fn		= evs_lib_init_fn,
 	.lib_init_fn		= evs_lib_init_fn,
 	.lib_exit_fn		= evs_lib_exit_fn,
 	.lib_exit_fn		= evs_lib_exit_fn,
 	.lib_service		= evs_lib_service,
 	.lib_service		= evs_lib_service,

+ 1 - 0
exec/evt.c

@@ -214,6 +214,7 @@ struct openais_service_handler evt_service_handler = {
 								(unsigned char*)"openais event service B.01.01",
 								(unsigned char*)"openais event service B.01.01",
 	.id							= EVT_SERVICE,
 	.id							= EVT_SERVICE,
 	.private_data_size			= sizeof (struct libevt_pd),
 	.private_data_size			= sizeof (struct libevt_pd),
+	.flow_control				= OPENAIS_FLOW_CONTROL_NOT_REQUIRED,
 	.lib_init_fn				= evt_lib_init,
 	.lib_init_fn				= evt_lib_init,
 	.lib_exit_fn				= evt_lib_exit,
 	.lib_exit_fn				= evt_lib_exit,
 	.lib_service				= evt_lib_service,
 	.lib_service				= evt_lib_service,

+ 434 - 0
exec/flow.c

@@ -0,0 +1,434 @@
+/*
+ * Copyright (c) 2006 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake@mvista.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * New messages are allowed from the library ONLY when the processor has not
+ * received a OPENAIS_FLOW_CONTROL_STATE_ENABLED from any processor.  If a OPENAIS_FLOW_CONTROL_STATE_ENABLED
+ * message is sent, it must later be cancelled by a OPENAIS_FLOW_CONTROL_STATE_DISABLED
+ * message.
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <assert.h>
+#include <pthread.h>
+
+#include "flow.h"
+#include "totem.h"
+#include "totempg.h"
+#include "print.h"
+#include "hdb.h"
+#include "../include/list.h"
+
+#define OPENAIS_FLOW_CONTROL_ENABLED_SERVICES_MAX 128
+
+struct flow_control_instance {
+	struct list_head list_head;
+	unsigned int service;
+};
+
+DECLARE_LIST_INIT (flow_control_service_list_head);
+
+struct flow_control_message {
+	unsigned int service __attribute__((aligned(8)));
+	char id[1024] __attribute__((aligned(8)));
+	unsigned int id_len __attribute__((aligned(8)));
+	enum openais_flow_control_state flow_control_state __attribute__((aligned(8)));
+};
+
+struct flow_control_node_state {
+	unsigned int nodeid;
+	enum openais_flow_control_state flow_control_state;
+};
+
+struct flow_control_service {
+	struct flow_control_node_state flow_control_node_state[PROCESSOR_COUNT_MAX];
+	unsigned int service;
+	char id[1024];
+	unsigned int id_len;
+	void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state flow_control_state);
+	void *context;
+	unsigned int processor_count;
+	enum openais_flow_control_state flow_control_state;
+	struct list_head list;
+	struct list_head list_all;
+};
+
+static struct totempg_group flow_control_group = {
+	.group      = "flowcontrol",
+	.group_len  = 12
+};
+
+static totempg_groups_handle flow_control_handle;
+
+static struct hdb_handle_database flow_control_hdb = {
+	.handle_count	= 0,
+	.handles	= NULL,
+	.iterator	= 0,
+	.mutex		= PTHREAD_MUTEX_INITIALIZER
+};
+
+static unsigned int flow_control_member_list[PROCESSOR_COUNT_MAX];
+static unsigned int flow_control_member_list_entries;
+
+static inline int flow_control_xmit (
+	struct flow_control_service *flow_control_service,
+	enum openais_flow_control_state flow_control_state)
+{
+	struct flow_control_message flow_control_message;
+	struct iovec iovec;
+	unsigned int res;
+
+	flow_control_message.service = flow_control_service->service;
+	flow_control_message.flow_control_state = flow_control_state;
+	memcpy (&flow_control_message.id, flow_control_service->id,
+		flow_control_service->id_len);
+	flow_control_message.id_len = flow_control_service->id_len;
+
+	iovec.iov_base = (char *)&flow_control_message;
+	iovec.iov_len = sizeof (flow_control_message);
+
+	res = totempg_groups_mcast_joined (flow_control_handle, &iovec, 1,
+		TOTEMPG_AGREED);
+
+	flow_control_service->flow_control_state_set_fn (
+		flow_control_service->context,
+		flow_control_service->flow_control_state);
+
+	return (res);
+}
+
+static void flow_control_deliver_fn (
+	unsigned int nodeid,
+	struct iovec *iovec,
+	int iov_len,
+	int endian_conversion_required)
+{
+	struct flow_control_message *flow_control_message = (struct flow_control_message *)iovec[0].iov_base;
+	struct flow_control_service *flow_control_service;
+	struct list_head *list;
+	unsigned int i;
+
+	for (list = flow_control_service_list_head.next;
+		list != &flow_control_service_list_head;
+		list = list->next) {
+
+		flow_control_service = list_entry (list, struct flow_control_service, list_all);
+		/*
+		 * Find this nodeid in the flow control service and set the message
+		 * enabled or disabled flag
+		 */
+		for (i = 0; i < flow_control_service->processor_count; i++) {
+			if (nodeid == flow_control_service->flow_control_node_state[i].nodeid) {
+				flow_control_service->flow_control_node_state[i].flow_control_state =
+					flow_control_message->flow_control_state;
+				break;
+			}
+		}
+
+		/*
+		 * Determine if any flow control is enabled on any nodes and set
+		 * the internal variable appropriately
+		 */
+		flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_DISABLED;
+		flow_control_service->flow_control_state_set_fn (flow_control_service->context, flow_control_service->flow_control_state);
+		for (i = 0; i < flow_control_service->processor_count; i++) {
+			if (flow_control_service->flow_control_node_state[i].flow_control_state == OPENAIS_FLOW_CONTROL_STATE_ENABLED) {
+				flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED;
+				flow_control_service->flow_control_state_set_fn (flow_control_service->context, flow_control_service->flow_control_state);
+			}
+		}
+	} /* for list iteration */
+}
+
+static void flow_control_confchg_fn (
+	enum totem_configuration_type configuration_type,
+	unsigned int *member_list, int member_list_entries,
+	unsigned int *left_list, int left_list_entries,
+	unsigned int *joined_list, int joined_list_entries,
+	struct memb_ring_id *ring_id)
+{
+	unsigned int i;
+	struct flow_control_service *flow_control_service;
+	struct list_head *list;
+
+	memcpy (flow_control_member_list, member_list,
+		sizeof (unsigned int) * member_list_entries);
+	flow_control_member_list_entries = member_list_entries;
+
+	for (list = flow_control_service_list_head.next;
+		list != &flow_control_service_list_head;
+		list = list->next) {
+
+		flow_control_service = list_entry (list, struct flow_control_service, list_all);
+
+		/*
+		 * Set all of the node ids after a configuration change
+		 * Turn on all flow control after a configuration change
+		 */
+		flow_control_service->processor_count = flow_control_member_list_entries;
+		flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED;
+		for (i = 0; i < member_list_entries; i++) {
+			flow_control_service->flow_control_node_state[i].nodeid = member_list[i];
+			flow_control_service->flow_control_node_state[i].flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED;
+		}
+	}
+} 
+/*
+ * External API
+ */
+unsigned int openais_flow_control_initialize (void)
+{
+	unsigned int res;
+
+	log_init ("FLOW");
+
+	res = totempg_groups_initialize (
+		&flow_control_handle,
+		flow_control_deliver_fn,
+		flow_control_confchg_fn);
+
+	if (res == -1) {
+		log_printf (LOG_LEVEL_ERROR,
+			"Couldn't initialize flow control interface.\n");
+		return (-1);
+	}
+	res = totempg_groups_join (
+		flow_control_handle,
+		&flow_control_group,
+		1);
+
+	if (res == -1) {
+		log_printf (LOG_LEVEL_ERROR, "Couldn't join flow control group.\n");
+		return (-1);
+	}
+
+	return (0);
+}
+
+unsigned int openais_flow_control_ipc_init (
+	unsigned int *flow_control_handle,
+	unsigned int service)
+{
+	struct flow_control_instance *instance;
+	unsigned int res;
+
+	res = hdb_handle_create (&flow_control_hdb,
+		sizeof (struct flow_control_instance), flow_control_handle);
+	if (res != 0) {
+		goto error_exit;
+	}
+	res = hdb_handle_get (&flow_control_hdb, *flow_control_handle,
+		(void *)&instance);
+	if (res != 0) {
+		goto error_destroy;
+	}
+	instance->service = service;
+
+	list_init (&instance->list_head);
+
+	return (0);
+
+error_destroy:
+	hdb_handle_destroy (&flow_control_hdb, *flow_control_handle);
+error_exit:
+	return (-1);
+
+}
+
+unsigned int openais_flow_control_ipc_exit (
+	unsigned int flow_control_handle)
+{
+	hdb_handle_destroy (&flow_control_hdb, flow_control_handle);
+	return (0);
+}
+
+unsigned int openais_flow_control_create (
+	unsigned int flow_control_handle,
+	unsigned int service,
+	void *id,
+	unsigned int id_len,
+	void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state flow_control_state),
+	void *context)
+{
+	struct flow_control_service *flow_control_service;
+	struct flow_control_instance *instance;
+	unsigned int res;
+	unsigned int i;
+
+	res = hdb_handle_get (&flow_control_hdb, flow_control_handle,
+		(void *)&instance);
+	if (res != 0) {
+		goto error_exit;
+	}
+
+	flow_control_service = malloc (sizeof (struct flow_control_service));
+	if (flow_control_service == NULL) {
+		goto error_put;
+	}
+
+	/*
+	 * Add new service to flow control system
+	 */
+	memset (flow_control_service, 0, sizeof (struct flow_control_service));
+
+	flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_DISABLED;
+	flow_control_service->service = service;
+	memcpy (flow_control_service->id, id, id_len);
+	flow_control_service->id_len = id_len;
+	flow_control_service->flow_control_state_set_fn = flow_control_state_set_fn;
+	flow_control_service->context = context;
+
+	list_init (&flow_control_service->list);
+	list_add_tail (&instance->list_head,
+		&flow_control_service->list);
+
+	list_init (&flow_control_service->list_all);
+	list_add_tail (&flow_control_service_list_head,
+		&flow_control_service->list_all);
+
+	for (i = 0; i < flow_control_member_list_entries; i++) {
+		flow_control_service->flow_control_node_state[i].nodeid = flow_control_member_list[i];
+		flow_control_service->processor_count = flow_control_member_list_entries;
+	}
+error_put:
+	hdb_handle_put (&flow_control_hdb, flow_control_handle);
+
+error_exit:
+	return (res);
+}
+
+unsigned int openais_flow_control_destroy (
+	unsigned int flow_control_identifier,
+	unsigned int service,
+	unsigned char *id,
+	unsigned int id_len)
+{
+	struct flow_control_service *flow_control_service;
+	struct flow_control_instance *instance;
+	struct list_head *list;
+	unsigned int res;
+
+	res = hdb_handle_get (&flow_control_hdb, flow_control_handle,
+		(void *)&instance);
+	if (res != 0) {
+		goto error_exit;
+	}
+
+	for (list = flow_control_service_list_head.next;
+		list != &flow_control_service_list_head;
+		list = list->next) {
+
+		flow_control_service = list_entry (list, struct flow_control_service, list_all);
+
+		if ((flow_control_service->id_len == id_len) &&
+			(memcmp (flow_control_service->id, id, id_len) == 0)) {
+			list_del (&flow_control_service->list);
+			list_del (&flow_control_service->list_all);
+			free (flow_control_service);
+			break; /* done */
+		}
+	}
+	hdb_handle_put (&flow_control_hdb, flow_control_handle);
+
+error_exit:
+	return (res);
+}
+
+/*
+ * Disable the ability for new messages to be sent for this service
+ * with the handle id of length id_len
+ */
+unsigned int openais_flow_control_disable (
+	unsigned int flow_control_handle)
+{
+	struct flow_control_instance *instance;
+	struct flow_control_service *flow_control_service;
+	struct list_head *list;
+	unsigned int res;
+	unsigned int i;
+
+	res = hdb_handle_get (&flow_control_hdb, flow_control_handle,
+		(void *)&instance);
+	if (res != 0) {
+		goto error_exit;
+	}
+
+i = 0;
+	for (list = instance->list_head.next;
+		list != &instance->list_head;
+		list = list->next) {
+
+		flow_control_service = list_entry (list, struct flow_control_service, list);
+		flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_DISABLED;
+		flow_control_xmit (flow_control_service, OPENAIS_FLOW_CONTROL_STATE_DISABLED);
+	}
+	hdb_handle_put (&flow_control_hdb, flow_control_handle);
+
+error_exit:
+	return (res);
+}
+
+/*
+ * Enable the ability for new messagess to be sent for this service
+ * with the handle id of length id_len
+ */
+unsigned int openais_flow_control_enable (
+	unsigned int flow_control_handle)
+{
+	struct flow_control_instance *instance;
+	struct flow_control_service *flow_control_service;
+	struct list_head *list;
+	unsigned int res;
+
+	res = hdb_handle_get (&flow_control_hdb, flow_control_handle,
+		(void *)&instance);
+	if (res != 0) {
+		goto error_exit;
+	}
+
+	for (list = instance->list_head.next;
+		list != &instance->list_head;
+		list = list->next) {
+
+
+		flow_control_service = list_entry (list, struct flow_control_service, list);
+		flow_control_service->flow_control_state = OPENAIS_FLOW_CONTROL_STATE_ENABLED;
+		flow_control_xmit (flow_control_service, OPENAIS_FLOW_CONTROL_STATE_ENABLED);
+	}
+	hdb_handle_put (&flow_control_hdb, flow_control_handle);
+
+error_exit:
+	return (res);
+}

+ 73 - 0
exec/flow.h

@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2006 Red Hat, Inc.
+ *
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake@mvista.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef FLOW_H_DEFINED
+#define FLOW_H_DEFINED
+
+enum openais_flow_control_state {
+	OPENAIS_FLOW_CONTROL_STATE_DISABLED,
+	OPENAIS_FLOW_CONTROL_STATE_ENABLED
+};
+
+unsigned int openais_flow_control_initialize (void);
+
+unsigned int openais_flow_control_ipc_init (
+	unsigned int *flow_control_identifier,
+	unsigned int service);
+
+unsigned int openais_flow_control_ipc_exit (
+	unsigned int flow_control_identifier);
+
+unsigned int openais_flow_control_create (
+	unsigned int flow_control_handle,
+	unsigned int service,
+	void *id,
+	unsigned int id_len,
+	void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state flow_control_state),
+	void *context);
+
+unsigned int openais_flow_control_destroy (
+	unsigned int flow_control_identifier,
+	unsigned int service,
+	unsigned char *id,
+	unsigned int id_len);
+
+unsigned int openais_flow_control_disable (
+	unsigned int flow_control_identifier);
+
+unsigned int openais_flow_control_enable (
+	unsigned int flow_control_identifier);
+
+#endif /* FLOW_H_DEFINED */

+ 184 - 49
exec/ipc.c

@@ -1,7 +1,6 @@
 /*
 /*
  * Copyright (c) 2002-2006 MontaVista Software, Inc.
  * Copyright (c) 2002-2006 MontaVista Software, Inc.
  * Copyright (c) 2006 Red Hat, Inc.
  * Copyright (c) 2006 Red Hat, Inc.
- * Copyright (c) 2006 Sun Microsystems, Inc.
  *
  *
  * All rights reserved.
  * All rights reserved.
  *
  *
@@ -68,6 +67,7 @@
 #include "totemconfig.h"
 #include "totemconfig.h"
 #include "main.h"
 #include "main.h"
 #include "ipc.h"
 #include "ipc.h"
+#include "flow.h"
 #include "service.h"
 #include "service.h"
 #include "sync.h"
 #include "sync.h"
 #include "swab.h"
 #include "swab.h"
@@ -79,16 +79,27 @@
 
 
 #include "util.h"
 #include "util.h"
 
 
-#ifdef OPENAIS_SOLARIS
-#define MSG_NOSIGNAL 0
-#endif
-
 #define SERVER_BACKLOG 5
 #define SERVER_BACKLOG 5
 
 
+/*
+ * When there are this many entries left in a queue, turn on flow control
+ */
+#define FLOW_CONTROL_ENTRIES_ENABLE 400
+
+/*
+ * When there are this many entries in a queue, turn off flow control
+ */
+#define FLOW_CONTROL_ENTRIES_DISABLE 64
+
+
 static unsigned int g_gid_valid = 0;
 static unsigned int g_gid_valid = 0;
 
 
 static struct totem_ip_address *my_ip;
 static struct totem_ip_address *my_ip;
 
 
+static totempg_groups_handle ipc_handle;
+
+DECLARE_LIST_INIT (conn_info_list_head);
+
 static void (*ipc_serialize_lock_fn) (void);
 static void (*ipc_serialize_lock_fn) (void);
 
 
 static void (*ipc_serialize_unlock_fn) (void);
 static void (*ipc_serialize_unlock_fn) (void);
@@ -122,16 +133,22 @@ struct conn_info {
 	int authenticated;	/* Is this connection authenticated? */
 	int authenticated;	/* Is this connection authenticated? */
 	void *private_data;	/* library connection private data */
 	void *private_data;	/* library connection private data */
 	struct conn_info *conn_info_partner;	/* partner connection dispatch<->response */
 	struct conn_info *conn_info_partner;	/* partner connection dispatch<->response */
+	unsigned int flow_control_handle;	/* flow control identifier */
+	unsigned int flow_control_enabled;	/* flow control enabled bit */
+	unsigned int flow_control_local_count;	/* flow control local count */
+	enum openais_flow_control flow_control;	/* Does this service use IPC flow control */
+	pthread_mutex_t flow_control_mutex;
         int (*lib_exit_fn) (void *conn);
         int (*lib_exit_fn) (void *conn);
 	struct timerlist timerlist;
 	struct timerlist timerlist;
 	pthread_mutex_t mutex;
 	pthread_mutex_t mutex;
 	pthread_mutex_t *shared_mutex;
 	pthread_mutex_t *shared_mutex;
-
+	struct list_head list;
 };
 };
 
 
 static void *prioritized_poll_thread (void *conn);
 static void *prioritized_poll_thread (void *conn);
 static int conn_info_outq_flush (struct conn_info *conn_info);
 static int conn_info_outq_flush (struct conn_info *conn_info);
 static void libais_deliver (struct conn_info *conn_info);
 static void libais_deliver (struct conn_info *conn_info);
+static void ipc_flow_control (struct conn_info *conn_info);
 
 
  /*
  /*
   * IPC Initializers
   * IPC Initializers
@@ -250,6 +267,15 @@ static int dispatch_init_send_response (
 	conn_info->conn_info_partner->state = CONN_STATE_ACTIVE;
 	conn_info->conn_info_partner->state = CONN_STATE_ACTIVE;
 	conn_info->lib_exit_fn = ais_service[conn_info->service]->lib_exit_fn;
 	conn_info->lib_exit_fn = ais_service[conn_info->service]->lib_exit_fn;
 	ais_service[conn_info->service]->lib_init_fn (conn_info);
 	ais_service[conn_info->service]->lib_init_fn (conn_info);
+
+	conn_info->flow_control = ais_service[conn_info->service]->flow_control;
+	conn_info->conn_info_partner->flow_control = ais_service[conn_info->service]->flow_control;
+	if (ais_service[conn_info->service]->flow_control == OPENAIS_FLOW_CONTROL_REQUIRED) {
+		openais_flow_control_ipc_init (
+			&conn_info->flow_control_handle,
+			conn_info->service);
+
+	}
 	return (0);
 	return (0);
 }
 }
 
 
@@ -288,6 +314,7 @@ static inline unsigned int conn_info_create (int fd) {
 	}
 	}
 
 
 	pthread_mutex_init (&conn_info->mutex, NULL);
 	pthread_mutex_init (&conn_info->mutex, NULL);
+	pthread_mutex_init (&conn_info->flow_control_mutex, NULL);
 	pthread_mutex_init (conn_info->shared_mutex, NULL);
 	pthread_mutex_init (conn_info->shared_mutex, NULL);
 
 
 	conn_info->state = CONN_STATE_ACTIVE;
 	conn_info->state = CONN_STATE_ACTIVE;
@@ -295,6 +322,9 @@ static inline unsigned int conn_info_create (int fd) {
 	conn_info->events = POLLIN|POLLNVAL;
 	conn_info->events = POLLIN|POLLNVAL;
 	conn_info->service = SOCKET_SERVICE_INIT;
 	conn_info->service = SOCKET_SERVICE_INIT;
 
 
+	list_init (&conn_info->list);
+	list_add (&conn_info_list_head, &conn_info->list);
+
 	pthread_attr_init (&conn_info->thread_attr);
 	pthread_attr_init (&conn_info->thread_attr);
 	pthread_attr_setstacksize (&conn_info->thread_attr, 200000);
 	pthread_attr_setstacksize (&conn_info->thread_attr, 200000);
 	pthread_attr_setdetachstate (&conn_info->thread_attr, PTHREAD_CREATE_DETACHED);
 	pthread_attr_setdetachstate (&conn_info->thread_attr, PTHREAD_CREATE_DETACHED);
@@ -321,6 +351,7 @@ static void conn_info_destroy (struct conn_info *conn_info)
 	if (conn_info->conn_info_partner) {
 	if (conn_info->conn_info_partner) {
 		conn_info->conn_info_partner->conn_info_partner = NULL;
 		conn_info->conn_info_partner->conn_info_partner = NULL;
 	}
 	}
+	list_del (&conn_info->list);
 	free (conn_info);
 	free (conn_info);
 }
 }
 
 
@@ -377,6 +408,9 @@ static int libais_disconnect (struct conn_info *conn_info)
 	}
 	}
 	conn_info->state = CONN_STATE_DISCONNECTED;
 	conn_info->state = CONN_STATE_DISCONNECTED;
 	conn_info->conn_info_partner->state = CONN_STATE_DISCONNECTED;
 	conn_info->conn_info_partner->state = CONN_STATE_DISCONNECTED;
+	if (conn_info->flow_control_enabled == 1) {
+		openais_flow_control_disable (conn_info->flow_control_handle);
+	}
 	return (0);
 	return (0);
 }
 }
 
 
@@ -410,17 +444,14 @@ static void *prioritized_poll_thread (void *conn)
 	struct conn_info *conn_info = (struct conn_info *)conn;
 	struct conn_info *conn_info = (struct conn_info *)conn;
 	struct pollfd ufd;
 	struct pollfd ufd;
 	int fds;
 	int fds;
+	struct sched_param sched_param;
 	int res;
 	int res;
 	pthread_mutex_t *rel_mutex;
 	pthread_mutex_t *rel_mutex;
 	unsigned int service;
 	unsigned int service;
 	struct conn_info *cinfo_partner;
 	struct conn_info *cinfo_partner;
 
 
-#if ! defined(TS_CLASS) && (defined(OPENAIS_BSD) || defined(OPENAIS_LINUX) || defined(OPENAIS_SOLARIS))
-	struct sched_param sched_param;
-
 	sched_param.sched_priority = 1;
 	sched_param.sched_priority = 1;
 	res = pthread_setschedparam (conn_info->thread, SCHED_RR, &sched_param);
 	res = pthread_setschedparam (conn_info->thread, SCHED_RR, &sched_param);
-#endif
 
 
 	ufd.fd = conn_info->fd;
 	ufd.fd = conn_info->fd;
 	for (;;) {
 	for (;;) {
@@ -495,6 +526,9 @@ retry_poll:
 			if ((ufd.revents & POLLIN) == POLLIN) {
 			if ((ufd.revents & POLLIN) == POLLIN) {
 				libais_deliver (conn_info);
 				libais_deliver (conn_info);
 			}
 			}
+
+			ipc_flow_control (conn_info);
+
 		}
 		}
 
 
 		ipc_serialize_unlock_fn ();
 		ipc_serialize_unlock_fn ();
@@ -507,20 +541,55 @@ retry_poll:
 	return (0);
 	return (0);
 }
 }
 
 
-#if defined(OPENAIS_LINUX) || defined(OPENAIS_SOLARIS)
+#if defined(OPENAIS_LINUX)
 /* SUN_LEN is broken for abstract namespace
 /* SUN_LEN is broken for abstract namespace
  */
  */
 #define AIS_SUN_LEN(a) sizeof(*(a))
 #define AIS_SUN_LEN(a) sizeof(*(a))
-#else
-#define AIS_SUN_LEN(a) SUN_LEN(a)
-#endif
- 
-#if defined(OPENAIS_LINUX)
+
 char *socketname = "libais.socket";
 char *socketname = "libais.socket";
 #else
 #else
+#define AIS_SUN_LEN(a) SUN_LEN(a)
+
 char *socketname = "/var/run/libais.socket";
 char *socketname = "/var/run/libais.socket";
 #endif
 #endif
 
 
+
+static void ipc_flow_control (struct conn_info *conn_info)
+{
+	unsigned int entries_used;
+	unsigned int entries_usedhw;
+
+	entries_used = queue_used (&conn_info->outq);
+	if (conn_info->flow_control_local_count > entries_used) {
+		entries_used = conn_info->flow_control_local_count;
+	}
+	/*
+	 * IPC group-wide flow control
+	 */
+	if (conn_info->flow_control == OPENAIS_FLOW_CONTROL_REQUIRED) {
+		if (conn_info->flow_control_enabled == 0 &&
+			((entries_used + FLOW_CONTROL_ENTRIES_ENABLE) > SIZEQUEUE)) {
+
+			entries_usedhw = queue_usedhw (&conn_info->outq);
+			log_printf (LOG_LEVEL_NOTICE, "Enabling flow control - HW mark %d of %d %p.\n", entries_usedhw, SIZEQUEUE, &conn_info->outq);
+			openais_flow_control_enable (conn_info->flow_control_handle);
+			conn_info->flow_control_enabled = 1;
+			conn_info->conn_info_partner->flow_control_enabled = 1;
+		}
+		if (conn_info->flow_control_enabled == 1 &&
+
+			entries_used <= FLOW_CONTROL_ENTRIES_DISABLE) {
+			entries_usedhw = queue_usedhw (&conn_info->outq);
+
+			log_printf (LOG_LEVEL_NOTICE, "Disabling flow control - HW mark [%d/%d].\n",
+				entries_usedhw, SIZEQUEUE);
+			openais_flow_control_disable (conn_info->flow_control_handle);
+			conn_info->flow_control_enabled = 0;
+			conn_info->conn_info_partner->flow_control_enabled = 0;
+		}
+	}
+}
+
 static int conn_info_outq_flush (struct conn_info *conn_info) {
 static int conn_info_outq_flush (struct conn_info *conn_info) {
 	struct queue *outq;
 	struct queue *outq;
 	int res = 0;
 	int res = 0;
@@ -538,14 +607,9 @@ static int conn_info_outq_flush (struct conn_info *conn_info) {
 	msg_send.msg_name = 0;
 	msg_send.msg_name = 0;
 	msg_send.msg_namelen = 0;
 	msg_send.msg_namelen = 0;
 	msg_send.msg_iovlen = 1;
 	msg_send.msg_iovlen = 1;
-#ifndef OPENAIS_SOLARIS
 	msg_send.msg_control = 0;
 	msg_send.msg_control = 0;
 	msg_send.msg_controllen = 0;
 	msg_send.msg_controllen = 0;
 	msg_send.msg_flags = 0;
 	msg_send.msg_flags = 0;
-#else
-	msg_send.msg_accrights = NULL;
-	msg_send.msg_accrightslen = 0;
-#endif
 
 
 	while (!queue_is_empty (outq)) {
 	while (!queue_is_empty (outq)) {
 		queue_item = queue_item_get (outq);
 		queue_item = queue_item_get (outq);
@@ -588,6 +652,7 @@ retry_sendmsg:
 	if (queue_is_empty (outq)) {
 	if (queue_is_empty (outq)) {
 		conn_info->events = POLLIN|POLLNVAL;
 		conn_info->events = POLLIN|POLLNVAL;
 	}
 	}
+
 	return (0);
 	return (0);
 }
 }
 
 
@@ -610,8 +675,6 @@ static void libais_deliver (struct conn_info *conn_info)
 	char cmsg_cred[CMSG_SPACE (sizeof (struct ucred))];
 	char cmsg_cred[CMSG_SPACE (sizeof (struct ucred))];
 	struct ucred *cred;
 	struct ucred *cred;
 	int on = 0;
 	int on = 0;
-#elif defined(OPENAIS_SOLARIS)
-	int fd;
 #else
 #else
 	uid_t euid;
 	uid_t euid;
 	gid_t egid;
 	gid_t egid;
@@ -625,25 +688,15 @@ static void libais_deliver (struct conn_info *conn_info)
 	msg_recv.msg_iovlen = 1;
 	msg_recv.msg_iovlen = 1;
 	msg_recv.msg_name = 0;
 	msg_recv.msg_name = 0;
 	msg_recv.msg_namelen = 0;
 	msg_recv.msg_namelen = 0;
-#ifndef OPENAIS_SOLARIS
 	msg_recv.msg_flags = 0;
 	msg_recv.msg_flags = 0;
-#endif
 
 
 	if (conn_info->authenticated) {
 	if (conn_info->authenticated) {
-#ifndef OPENAIS_SOLARIS
 		msg_recv.msg_control = 0;
 		msg_recv.msg_control = 0;
 		msg_recv.msg_controllen = 0;
 		msg_recv.msg_controllen = 0;
-#else
-		msg_recv.msg_accrights = NULL;
-		msg_recv.msg_accrightslen = 0;
-#endif
 	} else {
 	} else {
 #ifdef OPENAIS_LINUX
 #ifdef OPENAIS_LINUX
 		msg_recv.msg_control = (void *)cmsg_cred;
 		msg_recv.msg_control = (void *)cmsg_cred;
 		msg_recv.msg_controllen = sizeof (cmsg_cred);
 		msg_recv.msg_controllen = sizeof (cmsg_cred);
-#elif defined(OPENAIS_SOLARIS)
-		msg_recv.msg_accrights = (char *)&fd;
-		msg_recv.msg_accrightslen = sizeof (fd);
 #else
 #else
 		euid = -1; egid = -1;
 		euid = -1; egid = -1;
 		if (getpeereid(conn_info->fd, &euid, &egid) != -1 &&
 		if (getpeereid(conn_info->fd, &euid, &egid) != -1 &&
@@ -658,7 +711,9 @@ static void libais_deliver (struct conn_info *conn_info)
 
 
 	iov_recv.iov_base = &conn_info->inb[conn_info->inb_start];
 	iov_recv.iov_base = &conn_info->inb[conn_info->inb_start];
 	iov_recv.iov_len = (SIZEINB) - conn_info->inb_start;
 	iov_recv.iov_len = (SIZEINB) - conn_info->inb_start;
-	assert (iov_recv.iov_len != 0);
+	if (conn_info->inb_inuse == SIZEINB) {
+		return;
+	}
 
 
 retry_recv:
 retry_recv:
 	res = recvmsg (conn_info->fd, &msg_recv, MSG_NOSIGNAL);
 	res = recvmsg (conn_info->fd, &msg_recv, MSG_NOSIGNAL);
@@ -669,12 +724,6 @@ retry_recv:
 		return;
 		return;
 	} else
 	} else
 	if (res == 0) {
 	if (res == 0) {
-#if defined(OPENAIS_SOLARIS) || defined(OPENAIS_BSD) || defined(OPENAIS_DARWIN)
-		/* On many OS poll never return POLLHUP or POLLERR.
-		 * EOF is detected when recvmsg return 0.
-		 */
-		libais_disconnect_request (conn_info);
-#endif
 		return;
 		return;
 	}
 	}
 
 
@@ -696,9 +745,6 @@ retry_recv:
 			log_printf (LOG_LEVEL_SECURITY, "Connection not authenticated because gid is %d, expecting %d\n", cred->gid, g_gid_valid);
 			log_printf (LOG_LEVEL_SECURITY, "Connection not authenticated because gid is %d, expecting %d\n", cred->gid, g_gid_valid);
 		}
 		}
 	}
 	}
-#elif defined(OPENAIS_SOLARIS)
-	/* TODO Fix this. There is no authentication on Solaris yet. */
-	conn_info->authenticated = 1;
 #endif
 #endif
 	/*
 	/*
 	 * Dispatch all messages received in recvmsg that can be dispatched
 	 * Dispatch all messages received in recvmsg that can be dispatched
@@ -737,7 +783,7 @@ retry_recv:
 			 * to queue a message, otherwise tell the library we are busy and to
 			 * to queue a message, otherwise tell the library we are busy and to
 			 * try again later
 			 * try again later
 			 */
 			 */
-			send_ok_joined_iovec.iov_base = (char *)header;
+			send_ok_joined_iovec.iov_base = header;
 			send_ok_joined_iovec.iov_len = header->size;
 			send_ok_joined_iovec.iov_len = header->size;
 			send_ok_joined = totempg_groups_send_ok_joined (openais_group_handle,
 			send_ok_joined = totempg_groups_send_ok_joined (openais_group_handle,
 				&send_ok_joined_iovec, 1);
 				&send_ok_joined_iovec, 1);
@@ -866,6 +912,29 @@ void message_source_set (
 	source->conn = conn;
 	source->conn = conn;
 }
 }
 
 
+static void ipc_confchg_fn (
+	enum totem_configuration_type configuration_type,
+	unsigned int *member_list, int member_list_entries,
+	unsigned int *left_list, int left_list_entries,
+	unsigned int *joined_list, int joined_list_entries,
+	struct memb_ring_id *ring_id)
+{
+	struct conn_info *conn_info;
+	struct list_head *list;
+
+	/*
+	 * Turn on flow control enabled flag for all connections
+	 */
+	for (list = conn_info_list_head.next;
+		list != &conn_info_list_head;
+		list = list->next) {
+
+		conn_info = list_entry (list, struct conn_info, list);
+		conn_info->flow_control_enabled = 1;
+		conn_info->conn_info_partner->flow_control_enabled = 1;
+	}
+}
+
 void openais_ipc_init (
 void openais_ipc_init (
 	void (*serialize_lock_fn) (void),
 	void (*serialize_lock_fn) (void),
 	void (*serialize_unlock_fn) (void),
 	void (*serialize_unlock_fn) (void),
@@ -928,6 +997,15 @@ void openais_ipc_init (
 	g_gid_valid = gid_valid;
 	g_gid_valid = gid_valid;
 
 
 	my_ip = my_ip_in;
 	my_ip = my_ip_in;
+
+	/*
+	 * Reset internal state of flow control when
+	 * configuration change occurs
+	 */
+	res = totempg_groups_initialize (
+		&ipc_handle,
+		NULL,
+		ipc_confchg_fn);
 }
 }
 
 
 
 
@@ -982,20 +1060,18 @@ int openais_conn_send_response (
 	if (!libais_connection_active (conn_info)) {
 	if (!libais_connection_active (conn_info)) {
 		return (-1);
 		return (-1);
 	}
 	}
+
+	ipc_flow_control (conn_info);
+
 	outq = &conn_info->outq;
 	outq = &conn_info->outq;
 
 
 	msg_send.msg_iov = &iov_send;
 	msg_send.msg_iov = &iov_send;
 	msg_send.msg_name = 0;
 	msg_send.msg_name = 0;
 	msg_send.msg_namelen = 0;
 	msg_send.msg_namelen = 0;
 	msg_send.msg_iovlen = 1;
 	msg_send.msg_iovlen = 1;
-#ifndef OPENAIS_SOLARIS
 	msg_send.msg_control = 0;
 	msg_send.msg_control = 0;
 	msg_send.msg_controllen = 0;
 	msg_send.msg_controllen = 0;
 	msg_send.msg_flags = 0;
 	msg_send.msg_flags = 0;
-#else
-	msg_send.msg_accrights = NULL;
-	msg_send.msg_accrightslen = 0;
-#endif
 
 
 	if (queue_is_full (outq)) {
 	if (queue_is_full (outq)) {
 		/*
 		/*
@@ -1135,3 +1211,62 @@ void openais_ipc_timer_del_data (
 
 
 	timerlist_del (&conn_info->timerlist, timer_handle);
 	timerlist_del (&conn_info->timerlist, timer_handle);
 }
 }
+
+void openais_ipc_flow_control_create (
+	void *conn,
+	unsigned int service,
+	char *id,
+	int id_len,
+	void (*flow_control_state_set_fn) (void *conn, enum openais_flow_control_state),
+	void *context)
+{
+	struct conn_info *conn_info = (struct conn_info *)conn;
+
+	openais_flow_control_create (
+		conn_info->flow_control_handle,
+		service,
+		id,
+		id_len,
+		flow_control_state_set_fn,
+		context);	
+	conn_info->conn_info_partner->flow_control_handle = conn_info->flow_control_handle;
+}
+
+void openais_ipc_flow_control_destroy (
+	void *conn,
+	unsigned int service,
+	unsigned char *id,
+	int id_len)
+{
+	struct conn_info *conn_info = (struct conn_info *)conn;
+
+	openais_flow_control_destroy (
+		conn_info->flow_control_handle,
+		service,
+		id,
+		id_len);
+}
+
+void openais_ipc_flow_control_local_increment (
+        void *conn)
+{
+	struct conn_info *conn_info = (struct conn_info *)conn;
+
+	pthread_mutex_lock (&conn_info->flow_control_mutex);
+
+	conn_info->flow_control_local_count++;
+
+	pthread_mutex_unlock (&conn_info->flow_control_mutex);
+}
+
+void openais_ipc_flow_control_local_decrement (
+        void *conn)
+{
+	struct conn_info *conn_info = (struct conn_info *)conn;
+
+	pthread_mutex_lock (&conn_info->flow_control_mutex);
+
+	conn_info->flow_control_local_count--;
+
+	pthread_mutex_unlock (&conn_info->flow_control_mutex);
+}

+ 21 - 0
exec/ipc.h

@@ -36,6 +36,7 @@
 #define IPC_H_DEFINED
 #define IPC_H_DEFINED
 
 
 #include "tlist.h"
 #include "tlist.h"
+#include "flow.h"
 
 
 extern void message_source_set (mar_message_source_t *source, void *conn);
 extern void message_source_set (mar_message_source_t *source, void *conn);
 
 
@@ -68,4 +69,24 @@ extern void openais_ipc_timer_del_data (
 	void *conn,
 	void *conn,
 	timer_handle timer_handle);
 	timer_handle timer_handle);
 
 
+extern void openais_ipc_flow_control_create (
+	void *conn,
+	unsigned int service,
+	char *id,
+	int id_len,
+	void (*flow_control_state_set_fn) (void *context, enum openais_flow_control_state flow_control_state_set),
+	void *context);
+	
+extern void openais_ipc_flow_control_destroy (
+	void *conn,
+	unsigned int service,
+	unsigned char *id,
+	int id_len);
+
+extern void openais_ipc_flow_control_local_increment (
+	void *conn);
+
+extern void openais_ipc_flow_control_local_decrement (
+	void *conn);
+
 #endif /* IPC_H_DEFINED */
 #endif /* IPC_H_DEFINED */

+ 1 - 0
exec/lck.c

@@ -303,6 +303,7 @@ struct openais_service_handler lck_service_handler = {
 	.name				= (unsigned char*)"openais distributed locking service B.01.01",
 	.name				= (unsigned char*)"openais distributed locking service B.01.01",
 	.id				= LCK_SERVICE,
 	.id				= LCK_SERVICE,
 	.private_data_size		= sizeof (struct lck_pd),
 	.private_data_size		= sizeof (struct lck_pd),
+	.flow_control			= OPENAIS_FLOW_CONTROL_NOT_REQUIRED, 
 	.lib_init_fn			= lck_lib_init_fn,
 	.lib_init_fn			= lck_lib_init_fn,
 	.lib_exit_fn			= lck_lib_exit_fn,
 	.lib_exit_fn			= lck_lib_exit_fn,
 	.lib_service			= lck_lib_service,
 	.lib_service			= lck_lib_service,

+ 4 - 0
exec/main.c

@@ -76,6 +76,7 @@
 #include "timer.h"
 #include "timer.h"
 #include "print.h"
 #include "print.h"
 #include "util.h"
 #include "util.h"
+#include "flow.h"
 #include "version.h"
 #include "version.h"
 
 
 #define SERVER_BACKLOG 5
 #define SERVER_BACKLOG 5
@@ -575,6 +576,9 @@ int main (int argc, char **argv)
 	sync_register (openais_sync_callbacks_retrieve, openais_sync_completed,
 	sync_register (openais_sync_callbacks_retrieve, openais_sync_completed,
 		totem_config.vsf_type);
 		totem_config.vsf_type);
 
 
+
+	res = openais_flow_control_initialize ();
+
 	/*
 	/*
 	 * Drop root privleges to user 'ais'
 	 * Drop root privleges to user 'ais'
 	 * TODO: Don't really need full root capabilities;
 	 * TODO: Don't really need full root capabilities;

+ 1 - 1
exec/main.h

@@ -46,7 +46,7 @@
  * Size of the queue (entries) for I/O's to the API over socket IPC.
  * Size of the queue (entries) for I/O's to the API over socket IPC.
  */
  */
 
 
-#define SIZEQUEUE 256
+#define SIZEQUEUE 800
 
 
 #define SOCKET_SERVICE_INIT 254
 #define SOCKET_SERVICE_INIT 254
 
 

+ 1 - 0
exec/msg.c

@@ -436,6 +436,7 @@ struct openais_service_handler msg_service_handler = {
 	.name				= (unsigned char *)"openais message service B.01.01",
 	.name				= (unsigned char *)"openais message service B.01.01",
 	.id				= MSG_SERVICE,
 	.id				= MSG_SERVICE,
 	.private_data_size		= sizeof (struct msg_pd),
 	.private_data_size		= sizeof (struct msg_pd),
+	.flow_control			= OPENAIS_FLOW_CONTROL_NOT_REQUIRED, 
 	.lib_init_fn			= msg_lib_init_fn,
 	.lib_init_fn			= msg_lib_init_fn,
 	.lib_exit_fn			= msg_lib_exit_fn,
 	.lib_exit_fn			= msg_lib_exit_fn,
 	.lib_service			= msg_lib_service,
 	.lib_service			= msg_lib_service,

+ 1 - 0
exec/service.h

@@ -65,6 +65,7 @@ struct openais_service_handler {
 	unsigned char *name;
 	unsigned char *name;
 	unsigned short id;
 	unsigned short id;
 	unsigned int private_data_size;
 	unsigned int private_data_size;
+	enum openais_flow_control flow_control;
 	int (*lib_init_fn) (void *conn);
 	int (*lib_init_fn) (void *conn);
 	int (*lib_exit_fn) (void *conn);
 	int (*lib_exit_fn) (void *conn);
 	struct openais_lib_handler *lib_service;
 	struct openais_lib_handler *lib_service;

+ 9 - 1
include/cpg.h

@@ -56,6 +56,11 @@ typedef enum {
 	CPG_TYPE_SAFE		/* not implemented */
 	CPG_TYPE_SAFE		/* not implemented */
 } cpg_guarantee_t;
 } cpg_guarantee_t;
 
 
+typedef enum {
+	CPG_FLOW_CONTROL_DISABLED,	/* flow control is disabled - new messages may be sent */
+	CPG_FLOW_CONTROL_ENABLED	/* flow control is enabled - new messages should not be sent */
+} cpg_flow_control_state_t;
+
 typedef enum {
 typedef enum {
 	CPG_OK = 1,
 	CPG_OK = 1,
 	CPG_ERR_LIBRARY = 2,
 	CPG_ERR_LIBRARY = 2,
@@ -102,7 +107,6 @@ typedef void (*cpg_deliver_fn_t) (
 	void *msg,
 	void *msg,
 	int msg_len);
 	int msg_len);
 
 
-
 typedef void (*cpg_confchg_fn_t) (
 typedef void (*cpg_confchg_fn_t) (
 	cpg_handle_t handle,
 	cpg_handle_t handle,
 	struct cpg_name *group_name,
 	struct cpg_name *group_name,
@@ -183,4 +187,8 @@ cpg_error_t cpg_membership_get (
 	struct cpg_address *member_list,
 	struct cpg_address *member_list,
 	int *member_list_entries);
 	int *member_list_entries);
 
 
+cpg_error_t cpg_flow_control_state_get (
+	cpg_handle_t handle,
+	cpg_flow_control_state_t *flow_control_enabled);
+
 #endif /* OPENAIS_CPG_H_DEFINED */
 #endif /* OPENAIS_CPG_H_DEFINED */

+ 1 - 1
include/hdb.h

@@ -1,5 +1,6 @@
 /*
 /*
  * Copyright (c) 2002-2006 MontaVista Software, Inc.
  * Copyright (c) 2002-2006 MontaVista Software, Inc.
+ * Copyright (c) 2006 Red Hat, Inc.
  * Copyright (c) 2006 Sun Microsystems, Inc.
  * Copyright (c) 2006 Sun Microsystems, Inc.
  *
  *
  * All rights reserved.
  * All rights reserved.
@@ -197,7 +198,6 @@ static inline int hdb_iterator_next (
 			handle_database,
 			handle_database,
 			handle_database->iterator,
 			handle_database->iterator,
 			instance);
 			instance);
-		
 
 
 		handle_database->iterator += 1;
 		handle_database->iterator += 1;
 		if (res == 0) {
 		if (res == 0) {

+ 8 - 2
include/ipc_cpg.h

@@ -56,7 +56,8 @@ enum res_cpg_types {
 	MESSAGE_RES_CPG_CONFCHG_CALLBACK = 4,
 	MESSAGE_RES_CPG_CONFCHG_CALLBACK = 4,
 	MESSAGE_RES_CPG_DELIVER_CALLBACK = 5,
 	MESSAGE_RES_CPG_DELIVER_CALLBACK = 5,
 	MESSAGE_RES_CPG_TRACKSTART = 6,
 	MESSAGE_RES_CPG_TRACKSTART = 6,
-	MESSAGE_RES_CPG_TRACKSTOP = 7
+	MESSAGE_RES_CPG_TRACKSTOP = 7,
+	MESSAGE_RES_CPG_FLOW_CONTROL_STATE_SET = 8
 };
 };
 
 
 enum lib_cpg_confchg_reason {
 enum lib_cpg_confchg_reason {
@@ -104,6 +105,11 @@ struct req_lib_cpg_mcast {
 	mar_uint8_t message[] __attribute__((aligned(8)));
 	mar_uint8_t message[] __attribute__((aligned(8)));
 };
 };
 
 
+struct res_lib_cpg_mcast {
+	mar_res_header_t header __attribute__((aligned(8)));
+	mar_uint32_t flow_control_state __attribute__((aligned(8)));
+};
+
 /* Message from another node */
 /* Message from another node */
 struct res_lib_cpg_deliver_callback {
 struct res_lib_cpg_deliver_callback {
 	mar_res_header_t header __attribute__((aligned(8)));
 	mar_res_header_t header __attribute__((aligned(8)));
@@ -111,6 +117,7 @@ struct res_lib_cpg_deliver_callback {
 	mar_uint32_t msglen __attribute__((aligned(8)));
 	mar_uint32_t msglen __attribute__((aligned(8)));
 	mar_uint32_t nodeid __attribute__((aligned(8)));
 	mar_uint32_t nodeid __attribute__((aligned(8)));
 	mar_uint32_t pid __attribute__((aligned(8)));
 	mar_uint32_t pid __attribute__((aligned(8)));
+	mar_uint32_t flow_control_state __attribute__((aligned(8)));
 	mar_uint8_t message[] __attribute__((aligned(8)));
 	mar_uint8_t message[] __attribute__((aligned(8)));
 };
 };
 
 
@@ -140,5 +147,4 @@ struct res_lib_cpg_leave {
 	mar_res_header_t header __attribute__((aligned(8)));
 	mar_res_header_t header __attribute__((aligned(8)));
 };
 };
 
 
-
 #endif /* IPC_CPG_H_DEFINED */
 #endif /* IPC_CPG_H_DEFINED */

+ 11 - 7
include/queue.h

@@ -1,6 +1,5 @@
 /*
 /*
  * Copyright (c) 2002-2004 MontaVista Software, Inc.
  * Copyright (c) 2002-2004 MontaVista Software, Inc.
- * Copyright (c) 2006 Sun Microsystems, Inc.
  *
  *
  * All rights reserved.
  * All rights reserved.
  *
  *
@@ -39,12 +38,7 @@
 #include <pthread.h>
 #include <pthread.h>
 #include "assert.h"
 #include "assert.h"
 
 
-#ifndef OPENAIS_SOLARIS
 struct queue {
 struct queue {
-#else
-struct _queue {
-#define	queue _queue
-#endif
 	int head;
 	int head;
 	int tail;
 	int tail;
 	int used;
 	int used;
@@ -103,7 +97,7 @@ static inline int queue_is_empty (struct queue *queue) {
 	int empty;
 	int empty;
 
 
 	pthread_mutex_lock (&queue->mutex);
 	pthread_mutex_lock (&queue->mutex);
-	empty = queue->used == 0;
+	empty = (queue->used == 0);
 	pthread_mutex_unlock (&queue->mutex);
 	pthread_mutex_unlock (&queue->mutex);
 	return (empty);
 	return (empty);
 }
 }
@@ -219,4 +213,14 @@ static inline int queue_used (struct queue *queue) {
 	return (used);
 	return (used);
 }
 }
 
 
+static inline int queue_usedhw (struct queue *queue) {
+	int usedhw;
+
+	pthread_mutex_lock (&queue->mutex);
+	usedhw = queue->usedhw;
+	pthread_mutex_unlock (&queue->mutex);
+
+	return (usedhw);
+}
+
 #endif /* QUEUE_H_DEFINED */
 #endif /* QUEUE_H_DEFINED */

+ 32 - 10
lib/cpg.c

@@ -2,7 +2,6 @@
  * vi: set autoindent tabstop=4 shiftwidth=4 :
  * vi: set autoindent tabstop=4 shiftwidth=4 :
  *
  *
  * Copyright (c) 2006 Red Hat, Inc.
  * Copyright (c) 2006 Red Hat, Inc.
- * Copyright (c) 2006 Sun Microsystems, Inc.
  *
  *
  * All rights reserved.
  * All rights reserved.
  *
  *
@@ -56,6 +55,7 @@ struct cpg_inst {
 	int response_fd;
 	int response_fd;
 	int dispatch_fd;
 	int dispatch_fd;
 	int finalize;
 	int finalize;
+	cpg_flow_control_state_t flow_control_state;
 	cpg_callbacks_t callbacks;
 	cpg_callbacks_t callbacks;
 	pthread_mutex_t response_mutex;
 	pthread_mutex_t response_mutex;
 	pthread_mutex_t dispatch_mutex;
 	pthread_mutex_t dispatch_mutex;
@@ -307,6 +307,7 @@ cpg_error_t cpg_dispatch (
 		case MESSAGE_RES_CPG_DELIVER_CALLBACK:
 		case MESSAGE_RES_CPG_DELIVER_CALLBACK:
 			res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)&dispatch_data;
 			res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)&dispatch_data;
 
 
+			cpg_inst->flow_control_state = res_cpg_deliver_callback->flow_control_state;
 			marshall_from_mar_cpg_name_t (
 			marshall_from_mar_cpg_name_t (
 				&group_name,
 				&group_name,
 				&res_cpg_deliver_callback->group_name);
 				&res_cpg_deliver_callback->group_name);
@@ -353,7 +354,6 @@ cpg_error_t cpg_dispatch (
 				res_cpg_confchg_callback->joined_list_entries);
 				res_cpg_confchg_callback->joined_list_entries);
 			break;
 			break;
 
 
-
 		default:
 		default:
 			error = SA_AIS_ERR_LIBRARY;
 			error = SA_AIS_ERR_LIBRARY;
 			goto error_nounlock;
 			goto error_nounlock;
@@ -412,7 +412,7 @@ cpg_error_t cpg_join (
 	marshall_to_mar_cpg_name_t (&req_lib_cpg_trackstart.group_name,
 	marshall_to_mar_cpg_name_t (&req_lib_cpg_trackstart.group_name,
 		group);
 		group);
 
 
-	iov[0].iov_base = (char *)&req_lib_cpg_trackstart;
+	iov[0].iov_base = &req_lib_cpg_trackstart;
 	iov[0].iov_len = sizeof (struct req_lib_cpg_trackstart);
 	iov[0].iov_len = sizeof (struct req_lib_cpg_trackstart);
 
 
 	error = saSendMsgReceiveReply (cpg_inst->dispatch_fd, iov, 1,
 	error = saSendMsgReceiveReply (cpg_inst->dispatch_fd, iov, 1,
@@ -430,7 +430,7 @@ cpg_error_t cpg_join (
 	marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
 	marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
 		group);
 		group);
 
 
-	iov[0].iov_base = (char *)&req_lib_cpg_join;
+	iov[0].iov_base = &req_lib_cpg_join;
 	iov[0].iov_len = sizeof (struct req_lib_cpg_join);
 	iov[0].iov_len = sizeof (struct req_lib_cpg_join);
 
 
 	error = saSendMsgReceiveReply (cpg_inst->response_fd, iov, 1,
 	error = saSendMsgReceiveReply (cpg_inst->response_fd, iov, 1,
@@ -471,7 +471,7 @@ cpg_error_t cpg_leave (
 	marshall_to_mar_cpg_name_t (&req_lib_cpg_leave.group_name,
 	marshall_to_mar_cpg_name_t (&req_lib_cpg_leave.group_name,
 		group);
 		group);
 
 
-	iov[0].iov_base = (char *)&req_lib_cpg_leave;
+	iov[0].iov_base = &req_lib_cpg_leave;
 	iov[0].iov_len = sizeof (struct req_lib_cpg_leave);
 	iov[0].iov_len = sizeof (struct req_lib_cpg_leave);
 
 
 	pthread_mutex_lock (&cpg_inst->response_mutex);
 	pthread_mutex_lock (&cpg_inst->response_mutex);
@@ -503,7 +503,7 @@ cpg_error_t cpg_mcast_joined (
 	struct cpg_inst *cpg_inst;
 	struct cpg_inst *cpg_inst;
 	struct iovec iov[64];
 	struct iovec iov[64];
 	struct req_lib_cpg_mcast req_lib_cpg_mcast;
 	struct req_lib_cpg_mcast req_lib_cpg_mcast;
-	mar_res_header_t res_lib_cpg_mcast;
+	struct res_lib_cpg_mcast res_lib_cpg_mcast;
 	int msg_len = 0;
 	int msg_len = 0;
 
 
 	error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
 	error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
@@ -522,14 +522,14 @@ cpg_error_t cpg_mcast_joined (
 	req_lib_cpg_mcast.guarantee = guarantee;
 	req_lib_cpg_mcast.guarantee = guarantee;
 	req_lib_cpg_mcast.msglen = msg_len;
 	req_lib_cpg_mcast.msglen = msg_len;
 
 
-	iov[0].iov_base = (char *)&req_lib_cpg_mcast;
+	iov[0].iov_base = &req_lib_cpg_mcast;
 	iov[0].iov_len = sizeof (struct req_lib_cpg_mcast);
 	iov[0].iov_len = sizeof (struct req_lib_cpg_mcast);
 	memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
 	memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
 
 
 	pthread_mutex_lock (&cpg_inst->response_mutex);
 	pthread_mutex_lock (&cpg_inst->response_mutex);
 
 
 	error = saSendMsgReceiveReply (cpg_inst->response_fd, iov, iov_len + 1,
 	error = saSendMsgReceiveReply (cpg_inst->response_fd, iov, iov_len + 1,
-		&res_lib_cpg_mcast, sizeof (mar_res_header_t));
+		&res_lib_cpg_mcast, sizeof (res_lib_cpg_mcast));
 
 
 	pthread_mutex_unlock (&cpg_inst->response_mutex);
 	pthread_mutex_unlock (&cpg_inst->response_mutex);
 
 
@@ -537,7 +537,11 @@ cpg_error_t cpg_mcast_joined (
 		goto error_exit;
 		goto error_exit;
 	}
 	}
 
 
-	error = res_lib_cpg_mcast.error;
+	cpg_inst->flow_control_state = res_lib_cpg_mcast.flow_control_state;
+	if (res_lib_cpg_mcast.header.error == CPG_ERR_TRY_AGAIN) {
+		cpg_inst->flow_control_state = CPG_FLOW_CONTROL_ENABLED;
+	}
+	error = res_lib_cpg_mcast.header.error;
 
 
 error_exit:
 error_exit:
 	saHandleInstancePut (&cpg_handle_t_db, handle);
 	saHandleInstancePut (&cpg_handle_t_db, handle);
@@ -568,7 +572,7 @@ cpg_error_t cpg_membership_get (
 	marshall_to_mar_cpg_name_t (&req_lib_cpg_membership_get.group_name,
 	marshall_to_mar_cpg_name_t (&req_lib_cpg_membership_get.group_name,
 		group_name);
 		group_name);
 
 
-	iov.iov_base = (char *)&req_lib_cpg_membership_get;
+	iov.iov_base = &req_lib_cpg_membership_get;
 	iov.iov_len = sizeof (mar_req_header_t);
 	iov.iov_len = sizeof (mar_req_header_t);
 
 
 	pthread_mutex_lock (&cpg_inst->response_mutex);
 	pthread_mutex_lock (&cpg_inst->response_mutex);
@@ -601,4 +605,22 @@ error_exit:
 	return (error);
 	return (error);
 }
 }
 
 
+cpg_error_t cpg_flow_control_state_get (
+	cpg_handle_t handle,
+	cpg_flow_control_state_t *flow_control_state)
+{
+	cpg_error_t error;
+	struct cpg_inst *cpg_inst;
+
+	error = saHandleInstanceGet (&cpg_handle_t_db, handle, (void *)&cpg_inst);
+	if (error != SA_AIS_OK) {
+		return (error);
+	}
+
+	*flow_control_state = cpg_inst->flow_control_state;
+
+	saHandleInstancePut (&cpg_handle_t_db, handle);
+
+	return (error);
+}
 /** @} */
 /** @} */

+ 10 - 6
test/Makefile

@@ -1,4 +1,5 @@
 # Copyright (c) 2002-2004 MontaVista Software, Inc.
 # Copyright (c) 2002-2004 MontaVista Software, Inc.
+# Copyright (c) 2006 Red Hat, Inc.
 # Copyright (c) 2006 Sun Microsystems, Inc.
 # Copyright (c) 2006 Sun Microsystems, Inc.
 # 
 # 
 # All rights reserved.
 # All rights reserved.
@@ -45,17 +46,17 @@ LDFLAGS += -L../lib
 
 
 EXTRA_CFLAGS = -I../include
 EXTRA_CFLAGS = -I../include
 TEST_SRC =  testclm.c testamf1.c \
 TEST_SRC =  testclm.c testamf1.c \
-		testamf4.c testamf5.c testamf6.c testamfth.c  \
-		testckpt.c ckptstress.c ckptbench.c  \
-		ckptbenchth.c testevt.c testevs.c evsbench.c \
-		subscription.c publish.c evtbench.c \
-		sa_error.c unlink.c testclm2.c testlck.c testmsg.c
+	testamf4.c testamf5.c testamf6.c testamfth.c  \
+	testckpt.c ckptstress.c ckptbench.c  \
+	ckptbenchth.c testevt.c testevs.c evsbench.c \
+	subscription.c publish.c evtbench.c \
+	sa_error.c unlink.c testclm2.c testlck.c testmsg.c
 
 
 all: testclm testamf1 \
 all: testclm testamf1 \
 	testckpt ckptstress ckptbench \
 	testckpt ckptstress ckptbench \
 	ckptbenchth ckpt-rd ckpt-wr testevt testevs \
 	ckptbenchth ckpt-rd ckpt-wr testevt testevs \
 	evsbench subscription publish evtbench unlink testclm2 testlck \
 	evsbench subscription publish evtbench unlink testclm2 testlck \
-	testmsg testcpg openais-cfgtool
+	testmsg testcpg cpgbench openais-cfgtool
 
 
 testtimer: testtimer.o $(LIBRARIES)
 testtimer: testtimer.o $(LIBRARIES)
 	$(CC) $(LDFLAGS) -o testtimer testtimer.o ../exec/timer.o
 	$(CC) $(LDFLAGS) -o testtimer testtimer.o ../exec/timer.o
@@ -144,6 +145,9 @@ testmsg: testmsg.o $(LIBRARIES)
 testcpg: testcpg.o $(LIBRARIES)
 testcpg: testcpg.o $(LIBRARIES)
 	$(CC) $(LDFLAGS) -o testcpg testcpg.o $(LIBS)
 	$(CC) $(LDFLAGS) -o testcpg testcpg.o $(LIBS)
 
 
+cpgbench: cpgbench.o $(LIBRARIES)
+	$(CC) $(LDFLAGS) -o cpgbench cpgbench.o $(LIBS)
+
 openais-cfgtool: openais-cfgtool.o $(LIBRARIES)
 openais-cfgtool: openais-cfgtool.o $(LIBRARIES)
 	$(CC) $(LDFLAGS) -o openais-cfgtool openais-cfgtool.o $(LIBS)
 	$(CC) $(LDFLAGS) -o openais-cfgtool openais-cfgtool.o $(LIBS)
 
 

+ 175 - 0
test/cpgbench.c

@@ -0,0 +1,175 @@
+#define _BSD_SOURCE
+/*
+ * Copyright (c) 2006 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Steven Dake (sdake@mvista.com)
+ *
+ * This software licensed under BSD license, the text of which follows:
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ *   this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ *   this list of conditions and the following disclaimer in the documentation
+ *   and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ *   contributors may be used to endorse or promote products derived from this
+ *   software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <unistd.h>
+#include <errno.h>
+#include <unistd.h>
+#include <time.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/select.h>
+#include <sys/un.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "saAis.h"
+#include "cpg.h"
+
+int alarm_notice;
+
+void cpg_bm_confchg_fn (
+	cpg_handle_t handle,
+	struct cpg_name *group_name,
+	struct cpg_address *member_list, int member_list_entries,
+	struct cpg_address *left_list, int left_list_entries,
+	struct cpg_address *joined_list, int joined_list_entries)
+{
+}
+
+unsigned int write_count;
+
+void cpg_bm_deliver_fn (
+        cpg_handle_t handle,
+        struct cpg_name *group_name,
+        uint32_t nodeid,
+        uint32_t pid,
+        void *msg,
+        int msg_len)
+{
+	write_count++;
+}
+
+cpg_callbacks_t callbacks = {
+	.cpg_deliver_fn 	= cpg_bm_deliver_fn,
+	.cpg_confchg_fn		= cpg_bm_confchg_fn
+};
+
+char data[500000];
+
+void cpg_benchmark (
+	cpg_handle_t handle,
+	int write_size)
+{
+	struct timeval tv1, tv2, tv_elapsed;
+	struct iovec iov;
+	unsigned int res;
+	cpg_flow_control_state_t flow_control_state;
+
+	alarm_notice = 0;
+	iov.iov_base = data;
+	iov.iov_len = write_size;
+
+	write_count = 0;
+	alarm (10);
+
+	gettimeofday (&tv1, NULL);
+	do {
+		/*
+		 * Test checkpoint write
+		 */
+		cpg_flow_control_state_get (handle, &flow_control_state);
+		if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
+retry:
+			res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, &iov, 1);
+			if (res == CPG_ERR_TRY_AGAIN) {
+				goto retry;
+			}
+		}
+		res = cpg_dispatch (handle, CPG_DISPATCH_ALL);
+		if (res != CPG_OK) {
+			printf ("cpg dispatch returned error %d\n", res);
+			exit (1);
+		}
+	} while (alarm_notice == 0);
+	gettimeofday (&tv2, NULL);
+	timersub (&tv2, &tv1, &tv_elapsed);
+
+	printf ("%5d messages received ", write_count);
+	printf ("%5d bytes per write ", write_size);
+	printf ("%7.3f Seconds runtime ", 
+		(tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
+	printf ("%9.3f TP/s ",
+		((float)write_count) /  (tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)));
+	printf ("%7.3f MB/s.\n", 
+		((float)write_count) * ((float)write_size) /  ((tv_elapsed.tv_sec + (tv_elapsed.tv_usec / 1000000.0)) * 1000000.0));
+}
+
+void sigalrm_handler (int num)
+{
+	alarm_notice = 1;
+}
+
+static struct cpg_name group_name = {
+	.value = "cpg_bm",
+	.length = 6
+};
+
+int main (void) {
+	cpg_handle_t handle;
+	unsigned int size = 1;
+	int i;
+	unsigned int res;
+	
+	signal (SIGALRM, sigalrm_handler);
+	res = cpg_initialize (&handle, &callbacks);
+	if (res != CPG_OK) {
+		printf ("cpg_initialize failed with result %d\n", res);
+		exit (1);
+	}
+	
+	res = cpg_join (handle, &group_name);
+	if (res != CPG_OK) {
+		printf ("cpg_join failed with result %d\n", res);
+		exit (1);
+	}
+
+	for (i = 0; i < 50; i++) { /* number of repetitions - up to 50k */
+		cpg_benchmark (handle, size);
+		size += 1000;
+	}
+
+	res = cpg_finalize (handle);
+	if (res != CPG_OK) {
+		printf ("cpg_join failed with result %d\n", res);
+		exit (1);
+	}
+	return (0);
+}