Browse Source

Add reserve/release functionality to totem to reserve message queue
space.


git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@1793 fd59a12c-fef9-0310-b244-a6a79926bd2f

Steven Dake 17 years ago
parent
commit
1f05ffd27d
8 changed files with 81 additions and 47 deletions
  1. 4 3
      exec/apidef.c
  2. 6 3
      exec/ipc.c
  3. 0 7
      exec/main.c
  4. 0 4
      exec/main.h
  5. 59 19
      exec/totempg.c
  6. 8 4
      include/corosync/engine/coroapi.h
  7. 4 1
      include/corosync/totem/totempg.h
  8. 0 6
      services/evs.c

+ 4 - 3
exec/apidef.c

@@ -86,7 +86,6 @@ static struct corosync_api_v1 apidef_corosync_api_v1 = {
 	.totem_family_get = totempg_my_family_get,
 	.totem_family_get = totempg_my_family_get,
 	.totem_ring_reenable = totempg_ring_reenable,
 	.totem_ring_reenable = totempg_ring_reenable,
 	.totem_mcast = main_mcast,
 	.totem_mcast = main_mcast,
-	.totem_send_ok = main_send_ok,
 	.totem_ifaces_get = totempg_ifaces_get,
 	.totem_ifaces_get = totempg_ifaces_get,
 	.totem_ifaces_print = totempg_ifaces_print,
 	.totem_ifaces_print = totempg_ifaces_print,
 	.totem_ip_print = totemip_print,
 	.totem_ip_print = totemip_print,
@@ -96,9 +95,11 @@ static struct corosync_api_v1 apidef_corosync_api_v1 = {
 	.tpg_join = (typedef_tpg_join)totempg_groups_join,
 	.tpg_join = (typedef_tpg_join)totempg_groups_join,
 	.tpg_leave = (typedef_tpg_leave)totempg_groups_leave,
 	.tpg_leave = (typedef_tpg_leave)totempg_groups_leave,
 	.tpg_joined_mcast = totempg_groups_mcast_joined,
 	.tpg_joined_mcast = totempg_groups_mcast_joined,
-	.tpg_joined_send_ok = totempg_groups_send_ok_joined,
+	.tpg_joined_reserve = totempg_groups_joined_reserve,
+	.tpg_joined_release = totempg_groups_joined_release,
 	.tpg_groups_mcast = (typedef_tpg_groups_mcast)totempg_groups_mcast_groups,
 	.tpg_groups_mcast = (typedef_tpg_groups_mcast)totempg_groups_mcast_groups,
-	.tpg_groups_send_ok = (typedef_tpg_groups_send_ok)totempg_groups_send_ok_groups,
+	.tpg_groups_reserve = NULL,
+	.tpg_groups_release = NULL,
 	.sync_request = sync_request,
 	.sync_request = sync_request,
 	.quorum_is_quorate = corosync_quorum_is_quorate,
 	.quorum_is_quorate = corosync_quorum_is_quorate,
 	.quorum_register_callback = corosync_quorum_register_callback,
 	.quorum_register_callback = corosync_quorum_register_callback,

+ 6 - 3
exec/ipc.c

@@ -270,7 +270,7 @@ static void *pthread_ipc_consumer (void *conn)
 	struct res_overlay res_overlay;
 	struct res_overlay res_overlay;
 	struct iovec send_ok_joined_iovec;
 	struct iovec send_ok_joined_iovec;
 	int send_ok = 0;
 	int send_ok = 0;
-	int send_ok_joined = 0;
+	int reserved_msgs = 0;
 
 
 	for (;;) {
 	for (;;) {
 		sop.sem_num = 0;
 		sop.sem_num = 0;
@@ -296,14 +296,16 @@ retry_semop:
 
 
 		send_ok_joined_iovec.iov_base = (char *)header;
 		send_ok_joined_iovec.iov_base = (char *)header;
 		send_ok_joined_iovec.iov_len = header->size;
 		send_ok_joined_iovec.iov_len = header->size;
-		send_ok_joined = totempg_groups_send_ok_joined (corosync_group_handle,
+
+		reserved_msgs = totempg_groups_joined_reserve (
+			corosync_group_handle,
 			&send_ok_joined_iovec, 1);
 			&send_ok_joined_iovec, 1);
 
 
 		send_ok =
 		send_ok =
 			(corosync_quorum_is_quorate() == 1 || ais_service[conn_info->service]->allow_inquorate == CS_LIB_ALLOW_INQUORATE) && (
 			(corosync_quorum_is_quorate() == 1 || ais_service[conn_info->service]->allow_inquorate == CS_LIB_ALLOW_INQUORATE) && (
 			(ais_service[conn_info->service]->lib_engine[header->id].flow_control == CS_LIB_FLOW_CONTROL_NOT_REQUIRED) ||
 			(ais_service[conn_info->service]->lib_engine[header->id].flow_control == CS_LIB_FLOW_CONTROL_NOT_REQUIRED) ||
 			((ais_service[conn_info->service]->lib_engine[header->id].flow_control == CS_LIB_FLOW_CONTROL_REQUIRED) &&
 			((ais_service[conn_info->service]->lib_engine[header->id].flow_control == CS_LIB_FLOW_CONTROL_REQUIRED) &&
-			(send_ok_joined) &&
+			(reserved_msgs) &&
 			(sync_in_process() == 0)));
 			(sync_in_process() == 0)));
 
 
 		if (send_ok) {
 		if (send_ok) {
@@ -321,6 +323,7 @@ retry_semop:
 				res_overlay.header.size);
 				res_overlay.header.size);
 		}
 		}
 
 
+		totempg_groups_joined_release (reserved_msgs);
 		cs_conn_refcount_dec (conn);
 		cs_conn_refcount_dec (conn);
 	}
 	}
 	pthread_exit (0);
 	pthread_exit (0);

+ 0 - 7
exec/main.c

@@ -448,13 +448,6 @@ int main_mcast (
 	return (totempg_groups_mcast_joined (corosync_group_handle, iovec, iov_len, guarantee));
 	return (totempg_groups_mcast_joined (corosync_group_handle, iovec, iov_len, guarantee));
 }
 }
 
 
-extern int main_send_ok (
-        struct iovec *iovec,
-        int iov_len)
-{
-	return (totempg_groups_send_ok_joined (corosync_group_handle, iovec, iov_len));
-}
-
 int main (int argc, char **argv)
 int main (int argc, char **argv)
 {
 {
 	char *error_string;
 	char *error_string;

+ 0 - 4
exec/main.h

@@ -62,8 +62,4 @@ extern int main_mcast (
 	int iov_len,
 	int iov_len,
 	unsigned int guarantee);
 	unsigned int guarantee);
 
 
-extern int main_send_ok (
-        struct iovec *iovec,
-        int iov_len);
-
 #endif /* MAIN_H_DEFINED */
 #endif /* MAIN_H_DEFINED */

+ 59 - 19
exec/totempg.c

@@ -145,6 +145,8 @@ static unsigned short mcast_packed_msg_lens[FRAME_SIZE_MAX];
 
 
 static int mcast_packed_msg_count = 0;
 static int mcast_packed_msg_count = 0;
 
 
+static int totempg_reserved = 0;
+
 /*
 /*
  * Function and data used to log messages
  * Function and data used to log messages
  */
  */
@@ -225,8 +227,6 @@ static struct hdb_handle_database totempg_groups_instance_database = {
 	.mutex		= PTHREAD_MUTEX_INITIALIZER
 	.mutex		= PTHREAD_MUTEX_INITIALIZER
 };
 };
 
 
-static int send_ok (int msg_size);
-
 static unsigned char next_fragment = 1;
 static unsigned char next_fragment = 1;
 
 
 static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
 static pthread_mutex_t totempg_mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -241,6 +241,10 @@ do {									\
 		__FILE__, __LINE__, level, format, ##args);		\
 		__FILE__, __LINE__, level, format, ##args);		\
 } while (0);
 } while (0);
 
 
+static int msg_count_send_ok (int msg_count);
+
+static int byte_count_send_ok (int byte_count);
+
 static struct assembly *assembly_ref (unsigned int nodeid)
 static struct assembly *assembly_ref (unsigned int nodeid)
 {
 {
 	struct assembly *assembly;
 	struct assembly *assembly;
@@ -765,7 +769,7 @@ static int mcast_msg (
 		total_size += iovec[i].iov_len;
 		total_size += iovec[i].iov_len;
 	}
 	}
 
 
-	if (send_ok (total_size + sizeof(unsigned short) *
+	if (byte_count_send_ok (total_size + sizeof(unsigned short) *
 		(mcast_packed_msg_count+1)) == 0) {
 		(mcast_packed_msg_count+1)) == 0) {
 
 
 		pthread_mutex_unlock (&mcast_msg_mutex);
 		pthread_mutex_unlock (&mcast_msg_mutex);
@@ -888,23 +892,44 @@ static int mcast_msg (
 /*
 /*
  * Determine if a message of msg_size could be queued
  * Determine if a message of msg_size could be queued
  */
  */
-#define FUZZY_AVAIL_SUBTRACT 5
-static int send_ok (
-	int msg_size)
+static int msg_count_send_ok (
+	int msg_count)
 {
 {
 	int avail = 0;
 	int avail = 0;
-	int total;
 
 
-	avail = totemmrp_avail () - FUZZY_AVAIL_SUBTRACT;
+	avail = totemmrp_avail () - totempg_reserved - 1;
 	
 	
-	/*
-	 * msg size less then totempg_totem_config->net_mtu - 25 will take up
-	 * a full message, so add +1
-	 * totempg_totem_config->net_mtu - 25 is for the totempg_mcast header
-	 */
-	total = (msg_size / (totempg_totem_config->net_mtu - 25)) + 1; 
+	return (avail > msg_count);
+}
+
+static int byte_count_send_ok (
+	int byte_count)
+{
+	unsigned int msg_count = 0;
+	int avail = 0;
+
+	avail = totemmrp_avail () - 1;
+
+	msg_count = (byte_count / (totempg_totem_config->net_mtu - 25)) + 1; 
+
+	return (avail > msg_count);
+}
+
+static int send_reserve (
+	int msg_size)
+{
+	unsigned int msg_count = 0;
+
+	msg_count = (msg_size / (totempg_totem_config->net_mtu - 25)) + 1; 
+	totempg_reserved += msg_count;
+
+	return (msg_count);
+}
 
 
-	return (avail >= total);
+static void send_release (
+	int msg_count)
+{
+	totempg_reserved -= msg_count;
 }
 }
 
 
 int totempg_callback_token_create (
 int totempg_callback_token_create (
@@ -1091,7 +1116,7 @@ error_exit:
 	return (res);
 	return (res);
 }
 }
 
 
-int totempg_groups_send_ok_joined (
+int totempg_groups_joined_reserve (
 	hdb_handle_t handle,
 	hdb_handle_t handle,
 	struct iovec *iovec,
 	struct iovec *iovec,
 	int iov_len)
 	int iov_len)
@@ -1100,6 +1125,7 @@ int totempg_groups_send_ok_joined (
 	unsigned int size = 0;
 	unsigned int size = 0;
 	unsigned int i;
 	unsigned int i;
 	unsigned int res;
 	unsigned int res;
+	unsigned int reserved = 0;
 
 
 	pthread_mutex_lock (&totempg_mutex);
 	pthread_mutex_lock (&totempg_mutex);
 	pthread_mutex_lock (&mcast_msg_mutex);
 	pthread_mutex_lock (&mcast_msg_mutex);
@@ -1116,14 +1142,28 @@ int totempg_groups_send_ok_joined (
 		size += iovec[i].iov_len;
 		size += iovec[i].iov_len;
 	}
 	}
 
 
-	res = send_ok (size);
+	reserved = send_reserve (size);
+	if (msg_count_send_ok (reserved) == 0) {
+		send_release (reserved);
+		reserved = 0;
+	}
 
 
 	hdb_handle_put (&totempg_groups_instance_database, handle);
 	hdb_handle_put (&totempg_groups_instance_database, handle);
 
 
 error_exit:
 error_exit:
 	pthread_mutex_unlock (&mcast_msg_mutex);
 	pthread_mutex_unlock (&mcast_msg_mutex);
 	pthread_mutex_unlock (&totempg_mutex);
 	pthread_mutex_unlock (&totempg_mutex);
-	return (res);
+	return (reserved);
+}
+
+
+void totempg_groups_joined_release (int msg_count)
+{
+	pthread_mutex_lock (&totempg_mutex);
+	pthread_mutex_lock (&mcast_msg_mutex);
+	send_release (msg_count);
+	pthread_mutex_unlock (&mcast_msg_mutex);
+	pthread_mutex_unlock (&totempg_mutex);
 }
 }
 
 
 int totempg_groups_mcast_groups (
 int totempg_groups_mcast_groups (
@@ -1201,7 +1241,7 @@ int totempg_groups_send_ok_groups (
 		size += iovec[i].iov_len;
 		size += iovec[i].iov_len;
 	}
 	}
 
 
-	res = send_ok (size);
+	res = msg_count_send_ok (size);
 	 
 	 
 	hdb_handle_put (&totempg_groups_instance_database, handle);
 	hdb_handle_put (&totempg_groups_instance_database, handle);
 error_exit:
 error_exit:

+ 8 - 4
include/corosync/engine/coroapi.h

@@ -413,8 +413,6 @@ struct corosync_api_v1 {
 
 
 	int (*totem_mcast) (struct iovec *iovec, int iov_len, unsigned int guarantee);
 	int (*totem_mcast) (struct iovec *iovec, int iov_len, unsigned int guarantee);
 
 
-	int (*totem_send_ok) (struct iovec *iovec, int iov_len);
-
 	int (*totem_ifaces_get) (
 	int (*totem_ifaces_get) (
 		unsigned int nodeid,
 		unsigned int nodeid,
 		struct totem_ip_address *interfaces,
 		struct totem_ip_address *interfaces,
@@ -472,11 +470,14 @@ struct corosync_api_v1 {
 		int iov_len,
 		int iov_len,
 		int guarantee);
 		int guarantee);
 
 
-	int (*tpg_joined_send_ok) (
+	int (*tpg_joined_reserve) (
 		hdb_handle_t handle,
 		hdb_handle_t handle,
 		struct iovec *iovec,
 		struct iovec *iovec,
 		int iov_len);
 		int iov_len);
 
 
+	int (*tpg_joined_release) (
+		int reserved_msgs);
+
 	int (*tpg_groups_mcast) (
 	int (*tpg_groups_mcast) (
 		hdb_handle_t handle,
 		hdb_handle_t handle,
 		int guarantee,
 		int guarantee,
@@ -485,13 +486,16 @@ struct corosync_api_v1 {
 		struct iovec *iovec,
 		struct iovec *iovec,
 		int iov_len);
 		int iov_len);
 
 
-	int (*tpg_groups_send_ok) (
+	int (*tpg_groups_reserve) (
 		hdb_handle_t handle,
 		hdb_handle_t handle,
 		struct corosync_tpg_group *groups,
 		struct corosync_tpg_group *groups,
 		int groups_cnt,
 		int groups_cnt,
 		struct iovec *iovec,
 		struct iovec *iovec,
 		int iov_len);
 		int iov_len);
 
 
+	int (*tpg_groups_release) (
+		int reserved_msgs);
+
 	int (*sync_request) (
 	int (*sync_request) (
 		char *service_name);
 		char *service_name);
 
 

+ 4 - 1
include/corosync/totem/totempg.h

@@ -110,10 +110,13 @@ extern int totempg_groups_mcast_joined (
 	int iov_len,
 	int iov_len,
 	int guarantee);
 	int guarantee);
 
 
-extern int totempg_groups_send_ok_joined (
+extern int totempg_groups_joined_reserve (
 	hdb_handle_t handle,
 	hdb_handle_t handle,
 	struct iovec *iovec,
 	struct iovec *iovec,
 	int iov_len);
 	int iov_len);
+
+extern void totempg_groups_joined_release (
+	int msg_count);
 	
 	
 extern int totempg_groups_mcast_groups (
 extern int totempg_groups_mcast_groups (
 	hdb_handle_t handle,
 	hdb_handle_t handle,

+ 0 - 6
services/evs.c

@@ -363,7 +363,6 @@ static void message_handler_req_evs_mcast_joined (void *conn, void *msg)
 	struct res_lib_evs_mcast_joined res_lib_evs_mcast_joined;
 	struct res_lib_evs_mcast_joined res_lib_evs_mcast_joined;
 	struct iovec req_exec_evs_mcast_iovec[3];
 	struct iovec req_exec_evs_mcast_iovec[3];
 	struct req_exec_evs_mcast req_exec_evs_mcast;
 	struct req_exec_evs_mcast req_exec_evs_mcast;
-	int send_ok = 0;
 	int res;
 	int res;
 	struct evs_pd *evs_pd = (struct evs_pd *)api->ipc_private_data_get (conn);
 	struct evs_pd *evs_pd = (struct evs_pd *)api->ipc_private_data_get (conn);
 
 
@@ -382,8 +381,6 @@ static void message_handler_req_evs_mcast_joined (void *conn, void *msg)
 	req_exec_evs_mcast_iovec[1].iov_len = evs_pd->group_entries * sizeof (struct evs_group);
 	req_exec_evs_mcast_iovec[1].iov_len = evs_pd->group_entries * sizeof (struct evs_group);
 	req_exec_evs_mcast_iovec[2].iov_base = (char *)&req_lib_evs_mcast_joined->msg;
 	req_exec_evs_mcast_iovec[2].iov_base = (char *)&req_lib_evs_mcast_joined->msg;
 	req_exec_evs_mcast_iovec[2].iov_len = req_lib_evs_mcast_joined->msg_len;
 	req_exec_evs_mcast_iovec[2].iov_len = req_lib_evs_mcast_joined->msg_len;
-// TODO this doesn't seem to work for some reason	
-	send_ok = api->totem_send_ok (req_exec_evs_mcast_iovec, 3);
 
 
 	res = api->totem_mcast (req_exec_evs_mcast_iovec, 3, TOTEM_AGREED);
 	res = api->totem_mcast (req_exec_evs_mcast_iovec, 3, TOTEM_AGREED);
 		// TODO
 		// TODO
@@ -407,7 +404,6 @@ static void message_handler_req_evs_mcast_groups (void *conn, void *msg)
 	struct iovec req_exec_evs_mcast_iovec[3];
 	struct iovec req_exec_evs_mcast_iovec[3];
 	struct req_exec_evs_mcast req_exec_evs_mcast;
 	struct req_exec_evs_mcast req_exec_evs_mcast;
 	char *msg_addr;
 	char *msg_addr;
-	int send_ok = 0;
 	int res;
 	int res;
 
 
 	req_exec_evs_mcast.header.size = sizeof (struct req_exec_evs_mcast) +
 	req_exec_evs_mcast.header.size = sizeof (struct req_exec_evs_mcast) +
@@ -430,8 +426,6 @@ static void message_handler_req_evs_mcast_groups (void *conn, void *msg)
 	req_exec_evs_mcast_iovec[2].iov_base = msg_addr;
 	req_exec_evs_mcast_iovec[2].iov_base = msg_addr;
 	req_exec_evs_mcast_iovec[2].iov_len = req_lib_evs_mcast_groups->msg_len;
 	req_exec_evs_mcast_iovec[2].iov_len = req_lib_evs_mcast_groups->msg_len;
 	
 	
-// TODO this is wacky
-	send_ok = api->totem_send_ok (req_exec_evs_mcast_iovec, 3);
 	res = api->totem_mcast (req_exec_evs_mcast_iovec, 3, TOTEM_AGREED);
 	res = api->totem_mcast (req_exec_evs_mcast_iovec, 3, TOTEM_AGREED);
 	if (res == 0) {
 	if (res == 0) {
 		error = CS_OK;
 		error = CS_OK;