Просмотр исходного кода

CPG model_initialize and ringid + members callback

Patch adds new function to initialize cpg, cpg_model_initialize. Model
is set of callbacks. With this function, future addions of models
should  be possible without changing the ABI.

Patch also contains callback in CPG_MODEL_V1 for notification about
Totem membership changes.

git-svn-id: http://svn.fedorahosted.org/svn/corosync/trunk@2770 fd59a12c-fef9-0310-b244-a6a79926bd2f
Jan Friesse 16 лет назад
Родитель
Сommit
8d93705e8d
10 измененных файлов с 529 добавлено и 72 удалено
  1. 39 0
      include/corosync/cpg.h
  2. 22 0
      include/corosync/ipc_cpg.h
  3. 143 68
      lib/cpg.c
  4. 1 1
      lib/libcpg.verso
  5. 1 0
      man/Makefile.am
  6. 5 1
      man/cpg_initialize.3
  7. 231 0
      man/cpg_model_initialize.3
  8. 1 0
      man/cpg_overview.8
  9. 64 0
      services/cpg.c
  10. 22 2
      test/testcpg.c

+ 39 - 0
include/corosync/cpg.h

@@ -78,6 +78,10 @@ typedef enum {
 	CPG_ITERATION_ALL = 3,
 	CPG_ITERATION_ALL = 3,
 } cpg_iteration_type_t;
 } cpg_iteration_type_t;
 
 
+typedef enum {
+	CPG_MODEL_V1 = 1,
+} cpg_model_t;
+
 struct cpg_address {
 struct cpg_address {
 	uint32_t nodeid;
 	uint32_t nodeid;
 	uint32_t pid;
 	uint32_t pid;
@@ -98,6 +102,11 @@ struct cpg_iteration_description_t {
 	uint32_t pid;
 	uint32_t pid;
 };
 };
 
 
+struct cpg_ring_id {
+	uint32_t nodeid;
+	uint64_t seq;
+};
+
 typedef void (*cpg_deliver_fn_t) (
 typedef void (*cpg_deliver_fn_t) (
 	cpg_handle_t handle,
 	cpg_handle_t handle,
 	const struct cpg_name *group_name,
 	const struct cpg_name *group_name,
@@ -117,11 +126,32 @@ typedef void (*cpg_confchg_fn_t) (
 	const struct cpg_address *left_list, size_t left_list_entries,
 	const struct cpg_address *left_list, size_t left_list_entries,
 	const struct cpg_address *joined_list, size_t joined_list_entries);
 	const struct cpg_address *joined_list, size_t joined_list_entries);
 
 
+typedef void (*cpg_totem_confchg_fn_t) (
+	cpg_handle_t handle,
+	struct cpg_ring_id ring_id,
+	uint32_t member_list_entries,
+	const uint32_t *member_list);
+
 typedef struct {
 typedef struct {
 	cpg_deliver_fn_t cpg_deliver_fn;
 	cpg_deliver_fn_t cpg_deliver_fn;
 	cpg_confchg_fn_t cpg_confchg_fn;
 	cpg_confchg_fn_t cpg_confchg_fn;
 } cpg_callbacks_t;
 } cpg_callbacks_t;
 
 
+typedef struct {
+	cpg_model_t model;
+} cpg_model_data_t;
+
+#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF 0x01
+
+typedef struct {
+	cpg_model_t model;
+	cpg_deliver_fn_t cpg_deliver_fn;
+	cpg_confchg_fn_t cpg_confchg_fn;
+	cpg_totem_confchg_fn_t cpg_totem_confchg_fn;
+	unsigned int flags;
+} cpg_model_v1_data_t;
+
+
 /** @} */
 /** @} */
 
 
 /*
 /*
@@ -131,6 +161,15 @@ cs_error_t cpg_initialize (
 	cpg_handle_t *handle,
 	cpg_handle_t *handle,
 	cpg_callbacks_t *callbacks);
 	cpg_callbacks_t *callbacks);
 
 
+/*
+ * Create a new cpg connection, initialize with model
+ */
+cs_error_t cpg_model_initialize (
+	cpg_handle_t *handle,
+	cpg_model_t model,
+	cpg_model_data_t *model_data,
+	void *context);
+
 /*
 /*
  * Close the cpg handle
  * Close the cpg handle
  */
  */

+ 22 - 0
include/corosync/ipc_cpg.h

@@ -65,6 +65,7 @@ enum res_cpg_types {
 	MESSAGE_RES_CPG_ITERATIONNEXT = 10,
 	MESSAGE_RES_CPG_ITERATIONNEXT = 10,
 	MESSAGE_RES_CPG_ITERATIONFINALIZE = 11,
 	MESSAGE_RES_CPG_ITERATIONFINALIZE = 11,
 	MESSAGE_RES_CPG_FINALIZE = 12,
 	MESSAGE_RES_CPG_FINALIZE = 12,
+	MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK = 13,
 };
 };
 
 
 enum lib_cpg_confchg_reason {
 enum lib_cpg_confchg_reason {
@@ -149,10 +150,24 @@ static inline void marshall_from_mar_cpg_iteration_description_t(
 	marshall_from_mar_cpg_name_t (&dest->group, &src->group);
 	marshall_from_mar_cpg_name_t (&dest->group, &src->group);
 };
 };
 
 
+typedef struct {
+        mar_uint32_t nodeid __attribute__((aligned(8)));
+        mar_uint64_t seq __attribute__((aligned(8)));
+} mar_cpg_ring_id_t;
+
+static inline void marshall_from_mar_cpg_ring_id_t (
+	struct cpg_ring_id *dest,
+	const mar_cpg_ring_id_t *src)
+{
+	dest->nodeid = src->nodeid;
+	dest->seq = src->seq;
+}
+
 struct req_lib_cpg_join {
 struct req_lib_cpg_join {
 	coroipc_request_header_t header __attribute__((aligned(8)));
 	coroipc_request_header_t header __attribute__((aligned(8)));
 	mar_cpg_name_t group_name __attribute__((aligned(8)));
 	mar_cpg_name_t group_name __attribute__((aligned(8)));
 	mar_uint32_t pid __attribute__((aligned(8)));
 	mar_uint32_t pid __attribute__((aligned(8)));
+	mar_uint32_t flags __attribute__((aligned(8)));
 };
 };
 
 
 struct res_lib_cpg_join {
 struct res_lib_cpg_join {
@@ -244,6 +259,13 @@ struct res_lib_cpg_confchg_callback {
 //	struct cpg_address joined_list[];
 //	struct cpg_address joined_list[];
 };
 };
 
 
+struct res_lib_cpg_totem_confchg_callback {
+	coroipc_response_header_t header __attribute__((aligned(8)));
+	mar_cpg_ring_id_t ring_id __attribute__((aligned(8)));
+	mar_uint32_t member_list_entries __attribute__((aligned(8)));
+	mar_uint32_t member_list[];
+};
+
 struct req_lib_cpg_leave {
 struct req_lib_cpg_leave {
 	coroipc_request_header_t header __attribute__((aligned(8)));
 	coroipc_request_header_t header __attribute__((aligned(8)));
 	mar_cpg_name_t group_name __attribute__((aligned(8)));
 	mar_cpg_name_t group_name __attribute__((aligned(8)));

+ 143 - 68
lib/cpg.c

@@ -62,8 +62,11 @@
 struct cpg_inst {
 struct cpg_inst {
 	hdb_handle_t handle;
 	hdb_handle_t handle;
 	int finalize;
 	int finalize;
-	cpg_callbacks_t callbacks;
 	void *context;
 	void *context;
+	union {
+		cpg_model_data_t model_data;
+		cpg_model_v1_data_t model_v1_data;
+	};
 	struct list_head iteration_list_head;
 	struct list_head iteration_list_head;
 };
 };
 
 
@@ -117,10 +120,33 @@ static void cpg_inst_finalize (struct cpg_inst *cpg_inst, hdb_handle_t handle)
 cs_error_t cpg_initialize (
 cs_error_t cpg_initialize (
 	cpg_handle_t *handle,
 	cpg_handle_t *handle,
 	cpg_callbacks_t *callbacks)
 	cpg_callbacks_t *callbacks)
+{
+	cpg_model_v1_data_t model_v1_data;
+
+	memset (&model_v1_data, 0, sizeof (cpg_model_v1_data_t));
+
+	if (callbacks) {
+		model_v1_data.cpg_deliver_fn = callbacks->cpg_deliver_fn;
+		model_v1_data.cpg_confchg_fn = callbacks->cpg_confchg_fn;
+	}
+
+	return (cpg_model_initialize (handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_v1_data, NULL));
+}
+
+cs_error_t cpg_model_initialize (
+	cpg_handle_t *handle,
+	cpg_model_t model,
+	cpg_model_data_t *model_data,
+	void *context)
 {
 {
 	cs_error_t error;
 	cs_error_t error;
 	struct cpg_inst *cpg_inst;
 	struct cpg_inst *cpg_inst;
 
 
+	if (model != CPG_MODEL_V1) {
+		error = CPG_ERR_INVALID_PARAM;
+		goto error_no_destroy;
+	}
+
 	error = hdb_error_to_cs (hdb_handle_create (&cpg_handle_t_db, sizeof (struct cpg_inst), handle));
 	error = hdb_error_to_cs (hdb_handle_create (&cpg_handle_t_db, sizeof (struct cpg_inst), handle));
 	if (error != CS_OK) {
 	if (error != CS_OK) {
 		goto error_no_destroy;
 		goto error_no_destroy;
@@ -142,10 +168,26 @@ cs_error_t cpg_initialize (
 		goto error_put_destroy;
 		goto error_put_destroy;
 	}
 	}
 
 
-	if (callbacks) {
-		memcpy (&cpg_inst->callbacks, callbacks, sizeof (cpg_callbacks_t));
+	if (model_data != NULL) {
+		switch (model) {
+		case CPG_MODEL_V1:
+			memcpy (&cpg_inst->model_v1_data, model_data, sizeof (cpg_model_v1_data_t));
+			if ((cpg_inst->model_v1_data.flags & ~(CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF)) != 0) {
+				error = CS_ERR_INVALID_PARAM;
+
+				goto error_destroy;
+			}
+			break;
+		default:
+			error = CS_ERR_LIBRARY;
+			goto error_destroy;
+			break;
+		}
 	}
 	}
 
 
+	cpg_inst->model_data.model = model;
+	cpg_inst->context = context;
+
 	list_init(&cpg_inst->iteration_list_head);
 	list_init(&cpg_inst->iteration_list_head);
 
 
 	hdb_handle_put (&cpg_handle_t_db, *handle);
 	hdb_handle_put (&cpg_handle_t_db, *handle);
@@ -283,7 +325,8 @@ cs_error_t cpg_dispatch (
 	struct cpg_inst *cpg_inst;
 	struct cpg_inst *cpg_inst;
 	struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
 	struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
 	struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
 	struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
-	cpg_callbacks_t callbacks;
+	struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback;
+	struct cpg_inst cpg_inst_copy;
 	coroipc_response_header_t *dispatch_data;
 	coroipc_response_header_t *dispatch_data;
 	struct cpg_address member_list[CPG_MEMBERS_MAX];
 	struct cpg_address member_list[CPG_MEMBERS_MAX];
 	struct cpg_address left_list[CPG_MEMBERS_MAX];
 	struct cpg_address left_list[CPG_MEMBERS_MAX];
@@ -292,6 +335,8 @@ cs_error_t cpg_dispatch (
 	mar_cpg_address_t *left_list_start;
 	mar_cpg_address_t *left_list_start;
 	mar_cpg_address_t *joined_list_start;
 	mar_cpg_address_t *joined_list_start;
 	unsigned int i;
 	unsigned int i;
+	struct cpg_ring_id ring_id;
+	uint32_t totem_member_list[CPG_MEMBERS_MAX];
 
 
 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
 	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
 	if (error != CS_OK) {
 	if (error != CS_OK) {
@@ -332,74 +377,96 @@ cs_error_t cpg_dispatch (
 		 * A risk of this dispatch method is that the callback routines may
 		 * A risk of this dispatch method is that the callback routines may
 		 * operate at the same time that cpgFinalize has been called.
 		 * operate at the same time that cpgFinalize has been called.
 		 */
 		 */
-		memcpy (&callbacks, &cpg_inst->callbacks, sizeof (cpg_callbacks_t));
-		/*
-		 * Dispatch incoming message
-		 */
-		switch (dispatch_data->id) {
-		case MESSAGE_RES_CPG_DELIVER_CALLBACK:
-			if (callbacks.cpg_deliver_fn == NULL) {
+		memcpy (&cpg_inst_copy, cpg_inst, sizeof (struct cpg_inst));
+
+		switch (cpg_inst_copy.model_data.model) {
+		case CPG_MODEL_V1:
+			/*
+			 * Dispatch incoming message
+			 */
+			switch (dispatch_data->id) {
+			case MESSAGE_RES_CPG_DELIVER_CALLBACK:
+				if (cpg_inst_copy.model_v1_data.cpg_deliver_fn == NULL) {
+					break;
+				}
+
+				res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
+
+				marshall_from_mar_cpg_name_t (
+					&group_name,
+					&res_cpg_deliver_callback->group_name);
+
+				cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
+					&group_name,
+					res_cpg_deliver_callback->nodeid,
+					res_cpg_deliver_callback->pid,
+					&res_cpg_deliver_callback->message,
+					res_cpg_deliver_callback->msglen);
 				break;
 				break;
-			}
-
-			res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
 
 
-			marshall_from_mar_cpg_name_t (
-				&group_name,
-				&res_cpg_deliver_callback->group_name);
+			case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
+				if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) {
+					break;
+				}
+
+				res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
+
+				for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
+					marshall_from_mar_cpg_address_t (&member_list[i],
+						&res_cpg_confchg_callback->member_list[i]);
+				}
+				left_list_start = res_cpg_confchg_callback->member_list +
+					res_cpg_confchg_callback->member_list_entries;
+				for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
+					marshall_from_mar_cpg_address_t (&left_list[i],
+						&left_list_start[i]);
+				}
+				joined_list_start = res_cpg_confchg_callback->member_list +
+					res_cpg_confchg_callback->member_list_entries +
+					res_cpg_confchg_callback->left_list_entries;
+				for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
+					marshall_from_mar_cpg_address_t (&joined_list[i],
+						&joined_list_start[i]);
+				}
+				marshall_from_mar_cpg_name_t (
+					&group_name,
+					&res_cpg_confchg_callback->group_name);
+
+				cpg_inst_copy.model_v1_data.cpg_confchg_fn (handle,
+					&group_name,
+					member_list,
+					res_cpg_confchg_callback->member_list_entries,
+					left_list,
+					res_cpg_confchg_callback->left_list_entries,
+					joined_list,
+					res_cpg_confchg_callback->joined_list_entries);
 
 
-			callbacks.cpg_deliver_fn (handle,
-				&group_name,
-				res_cpg_deliver_callback->nodeid,
-				res_cpg_deliver_callback->pid,
-				&res_cpg_deliver_callback->message,
-				res_cpg_deliver_callback->msglen);
-			break;
-
-		case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
-			if (callbacks.cpg_confchg_fn == NULL) {
 				break;
 				break;
-			}
-
-			res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
-
-			for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
-				marshall_from_mar_cpg_address_t (&member_list[i],
-					&res_cpg_confchg_callback->member_list[i]);
-			}
-			left_list_start = res_cpg_confchg_callback->member_list +
-				res_cpg_confchg_callback->member_list_entries;
-			for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
-				marshall_from_mar_cpg_address_t (&left_list[i],
-					&left_list_start[i]);
-			}
-			joined_list_start = res_cpg_confchg_callback->member_list +
-				res_cpg_confchg_callback->member_list_entries +
-				res_cpg_confchg_callback->left_list_entries;
-			for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
-				marshall_from_mar_cpg_address_t (&joined_list[i],
-					&joined_list_start[i]);
-			}
-			marshall_from_mar_cpg_name_t (
-				&group_name,
-				&res_cpg_confchg_callback->group_name);
-
-			callbacks.cpg_confchg_fn (handle,
-				&group_name,
-				member_list,
-				res_cpg_confchg_callback->member_list_entries,
-				left_list,
-				res_cpg_confchg_callback->left_list_entries,
-				joined_list,
-				res_cpg_confchg_callback->joined_list_entries);
-			break;
-
-		default:
-			coroipcc_dispatch_put (cpg_inst->handle);
-			error = CS_ERR_LIBRARY;
-			goto error_put;
-			break;
-		}
+			case MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK:
+				if (cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn == NULL) {
+					break;
+				}
+
+				res_cpg_totem_confchg_callback = (struct res_lib_cpg_totem_confchg_callback *)dispatch_data;
+
+				marshall_from_mar_cpg_ring_id_t (&ring_id, &res_cpg_totem_confchg_callback->ring_id);
+				for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) {
+					totem_member_list[i] = res_cpg_totem_confchg_callback->member_list[i];
+				}
+
+				cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn (handle,
+					ring_id,
+					res_cpg_totem_confchg_callback->member_list_entries,
+					totem_member_list);
+				break;
+			default:
+				coroipcc_dispatch_put (cpg_inst->handle);
+				error = CS_ERR_LIBRARY;
+				goto error_put;
+				break;
+			} /* - switch (dispatch_data->id) */
+			break; /* case CPG_MODEL_V1 */
+		} /* - switch (cpg_inst_copy.model_data.model) */
 		coroipcc_dispatch_put (cpg_inst->handle);
 		coroipcc_dispatch_put (cpg_inst->handle);
 
 
 		/*
 		/*
@@ -434,6 +501,14 @@ cs_error_t cpg_join (
 	req_lib_cpg_join.header.size = sizeof (struct req_lib_cpg_join);
 	req_lib_cpg_join.header.size = sizeof (struct req_lib_cpg_join);
 	req_lib_cpg_join.header.id = MESSAGE_REQ_CPG_JOIN;
 	req_lib_cpg_join.header.id = MESSAGE_REQ_CPG_JOIN;
 	req_lib_cpg_join.pid = getpid();
 	req_lib_cpg_join.pid = getpid();
+	req_lib_cpg_join.flags = 0;
+
+	switch (cpg_inst->model_data.model) {
+	case CPG_MODEL_V1:
+		req_lib_cpg_join.flags = cpg_inst->model_v1_data.flags;
+		break;
+	}
+
 	marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
 	marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
 		group);
 		group);
 
 

+ 1 - 1
lib/libcpg.verso

@@ -1 +1 @@
-4.0.1
+4.1.0

+ 1 - 0
man/Makefile.am

@@ -79,6 +79,7 @@ dist_man_MANS = \
 	cpg_leave.3 \
 	cpg_leave.3 \
 	cpg_local_get.3 \
 	cpg_local_get.3 \
 	cpg_mcast_joined.3 \
 	cpg_mcast_joined.3 \
+	cpg_model_initialize.3 \
 	cpg_zcb_mcast_joined.3 \
 	cpg_zcb_mcast_joined.3 \
 	cpg_zcb_alloc.3 \
 	cpg_zcb_alloc.3 \
 	cpg_zcb_free.3 \
 	cpg_zcb_free.3 \

+ 5 - 1
man/cpg_initialize.3

@@ -41,7 +41,10 @@ cpg_initialize \- Create a new connection to the CPG service
 .SH DESCRIPTION
 .SH DESCRIPTION
 The
 The
 .B cpg_initialize
 .B cpg_initialize
-function is used to initialize a connection to the closed process groups API.
+function is used to initialize a connection to the closed process groups API. This function is deprecated
+and
+.B cpg_model_initialize
+should be used in newly written code.
 .PP
 .PP
 Each application may have several connections to the CPG API.  Each  application
 Each application may have several connections to the CPG API.  Each  application
 uses the
 uses the
@@ -167,5 +170,6 @@ The errors are undocumented.
 .BR cpg_context_get (3)
 .BR cpg_context_get (3)
 .BR cpg_context_set (3)
 .BR cpg_context_set (3)
 .BR cpg_local_get (3)
 .BR cpg_local_get (3)
+.BR cpg_model_initialize (3)
 
 
 .PP
 .PP

+ 231 - 0
man/cpg_model_initialize.3

@@ -0,0 +1,231 @@
+.\"/*
+.\" * Copyright (c) 2010 Red Hat, Inc.
+.\" *
+.\" * All rights reserved.
+.\" *
+.\" * Author: Jan Friesse <jfriesse@redhat.com>
+.\" * Author: Christine Caulfield <ccaulfie@redhat.com>
+.\" *
+.\" * This software licensed under BSD license, the text of which follows:
+.\" *
+.\" * Redistribution and use in source and binary forms, with or without
+.\" * modification, are permitted provided that the following conditions are met:
+.\" *
+.\" * - Redistributions of source code must retain the above copyright notice,
+.\" *   this list of conditions and the following disclaimer.
+.\" * - Redistributions in binary form must reproduce the above copyright notice,
+.\" *   this list of conditions and the following disclaimer in the documentation
+.\" *   and/or other materials provided with the distribution.
+.\" * - Neither the name of the MontaVista Software, Inc. nor the names of its
+.\" *   contributors may be used to endorse or promote products derived from this
+.\" *   software without specific prior written permission.
+.\" *
+.\" * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+.\" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+.\" * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+.\" * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+.\" * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+.\" * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+.\" * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+.\" * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+.\" * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+.\" * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+.\" * THE POSSIBILITY OF SUCH DAMAGE.
+.\" */
+.TH CPG_MODEL_INITIALIZE 3 2010-04-07 "corosync Man Page" "Corosync Cluster Engine Programmer's Manual"
+.SH NAME
+cpg_model_initialize \- Create a new connection to the CPG service
+.SH SYNOPSIS
+.B #include <corosync/cpg.h>
+.sp
+.BI "cs_error_t cpg_model_initialize(cpg_handle_t *" handle ", cpg_model_t " model ", cpg_model_data_t *" model_data ", void *" context ");
+
+.SH DESCRIPTION
+The
+.B cpg_model_initialize
+function is used to initialize a connection to the closed process groups API.
+.PP
+Each application may have several connections to the CPG API.  Each  application
+uses the
+.I handle
+argument to uniquely identify the connection.  The
+.I handle
+argument is then used in other function calls to identify the connection to be used
+for communication with the CPG service.
+.PP
+Argument
+.I model
+is used to explicitly choose set of callbacks and internal parameters. Currently only model
+.I CPG_MODEL_V1
+is defined.
+.PP
+Callbacks and internal parameters are passed by
+.I model_data
+argument. This is casted pointer (idea is similar as in sockaddr function) to one of structures
+corresponding to chosen model. Currently only
+.I cpg_model_v1_data_t
+is needed.
+.SH MODEL_V1
+The
+.I MODEL_V1
+is backwards compatible with original callbacks initialized by
+.I cpg_initialize
+but new callback
+.I cpg_totem_confchg_fn
+is defined.
+.PP
+Every time an CPG event occurs within the joined group, one of the callbacks specified by the argument
+.I callbacks
+is called.  The callback functions are described by the following type definitions:
+.PP
+.PP
+.IP
+.RS
+.ne 18
+.nf
+.ta 4n 20n 32n
+
+typedef void (*cpg_deliver_fn_t) (
+        cpg_handle_t handle,
+        const struct cpg_name *group_name,
+        uint32_t nodeid,
+        uint32_t pid,
+        const void *msg,
+        size_t msg_len);
+
+
+typedef void (*cpg_confchg_fn_t) (
+        cpg_handle_t handle,
+        const struct cpg_name *group_name,
+        const struct cpg_address *member_list, size_t member_list_entries,
+        const struct cpg_address *left_list, size_t left_list_entries,
+        const struct cpg_address *joined_list, size_t joined_list_entries);
+
+
+typedef void (*cpg_totem_confchg_fn_t) (
+        cpg_handle_t handle,
+        struct cpg_ring_id ring_id,
+        uint32_t member_list_entries,
+        const uint32_t *member_list);
+.ta
+.fi
+.RE
+.IP
+.PP
+.PP
+The
+.I cpg_model_v1_data_t
+structure is defined as:
+.IP
+.RS
+.ne 18
+.nf
+.PP
+typedef struct {
+        cpg_model_t model;
+        cpg_deliver_fn_t cpg_deliver_fn;
+        cpg_confchg_fn_t cpg_confchg_fn;
+        cpg_totem_confchg_fn_t cpg_totem_confchg_fn;
+	unsigned int flags;
+} cpg_model_v1_data_t;
+.ta
+.fi
+.RE
+.IP
+.PP
+When a configuration change occurs or a message is to be delivered one of the callbacks
+is called from the
+.B cpg_dispatch()
+function.  If a configuration change occurs,
+.I cpg_confchg_fn
+is called.  If a delivery of a message occurs,
+.I cpg_deliver_fn
+is called.
+When totem membership change occurs,
+.I cpg_totem_confchg_fn
+is called. You can OR
+.I CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
+constant to flags to get callback after first confchg event.
+
+The
+.I cpg_address
+structure is defined
+.IP
+.RS
+.ne 18
+.nf
+.PP
+struct cpg_address {
+        unsigned int nodeid;
+        unsigned int pid;
+        unsigned int reason;
+};
+.ta
+.fi
+.RE
+.IP
+.PP
+where nodeid is a 32 bit unique node identifier, pid is the process ID of the process that has joined/left the group
+or sent the message, and reason is an integer code indicating why the node joined/left the group.
+.PP
+.IP
+.RS
+.ne 18
+.nf
+.PP
+CPG_REASON_JOIN     - the process joined a group using cpg_join().
+CPG_REASON_LEAVE    - the process left a group using cpg_leave()
+CPG_REASON_NODEDOWN - the process left a group because the node left the cluster.
+CPG_REASON_NODEUP   - the process joined a group because it was already a member of a group on a node that has just joined the cluster
+CPG_REASON_PROCDOWN - the process left a group without calling cpg_leave()
+.ta
+.fi
+.RE
+.IP
+.PP
+The
+.I cpg_ring_id
+structure is defined
+.IP
+.RS
+.ne 18
+.nf
+.PP
+struct cpg_ring_id {
+        uint32_t nodeid;
+        uint64_t seq;
+};
+.ta
+.fi
+.RE
+.IP
+.PP
+where
+.I nodeid
+is if of node of current Totem leader and seq is increasing number.
+
+.PP
+.SH RETURN VALUE
+This call returns the CPG_OK value if successful, otherwise an error is returned.
+.PP
+.SH ERRORS
+The errors are undocumented.
+.SH "SEE ALSO"
+.BR cpg_overview (8),
+.BR cpg_initialize (3),
+.BR cpg_finalize (3),
+.BR cpg_fd_get (3),
+.BR cpg_dispatch (3),
+.BR cpg_join (3),
+.BR cpg_leave (3),
+.BR cpg_mcast_joined (3),
+.BR cpg_membership_get (3)
+.BR cpg_zcb_alloc (3)
+.BR cpg_zcb_free (3)
+.BR cpg_zcb_mcast_joined (3)
+.BR cpg_context_get (3)
+.BR cpg_context_set (3)
+.BR cpg_local_get (3)
+.BR cpg_model_initialize (3)
+
+.PP

+ 1 - 0
man/cpg_overview.8

@@ -61,6 +61,7 @@ access the corosync services.
 .BR cpg_join (3),
 .BR cpg_join (3),
 .BR cpg_leave (3),
 .BR cpg_leave (3),
 .BR cpg_mcast_joined (3),
 .BR cpg_mcast_joined (3),
+.BR cpg_model_initialize (3),
 .BR cpg_membership_get (3)
 .BR cpg_membership_get (3)
 .BR cpg_zcb_alloc (3)
 .BR cpg_zcb_alloc (3)
 .BR cpg_zcb_free (3)
 .BR cpg_zcb_free (3)

+ 64 - 0
services/cpg.c

@@ -133,6 +133,8 @@ struct cpg_pd {
  	mar_cpg_name_t group_name;
  	mar_cpg_name_t group_name;
 	uint32_t pid;
 	uint32_t pid;
 	enum cpd_state cpd_state;
 	enum cpd_state cpd_state;
+	unsigned int flags;
+	int initial_totem_conf_sent;
 	struct list_head list;
 	struct list_head list;
 	struct list_head iteration_instance_list_head;
 	struct list_head iteration_instance_list_head;
 };
 };
@@ -160,6 +162,8 @@ static struct corosync_api_v1 *api = NULL;
 
 
 static enum cpg_sync_state my_sync_state = CPGSYNC_DOWNLIST;
 static enum cpg_sync_state my_sync_state = CPGSYNC_DOWNLIST;
 
 
+static mar_cpg_ring_id_t last_sync_ring_id;
+
 struct process_info {
 struct process_info {
 	unsigned int nodeid;
 	unsigned int nodeid;
 	uint32_t pid;
 	uint32_t pid;
@@ -255,6 +259,11 @@ static void cpg_sync_activate (void);
 
 
 static void cpg_sync_abort (void);
 static void cpg_sync_abort (void);
 
 
+static int notify_lib_totem_membership (
+	void *conn,
+	int member_list_entries,
+	const unsigned int *member_list);
+
 /*
 /*
  * Library Handler Definition
  * Library Handler Definition
  */
  */
@@ -432,6 +441,9 @@ static void cpg_sync_init_v2 (
 		sizeof (unsigned int));
 		sizeof (unsigned int));
 	my_member_list_entries = member_list_entries;
 	my_member_list_entries = member_list_entries;
 
 
+	last_sync_ring_id.nodeid = ring_id->rep.nodeid;
+	last_sync_ring_id.seq = ring_id->seq;
+
 	for (i = 0; i < my_member_list_entries; i++) {
 	for (i = 0; i < my_member_list_entries; i++) {
 		if (my_member_list[i] < lowest_nodeid) {
 		if (my_member_list[i] < lowest_nodeid) {
 			lowest_nodeid = my_member_list[i];
 			lowest_nodeid = my_member_list[i];
@@ -482,13 +494,50 @@ static void cpg_sync_activate (void)
 	memcpy (my_old_member_list, my_member_list,
 	memcpy (my_old_member_list, my_member_list,
 		my_member_list_entries * sizeof (unsigned int));
 		my_member_list_entries * sizeof (unsigned int));
 	my_old_member_list_entries = my_member_list_entries;
 	my_old_member_list_entries = my_member_list_entries;
+
+	notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
 }
 }
 
 
 static void cpg_sync_abort (void)
 static void cpg_sync_abort (void)
 {
 {
 }
 }
 
 
+static int notify_lib_totem_membership (
+	void *conn,
+	int member_list_entries,
+	const unsigned int *member_list)
+{
+	struct list_head *iter;
+	char *buf;
+	int size;
+	struct res_lib_cpg_totem_confchg_callback *res;
+
+	size = sizeof(struct res_lib_cpg_totem_confchg_callback) +
+		sizeof(mar_uint32_t) * (member_list_entries);
+	buf = alloca(size);
+	if (!buf)
+		return CPG_ERR_LIBRARY;
 
 
+	res = (struct res_lib_cpg_totem_confchg_callback *)buf;
+	res->member_list_entries = member_list_entries;
+	res->header.size = size;
+	res->header.id = MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK;
+	res->header.error = CS_OK;
+
+	memcpy (&res->ring_id, &last_sync_ring_id, sizeof (mar_cpg_ring_id_t));
+	memcpy (res->member_list, member_list, res->member_list_entries * sizeof (mar_uint32_t));
+
+	if (conn == NULL) {
+		for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
+			struct cpg_pd *cpg_pd = list_entry (iter, struct cpg_pd, list);
+			api->ipc_dispatch_send (cpg_pd->conn, buf, size);
+		}
+	} else {
+		api->ipc_dispatch_send (conn, buf, size);
+	}
+
+	return CPG_OK;
+}
 
 
 static int notify_lib_joinlist(
 static int notify_lib_joinlist(
 	const mar_cpg_name_t *group_name,
 	const mar_cpg_name_t *group_name,
@@ -604,6 +653,20 @@ static int notify_lib_joinlist(
 		}
 		}
 	}
 	}
 
 
+
+	/*
+	 * Traverse thru cpds and send totem membership for cpd, where it is not send yet
+	 */
+	for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
+		struct cpg_pd *cpd = list_entry (iter, struct cpg_pd, list);
+
+		if ((cpd->flags & CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF) && (cpd->initial_totem_conf_sent == 0)) {
+			cpd->initial_totem_conf_sent = 1;
+
+			notify_lib_totem_membership (cpd->conn, my_old_member_list_entries, my_old_member_list);
+		}
+	}
+
 	return CPG_OK;
 	return CPG_OK;
 }
 }
 
 
@@ -1093,6 +1156,7 @@ static void message_handler_req_lib_cpg_join (void *conn, const void *message)
 		error = CPG_OK;
 		error = CPG_OK;
 		cpd->cpd_state = CPD_STATE_JOIN_STARTED;
 		cpd->cpd_state = CPD_STATE_JOIN_STARTED;
 		cpd->pid = req_lib_cpg_join->pid;
 		cpd->pid = req_lib_cpg_join->pid;
+		cpd->flags = req_lib_cpg_join->flags;
 		memcpy (&cpd->group_name, &req_lib_cpg_join->group_name,
 		memcpy (&cpd->group_name, &req_lib_cpg_join->group_name,
 			sizeof (cpd->group_name));
 			sizeof (cpd->group_name));
 
 

+ 22 - 2
test/testcpg.c

@@ -132,9 +132,29 @@ static void ConfchgCallback (
 	}
 	}
 }
 }
 
 
-static cpg_callbacks_t callbacks = {
+static void TotemConfchgCallback (
+	cpg_handle_t handle,
+        struct cpg_ring_id ring_id,
+        uint32_t member_list_entries,
+        const uint32_t *member_list)
+{
+	int i;
+
+	printf("\nTotemConfchgCallback: ringid (%u.%llu)\n", ring_id.nodeid, ring_id.seq);
+
+	printf("active processors %lu: ",
+	       (unsigned long int) member_list_entries);
+	for (i=0; i<member_list_entries; i++) {
+		printf("%d ", member_list[i]);
+	}
+	printf ("\n");
+}
+
+static cpg_model_v1_data_t model_data = {
 	.cpg_deliver_fn =            DeliverCallback,
 	.cpg_deliver_fn =            DeliverCallback,
 	.cpg_confchg_fn =            ConfchgCallback,
 	.cpg_confchg_fn =            ConfchgCallback,
+	.cpg_totem_confchg_fn =      TotemConfchgCallback,
+	.flags =                     CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF,
 };
 };
 
 
 static void sigintr_handler (int signum) __attribute__((noreturn));
 static void sigintr_handler (int signum) __attribute__((noreturn));
@@ -173,7 +193,7 @@ int main (int argc, char *argv[]) {
 		group_name.length = 6;
 		group_name.length = 6;
 	}
 	}
 
 
-	result = cpg_initialize (&handle, &callbacks);
+	result = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_data, NULL);
 	if (result != CS_OK) {
 	if (result != CS_OK) {
 		printf ("Could not initialize Cluster Process Group API instance error %d\n", result);
 		printf ("Could not initialize Cluster Process Group API instance error %d\n", result);
 		exit (1);
 		exit (1);