|
|
@@ -50,6 +50,7 @@
|
|
|
#include <corosync/totem/coropoll.h>
|
|
|
#include <corosync/list.h>
|
|
|
#include <corosync/cpg.h>
|
|
|
+#include <corosync/cfg.h>
|
|
|
#include "../../exec/crypto.h"
|
|
|
#include "common_test_agent.h"
|
|
|
|
|
|
@@ -82,7 +83,9 @@ static char big_and_buf[HOW_BIG_AND_BUF];
|
|
|
static int32_t record_config_events_g = 0;
|
|
|
static int32_t record_messages_g = 0;
|
|
|
static cpg_handle_t cpg_handle = 0;
|
|
|
+static corosync_cfg_handle_t cfg_handle = 0;
|
|
|
static int32_t cpg_fd = -1;
|
|
|
+static int32_t cfg_fd = -1;
|
|
|
static struct list_head config_chg_log_head;
|
|
|
static struct list_head msg_log_head;
|
|
|
static pid_t my_pid;
|
|
|
@@ -91,7 +94,8 @@ static int32_t my_seq;
|
|
|
static int32_t use_zcb = 0;
|
|
|
static int32_t my_msgs_to_send;
|
|
|
static int32_t total_stored_msgs = 0;
|
|
|
-
|
|
|
+static int32_t in_cnchg = 0;
|
|
|
+static int32_t pcmk_test = 0;
|
|
|
|
|
|
static void send_some_more_messages (void * unused);
|
|
|
|
|
|
@@ -171,6 +175,11 @@ static void delivery_callback (
|
|
|
err_status_string (status_buf, 20, status));
|
|
|
list_add_tail (&log_pt->list, &msg_log_head);
|
|
|
total_stored_msgs++;
|
|
|
+
|
|
|
+// if ((total_stored_msgs % 100) == 0) {
|
|
|
+// syslog (LOG_INFO, "%s(); %d", __func__, total_stored_msgs);
|
|
|
+// }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
static void config_change_callback (
|
|
|
@@ -209,8 +218,27 @@ static void config_change_callback (
|
|
|
list_add_tail (&log_pt->list, &config_chg_log_head);
|
|
|
}
|
|
|
}
|
|
|
+ if (pcmk_test == 1) {
|
|
|
+ in_cnchg = 1;
|
|
|
+ send_some_more_messages (NULL);
|
|
|
+ in_cnchg = 0;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+static void my_shutdown_callback (corosync_cfg_handle_t handle,
|
|
|
+ corosync_cfg_shutdown_flags_t flags)
|
|
|
+{
|
|
|
+ syslog (LOG_CRIT, "%s flags:%d", __func__, flags);
|
|
|
+ if (flags & COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST) {
|
|
|
+ corosync_cfg_replyto_shutdown (cfg_handle, COROSYNC_CFG_SHUTDOWN_FLAG_YES);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+static corosync_cfg_callbacks_t cfg_callbacks = {
|
|
|
+ .corosync_cfg_shutdown_callback = my_shutdown_callback,
|
|
|
+ .corosync_cfg_state_track_callback = NULL,
|
|
|
+};
|
|
|
static cpg_callbacks_t callbacks = {
|
|
|
.cpg_deliver_fn = delivery_callback,
|
|
|
.cpg_confchg_fn = config_change_callback,
|
|
|
@@ -349,8 +377,10 @@ static void send_some_more_messages_zcb (void)
|
|
|
if (res == CS_ERR_TRY_AGAIN) {
|
|
|
/* lets do this later */
|
|
|
send_some_more_messages_later ();
|
|
|
- syslog (LOG_INFO, "%s() cpg_mcast_joined() says try again.",
|
|
|
- __func__);
|
|
|
+// if (i > 0) {
|
|
|
+// syslog (LOG_INFO, "%s() TRY_AGAIN %d to send.",
|
|
|
+// __func__, my_msgs_to_send);
|
|
|
+// }
|
|
|
goto free_buffer;
|
|
|
} else if (res != CS_OK) {
|
|
|
syslog (LOG_ERR, "%s() -> cpg_mcast_joined error:%d, exiting.",
|
|
|
@@ -365,6 +395,13 @@ free_buffer:
|
|
|
}
|
|
|
|
|
|
|
|
|
+#define cs_repeat(counter, max, code) do { \
|
|
|
+ code; \
|
|
|
+ if(res == CS_ERR_TRY_AGAIN) { \
|
|
|
+ counter++; \
|
|
|
+ sleep(counter); \
|
|
|
+ } \
|
|
|
+ } while(res == CS_ERR_TRY_AGAIN && counter < max)
|
|
|
|
|
|
static unsigned char buffer[200000];
|
|
|
static void send_some_more_messages_normal (void)
|
|
|
@@ -377,6 +414,8 @@ static void send_some_more_messages_normal (void)
|
|
|
hash_state sha1_hash;
|
|
|
cs_error_t res;
|
|
|
cpg_flow_control_state_t fc_state;
|
|
|
+ int retries = 0;
|
|
|
+ time_t before;
|
|
|
|
|
|
if (cpg_fd < 0)
|
|
|
return;
|
|
|
@@ -402,29 +441,43 @@ static void send_some_more_messages_normal (void)
|
|
|
iov[1].iov_base = buffer;
|
|
|
|
|
|
for (i = 0; i < send_now; i++) {
|
|
|
-
|
|
|
- res = cpg_flow_control_state_get (cpg_handle, &fc_state);
|
|
|
- if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
|
|
|
- /* lets do this later */
|
|
|
- send_some_more_messages_later ();
|
|
|
- syslog (LOG_INFO, "%s() flow control enabled.", __func__);
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 2);
|
|
|
- if (res == CS_ERR_TRY_AGAIN) {
|
|
|
- /* lets do this later */
|
|
|
- send_some_more_messages_later ();
|
|
|
- syslog (LOG_INFO, "%s() cpg_mcast_joined() says try again.",
|
|
|
- __func__);
|
|
|
- return;
|
|
|
- } else
|
|
|
+ if (in_cnchg && pcmk_test) {
|
|
|
+ retries = 0;
|
|
|
+ before = time(NULL);
|
|
|
+ cs_repeat(retries, 30, res = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, 1));
|
|
|
+ if (retries > 20) {
|
|
|
+ syslog (LOG_ERR, "%s() -> cs_repeat: blocked for :%lu secs.",
|
|
|
+ __func__, (unsigned long)(time(NULL) - before));
|
|
|
+ }
|
|
|
if (res != CS_OK) {
|
|
|
+ syslog (LOG_ERR, "%s() -> cpg_mcast_joined error:%d.",
|
|
|
+ __func__, res);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ res = cpg_flow_control_state_get (cpg_handle, &fc_state);
|
|
|
+ if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
|
|
|
+ /* lets do this later */
|
|
|
+ send_some_more_messages_later ();
|
|
|
+ syslog (LOG_INFO, "%s() flow control enabled.", __func__);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 2);
|
|
|
+ if (res == CS_ERR_TRY_AGAIN) {
|
|
|
+ /* lets do this later */
|
|
|
+ send_some_more_messages_later ();
|
|
|
+ if (i > 0) {
|
|
|
+ syslog (LOG_INFO, "%s() TRY_AGAIN %d to send.",
|
|
|
+ __func__, my_msgs_to_send);
|
|
|
+ }
|
|
|
+ return;
|
|
|
+ } else if (res != CS_OK) {
|
|
|
syslog (LOG_ERR, "%s() -> cpg_mcast_joined error:%d, exiting.",
|
|
|
__func__, res);
|
|
|
exit (-2);
|
|
|
}
|
|
|
-
|
|
|
+ }
|
|
|
my_msgs_to_send--;
|
|
|
}
|
|
|
}
|
|
|
@@ -445,6 +498,7 @@ static void msg_blaster (int sock, char* num_to_send_str)
|
|
|
my_pid = getpid();
|
|
|
|
|
|
use_zcb = 0;
|
|
|
+ total_stored_msgs = 0;
|
|
|
|
|
|
cpg_local_get (cpg_handle, &my_nodeid);
|
|
|
|
|
|
@@ -481,6 +535,7 @@ static void msg_blaster_zcb (int sock, char* num_to_send_str)
|
|
|
my_pid = getpid();
|
|
|
|
|
|
use_zcb = 1;
|
|
|
+ total_stored_msgs = 0;
|
|
|
|
|
|
cpg_local_get (cpg_handle, &my_nodeid);
|
|
|
|
|
|
@@ -493,18 +548,52 @@ static void msg_blaster_zcb (int sock, char* num_to_send_str)
|
|
|
send_some_more_messages_zcb ();
|
|
|
}
|
|
|
|
|
|
+static corosync_cfg_state_notification_t notification_buffer;
|
|
|
+
|
|
|
+static int cfg_dispatch_wrapper_fn (hdb_handle_t handle,
|
|
|
+ int fd,
|
|
|
+ int revents,
|
|
|
+ void *data)
|
|
|
+{
|
|
|
+ cs_error_t error;
|
|
|
+ if (revents & POLLHUP || revents & POLLERR) {
|
|
|
+ syslog (LOG_ERR, "%s() got POLLHUP disconnecting from CFG", __func__);
|
|
|
+ poll_dispatch_delete (ta_poll_handle_get(), cfg_fd);
|
|
|
+ close (cfg_fd);
|
|
|
+ cfg_fd = -1;
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ error = corosync_cfg_dispatch (cfg_handle, CS_DISPATCH_ALL);
|
|
|
+ if (error == CS_ERR_LIBRARY) {
|
|
|
+ syslog (LOG_ERR, "%s() got LIB error disconnecting from CFG.", __func__);
|
|
|
+ poll_dispatch_delete (ta_poll_handle_get(), cfg_fd);
|
|
|
+ close (cfg_fd);
|
|
|
+ cfg_fd = -1;
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+}
|
|
|
|
|
|
static int cpg_dispatch_wrapper_fn (hdb_handle_t handle,
|
|
|
int fd,
|
|
|
int revents,
|
|
|
void *data)
|
|
|
{
|
|
|
- cs_error_t error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
|
|
|
+ cs_error_t error;
|
|
|
+ if (revents & POLLHUP || revents & POLLERR) {
|
|
|
+ syslog (LOG_ERR, "%s() got POLLHUP disconnecting from CPG", __func__);
|
|
|
+ poll_dispatch_delete (ta_poll_handle_get(), cpg_fd);
|
|
|
+ close (cpg_fd);
|
|
|
+ cpg_fd = -1;
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
|
|
|
if (error == CS_ERR_LIBRARY) {
|
|
|
- syslog (LOG_ERR, "%s() got LIB error disconnecting from corosync.", __func__);
|
|
|
+ syslog (LOG_ERR, "%s() got LIB error disconnecting from CPG", __func__);
|
|
|
poll_dispatch_delete (ta_poll_handle_get(), cpg_fd);
|
|
|
close (cpg_fd);
|
|
|
cpg_fd = -1;
|
|
|
+ return -1;
|
|
|
}
|
|
|
return 0;
|
|
|
}
|
|
|
@@ -595,6 +684,8 @@ static void do_command (int sock, char* func, char*args[], int num_args)
|
|
|
read_messages (sock, args[0]);
|
|
|
} else if (strcmp ("msg_blaster_zcb", func) == 0) {
|
|
|
msg_blaster_zcb (sock, args[0]);
|
|
|
+ } else if (strcmp ("pcmk_test", func) == 0) {
|
|
|
+ pcmk_test = 1;
|
|
|
} else if (strcmp ("msg_blaster", func) == 0) {
|
|
|
msg_blaster (sock, args[0]);
|
|
|
} else if (strcmp ("context_test", func) == 0) {
|
|
|
@@ -602,6 +693,33 @@ static void do_command (int sock, char* func, char*args[], int num_args)
|
|
|
} else if (strcmp ("are_you_ok_dude", func) == 0) {
|
|
|
snprintf (response, 100, "%s", OK_STR);
|
|
|
send (sock, response, strlen (response), 0);
|
|
|
+
|
|
|
+ } else if (strcmp ("cfg_shutdown", func) == 0) {
|
|
|
+
|
|
|
+ corosync_cfg_try_shutdown (cfg_handle, COROSYNC_CFG_SHUTDOWN_FLAG_REQUEST);
|
|
|
+
|
|
|
+ } else if (strcmp ("cfg_initialize",func) == 0) {
|
|
|
+ int retry_count = 0;
|
|
|
+
|
|
|
+ syslog (LOG_INFO,"%s %s() called!", __func__, func);
|
|
|
+ result = corosync_cfg_initialize (&cfg_handle, &cfg_callbacks);
|
|
|
+ while (result != CS_OK) {
|
|
|
+ syslog (LOG_ERR,
|
|
|
+ "cfg_initialize error %d (attempt %d)\n",
|
|
|
+ result, retry_count);
|
|
|
+ if (retry_count >= 3) {
|
|
|
+ exit (1);
|
|
|
+ }
|
|
|
+ sleep(1);
|
|
|
+ retry_count++;
|
|
|
+ result = corosync_cfg_initialize (&cfg_handle, &cfg_callbacks);
|
|
|
+ }
|
|
|
+
|
|
|
+ corosync_cfg_fd_get (cfg_handle, &cfg_fd);
|
|
|
+
|
|
|
+ corosync_cfg_state_track (cfg_handle, 0, ¬ification_buffer);
|
|
|
+
|
|
|
+ poll_dispatch_add (ta_poll_handle_get(), cfg_fd, POLLIN|POLLNVAL, NULL, cfg_dispatch_wrapper_fn);
|
|
|
} else {
|
|
|
syslog (LOG_ERR,"%s RPC:%s not supported!", __func__, func);
|
|
|
}
|