[corosync] [PATCH] Merge downlist and leftlist into one confchg event

Yunkai Zhang qiushu.zyk at taobao.com
Sun Nov 6 08:05:14 GMT 2011


I found that corosync sends downlist(alias leftlist in CPG) and
joinlist separately to CPG client after a new ring established.

When a node lefts from or joins into a ring, only downlist or joinlist
contain content, this behavior seems not strange.

But suppose there are two ring: Ring(A,B), Ring(C,D,E,F) that are
forming into a new ring, and these nodes: B,E,F dropped in the GATHER
state, then only these nodes: A,C,D will form the new Ring(A,C,D). And
corosync will send downlist(B) and joinlist(C,D) as two confchg events
to the CPG client of node A. This will make CPG client confused.

The correct way is that send _Only_ one confchg event containing both
downlist and joinlist to the CPG client. In this case,
1) The CPG client of A should receive only one confchg as following:
memb:A,C,D       left:B         join:C,D

2) The CPG client of C,D should receive only one confchg as following:
memb:A,C,D       left:E,F       join:A

This patch will fix this issue.

Signed-off-by: Yunkai Zhang <qiushu.zyk at taobao.com>
---
 services/cpg.c |  528 ++++++++++++++++++++++++++------------------------------
 1 files changed, 242 insertions(+), 286 deletions(-)

diff --git a/services/cpg.c b/services/cpg.c
index a86b1d7..b6e28d1 100644
--- a/services/cpg.c
+++ b/services/cpg.c
@@ -77,9 +77,7 @@ enum cpg_message_req_types {
 	MESSAGE_REQ_EXEC_CPG_PROCJOIN = 0,
 	MESSAGE_REQ_EXEC_CPG_PROCLEAVE = 1,
 	MESSAGE_REQ_EXEC_CPG_JOINLIST = 2,
-	MESSAGE_REQ_EXEC_CPG_MCAST = 3,
-	MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD = 4,
-	MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5
+	MESSAGE_REQ_EXEC_CPG_MCAST = 3
 };
 
 struct zcb_mapped {
@@ -131,22 +129,9 @@ enum cpd_state {
 	CPD_STATE_JOIN_COMPLETED
 };
 
-enum cpg_sync_state {
-	CPGSYNC_DOWNLIST,
-	CPGSYNC_JOINLIST
-};
-
-enum cpg_downlist_state_e {
-       CPG_DOWNLIST_NONE,
-       CPG_DOWNLIST_WAITING_FOR_MESSAGES,
-       CPG_DOWNLIST_APPLYING,
-};
-static enum cpg_downlist_state_e downlist_state;
-static struct list_head downlist_messages_head;
-
 struct cpg_pd {
 	void *conn;
- 	mar_cpg_name_t group_name;
+	mar_cpg_name_t group_name;
 	uint32_t pid;
 	enum cpd_state cpd_state;
 	unsigned int flags;
@@ -177,8 +162,6 @@ static unsigned int my_old_member_list_entries = 0;
 
 static struct corosync_api_v1 *api = NULL;
 
-static enum cpg_sync_state my_sync_state = CPGSYNC_DOWNLIST;
-
 static mar_cpg_ring_id_t last_sync_ring_id;
 
 struct process_info {
@@ -219,10 +202,6 @@ static void message_handler_req_exec_cpg_mcast (
 	const void *message,
 	unsigned int nodeid);
 
-static void message_handler_req_exec_cpg_downlist_old (
-	const void *message,
-	unsigned int nodeid);
-
 static void message_handler_req_exec_cpg_downlist (
 	const void *message,
 	unsigned int nodeid);
@@ -233,10 +212,6 @@ static void exec_cpg_joinlist_endian_convert (void *msg);
 
 static void exec_cpg_mcast_endian_convert (void *msg);
 
-static void exec_cpg_downlist_endian_convert_old (void *msg);
-
-static void exec_cpg_downlist_endian_convert (void *msg);
-
 static void message_handler_req_lib_cpg_join (void *conn, const void *message);
 
 static void message_handler_req_lib_cpg_leave (void *conn, const void *message);
@@ -277,13 +252,13 @@ static void message_handler_req_lib_cpg_zc_execute (
 
 static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason);
 
-static int cpg_exec_send_downlist(void);
-
 static int cpg_exec_send_joinlist(void);
 
-static void downlist_messages_delete (void);
+static void cpg_leftlist_collect (void);
 
-static void downlist_master_choose_and_send (void);
+static void cpg_joinlist_collect (void);
+
+static void cpg_confchg_send (void);
 
 static void cpg_sync_init_v2 (
 	const unsigned int *trans_list,
@@ -380,15 +355,7 @@ static struct corosync_exec_handler cpg_exec_engine[] =
 	{ /* 3 - MESSAGE_REQ_EXEC_CPG_MCAST */
 		.exec_handler_fn	= message_handler_req_exec_cpg_mcast,
 		.exec_endian_convert_fn	= exec_cpg_mcast_endian_convert
-	},
-	{ /* 4 - MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD */
-		.exec_handler_fn	= message_handler_req_exec_cpg_downlist_old,
-		.exec_endian_convert_fn	= exec_cpg_downlist_endian_convert_old
-	},
-	{ /* 5 - MESSAGE_REQ_EXEC_CPG_DOWNLIST */
-		.exec_handler_fn	= message_handler_req_exec_cpg_downlist,
-		.exec_endian_convert_fn	= exec_cpg_downlist_endian_convert
-	},
+	}
 };
 
 struct corosync_service_engine cpg_service_engine = {
@@ -454,7 +421,7 @@ void corosync_lcr_component_register (void) {
 #else
 __attribute__ ((constructor)) static void corosync_lcr_component_register (void) {
 #endif
-        lcr_interfaces_set (&corosync_cpg_ver0[0], &cpg_service_engine_iface);
+	lcr_interfaces_set (&corosync_cpg_ver0[0], &cpg_service_engine_iface);
 
 	lcr_component_register (&cpg_comp_ver0);
 }
@@ -498,7 +465,26 @@ struct downlist_msg {
 	struct list_head list;
 };
 
-static struct req_exec_cpg_downlist g_req_exec_cpg_downlist;
+struct confchg_nodes{
+	mar_uint32_t left_nodes __attribute__((aligned(8)));
+	mar_uint32_t left_nodeids[PROCESSOR_COUNT_MAX]  __attribute__((aligned(8)));
+	mar_uint32_t join_nodes __attribute__((aligned(8)));
+	mar_uint32_t join_nodeids[PROCESSOR_COUNT_MAX]  __attribute__((aligned(8)));
+};
+
+struct confchg_cpg_group{
+	struct cpg_name cpg_group;
+	mar_cpg_address_t left_list[CPG_MEMBERS_MAX];
+	int left_list_entries;
+	mar_cpg_address_t join_list[CPG_MEMBERS_MAX];
+	int join_list_entries;
+	struct list_head  list;
+};
+
+static struct confchg_nodes g_confchg_nodes;
+static qb_map_t *g_cpg_group_map;
+static int g_recv_msg_nodes;
+static int g_recv_msg_nodeids[PROCESSOR_COUNT_MAX];
 
 static void cpg_sync_init_v2 (
 	const unsigned int *trans_list,
@@ -511,7 +497,9 @@ static void cpg_sync_init_v2 (
 	int i, j;
 	int found;
 
-	my_sync_state = CPGSYNC_DOWNLIST;
+	log_printf(LOG_DEBUG, "cpg_sync_init_v2 ...");
+
+	g_recv_msg_nodes = 0;
 
 	memcpy (my_member_list, member_list, member_list_entries *
 		sizeof (unsigned int));
@@ -520,13 +508,8 @@ static void cpg_sync_init_v2 (
 	last_sync_ring_id.nodeid = ring_id->rep.nodeid;
 	last_sync_ring_id.seq = ring_id->seq;
 
-	downlist_state = CPG_DOWNLIST_WAITING_FOR_MESSAGES;
-
-	entries = 0;
-	/*
-	 * Determine list of nodeids for downlist message
-	 */
-	for (i = 0; i < my_old_member_list_entries; i++) {
+	/* Determine list of left_nodes for confchg event */
+	for (i = 0, entries = 0; i < my_old_member_list_entries; i++) {
 		found = 0;
 		for (j = 0; j < trans_list_entries; j++) {
 			if (my_old_member_list[i] == trans_list[j]) {
@@ -535,50 +518,55 @@ static void cpg_sync_init_v2 (
 			}
 		}
 		if (found == 0) {
-			g_req_exec_cpg_downlist.nodeids[entries++] =
+			g_confchg_nodes.left_nodeids[entries++] =
 				my_old_member_list[i];
 		}
 	}
-	g_req_exec_cpg_downlist.left_nodes = entries;
+	g_confchg_nodes.left_nodes = entries;
+
+	/* Determine list of join_nodes for confchg event */
+	for (i = 0, entries = 0; i < my_member_list_entries; i++) {
+		found = 0;
+		for (j = 0; j < trans_list_entries; j++) {
+			if (my_member_list[i] == trans_list[j]) {
+				found = 1;
+				break;
+			}
+		}
+		if (found == 0) {
+			g_confchg_nodes.join_nodeids[entries++] =
+				my_member_list[i];
+		}
+	}
+	g_confchg_nodes.join_nodes = entries;
+
+	cpg_leftlist_collect();
 }
 
 static int cpg_sync_process (void)
 {
 	int res = -1;
 
-	if (my_sync_state == CPGSYNC_DOWNLIST) {
-		res = cpg_exec_send_downlist();
-		if (res == -1) {
-			return (-1);
-		}
-		my_sync_state = CPGSYNC_JOINLIST;
-	}
-	if (my_sync_state == CPGSYNC_JOINLIST) {
-		res = cpg_exec_send_joinlist();
-	}
+	log_printf (LOGSYS_LEVEL_DEBUG, "cpg_sync_process ...");
+	res = cpg_exec_send_joinlist();
+
 	return (res);
 }
 
 static void cpg_sync_activate (void)
 {
+	log_printf (LOGSYS_LEVEL_DEBUG, "cpg_sync_activate ...");
+
 	memcpy (my_old_member_list, my_member_list,
 		my_member_list_entries * sizeof (unsigned int));
 	my_old_member_list_entries = my_member_list_entries;
 
-	if (downlist_state == CPG_DOWNLIST_WAITING_FOR_MESSAGES) {
-		downlist_master_choose_and_send ();
-	}
-
-	downlist_messages_delete ();
-	downlist_state = CPG_DOWNLIST_NONE;
-
 	notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
 }
 
 static void cpg_sync_abort (void)
 {
-	downlist_state = CPG_DOWNLIST_NONE;
-	downlist_messages_delete ();
+	log_printf (LOGSYS_LEVEL_DEBUG, "cpg_sync_abort ...");
 }
 
 static int notify_lib_totem_membership (
@@ -634,8 +622,9 @@ static int notify_lib_joinlist(
 	struct res_lib_cpg_confchg_callback *res;
 	mar_cpg_address_t *retgi;
 
-	count = 0;
+	log_printf (LOGSYS_LEVEL_DEBUG, "notify_lib_joinlist ...");
 
+	count = 0;
 	for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
 		struct process_info *pi = list_entry (iter, struct process_info, list);
 		if (mar_name_compare (&pi->group, group_name) == 0) {
@@ -706,7 +695,6 @@ static int notify_lib_joinlist(
 		for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
 			struct cpg_pd *cpd = list_entry (iter, struct cpg_pd, list);
 			if (mar_name_compare (&cpd->group_name, group_name) == 0) {
-				assert (joined_list_entries <= 1);
 				if (joined_list_entries) {
 					if (joined_list[0].pid == cpd->pid &&
 						joined_list[0].nodeid == api->totem_nodeid_get()) {
@@ -716,6 +704,13 @@ static int notify_lib_joinlist(
 				if (cpd->cpd_state == CPD_STATE_JOIN_COMPLETED ||
 					cpd->cpd_state == CPD_STATE_LEAVE_STARTED) {
 
+					log_printf (LOGSYS_LEVEL_DEBUG,
+						"send confchg(memb:%d, left:%d, join:%d) to CPG client(group:%s, pid:%d)",
+						res->member_list_entries,
+						res->left_list_entries,
+						res->joined_list_entries,
+						cpd->group_name.value,
+						cpd->pid);
 					api->ipc_dispatch_send (cpd->conn, buf, size);
 				}
 				if (left_list_entries) {
@@ -759,72 +754,68 @@ static void downlist_log(const char *msg, struct downlist_msg* dl)
 		    dl->left_nodes);
 }
 
-static struct downlist_msg* downlist_master_choose (void)
+static void cpg_confchg_send (void)
 {
-	struct downlist_msg *cmp;
-	struct downlist_msg *best = NULL;
-	struct list_head *iter;
-	uint32_t cmp_members;
-	uint32_t best_members;
-
-	for (iter = downlist_messages_head.next;
-		iter != &downlist_messages_head;
-		iter = iter->next) {
+	struct confchg_cpg_group *pccg;
+	mar_cpg_name_t group;
+	qb_map_iter_t *miter;
+	const char *p;
+	int i;
 
-		cmp = list_entry(iter, struct downlist_msg, list);
-		downlist_log("comparing", cmp);
-		if (best == NULL) {
-			best = cmp;
-			continue;
-		}
-		best_members = best->old_members - best->left_nodes;
-		cmp_members = cmp->old_members - cmp->left_nodes;
+	log_printf (LOGSYS_LEVEL_DEBUG, "cpg_confchg_send ...");
 
-		if (cmp_members < best_members) {
-			continue;
-		}
-		else if (cmp_members > best_members) {
-			best = cmp;
+	/* send only one confchg event per cpg group */
+	miter = qb_map_iter_create(g_cpg_group_map);
+	while ((p = qb_map_iter_next(miter, (void **)&pccg))) {
+		marshall_to_mar_cpg_name_t(&group, &pccg->cpg_group);
+
+		log_printf (LOG_DEBUG, "left_list_entries:%d", pccg->left_list_entries);
+		for (i=0; i<pccg->left_list_entries; i++) {
+			log_printf (LOG_DEBUG, "left_list[%d] group:%s, ip:%s, pid:%d",
+				i, pccg->cpg_group.value,
+				(char*)api->totem_ifaces_print(pccg->left_list[i].nodeid),
+				pccg->left_list[i].pid);
 		}
-		else if (cmp->sender_nodeid < best->sender_nodeid) {
-			best = cmp;
+		log_printf (LOG_DEBUG, "join_list_entries:%d", pccg->join_list_entries);
+		for (i=0; i<pccg->join_list_entries; i++) {
+			log_printf (LOG_DEBUG, "join_list[%d] group:%s, ip:%s, pid:%d",
+				i, pccg->cpg_group.value,
+				(char*)api->totem_ifaces_print(pccg->join_list[i].nodeid),
+				pccg->join_list[i].pid);
 		}
 
+		/* send confchg event */
+		notify_lib_joinlist(&group, NULL,
+			pccg->join_list_entries,
+			pccg->join_list,
+			pccg->left_list_entries,
+			pccg->left_list,
+			MESSAGE_RES_CPG_CONFCHG_CALLBACK);
+
+		free(pccg);
 	}
-	return best;
+	qb_map_iter_free(miter);
+	qb_map_destroy(g_cpg_group_map);
+	g_cpg_group_map = NULL;
 }
 
-static void downlist_master_choose_and_send (void)
+static void cpg_leftlist_collect(void)
 {
-	struct downlist_msg *stored_msg;
 	struct list_head *iter;
 	struct process_info *left_pi;
-	qb_map_t *group_map;
 	struct cpg_name cpg_group;
-	mar_cpg_name_t group;
-	struct confchg_data{
-		struct cpg_name cpg_group;
-		mar_cpg_address_t left_list[CPG_MEMBERS_MAX];
-		int left_list_entries;
-		struct list_head  list;
-	} *pcd;
-	qb_map_iter_t *miter;
+	struct confchg_cpg_group *pccg;
 	int i, size;
-	const char *p;
 
-	downlist_state = CPG_DOWNLIST_APPLYING;
+	log_printf (LOGSYS_LEVEL_DEBUG, "cpg_leftlist_collect ...");
 
-	stored_msg = downlist_master_choose ();
-	if (!stored_msg) {
-		log_printf (LOGSYS_LEVEL_DEBUG, "NO chosen downlist");
-		return;
+	if (g_cpg_group_map) {
+		qb_map_destroy(g_cpg_group_map);
 	}
-	downlist_log("chosen downlist", stored_msg);
-
-	group_map = qb_skiplist_create();
+	g_cpg_group_map = qb_skiplist_create();
 
 	/*
-	 * only the cpg groups included in left nodes should receive
+	 * only the cpg groups included in _left nodes_ should receive
 	 * confchg event, so we will collect these cpg groups and
 	 * relative left_lists here.
 	 */
@@ -833,9 +824,8 @@ static void downlist_master_choose_and_send (void)
 		iter = iter->next;
 
 		left_pi = NULL;
-		for (i = 0; i < stored_msg->left_nodes; i++) {
-
-			if (pi->nodeid == stored_msg->nodeids[i]) {
+		for (i = 0; i < g_confchg_nodes.left_nodes; i++) {
+			if (pi->nodeid == g_confchg_nodes.left_nodeids[i]) {
 				left_pi = pi;
 				break;
 			}
@@ -845,72 +835,75 @@ static void downlist_master_choose_and_send (void)
 			marshall_from_mar_cpg_name_t(&cpg_group, &left_pi->group);
 			cpg_group.value[cpg_group.length] = 0;
 
-			pcd = (struct confchg_data *)qb_map_get(group_map, cpg_group.value);
-			if (pcd == NULL) {
-				pcd = (struct confchg_data *)calloc(1, sizeof(struct confchg_data));
-				memcpy(&pcd->cpg_group, &cpg_group, sizeof(struct cpg_name));
-				qb_map_put(group_map, pcd->cpg_group.value, pcd);
+			pccg = (struct confchg_cpg_group *)qb_map_get(g_cpg_group_map, cpg_group.value);
+			if (pccg == NULL) {
+				pccg = (struct confchg_cpg_group *)calloc(1, sizeof(struct confchg_cpg_group));
+				memcpy(&pccg->cpg_group, &cpg_group, sizeof(struct cpg_name));
+				qb_map_put(g_cpg_group_map, pccg->cpg_group.value, pccg);
 			}
-			size = pcd->left_list_entries;
-			pcd->left_list[size].nodeid = left_pi->nodeid;
-			pcd->left_list[size].pid = left_pi->pid;
-			pcd->left_list[size].reason = CONFCHG_CPG_REASON_NODEDOWN;
-			pcd->left_list_entries++;
+			size = pccg->left_list_entries;
+			pccg->left_list[size].nodeid = left_pi->nodeid;
+			pccg->left_list[size].pid = left_pi->pid;
+			pccg->left_list[size].reason = CONFCHG_CPG_REASON_NODEDOWN;
+			pccg->left_list_entries++;
+
 			list_del (&left_pi->list);
 			free (left_pi);
 		}
 	}
-
-	/* send only one confchg event per cpg group */
-	miter = qb_map_iter_create(group_map);
-	while ((p = qb_map_iter_next(miter, (void **)&pcd))) {
-		marshall_to_mar_cpg_name_t(&group, &pcd->cpg_group);
-
-		log_printf (LOG_DEBUG, "left_list_entries:%d", pcd->left_list_entries);
-		for (i=0; i<pcd->left_list_entries; i++) {
-			log_printf (LOG_DEBUG, "left_list[%d] group:%d, ip:%s, pid:%d",
-				i, pcd->cpg_group.value,
-				(char*)api->totem_ifaces_print(pcd->left_list[i].nodeid),
-				pcd->left_list[i].pid);
-		}
-
-		/* send confchg event */
-		notify_lib_joinlist(&group, NULL,
-			0, NULL,
-			pcd->left_list_entries,
-			pcd->left_list,
-			MESSAGE_RES_CPG_CONFCHG_CALLBACK);
-
-		free(pcd);
-	}
-	qb_map_iter_free(miter);
-	qb_map_destroy(group_map);
 }
 
-static void downlist_messages_delete (void)
+static void cpg_joinlist_collect(void)
 {
-	struct downlist_msg *stored_msg;
-	struct list_head *iter, *iter_next;
+	struct list_head *iter;
+	struct process_info *join_pi;
+	struct cpg_name cpg_group;
+	struct confchg_cpg_group *pccg;
+	int i, size;
 
-	for (iter = downlist_messages_head.next;
-		iter != &downlist_messages_head;
-		iter = iter_next) {
+	log_printf (LOGSYS_LEVEL_DEBUG, "cpg_joinlist_collect ...");
 
-		iter_next = iter->next;
+	/*
+	 * only the cpg groups included in _join nodes_ should receive
+	 * confchg event, so we will collect these cpg groups and
+	 * relative join_lists here.
+	 */
+	for (iter = process_info_list_head.next; iter != &process_info_list_head; ) {
+		struct process_info *pi = list_entry(iter, struct process_info, list);
+		iter = iter->next;
+
+		join_pi = NULL;
+		for (i = 0; i < g_confchg_nodes.join_nodes; i++) {
+			if (pi->nodeid == g_confchg_nodes.join_nodeids[i]) {
+				join_pi = pi;
+				break;
+			}
+		}
+
+		if (join_pi) {
+			marshall_from_mar_cpg_name_t(&cpg_group, &join_pi->group);
+			cpg_group.value[cpg_group.length] = 0;
 
-		stored_msg = list_entry(iter, struct downlist_msg, list);
-		list_del (&stored_msg->list);
-		free (stored_msg);
+			pccg = (struct confchg_cpg_group *)qb_map_get(g_cpg_group_map, cpg_group.value);
+			if (pccg == NULL) {
+				pccg = (struct confchg_cpg_group *)calloc(1, sizeof(struct confchg_cpg_group));
+				memcpy(&pccg->cpg_group, &cpg_group, sizeof(struct cpg_name));
+				qb_map_put(g_cpg_group_map, pccg->cpg_group.value, pccg);
+			}
+			size = pccg->join_list_entries;
+			pccg->join_list[size].nodeid = join_pi->nodeid;
+			pccg->join_list[size].pid = join_pi->pid;
+			pccg->join_list[size].reason = CONFCHG_CPG_REASON_NODEUP;
+			pccg->join_list_entries++;
+		}
 	}
 }
 
-
 static int cpg_exec_init_fn (struct corosync_api_v1 *corosync_api)
 {
 #ifdef COROSYNC_SOLARIS
 	logsys_subsys_init();
 #endif
-	list_init (&downlist_messages_head);
 	api = corosync_api;
 	return (0);
 }
@@ -1018,24 +1011,6 @@ static void exec_cpg_joinlist_endian_convert (void *msg_v)
 	}
 }
 
-static void exec_cpg_downlist_endian_convert_old (void *msg)
-{
-}
-
-static void exec_cpg_downlist_endian_convert (void *msg)
-{
-	struct req_exec_cpg_downlist *req_exec_cpg_downlist = msg;
-	unsigned int i;
-
-	req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes);
-	req_exec_cpg_downlist->old_members = swab32(req_exec_cpg_downlist->old_members);
-
-	for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
-		req_exec_cpg_downlist->nodeids[i] = swab32(req_exec_cpg_downlist->nodeids[i]);
-	}
-}
-
-
 static void exec_cpg_mcast_endian_convert (void *msg)
 {
 	struct req_exec_cpg_mcast *req_exec_cpg_mcast = msg;
@@ -1077,7 +1052,7 @@ static void do_proc_join(
 
 	if (process_info_find (name, pid, nodeid) != NULL) {
 		return ;
- 	}
+	}
 	pi = malloc (sizeof (struct process_info));
 	if (!pi) {
 		log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
@@ -1114,66 +1089,14 @@ static void do_proc_join(
 			    MESSAGE_RES_CPG_CONFCHG_CALLBACK);
 }
 
-static void message_handler_req_exec_cpg_downlist_old (
-	const void *message,
-	unsigned int nodeid)
-{
-	log_printf (LOGSYS_LEVEL_WARNING, "downlist OLD from node %d",
-		nodeid);
-}
-
-static void message_handler_req_exec_cpg_downlist(
-	const void *message,
-	unsigned int nodeid)
-{
-	const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message;
-	int i;
-	struct list_head *iter;
-	struct downlist_msg *stored_msg;
-	int found;
-
-	if (downlist_state != CPG_DOWNLIST_WAITING_FOR_MESSAGES) {
-		log_printf (LOGSYS_LEVEL_WARNING, "downlist left_list: %d received in state %d",
-			req_exec_cpg_downlist->left_nodes, downlist_state);
-		return;
-	}
-
-	stored_msg = malloc (sizeof (struct downlist_msg));
-	stored_msg->sender_nodeid = nodeid;
-	stored_msg->old_members = req_exec_cpg_downlist->old_members;
-	stored_msg->left_nodes = req_exec_cpg_downlist->left_nodes;
-	memcpy (stored_msg->nodeids, req_exec_cpg_downlist->nodeids,
-		req_exec_cpg_downlist->left_nodes * sizeof (mar_uint32_t));
-	list_init (&stored_msg->list);
-	list_add (&stored_msg->list, &downlist_messages_head);
-
-	for (i = 0; i < my_member_list_entries; i++) {
-		found = 0;
-		for (iter = downlist_messages_head.next;
-			iter != &downlist_messages_head;
-			iter = iter->next) {
-
-			stored_msg = list_entry(iter, struct downlist_msg, list);
-			if (my_member_list[i] == stored_msg->sender_nodeid) {
-				found = 1;
-			}
-		}
-		if (!found) {
-			return;
-		}
-	}
-
-	downlist_master_choose_and_send ();
-}
-
-
 static void message_handler_req_exec_cpg_procjoin (
 	const void *message,
 	unsigned int nodeid)
 {
 	const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
 
-	log_printf(LOGSYS_LEVEL_DEBUG, "got procjoin message from cluster node %d\n", nodeid);
+	log_printf(LOGSYS_LEVEL_DEBUG, "got procjoin message from cluster node %s",
+			api->totem_ifaces_print(nodeid));
 
 	do_proc_join (&req_exec_cpg_procjoin->group_name,
 		req_exec_cpg_procjoin->pid, nodeid,
@@ -1189,7 +1112,8 @@ static void message_handler_req_exec_cpg_procleave (
 	struct list_head *iter;
 	mar_cpg_address_t notify_info;
 
-	log_printf(LOGSYS_LEVEL_DEBUG, "got procleave message from cluster node %d\n", nodeid);
+	log_printf(LOGSYS_LEVEL_DEBUG, "got procleave message from cluster node %s",
+			api->totem_ifaces_print(nodeid));
 
 	notify_info.pid = req_exec_cpg_procjoin->pid;
 	notify_info.nodeid = nodeid;
@@ -1218,22 +1142,70 @@ static void message_handler_req_exec_cpg_joinlist (
 	const void *message_v,
 	unsigned int nodeid)
 {
+	int i;
 	const char *message = message_v;
+	struct process_info *pi, *pi_entry;
+	struct list_head *list;
+	struct list_head *list_to_add = NULL;
 	const struct qb_ipc_response_header *res = (const struct qb_ipc_response_header *)message;
 	const struct join_list_entry *jle = (const struct join_list_entry *)(message + sizeof(struct qb_ipc_response_header));
 
-	log_printf(LOGSYS_LEVEL_DEBUG, "got joinlist message from node %x\n",
-		nodeid);
+	log_printf(LOGSYS_LEVEL_DEBUG, "got joinlist message from node:%s",
+			api->totem_ifaces_print(nodeid));
+
+	for (i=0; i<g_recv_msg_nodes; i++) {
+		if (g_recv_msg_nodeids[i] == nodeid) {
+			break;
+		}
+	}
+	if (i == g_recv_msg_nodes) {
+		g_recv_msg_nodeids[i] = nodeid;
+		g_recv_msg_nodes++;
+	}
 
-	/* Ignore our own messages */
 	if (nodeid == api->totem_nodeid_get()) {
-		return;
+		goto send_event;
 	}
 
-	while ((const char*)jle < message + res->size) {
-		do_proc_join (&jle->group_name, jle->pid, nodeid,
-			CONFCHG_CPG_REASON_NODEUP);
-		jle++;
+	for (; (const char*)jle < message + res->size; jle++) {
+		if (process_info_find(&jle->group_name, jle->pid, nodeid) != NULL) {
+			continue;
+		}
+
+		pi = malloc (sizeof (struct process_info));
+		if (!pi) {
+			log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
+			return;
+		}
+
+		pi->nodeid = nodeid;
+		pi->pid = jle->pid;
+		memcpy(&pi->group, &jle->group_name, sizeof(jle->group_name));
+		list_init(&pi->list);
+
+		/*
+		 * Insert new process in sorted order so synchronization works properly
+		 */
+		list_to_add = &process_info_list_head;
+		for (list = process_info_list_head.next; list != &process_info_list_head; list = list->next) {
+
+			pi_entry = list_entry(list, struct process_info, list);
+			if (pi_entry->nodeid > pi->nodeid ||
+				(pi_entry->nodeid == pi->nodeid && pi_entry->pid > pi->pid)) {
+
+				break;
+			}
+			list_to_add = list;
+		}
+		list_add (&pi->list, list_to_add);
+	}
+
+send_event:
+	if (g_recv_msg_nodes == my_member_list_entries) {
+		log_printf(LOGSYS_LEVEL_DEBUG, "We have got all joinlist messages in this cluster");
+		g_recv_msg_nodes = 0;
+		cpg_joinlist_collect();
+		cpg_confchg_send();
 	}
 }
 
@@ -1295,43 +1267,25 @@ static void message_handler_req_exec_cpg_mcast (
 	}
 }
 
-
-static int cpg_exec_send_downlist(void)
-{
-	struct iovec iov;
-
-	g_req_exec_cpg_downlist.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_DOWNLIST);
-	g_req_exec_cpg_downlist.header.size = sizeof(struct req_exec_cpg_downlist);
-
-	g_req_exec_cpg_downlist.old_members = my_old_member_list_entries;
-
-	iov.iov_base = (void *)&g_req_exec_cpg_downlist;
-	iov.iov_len = g_req_exec_cpg_downlist.header.size;
-
-	return (api->totem_mcast (&iov, 1, TOTEM_AGREED));
-}
-
 static int cpg_exec_send_joinlist(void)
 {
 	int count = 0;
 	struct list_head *iter;
 	struct qb_ipc_response_header *res;
- 	char *buf;
+	char *buf;
 	struct join_list_entry *jle;
 	struct iovec req_exec_cpg_iovec;
 
- 	for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
- 		struct process_info *pi = list_entry (iter, struct process_info, list);
+	log_printf (LOG_DEBUG, "cpg_exec_send_joinlist ...");
 
- 		if (pi->nodeid == api->totem_nodeid_get ()) {
- 			count++;
+	for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
+		struct process_info *pi = list_entry (iter, struct process_info, list);
+
+		if (pi->nodeid == api->totem_nodeid_get ()) {
+			count++;
 		}
 	}
 
-	/* Nothing to send */
-	if (!count)
-		return 0;
-
 	buf = alloca(sizeof(struct qb_ipc_response_header) + sizeof(struct join_list_entry) * count);
 	if (!buf) {
 		log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate joinlist buffer");
@@ -1341,13 +1295,15 @@ static int cpg_exec_send_joinlist(void)
 	jle = (struct join_list_entry *)(buf + sizeof(struct qb_ipc_response_header));
 	res = (struct qb_ipc_response_header *)buf;
 
- 	for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
- 		struct process_info *pi = list_entry (iter, struct process_info, list);
+	if (count > 0) {
+		for (iter = process_info_list_head.next; iter != &process_info_list_head; iter = iter->next) {
+			struct process_info *pi = list_entry (iter, struct process_info, list);
 
-		if (pi->nodeid == api->totem_nodeid_get ()) {
-			memcpy (&jle->group_name, &pi->group, sizeof (mar_cpg_name_t));
-			jle->pid = pi->pid;
-			jle++;
+			if (pi->nodeid == api->totem_nodeid_get ()) {
+				memcpy (&jle->group_name, &pi->group, sizeof (mar_cpg_name_t));
+				jle->pid = pi->pid;
+				jle++;
+			}
 		}
 	}
 
@@ -1438,9 +1394,9 @@ static void message_handler_req_lib_cpg_join (void *conn, const void *message)
 
 response_send:
 	res_lib_cpg_join.header.size = sizeof(res_lib_cpg_join);
-        res_lib_cpg_join.header.id = MESSAGE_RES_CPG_JOIN;
-        res_lib_cpg_join.header.error = error;
-        api->ipc_response_send (conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join));
+	res_lib_cpg_join.header.id = MESSAGE_RES_CPG_JOIN;
+	res_lib_cpg_join.header.error = error;
+	api->ipc_response_send (conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join));
 }
 
 /* Leave message from the library */
-- 
1.7.1



More information about the discuss mailing list