|
|
@@ -542,7 +542,7 @@ struct event_svr_channel_subscr {
|
|
|
/*
|
|
|
* Member node data
|
|
|
* mn_node_info: cluster node info from membership
|
|
|
- * mn_last_evt_id: last seen event ID for this node
|
|
|
+ * mn_last_msg_id: last seen message ID for this node
|
|
|
* mn_started: Indicates that event service has started
|
|
|
* on this node.
|
|
|
* mn_next: pointer to the next node in the hash chain.
|
|
|
@@ -551,7 +551,7 @@ struct event_svr_channel_subscr {
|
|
|
struct member_node_data {
|
|
|
struct in_addr mn_node_addr;
|
|
|
SaClmClusterNodeT mn_node_info;
|
|
|
- SaEvtEventIdT mn_last_evt_id;
|
|
|
+ SaEvtEventIdT mn_last_msg_id;
|
|
|
SaClmNodeIdT mn_started;
|
|
|
struct member_node_data *mn_next;
|
|
|
struct list_head mn_entry;
|
|
|
@@ -1300,8 +1300,8 @@ static int check_last_event(struct lib_event_data *evtpkt,
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
- if ((nd->mn_last_evt_id < evtpkt->led_event_id)) {
|
|
|
- nd->mn_last_evt_id = evtpkt->led_event_id;
|
|
|
+ if ((nd->mn_last_msg_id < evtpkt->led_msg_id)) {
|
|
|
+ nd->mn_last_msg_id = evtpkt->led_msg_id;
|
|
|
return 0;
|
|
|
}
|
|
|
return 1;
|
|
|
@@ -1322,10 +1322,37 @@ SaErrorT set_event_id(SaClmNodeIdT node_id)
|
|
|
return err;
|
|
|
}
|
|
|
|
|
|
-static SaErrorT get_event_id(uint64_t *event_id)
|
|
|
+/*
|
|
|
+ * See if an event Id is still in use in the retained event
|
|
|
+ * list.
|
|
|
+ */
|
|
|
+static int id_in_use(uint64_t id, uint64_t base)
|
|
|
+{
|
|
|
+ struct list_head *l;
|
|
|
+ struct event_data *edp;
|
|
|
+ SaEvtEventIdT evtid = (id << 32) | (base & BASE_ID_MASK);
|
|
|
+
|
|
|
+ for (l = retained_list.next; l != &retained_list; l = l->next) {
|
|
|
+ edp = list_entry(l, struct event_data, ed_retained);
|
|
|
+ if (edp->ed_event.led_event_id == evtid) {
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+}
|
|
|
+
|
|
|
+static SaErrorT get_event_id(uint64_t *event_id, uint64_t *msg_id)
|
|
|
{
|
|
|
- *event_id = base_id_top | base_id ;
|
|
|
- base_id = (base_id + 1) & BASE_ID_MASK;
|
|
|
+ /*
|
|
|
+ * Don't reuse an event ID if it is still valid because of
|
|
|
+ * a retained event.
|
|
|
+ */
|
|
|
+ while (id_in_use(base_id_top, base_id)) {
|
|
|
+ base_id++;
|
|
|
+ }
|
|
|
+
|
|
|
+ *event_id = base_id_top | (base_id & BASE_ID_MASK) ;
|
|
|
+ *msg_id = base_id++;
|
|
|
return SA_AIS_OK;
|
|
|
}
|
|
|
|
|
|
@@ -2543,6 +2570,7 @@ static int lib_evt_event_publish(struct conn_info *conn_info, void *message)
|
|
|
struct event_svr_channel_open *eco;
|
|
|
struct event_svr_channel_instance *eci;
|
|
|
SaEvtEventIdT event_id = 0;
|
|
|
+ uint64_t msg_id = 0;
|
|
|
SaErrorT error = SA_AIS_OK;
|
|
|
struct iovec pub_iovec;
|
|
|
void *ptr;
|
|
|
@@ -2571,10 +2599,11 @@ static int lib_evt_event_publish(struct conn_info *conn_info, void *message)
|
|
|
* modify the request structure for sending event data to subscribed
|
|
|
* processes.
|
|
|
*/
|
|
|
- get_event_id(&event_id);
|
|
|
+ get_event_id(&event_id, &msg_id);
|
|
|
req->led_head.id = MESSAGE_REQ_EXEC_EVT_EVENTDATA;
|
|
|
req->led_chan_name = eci->esc_channel_name;
|
|
|
req->led_event_id = event_id;
|
|
|
+ req->led_msg_id = msg_id;
|
|
|
req->led_chan_unlink_id = eci->esc_unlink_id;
|
|
|
|
|
|
/*
|
|
|
@@ -3919,23 +3948,22 @@ static int evt_sync_process(void)
|
|
|
log_printf(RECOVERY_DEBUG, "Send max event ID updates\n");
|
|
|
while (add_count) {
|
|
|
/*
|
|
|
- * If we've seen this node before, send out the last event ID
|
|
|
- * that we've seen from him. He will set his base event ID to
|
|
|
- * the highest one seen.
|
|
|
+ * If we've seen this node before, send out the last msg ID
|
|
|
+ * that we've seen from him. He will set his base ID for
|
|
|
+ * generating event and message IDs to the highest one seen.
|
|
|
*/
|
|
|
md = evt_find_node(*add_list);
|
|
|
if (md != NULL) {
|
|
|
log_printf(RECOVERY_DEBUG,
|
|
|
"Send set evt ID %llx to %s\n",
|
|
|
- md->mn_last_evt_id, inet_ntoa(*add_list));
|
|
|
+ md->mn_last_msg_id, inet_ntoa(*add_list));
|
|
|
md->mn_started = 1;
|
|
|
memset(&cpkt, 0, sizeof(cpkt));
|
|
|
cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
|
|
|
cpkt.chc_head.size = sizeof(cpkt);
|
|
|
cpkt.chc_op = EVT_SET_ID_OP;
|
|
|
cpkt.u.chc_set_id.chc_addr = *add_list;
|
|
|
- cpkt.u.chc_set_id.chc_last_id =
|
|
|
- md->mn_last_evt_id & BASE_ID_MASK;
|
|
|
+ cpkt.u.chc_set_id.chc_last_id = md->mn_last_msg_id;
|
|
|
chn_iovec.iov_base = &cpkt;
|
|
|
chn_iovec.iov_len = cpkt.chc_head.size;
|
|
|
res = totempg_mcast (&chn_iovec, 1,TOTEMPG_AGREED);
|