|
|
@@ -67,7 +67,6 @@
|
|
|
#include "gmi.h"
|
|
|
#include "../include/queue.h"
|
|
|
#include "../include/sq.h"
|
|
|
-#include "print.h"
|
|
|
|
|
|
extern struct sockaddr_in this_ip;
|
|
|
|
|
|
@@ -190,6 +189,15 @@ poll_timer_handle timer_single_member = 0;
|
|
|
*/
|
|
|
int (*gmi_recv) (char *group, struct iovec *iovec, int iov_len);
|
|
|
|
|
|
+/*
|
|
|
+ * Function and data used to log messages
|
|
|
+ */
|
|
|
+static void (*gmi_log_printf) (int level, char *format, ...);
|
|
|
+int gmi_log_level_error;
|
|
|
+int gmi_log_level_warning;
|
|
|
+int gmi_log_level_notice;
|
|
|
+int gmi_log_level_debug;
|
|
|
+
|
|
|
struct message_header {
|
|
|
int type;
|
|
|
int seqid;
|
|
|
@@ -382,6 +390,20 @@ struct message_handlers gmi_message_handlers = {
|
|
|
}
|
|
|
};
|
|
|
|
|
|
+void gmi_log_printf_init (
|
|
|
+ void (*log_printf) (int , char *, ...),
|
|
|
+ int log_level_error,
|
|
|
+ int log_level_warning,
|
|
|
+ int log_level_notice,
|
|
|
+ int log_level_debug)
|
|
|
+{
|
|
|
+ gmi_log_level_error = log_level_error;
|
|
|
+ gmi_log_level_warning = log_level_warning;
|
|
|
+ gmi_log_level_notice = log_level_notice;
|
|
|
+ gmi_log_level_debug = log_level_debug;
|
|
|
+ gmi_log_printf = log_printf;
|
|
|
+}
|
|
|
+
|
|
|
/*
|
|
|
* Exported interfaces
|
|
|
*/
|
|
|
@@ -499,7 +521,7 @@ static int gmi_pend_trans_item_store (
|
|
|
}
|
|
|
gmi_pend_trans_item.iov_len = iov_len;
|
|
|
|
|
|
- log_printf (LOG_LEVEL_DEBUG, "mcasted message added to pending queue\n");
|
|
|
+ gmi_log_printf (gmi_log_level_debug, "mcasted message added to pending queue\n");
|
|
|
queue_item_add (&queues_pend_trans[priority], &gmi_pend_trans_item);
|
|
|
|
|
|
return (0);
|
|
|
@@ -580,7 +602,7 @@ int gmi_mcast (
|
|
|
|
|
|
packet_size = FRAGMENT_SIZE;
|
|
|
|
|
|
- log_printf (LOG_LEVEL_DEBUG, "MCASTING MESSAGE\n");
|
|
|
+ gmi_log_printf (gmi_log_level_debug, "MCASTING MESSAGE\n");
|
|
|
|
|
|
/*
|
|
|
* Determine size of total message
|
|
|
@@ -593,7 +615,7 @@ int gmi_mcast (
|
|
|
|
|
|
packet_count = (total_size / packet_size);
|
|
|
|
|
|
- log_printf (LOG_LEVEL_DEBUG, "Message size is %d\n", total_size);
|
|
|
+ gmi_log_printf (gmi_log_level_debug, "Message size is %d\n", total_size);
|
|
|
|
|
|
/*
|
|
|
* Break message up into individual packets and publish them
|
|
|
@@ -747,7 +769,7 @@ static int gmi_build_sockets (struct sockaddr_in *sockaddr_mcast,
|
|
|
*/
|
|
|
if (setsockopt(*fd_mcast, SOL_SOCKET, SO_BINDTODEVICE,
|
|
|
(char *)&interface, sizeof(interface)) < 0) {
|
|
|
- log_printf (LOG_LEVEL_WARNING, "Could not bind to device for multicast, group messaging may not work properly. (%s)\n", strerror (errno));
|
|
|
+ gmi_log_printf (gmi_log_level_warning, "Could not bind to device for multicast, group messaging may not work properly. (%s)\n", strerror (errno));
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
@@ -935,7 +957,7 @@ static int messages_free (int group_arut)
|
|
|
}
|
|
|
|
|
|
sq_items_release (&queue_rtr_items, lesser);
|
|
|
- log_printf (LOG_LEVEL_DEBUG, "releasing messages up to and including %d\n", lesser);
|
|
|
+ gmi_log_printf (gmi_log_level_debug, "releasing messages up to and including %d\n", lesser);
|
|
|
return (0);
|
|
|
}
|
|
|
|
|
|
@@ -1308,7 +1330,7 @@ printf ("EVS STATE group arut %d gmi arut %d highest %d barrier %d starting grou
|
|
|
//printf ("group arut is %d %d %d\n", orf_token->group_arut, gmi_arut, gmi_highest_seq);
|
|
|
// TODO
|
|
|
if (memb_state == MEMB_STATE_EVS && gmi_arut == gmi_barrier_seq && orf_token->group_arut == gmi_barrier_seq) {
|
|
|
- log_printf (LOG_LEVEL_NOTICE, "EVS recovery of messages complete, transitioning to operational.\n");
|
|
|
+ gmi_log_printf (gmi_log_level_notice, "EVS recovery of messages complete, transitioning to operational.\n");
|
|
|
/*
|
|
|
* EVS recovery complete, reset local variables
|
|
|
*/
|
|
|
@@ -1466,7 +1488,7 @@ static int orf_fcc_allowed (struct orf_token *token)
|
|
|
|
|
|
void timer_function_form_token_timeout (void *data)
|
|
|
{
|
|
|
- log_printf (LOG_LEVEL_WARNING, "Token loss in FORM state\n");
|
|
|
+ gmi_log_printf (gmi_log_level_warning, "Token loss in FORM state\n");
|
|
|
memb_list_entries = 1;
|
|
|
|
|
|
/*
|
|
|
@@ -1482,7 +1504,7 @@ void orf_timer_function_token_timeout (void *data)
|
|
|
{
|
|
|
switch (memb_state) {
|
|
|
case MEMB_STATE_OPERATIONAL:
|
|
|
- log_printf (LOG_LEVEL_WARNING, "Token loss in OPERATIONAL.\n");
|
|
|
+ gmi_log_printf (gmi_log_level_warning, "Token loss in OPERATIONAL.\n");
|
|
|
memb_conf_id.rep.s_addr = memb_local_sockaddr_in.sin_addr.s_addr;
|
|
|
memb_list_entries = 1;
|
|
|
|
|
|
@@ -1491,12 +1513,12 @@ void orf_timer_function_token_timeout (void *data)
|
|
|
|
|
|
case MEMB_STATE_GATHER:
|
|
|
case MEMB_STATE_COMMIT:
|
|
|
- log_printf (LOG_LEVEL_WARNING, "Token loss in GATHER or COMMIT.\n");
|
|
|
+ gmi_log_printf (gmi_log_level_warning, "Token loss in GATHER or COMMIT.\n");
|
|
|
memb_list_entries = 1;
|
|
|
break;
|
|
|
|
|
|
case MEMB_STATE_EVS:
|
|
|
- log_printf (LOG_LEVEL_WARNING, "Token loss in EVS state\n");
|
|
|
+ gmi_log_printf (gmi_log_level_warning, "Token loss in EVS state\n");
|
|
|
memb_list_entries = 1;
|
|
|
memb_state_gather_enter ();
|
|
|
break;
|
|
|
@@ -1535,7 +1557,7 @@ static int orf_token_send (
|
|
|
|
|
|
// THIS IS FOR TESTING ERRORS IN THE EVS STATE
|
|
|
//if ((memb_state == MEMB_STATE_EVS) && ((random () % 3) == 0)) {
|
|
|
-//log_printf (LOG_LEVEL_DEBUG, "CAUSING TOKEN LOSS AT EVS STATE\n");
|
|
|
+//gmi_log_printf (gmi_log_level_debug, "CAUSING TOKEN LOSS AT EVS STATE\n");
|
|
|
// return (1);
|
|
|
//}
|
|
|
|
|
|
@@ -1671,7 +1693,7 @@ static void memb_timer_function_state_commit_timeout (void *data)
|
|
|
* send a configuration change because no messages can be recovered in EVS
|
|
|
*/
|
|
|
if (memb_list_entries == 0) {
|
|
|
- log_printf (LOG_LEVEL_NOTICE, "I am the only member.\n");
|
|
|
+ gmi_log_printf (gmi_log_level_notice, "I am the only member.\n");
|
|
|
if (gmi_confchg_fn) {
|
|
|
/*
|
|
|
* Determine nodes that left the configuration
|
|
|
@@ -1691,7 +1713,7 @@ static void memb_timer_function_state_commit_timeout (void *data)
|
|
|
0, 0);
|
|
|
}
|
|
|
} else {
|
|
|
- log_printf (LOG_LEVEL_NOTICE, "No members sent join, keeping old ring and transitioning to operational.\n");
|
|
|
+ gmi_log_printf (gmi_log_level_notice, "No members sent join, keeping old ring and transitioning to operational.\n");
|
|
|
}
|
|
|
memb_state = MEMB_STATE_OPERATIONAL;
|
|
|
return;
|
|
|
@@ -1769,9 +1791,9 @@ static void memb_timer_function_state_gather (void *data)
|
|
|
* GATHER period expired, sort gather sets and send JOIN
|
|
|
*/
|
|
|
memb_state_commit_enter ();
|
|
|
- log_printf (LOG_LEVEL_DEBUG, "GATHER timeout:\n");
|
|
|
+ gmi_log_printf (gmi_log_level_debug, "GATHER timeout:\n");
|
|
|
for (i = 0; i < memb_gather_set_entries; i++) {
|
|
|
- log_printf (LOG_LEVEL_DEBUG, "host %d attempted to join %s\n", i, inet_ntoa (memb_gather_set[i]));
|
|
|
+ gmi_log_printf (gmi_log_level_debug, "host %d attempted to join %s\n", i, inet_ntoa (memb_gather_set[i]));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -1779,14 +1801,14 @@ static void memb_print_commit_set (void)
|
|
|
{
|
|
|
int i, j;
|
|
|
|
|
|
- log_printf (LOG_LEVEL_DEBUG, "Gather list\n");
|
|
|
+ gmi_log_printf (gmi_log_level_debug, "Gather list\n");
|
|
|
for (i = 0; i < memb_gather_set_entries; i++) {
|
|
|
- log_printf (LOG_LEVEL_DEBUG, "\tmember %d %s\n", i, inet_ntoa (memb_gather_set[i]));
|
|
|
+ gmi_log_printf (gmi_log_level_debug, "\tmember %d %s\n", i, inet_ntoa (memb_gather_set[i]));
|
|
|
}
|
|
|
for (i = 0; i < memb_commit_set_entries; i++) {
|
|
|
- log_printf (LOG_LEVEL_DEBUG, "Join from rep %d %s\n", i, inet_ntoa (memb_commit_set[i].rep.sin_addr));
|
|
|
+ gmi_log_printf (gmi_log_level_debug, "Join from rep %d %s\n", i, inet_ntoa (memb_commit_set[i].rep.sin_addr));
|
|
|
for (j = 0; j < memb_commit_set[i].join_rep_list_entries; j++) {
|
|
|
- log_printf (LOG_LEVEL_DEBUG, "\tmember %d %s\n", j, inet_ntoa (memb_commit_set[i].join_rep_list[j]));
|
|
|
+ gmi_log_printf (gmi_log_level_debug, "\tmember %d %s\n", j, inet_ntoa (memb_commit_set[i].join_rep_list[j]));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -2087,11 +2109,11 @@ void print_stats (void)
|
|
|
struct timeval tv_end;
|
|
|
gettimeofday (&tv_end, NULL);
|
|
|
|
|
|
- log_printf (LOG_LEVEL_NOTICE, "Bytes recv %d\n", stats_recv);
|
|
|
- log_printf (LOG_LEVEL_NOTICE, "Bytes sent %d\n", stats_sent);
|
|
|
- log_printf (LOG_LEVEL_NOTICE, "Messages delivered %d\n", stats_delv);
|
|
|
- log_printf (LOG_LEVEL_NOTICE, "Re-Mcasts %d\n", stats_remcasts);
|
|
|
- log_printf (LOG_LEVEL_NOTICE, "Tokens process %d\n", stats_orf_token);
|
|
|
+ gmi_log_printf (gmi_log_level_notice, "Bytes recv %d\n", stats_recv);
|
|
|
+ gmi_log_printf (gmi_log_level_notice, "Bytes sent %d\n", stats_sent);
|
|
|
+ gmi_log_printf (gmi_log_level_notice, "Messages delivered %d\n", stats_delv);
|
|
|
+ gmi_log_printf (gmi_log_level_notice, "Re-Mcasts %d\n", stats_remcasts);
|
|
|
+ gmi_log_printf (gmi_log_level_notice, "Tokens process %d\n", stats_orf_token);
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
@@ -2152,7 +2174,7 @@ static int message_handler_orf_token (
|
|
|
#endif
|
|
|
|
|
|
if (memb_state == MEMB_STATE_FORM) {
|
|
|
- log_printf (LOG_LEVEL_NOTICE, "swallowing ORF token %d.\n", stats_orf_token);
|
|
|
+ gmi_log_printf (gmi_log_level_notice, "swallowing ORF token %d.\n", stats_orf_token);
|
|
|
poll_timer_delete (*gmi_poll_handle, timer_orf_token_timeout);
|
|
|
timer_orf_token_timeout = 0;
|
|
|
return (0);
|
|
|
@@ -2226,7 +2248,7 @@ static int memb_state_gather_enter (void) {
|
|
|
struct memb_attempt_join memb_attempt_join;
|
|
|
int res = 0;
|
|
|
|
|
|
- log_printf (LOG_LEVEL_NOTICE, "entering GATHER state.\n");
|
|
|
+ gmi_log_printf (gmi_log_level_notice, "entering GATHER state.\n");
|
|
|
memb_state = MEMB_STATE_GATHER;
|
|
|
|
|
|
/*
|
|
|
@@ -2245,7 +2267,7 @@ static int memb_state_gather_enter (void) {
|
|
|
* If this node is the representative, send attempt join
|
|
|
*/
|
|
|
if (memb_local_sockaddr_in.sin_addr.s_addr == memb_conf_id.rep.s_addr) {
|
|
|
- log_printf (LOG_LEVEL_NOTICE, "SENDING attempt join because this node is ring rep.\n");
|
|
|
+ gmi_log_printf (gmi_log_level_notice, "SENDING attempt join because this node is ring rep.\n");
|
|
|
memb_attempt_join.header.seqid = 0;
|
|
|
memb_attempt_join.header.type = MESSAGE_TYPE_MEMB_ATTEMPT_JOIN;
|
|
|
|
|
|
@@ -2418,7 +2440,7 @@ static int user_deliver ()
|
|
|
* Deliver message if this is the last packet
|
|
|
*/
|
|
|
if (mcast->packet_number == mcast->packet_count) {
|
|
|
- log_printf (LOG_LEVEL_DEBUG, "Last packet, delivering iovec %d entries seq %d\n",
|
|
|
+ gmi_log_printf (gmi_log_level_debug, "Last packet, delivering iovec %d entries seq %d\n",
|
|
|
iov_len_delv, i);
|
|
|
|
|
|
gmi_deliver_fn (
|
|
|
@@ -2527,7 +2549,7 @@ static void pending_queues_deliver (void)
|
|
|
/*
|
|
|
* Message found
|
|
|
*/
|
|
|
- log_printf (LOG_LEVEL_DEBUG,
|
|
|
+ gmi_log_printf (gmi_log_level_debug,
|
|
|
"Delivering MCAST message with seqid %d to pending delivery queue\n",
|
|
|
mcast->header.seqid);
|
|
|
|
|
|
@@ -2641,12 +2663,12 @@ static int message_handler_memb_attempt_join (
|
|
|
int found;
|
|
|
int i;
|
|
|
|
|
|
- log_printf (LOG_LEVEL_NOTICE, "Got attempt join from %s\n", inet_ntoa (system_from->sin_addr));
|
|
|
+ gmi_log_printf (gmi_log_level_notice, "Got attempt join from %s\n", inet_ntoa (system_from->sin_addr));
|
|
|
|
|
|
for (token_lost = 0, i = 0; i < memb_list_entries; i++) {
|
|
|
if (memb_list[i].sin_addr.s_addr == system_from->sin_addr.s_addr &&
|
|
|
memb_conf_id.rep.s_addr != system_from->sin_addr.s_addr) {
|
|
|
- log_printf (LOG_LEVEL_DEBUG, "ATTEMPT JOIN, token lost, taking attempt join msg.\n");
|
|
|
+ gmi_log_printf (gmi_log_level_debug, "ATTEMPT JOIN, token lost, taking attempt join msg.\n");
|
|
|
poll_timer_delete (*gmi_poll_handle, timer_orf_token_timeout);
|
|
|
timer_orf_token_timeout = 0;
|
|
|
memb_conf_id.rep.s_addr = memb_local_sockaddr_in.sin_addr.s_addr;
|
|
|
@@ -2661,7 +2683,7 @@ static int message_handler_memb_attempt_join (
|
|
|
if (token_lost == 0 &&
|
|
|
memb_conf_id.rep.s_addr != memb_local_sockaddr_in.sin_addr.s_addr) {
|
|
|
|
|
|
- log_printf (LOG_LEVEL_NOTICE, "not the rep for this ring, not handling attempt join.\n");
|
|
|
+ gmi_log_printf (gmi_log_level_notice, "not the rep for this ring, not handling attempt join.\n");
|
|
|
return (0);
|
|
|
}
|
|
|
|
|
|
@@ -2674,7 +2696,7 @@ static int message_handler_memb_attempt_join (
|
|
|
*/
|
|
|
|
|
|
case MEMB_STATE_GATHER:
|
|
|
- log_printf (LOG_LEVEL_DEBUG, "ATTEMPT JOIN: state gather\n");
|
|
|
+ gmi_log_printf (gmi_log_level_debug, "ATTEMPT JOIN: state gather\n");
|
|
|
for (found = 0, i = 0; i < memb_gather_set_entries; i++) {
|
|
|
if (memb_gather_set[i].s_addr == system_from->sin_addr.s_addr) {
|
|
|
found = 1;
|
|
|
@@ -2694,7 +2716,7 @@ static int message_handler_memb_attempt_join (
|
|
|
|
|
|
default:
|
|
|
// TODO what about other states
|
|
|
- log_printf (LOG_LEVEL_ERROR, "memb_attempt_join: EVS or FORM state attempt join occured %d\n", memb_state);
|
|
|
+ gmi_log_printf (gmi_log_level_error, "memb_attempt_join: EVS or FORM state attempt join occured %d\n", memb_state);
|
|
|
}
|
|
|
|
|
|
return (0);
|
|
|
@@ -2715,7 +2737,7 @@ static int message_handler_memb_join (
|
|
|
* Not representative
|
|
|
*/
|
|
|
if (memb_conf_id.rep.s_addr != memb_local_sockaddr_in.sin_addr.s_addr) {
|
|
|
- log_printf (LOG_LEVEL_DEBUG, "not the rep for this ring, not handling join.\n");
|
|
|
+ gmi_log_printf (gmi_log_level_debug, "not the rep for this ring, not handling join.\n");
|
|
|
return (0);
|
|
|
}
|
|
|
|
|
|
@@ -2728,7 +2750,7 @@ static int message_handler_memb_join (
|
|
|
*/
|
|
|
|
|
|
case MEMB_STATE_COMMIT:
|
|
|
- log_printf (LOG_LEVEL_DEBUG, "JOIN in commit\n");
|
|
|
+ gmi_log_printf (gmi_log_level_debug, "JOIN in commit\n");
|
|
|
memb_join = (struct memb_join *)iovec[0].iov_base;
|
|
|
/*
|
|
|
* Find gather set that matches the system message was from
|
|
|
@@ -2772,9 +2794,9 @@ static int message_handler_memb_join (
|
|
|
|
|
|
consensus = memb_state_consensus_commit ();
|
|
|
if (consensus) {
|
|
|
- log_printf (LOG_LEVEL_NOTICE, "CONSENSUS reached!\n");
|
|
|
+ gmi_log_printf (gmi_log_level_notice, "CONSENSUS reached!\n");
|
|
|
if (memb_local_sockaddr_in.sin_addr.s_addr == memb_gather_set[0].s_addr) {
|
|
|
- log_printf (LOG_LEVEL_DEBUG, "This node responsible for sending the FORM token.\n");
|
|
|
+ gmi_log_printf (gmi_log_level_debug, "This node responsible for sending the FORM token.\n");
|
|
|
|
|
|
poll_timer_delete (*gmi_poll_handle, timer_memb_state_commit_timeout);
|
|
|
timer_memb_state_commit_timeout = 0;
|
|
|
@@ -2787,12 +2809,12 @@ static int message_handler_memb_join (
|
|
|
* All other cases are ignored on JOINs
|
|
|
*/
|
|
|
case MEMB_STATE_FORM:
|
|
|
- log_printf (LOG_LEVEL_WARNING, "JOIN in form, ignoring since consensus reached in state machine.\n");
|
|
|
+ gmi_log_printf (gmi_log_level_warning, "JOIN in form, ignoring since consensus reached in state machine.\n");
|
|
|
break;
|
|
|
|
|
|
default:
|
|
|
// TODO HANDLE THIS CASE
|
|
|
- log_printf (LOG_LEVEL_DEBUG, "memb_join: DEFAULT case %d, shouldn't happen!!\n", memb_state);
|
|
|
+ gmi_log_printf (gmi_log_level_debug, "memb_join: DEFAULT case %d, shouldn't happen!!\n", memb_state);
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
@@ -2911,14 +2933,14 @@ printf ("setting barrier seq to %d\n", gmi_barrier_seq);
|
|
|
break;
|
|
|
|
|
|
case MEMB_STATE_EVS:
|
|
|
- log_printf (LOG_LEVEL_DEBUG, "Swallowing FORM token in EVS state.\n");
|
|
|
+ gmi_log_printf (gmi_log_level_debug, "Swallowing FORM token in EVS state.\n");
|
|
|
printf ("FORM CONF ENTRIES %d\n", memb_form_token.conf_desc_list_entries);
|
|
|
orf_token_send_initial();
|
|
|
return (0);
|
|
|
|
|
|
default:
|
|
|
// TODO
|
|
|
- log_printf (LOG_LEVEL_ERROR, "memb_form_token: default case, shouldn't happen.\n");
|
|
|
+ gmi_log_printf (gmi_log_level_error, "memb_form_token: default case, shouldn't happen.\n");
|
|
|
return (0);
|
|
|
}
|
|
|
|