|
@@ -71,7 +71,7 @@
|
|
|
#include <sys/poll.h>
|
|
#include <sys/poll.h>
|
|
|
|
|
|
|
|
#include <corosync/swab.h>
|
|
#include <corosync/swab.h>
|
|
|
-#include <corosync/queue.h>
|
|
|
|
|
|
|
+#include <corosync/cs_queue.h>
|
|
|
#include <corosync/sq.h>
|
|
#include <corosync/sq.h>
|
|
|
#include <corosync/list.h>
|
|
#include <corosync/list.h>
|
|
|
#include <corosync/hdb.h>
|
|
#include <corosync/hdb.h>
|
|
@@ -374,9 +374,9 @@ struct totemsrp_instance {
|
|
|
/*
|
|
/*
|
|
|
* Queues used to order, deliver, and recover messages
|
|
* Queues used to order, deliver, and recover messages
|
|
|
*/
|
|
*/
|
|
|
- struct queue new_message_queue;
|
|
|
|
|
|
|
+ struct cs_queue new_message_queue;
|
|
|
|
|
|
|
|
- struct queue retrans_message_queue;
|
|
|
|
|
|
|
+ struct cs_queue retrans_message_queue;
|
|
|
|
|
|
|
|
struct sq regular_sort_queue;
|
|
struct sq regular_sort_queue;
|
|
|
|
|
|
|
@@ -788,7 +788,7 @@ int totemsrp_initialize (
|
|
|
"max_network_delay (%d ms)\n", totem_config->max_network_delay);
|
|
"max_network_delay (%d ms)\n", totem_config->max_network_delay);
|
|
|
|
|
|
|
|
|
|
|
|
|
- queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
|
|
|
|
|
|
|
+ cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
|
|
|
sizeof (struct message_item));
|
|
sizeof (struct message_item));
|
|
|
|
|
|
|
|
sq_init (&instance->regular_sort_queue,
|
|
sq_init (&instance->regular_sort_queue,
|
|
@@ -845,7 +845,7 @@ int totemsrp_initialize (
|
|
|
/*
|
|
/*
|
|
|
* Must have net_mtu adjusted by totemrrp_initialize first
|
|
* Must have net_mtu adjusted by totemrrp_initialize first
|
|
|
*/
|
|
*/
|
|
|
- queue_init (&instance->new_message_queue,
|
|
|
|
|
|
|
+ cs_queue_init (&instance->new_message_queue,
|
|
|
MESSAGE_QUEUE_MAX,
|
|
MESSAGE_QUEUE_MAX,
|
|
|
sizeof (struct message_item));
|
|
sizeof (struct message_item));
|
|
|
|
|
|
|
@@ -1853,7 +1853,7 @@ static void memb_state_recovery_enter (
|
|
|
instance->my_high_ring_delivered = 0;
|
|
instance->my_high_ring_delivered = 0;
|
|
|
|
|
|
|
|
sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
|
|
sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
|
|
|
- queue_reinit (&instance->retrans_message_queue);
|
|
|
|
|
|
|
+ cs_queue_reinit (&instance->retrans_message_queue);
|
|
|
|
|
|
|
|
low_ring_aru = instance->old_ring_state_high_seq_received;
|
|
low_ring_aru = instance->old_ring_state_high_seq_received;
|
|
|
|
|
|
|
@@ -1982,7 +1982,7 @@ static void memb_state_recovery_enter (
|
|
|
memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
|
|
memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
|
|
|
sort_queue_item->mcast,
|
|
sort_queue_item->mcast,
|
|
|
sort_queue_item->msg_len);
|
|
sort_queue_item->msg_len);
|
|
|
- queue_item_add (&instance->retrans_message_queue, &message_item);
|
|
|
|
|
|
|
+ cs_queue_item_add (&instance->retrans_message_queue, &message_item);
|
|
|
}
|
|
}
|
|
|
log_printf (instance->totemsrp_log_level_notice,
|
|
log_printf (instance->totemsrp_log_level_notice,
|
|
|
"Originated %d messages in RECOVERY.\n", messages_originated);
|
|
"Originated %d messages in RECOVERY.\n", messages_originated);
|
|
@@ -2049,7 +2049,7 @@ int totemsrp_mcast (
|
|
|
goto error_exit;
|
|
goto error_exit;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if (queue_is_full (&instance->new_message_queue)) {
|
|
|
|
|
|
|
+ if (cs_queue_is_full (&instance->new_message_queue)) {
|
|
|
log_printf (instance->totemsrp_log_level_warning, "queue full\n");
|
|
log_printf (instance->totemsrp_log_level_warning, "queue full\n");
|
|
|
return (-1);
|
|
return (-1);
|
|
|
}
|
|
}
|
|
@@ -2086,7 +2086,7 @@ int totemsrp_mcast (
|
|
|
message_item.msg_len = addr_idx;
|
|
message_item.msg_len = addr_idx;
|
|
|
|
|
|
|
|
log_printf (instance->totemsrp_log_level_debug, "mcasted message added to pending queue\n");
|
|
log_printf (instance->totemsrp_log_level_debug, "mcasted message added to pending queue\n");
|
|
|
- queue_item_add (&instance->new_message_queue, &message_item);
|
|
|
|
|
|
|
+ cs_queue_item_add (&instance->new_message_queue, &message_item);
|
|
|
|
|
|
|
|
hdb_handle_put (&totemsrp_instance_database, handle);
|
|
hdb_handle_put (&totemsrp_instance_database, handle);
|
|
|
return (0);
|
|
return (0);
|
|
@@ -2113,7 +2113,7 @@ int totemsrp_avail (hdb_handle_t handle)
|
|
|
goto error_exit;
|
|
goto error_exit;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- queue_avail (&instance->new_message_queue, &avail);
|
|
|
|
|
|
|
+ cs_queue_avail (&instance->new_message_queue, &avail);
|
|
|
|
|
|
|
|
hdb_handle_put (&totemsrp_instance_database, handle);
|
|
hdb_handle_put (&totemsrp_instance_database, handle);
|
|
|
|
|
|
|
@@ -2272,7 +2272,7 @@ static int orf_token_mcast (
|
|
|
int fcc_mcasts_allowed)
|
|
int fcc_mcasts_allowed)
|
|
|
{
|
|
{
|
|
|
struct message_item *message_item = 0;
|
|
struct message_item *message_item = 0;
|
|
|
- struct queue *mcast_queue;
|
|
|
|
|
|
|
+ struct cs_queue *mcast_queue;
|
|
|
struct sq *sort_queue;
|
|
struct sq *sort_queue;
|
|
|
struct sort_queue_item sort_queue_item;
|
|
struct sort_queue_item sort_queue_item;
|
|
|
struct sort_queue_item *sort_queue_item_ptr;
|
|
struct sort_queue_item *sort_queue_item_ptr;
|
|
@@ -2289,10 +2289,10 @@ static int orf_token_mcast (
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
|
|
for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
|
|
|
- if (queue_is_empty (mcast_queue)) {
|
|
|
|
|
|
|
+ if (cs_queue_is_empty (mcast_queue)) {
|
|
|
break;
|
|
break;
|
|
|
}
|
|
}
|
|
|
- message_item = (struct message_item *)queue_item_get (mcast_queue);
|
|
|
|
|
|
|
+ message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
|
|
|
/* preincrement required by algo */
|
|
/* preincrement required by algo */
|
|
|
if (instance->old_ring_state_saved &&
|
|
if (instance->old_ring_state_saved &&
|
|
|
(instance->memb_state == MEMB_STATE_GATHER ||
|
|
(instance->memb_state == MEMB_STATE_GATHER ||
|
|
@@ -2332,7 +2332,7 @@ static int orf_token_mcast (
|
|
|
/*
|
|
/*
|
|
|
* Delete item from pending queue
|
|
* Delete item from pending queue
|
|
|
*/
|
|
*/
|
|
|
- queue_item_remove (mcast_queue);
|
|
|
|
|
|
|
+ cs_queue_item_remove (mcast_queue);
|
|
|
|
|
|
|
|
/*
|
|
/*
|
|
|
* If messages mcasted, deliver any new messages to totempg
|
|
* If messages mcasted, deliver any new messages to totempg
|
|
@@ -2607,7 +2607,7 @@ static int orf_token_send_initial (struct totemsrp_instance *instance)
|
|
|
orf_token.retrans_flg = 1;
|
|
orf_token.retrans_flg = 1;
|
|
|
instance->my_set_retrans_flg = 1;
|
|
instance->my_set_retrans_flg = 1;
|
|
|
|
|
|
|
|
- if (queue_is_empty (&instance->retrans_message_queue) == 1) {
|
|
|
|
|
|
|
+ if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
|
|
|
orf_token.retrans_flg = 0;
|
|
orf_token.retrans_flg = 0;
|
|
|
instance->my_set_retrans_flg = 0;
|
|
instance->my_set_retrans_flg = 0;
|
|
|
} else {
|
|
} else {
|
|
@@ -3148,10 +3148,10 @@ static unsigned int backlog_get (struct totemsrp_instance *instance)
|
|
|
unsigned int backlog = 0;
|
|
unsigned int backlog = 0;
|
|
|
|
|
|
|
|
if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
|
|
if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
|
|
|
- backlog = queue_used (&instance->new_message_queue);
|
|
|
|
|
|
|
+ backlog = cs_queue_used (&instance->new_message_queue);
|
|
|
} else
|
|
} else
|
|
|
if (instance->memb_state == MEMB_STATE_RECOVERY) {
|
|
if (instance->memb_state == MEMB_STATE_RECOVERY) {
|
|
|
- backlog = queue_used (&instance->retrans_message_queue);
|
|
|
|
|
|
|
+ backlog = cs_queue_used (&instance->retrans_message_queue);
|
|
|
}
|
|
}
|
|
|
return (backlog);
|
|
return (backlog);
|
|
|
}
|
|
}
|
|
@@ -3432,7 +3432,7 @@ static int message_handler_orf_token (
|
|
|
* has recovered all messages it can recover
|
|
* has recovered all messages it can recover
|
|
|
* (ie: its retrans queue is empty)
|
|
* (ie: its retrans queue is empty)
|
|
|
*/
|
|
*/
|
|
|
- if (queue_is_empty (&instance->retrans_message_queue) == 0) {
|
|
|
|
|
|
|
+ if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
|
|
|
|
|
|
|
|
if (token->retrans_flg == 0) {
|
|
if (token->retrans_flg == 0) {
|
|
|
token->retrans_flg = 1;
|
|
token->retrans_flg = 1;
|
|
@@ -3445,7 +3445,7 @@ static int message_handler_orf_token (
|
|
|
log_printf (instance->totemsrp_log_level_debug,
|
|
log_printf (instance->totemsrp_log_level_debug,
|
|
|
"token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x\n",
|
|
"token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x\n",
|
|
|
token->retrans_flg, instance->my_set_retrans_flg,
|
|
token->retrans_flg, instance->my_set_retrans_flg,
|
|
|
- queue_is_empty (&instance->retrans_message_queue),
|
|
|
|
|
|
|
+ cs_queue_is_empty (&instance->retrans_message_queue),
|
|
|
instance->my_retrans_flg_count, token->aru);
|
|
instance->my_retrans_flg_count, token->aru);
|
|
|
if (token->retrans_flg == 0) {
|
|
if (token->retrans_flg == 0) {
|
|
|
instance->my_retrans_flg_count += 1;
|
|
instance->my_retrans_flg_count += 1;
|