Browse Source

libcpg: Fix issue with partial big packet assembly

Packet assembly is done seperately for each nodeid, pid pair,
therefore multiple packets are not mixed into single buffer.

(backported from master c9dd11772cd6660d7651b6781df963efa914652e)

Signed-off-by: Rytis Karpuška <rytisk@neurotechnology.com>
Reviewed-by: Jan Friesse <jfriesse@redhat.com>
Reviewed-by: Christine Caulfield <ccaulfie@redhat.com>
Rytis Karpuška 8 years ago
parent
commit
86579ff5f8
1 changed files with 76 additions and 20 deletions
  1. 76 20
      lib/cpg.c

+ 76 - 20
lib/cpg.c

@@ -80,6 +80,15 @@
  */
 #define CPG_MEMORY_MAP_UMASK		077
 
+struct cpg_assembly_data
+{
+	struct list_head list;
+	uint32_t nodeid;
+	uint32_t pid;
+	char *assembly_buf;
+	uint32_t assembly_buf_ptr;
+};
+
 struct cpg_inst {
 	qb_ipcc_connection_t *c;
 	int finalize;
@@ -89,14 +98,8 @@ struct cpg_inst {
 		cpg_model_v1_data_t model_v1_data;
 	};
 	struct list_head iteration_list_head;
-    uint32_t max_msg_size;
-    char *assembly_buf;
-    uint32_t assembly_buf_ptr;
-    int assembling; /* Flag that says we have started assembling a message.
-					 * It's here to catch the situation where a node joins
-					 * the cluster/group in the middle of a CPG message send
-					 * so we don't pass on a partial message to the client.
-					 */
+	uint32_t max_msg_size;
+	struct list_head assembly_list_head;
 };
 static void cpg_inst_free (void *inst);
 
@@ -231,6 +234,8 @@ cs_error_t cpg_model_initialize (
 
 	list_init(&cpg_inst->iteration_list_head);
 
+	list_init(&cpg_inst->assembly_list_head);
+
 	hdb_handle_put (&cpg_handle_t_db, *handle);
 
 	return (CS_OK);
@@ -382,6 +387,8 @@ cs_error_t cpg_dispatch (
 	struct cpg_address left_list[CPG_MEMBERS_MAX];
 	struct cpg_address joined_list[CPG_MEMBERS_MAX];
 	struct cpg_name group_name;
+	struct cpg_assembly_data *assembly_data;
+	struct list_head *iter, *tmp_iter;
 	mar_cpg_address_t *left_list_start;
 	mar_cpg_address_t *joined_list_start;
 	unsigned int i;
@@ -471,32 +478,63 @@ cs_error_t cpg_dispatch (
 					&group_name,
 					&res_cpg_partial_deliver_callback->group_name);
 
+				/*
+				 * Search for assembly data for current messages (nodeid, pid) pair in list of assemblies
+				 */
+				assembly_data = NULL;
+				for (iter = cpg_inst->assembly_list_head.next; iter != &cpg_inst->assembly_list_head; iter = iter->next) {
+					struct cpg_assembly_data *current_assembly_data = list_entry (iter, struct cpg_assembly_data, list);
+					if (current_assembly_data->nodeid == res_cpg_partial_deliver_callback->nodeid && current_assembly_data->pid == res_cpg_partial_deliver_callback->pid) {
+						assembly_data = current_assembly_data;
+						break;
+					}
+				}
+
 				if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) {
+
 					/*
-					 * Allocate a buffer to contain a full message.
+					 * As this is LIBCPG_PARTIAL_FIRST packet, check that there is no ongoing assembly
 					 */
-					cpg_inst->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
-					if (!cpg_inst->assembly_buf) {
+					if (assembly_data) {
+						error = CS_ERR_MESSAGE_ERROR;
+						goto error_put;
+					}
+
+					assembly_data = malloc(sizeof(struct cpg_assembly_data));
+					if (!assembly_data) {
 						error = CS_ERR_NO_MEMORY;
 						goto error_put;
 					}
-					cpg_inst->assembling = 1;
-					cpg_inst->assembly_buf_ptr = 0;
+
+					assembly_data->nodeid = res_cpg_partial_deliver_callback->nodeid;
+					assembly_data->pid = res_cpg_partial_deliver_callback->pid;
+					assembly_data->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
+					if (!assembly_data->assembly_buf) {
+						free(assembly_data);
+						error = CS_ERR_NO_MEMORY;
+						goto error_put;
+					}
+					assembly_data->assembly_buf_ptr = 0;
+					list_init (&assembly_data->list);
+
+					list_add (&assembly_data->list, &cpg_inst->assembly_list_head);
 				}
-				if (cpg_inst->assembling) {
-					memcpy(cpg_inst->assembly_buf + cpg_inst->assembly_buf_ptr,
-					       res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
-					cpg_inst->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
+				if (assembly_data) {
+					memcpy(assembly_data->assembly_buf + assembly_data->assembly_buf_ptr,
+						res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
+					assembly_data->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
 
 					if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) {
 						cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
 							&group_name,
 							res_cpg_partial_deliver_callback->nodeid,
 							res_cpg_partial_deliver_callback->pid,
-							cpg_inst->assembly_buf,
+							assembly_data->assembly_buf,
 							res_cpg_partial_deliver_callback->msglen);
-						free(cpg_inst->assembly_buf);
-						cpg_inst->assembling = 0;
+
+						list_del (&assembly_data->list);
+						free(assembly_data->assembly_buf);
+						free(assembly_data);
 					}
 				}
 				break;
@@ -538,6 +576,24 @@ cs_error_t cpg_dispatch (
 					joined_list,
 					res_cpg_confchg_callback->joined_list_entries);
 
+				/*
+				 * If member left while his partial packet was being assembled, assembly data must be removed from list
+				 */
+				for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
+					for (iter = cpg_inst->assembly_list_head.next; iter != &cpg_inst->assembly_list_head;iter = tmp_iter) {
+						struct cpg_assembly_data *current_assembly_data = list_entry (iter, struct cpg_assembly_data, list);
+
+						tmp_iter = iter->next;
+
+						if (current_assembly_data->nodeid != left_list[i].nodeid || current_assembly_data->pid != left_list[i].pid)
+							continue;
+
+						list_del (&current_assembly_data->list);
+						free(current_assembly_data->assembly_buf);
+						free(current_assembly_data);
+					}
+				}
+
 				break;
 			case MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK:
 				if (cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn == NULL) {