Просмотр исходного кода

libcpg: Fix issue with partial big packet assembly

Packet assembly is done seperately for each nodeid, pid pair, therefore
multiple packets are not mixed into single buffer.

Signed-off-by: Rytis Karpuška <rytisk@neurotechnology.com>
Reviewed-by: Christine Caulfield <ccaulfie@redhat.com>
Reviewed-by: Jan Friesse <jfriesse@redhat.com>
Rytis Karpuška 8 лет назад
Родитель
Сommit
c9dd11772c
1 измененных файлов с 73 добавлено и 20 удалено
  1. 73 20
      lib/cpg.c

+ 73 - 20
lib/cpg.c

@@ -80,6 +80,15 @@
  */
 #define CPG_MEMORY_MAP_UMASK		077
 
+struct cpg_assembly_data
+{
+	struct qb_list_head list;
+	uint32_t nodeid;
+	uint32_t pid;
+	char *assembly_buf;
+	uint32_t assembly_buf_ptr;
+};
+
 struct cpg_inst {
 	qb_ipcc_connection_t *c;
 	int finalize;
@@ -89,14 +98,8 @@ struct cpg_inst {
 		cpg_model_v1_data_t model_v1_data;
 	};
 	struct qb_list_head iteration_list_head;
-    uint32_t max_msg_size;
-    char *assembly_buf;
-    uint32_t assembly_buf_ptr;
-    int assembling; /* Flag that says we have started assembling a message.
-					 * It's here to catch the situation where a node joins
-					 * the cluster/group in the middle of a CPG message send
-					 * so we don't pass on a partial message to the client.
-					 */
+	uint32_t max_msg_size;
+	struct qb_list_head assembly_list_head;
 };
 static void cpg_inst_free (void *inst);
 
@@ -229,6 +232,8 @@ cs_error_t cpg_model_initialize (
 
 	qb_list_init(&cpg_inst->iteration_list_head);
 
+	qb_list_init(&cpg_inst->assembly_list_head);
+
 	hdb_handle_put (&cpg_handle_t_db, *handle);
 
 	return (CS_OK);
@@ -380,6 +385,8 @@ cs_error_t cpg_dispatch (
 	struct cpg_address left_list[CPG_MEMBERS_MAX];
 	struct cpg_address joined_list[CPG_MEMBERS_MAX];
 	struct cpg_name group_name;
+	struct cpg_assembly_data *assembly_data;
+	struct qb_list_head *iter, *tmp_iter;
 	mar_cpg_address_t *left_list_start;
 	mar_cpg_address_t *joined_list_start;
 	unsigned int i;
@@ -469,32 +476,63 @@ cs_error_t cpg_dispatch (
 					&group_name,
 					&res_cpg_partial_deliver_callback->group_name);
 
+				/*
+				 * Search for assembly data for current messages (nodeid, pid) pair in list of assemblies
+				 */
+				assembly_data = NULL;
+				qb_list_for_each(iter, &(cpg_inst->assembly_list_head)) {
+					struct cpg_assembly_data *current_assembly_data = qb_list_entry (iter, struct cpg_assembly_data, list);
+					if (current_assembly_data->nodeid == res_cpg_partial_deliver_callback->nodeid && current_assembly_data->pid == res_cpg_partial_deliver_callback->pid) {
+						assembly_data = current_assembly_data;
+						break;
+					}
+				}
+
 				if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) {
+
 					/*
-					 * Allocate a buffer to contain a full message.
+					 * As this is LIBCPG_PARTIAL_FIRST packet, check that there is no ongoing assembly
 					 */
-					cpg_inst->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
-					if (!cpg_inst->assembly_buf) {
+					if (assembly_data) {
+						error = CS_ERR_MESSAGE_ERROR;
+						goto error_put;
+					}
+
+					assembly_data = malloc(sizeof(struct cpg_assembly_data));
+					if (!assembly_data) {
 						error = CS_ERR_NO_MEMORY;
 						goto error_put;
 					}
-					cpg_inst->assembling = 1;
-					cpg_inst->assembly_buf_ptr = 0;
+
+					assembly_data->nodeid = res_cpg_partial_deliver_callback->nodeid;
+					assembly_data->pid = res_cpg_partial_deliver_callback->pid;
+					assembly_data->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
+					if (!assembly_data->assembly_buf) {
+						free(assembly_data);
+						error = CS_ERR_NO_MEMORY;
+						goto error_put;
+					}
+					assembly_data->assembly_buf_ptr = 0;
+					qb_list_init (&assembly_data->list);
+
+					qb_list_add (&assembly_data->list, &cpg_inst->assembly_list_head);
 				}
-				if (cpg_inst->assembling) {
-					memcpy(cpg_inst->assembly_buf + cpg_inst->assembly_buf_ptr,
-					       res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
-					cpg_inst->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
+				if (assembly_data) {
+					memcpy(assembly_data->assembly_buf + assembly_data->assembly_buf_ptr,
+						res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
+					assembly_data->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
 
 					if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) {
 						cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
 							&group_name,
 							res_cpg_partial_deliver_callback->nodeid,
 							res_cpg_partial_deliver_callback->pid,
-							cpg_inst->assembly_buf,
+							assembly_data->assembly_buf,
 							res_cpg_partial_deliver_callback->msglen);
-						free(cpg_inst->assembly_buf);
-						cpg_inst->assembling = 0;
+
+						qb_list_del (&assembly_data->list);
+						free(assembly_data->assembly_buf);
+						free(assembly_data);
 					}
 				}
 				break;
@@ -536,6 +574,21 @@ cs_error_t cpg_dispatch (
 					joined_list,
 					res_cpg_confchg_callback->joined_list_entries);
 
+				/*
+				 * If member left while his partial packet was being assembled, assembly data must be removed from list
+				 */
+				for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
+					qb_list_for_each_safe(iter, tmp_iter, &(cpg_inst->assembly_list_head)) {
+						struct cpg_assembly_data *current_assembly_data = qb_list_entry (iter, struct cpg_assembly_data, list);
+						if (current_assembly_data->nodeid != left_list[i].nodeid || current_assembly_data->pid != left_list[i].pid)
+							continue;
+
+						qb_list_del (&current_assembly_data->list);
+						free(current_assembly_data->assembly_buf);
+						free(current_assembly_data);
+					}
+				}
+
 				break;
 			case MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK:
 				if (cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn == NULL) {