|
|
@@ -75,7 +75,7 @@ typedef struct {
|
|
|
unsigned char payload[0];
|
|
|
} msg_t;
|
|
|
|
|
|
-#define LOG_STR_SIZE 256
|
|
|
+#define LOG_STR_SIZE 80
|
|
|
typedef struct {
|
|
|
char log[LOG_STR_SIZE];
|
|
|
struct list_head list;
|
|
|
@@ -103,7 +103,8 @@ static int32_t pcmk_test = 0;
|
|
|
|
|
|
static void send_some_more_messages (void * unused);
|
|
|
|
|
|
-static char* err_status_string (char * buf, size_t buf_len, msg_status_t status)
|
|
|
+static char*
|
|
|
+err_status_string (char * buf, size_t buf_len, msg_status_t status)
|
|
|
{
|
|
|
switch (status) {
|
|
|
case MSG_OK:
|
|
|
@@ -153,9 +154,6 @@ static void delivery_callback (
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- msg_pt->seq = my_seq;
|
|
|
- my_seq++;
|
|
|
-
|
|
|
if (nodeid != msg_pt->nodeid) {
|
|
|
status = MSG_NODEID_ERR;
|
|
|
}
|
|
|
@@ -178,14 +176,15 @@ static void delivery_callback (
|
|
|
list_init (&log_pt->list);
|
|
|
|
|
|
snprintf (log_pt->log, LOG_STR_SIZE, "%d:%d:%d:%s;",
|
|
|
- msg_pt->nodeid, msg_pt->pid, msg_pt->seq,
|
|
|
+ msg_pt->nodeid, msg_pt->seq, my_seq,
|
|
|
err_status_string (status_buf, 20, status));
|
|
|
list_add_tail (&log_pt->list, &msg_log_head);
|
|
|
total_stored_msgs++;
|
|
|
total_msgs_revd++;
|
|
|
+ my_seq++;
|
|
|
|
|
|
- if ((total_msgs_revd % 100) == 0) {
|
|
|
- qb_log (LOG_INFO, "%d",total_msgs_revd);
|
|
|
+ if ((total_msgs_revd % 1000) == 0) {
|
|
|
+ qb_log (LOG_INFO, "rx %d", total_msgs_revd);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -200,6 +199,11 @@ static void config_change_callback (
|
|
|
log_entry_t *log_pt;
|
|
|
|
|
|
/* group_name,ip,pid,join|leave */
|
|
|
+ if (record_config_events_g > 0) {
|
|
|
+ qb_log (LOG_INFO, "got cpg event[recording] for group %s", groupName->value);
|
|
|
+ } else {
|
|
|
+ qb_log (LOG_INFO, "got cpg event[ignoring] for group %s", groupName->value);
|
|
|
+ }
|
|
|
|
|
|
for (i = 0; i < left_list_entries; i++) {
|
|
|
if (record_config_events_g > 0) {
|
|
|
@@ -208,6 +212,7 @@ static void config_change_callback (
|
|
|
snprintf (log_pt->log, LOG_STR_SIZE, "%s,%d,%d,left",
|
|
|
groupName->value, left_list[i].nodeid,left_list[i].pid);
|
|
|
list_add_tail(&log_pt->list, &config_chg_log_head);
|
|
|
+ qb_log (LOG_INFO, "cpg event %s", log_pt->log);
|
|
|
}
|
|
|
}
|
|
|
for (i = 0; i < joined_list_entries; i++) {
|
|
|
@@ -217,6 +222,7 @@ static void config_change_callback (
|
|
|
snprintf (log_pt->log, LOG_STR_SIZE, "%s,%d,%d,join",
|
|
|
groupName->value, joined_list[i].nodeid,joined_list[i].pid);
|
|
|
list_add_tail (&log_pt->list, &config_chg_log_head);
|
|
|
+ qb_log (LOG_INFO, "cpg event %s", log_pt->log);
|
|
|
}
|
|
|
}
|
|
|
if (pcmk_test == 1) {
|
|
|
@@ -248,7 +254,7 @@ static cpg_callbacks_t callbacks = {
|
|
|
static void record_messages (void)
|
|
|
{
|
|
|
record_messages_g = 1;
|
|
|
- qb_log (LOG_DEBUG,"record:%d", record_messages_g);
|
|
|
+ qb_log (LOG_INFO, "record:%d", record_messages_g);
|
|
|
}
|
|
|
|
|
|
static void record_config_events (int sock)
|
|
|
@@ -285,14 +291,13 @@ static void read_messages (int sock, char* atmost_str)
|
|
|
log_entry_t *entry;
|
|
|
int atmost = atoi (atmost_str);
|
|
|
int packed = 0;
|
|
|
+ ssize_t rc;
|
|
|
|
|
|
if (atmost == 0)
|
|
|
atmost = 1;
|
|
|
if (atmost > (HOW_BIG_AND_BUF / LOG_STR_SIZE))
|
|
|
atmost = (HOW_BIG_AND_BUF / LOG_STR_SIZE);
|
|
|
|
|
|
- qb_log(LOG_DEBUG, "atmost %d; total_stored_msgs:%d",
|
|
|
- atmost, total_stored_msgs);
|
|
|
big_and_buf[0] = '\0';
|
|
|
|
|
|
for (list = msg_log_head.next;
|
|
|
@@ -312,10 +317,13 @@ static void read_messages (int sock, char* atmost_str)
|
|
|
if (packed == 0) {
|
|
|
strcpy (big_and_buf, "None");
|
|
|
} else {
|
|
|
- qb_log(LOG_INFO, "sending %d; total_stored_msgs:%d; len:%d",
|
|
|
- packed, total_stored_msgs, (int)strlen (big_and_buf));
|
|
|
+ if ((total_stored_msgs % 1000) == 0) {
|
|
|
+ qb_log(LOG_INFO, "sending %d; total_stored_msgs:%d; len:%d",
|
|
|
+ packed, total_stored_msgs, (int)strlen (big_and_buf));
|
|
|
+ }
|
|
|
}
|
|
|
- send (sock, big_and_buf, strlen (big_and_buf), 0);
|
|
|
+ rc = send (sock, big_and_buf, strlen (big_and_buf), 0);
|
|
|
+ assert(rc = strlen (big_and_buf));
|
|
|
}
|
|
|
|
|
|
static qb_loop_timer_handle more_messages_timer_handle;
|
|
|
@@ -356,7 +364,7 @@ static void send_some_more_messages_zcb (void)
|
|
|
my_msg->pid = my_pid;
|
|
|
my_msg->nodeid = my_nodeid;
|
|
|
my_msg->size = sizeof (msg_t) + payload_size;
|
|
|
- my_msg->seq = 0;
|
|
|
+ my_msg->seq = my_msgs_sent;
|
|
|
for (i = 0; i < payload_size; i++) {
|
|
|
my_msg->payload[i] = i;
|
|
|
}
|
|
|
@@ -378,10 +386,6 @@ static void send_some_more_messages_zcb (void)
|
|
|
if (res == CS_ERR_TRY_AGAIN) {
|
|
|
/* lets do this later */
|
|
|
send_some_more_messages_later ();
|
|
|
-// if (i > 0) {
|
|
|
-// qb_log (LOG_INFO, "TRY_AGAIN %d to send.",
|
|
|
-// my_msgs_to_send);
|
|
|
-// }
|
|
|
goto free_buffer;
|
|
|
} else if (res != CS_OK) {
|
|
|
qb_log (LOG_ERR, "cpg_mcast_joined error:%d, exiting.",
|
|
|
@@ -398,11 +402,11 @@ free_buffer:
|
|
|
|
|
|
#define cs_repeat(counter, max, code) do { \
|
|
|
code; \
|
|
|
- if(res == CS_ERR_TRY_AGAIN) { \
|
|
|
- counter++; \
|
|
|
- sleep(counter); \
|
|
|
+ if (res == CS_ERR_TRY_AGAIN) { \
|
|
|
+ counter++; \
|
|
|
+ sleep(counter); \
|
|
|
} \
|
|
|
- } while(res == CS_ERR_TRY_AGAIN && counter < max)
|
|
|
+} while (res == CS_ERR_TRY_AGAIN && counter < max)
|
|
|
|
|
|
static unsigned char buffer[200000];
|
|
|
static void send_some_more_messages_normal (void)
|
|
|
@@ -423,13 +427,13 @@ static void send_some_more_messages_normal (void)
|
|
|
|
|
|
send_now = my_msgs_to_send;
|
|
|
|
|
|
- qb_log (LOG_DEBUG,"send_now:%d", send_now);
|
|
|
+ qb_log (LOG_TRACE, "send_now:%d", send_now);
|
|
|
|
|
|
my_msg.pid = my_pid;
|
|
|
my_msg.nodeid = my_nodeid;
|
|
|
payload_size = (rand() % 10000);
|
|
|
my_msg.size = sizeof (msg_t) + payload_size;
|
|
|
- my_msg.seq = 0;
|
|
|
+ my_msg.seq = my_msgs_sent;
|
|
|
for (i = 0; i < payload_size; i++) {
|
|
|
buffer[i] = i;
|
|
|
}
|
|
|
@@ -481,9 +485,10 @@ static void send_some_more_messages_normal (void)
|
|
|
}
|
|
|
}
|
|
|
my_msgs_sent++;
|
|
|
+ my_msg.seq = my_msgs_sent;
|
|
|
my_msgs_to_send--;
|
|
|
}
|
|
|
- qb_log (LOG_INFO, "sent %d; to send %d.",
|
|
|
+ qb_log (LOG_TRACE, "sent %d; to send %d.",
|
|
|
my_msgs_sent, my_msgs_to_send);
|
|
|
}
|
|
|
|
|
|
@@ -633,7 +638,7 @@ static void do_command (int sock, char* func, char*args[], int num_args)
|
|
|
"Could not join process group, error %d", result);
|
|
|
exit (1);
|
|
|
}
|
|
|
- qb_log (LOG_INFO, "called cpg_join()!");
|
|
|
+ qb_log (LOG_INFO, "called cpg_join(%s)!", group_name.value);
|
|
|
|
|
|
} else if (strcmp ("cpg_leave",func) == 0) {
|
|
|
|
|
|
@@ -646,7 +651,7 @@ static void do_command (int sock, char* func, char*args[], int num_args)
|
|
|
"Could not leave process group, error %d", result);
|
|
|
exit (1);
|
|
|
}
|
|
|
- qb_log (LOG_INFO, "called cpg_leave()!");
|
|
|
+ qb_log (LOG_INFO, "called cpg_leave(%s)!", group_name.value);
|
|
|
|
|
|
} else if (strcmp ("cpg_initialize",func) == 0) {
|
|
|
int retry_count = 0;
|
|
|
@@ -666,11 +671,11 @@ static void do_command (int sock, char* func, char*args[], int num_args)
|
|
|
|
|
|
cpg_fd_get (cpg_handle, &cpg_fd);
|
|
|
qb_loop_poll_add (ta_poll_handle_get(),
|
|
|
- QB_LOOP_MED,
|
|
|
- cpg_fd,
|
|
|
- POLLIN|POLLNVAL,
|
|
|
- NULL,
|
|
|
- cpg_dispatch_wrapper_fn);
|
|
|
+ QB_LOOP_MED,
|
|
|
+ cpg_fd,
|
|
|
+ POLLIN|POLLNVAL,
|
|
|
+ NULL,
|
|
|
+ cpg_dispatch_wrapper_fn);
|
|
|
|
|
|
} else if (strcmp ("cpg_local_get", func) == 0) {
|
|
|
unsigned int local_nodeid;
|
|
|
@@ -736,11 +741,11 @@ static void do_command (int sock, char* func, char*args[], int num_args)
|
|
|
qb_log (LOG_INFO,"corosync_cfg_state_track() == %d", result);
|
|
|
|
|
|
qb_loop_poll_add (ta_poll_handle_get(),
|
|
|
- QB_LOOP_MED,
|
|
|
- cfg_fd,
|
|
|
- POLLIN|POLLNVAL,
|
|
|
- NULL,
|
|
|
- cfg_dispatch_wrapper_fn);
|
|
|
+ QB_LOOP_MED,
|
|
|
+ cfg_fd,
|
|
|
+ POLLIN|POLLNVAL,
|
|
|
+ NULL,
|
|
|
+ cfg_dispatch_wrapper_fn);
|
|
|
} else {
|
|
|
qb_log(LOG_ERR, "RPC:%s not supported!", func);
|
|
|
}
|