|
|
@@ -88,6 +88,7 @@ static struct list_head msg_log_head;
|
|
|
static pid_t my_pid;
|
|
|
static uint32_t my_nodeid;
|
|
|
static int32_t my_seq;
|
|
|
+static int32_t use_zcb = 0;
|
|
|
static int32_t my_msgs_to_send;
|
|
|
static int32_t total_stored_msgs = 0;
|
|
|
|
|
|
@@ -285,13 +286,80 @@ static void send_some_more_messages_later (void)
|
|
|
cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
|
|
|
poll_timer_add (
|
|
|
ta_poll_handle_get(),
|
|
|
- 100, NULL,
|
|
|
+ 300, NULL,
|
|
|
send_some_more_messages,
|
|
|
&timer_handle);
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+
|
|
|
+static void send_some_more_messages_zcb (void)
|
|
|
+{
|
|
|
+ msg_t *my_msg;
|
|
|
+ int i;
|
|
|
+ int send_now;
|
|
|
+ size_t payload_size;
|
|
|
+ size_t total_size;
|
|
|
+ hash_state sha1_hash;
|
|
|
+ cs_error_t res;
|
|
|
+ cpg_flow_control_state_t fc_state;
|
|
|
+ void *zcb_buffer;
|
|
|
+
|
|
|
+ if (cpg_fd < 0)
|
|
|
+ return;
|
|
|
+
|
|
|
+ send_now = my_msgs_to_send;
|
|
|
+ payload_size = (rand() % 100000);
|
|
|
+ total_size = payload_size + sizeof (msg_t);
|
|
|
+ cpg_zcb_alloc (cpg_handle, total_size, &zcb_buffer);
|
|
|
+
|
|
|
+ my_msg = (msg_t*)zcb_buffer;
|
|
|
+
|
|
|
+ //syslog (LOG_DEBUG,"%s() send_now:%d", __func__, send_now);
|
|
|
+ my_msg->pid = my_pid;
|
|
|
+ my_msg->nodeid = my_nodeid;
|
|
|
+ my_msg->size = sizeof (msg_t) + payload_size;
|
|
|
+ my_msg->seq = 0;
|
|
|
+ for (i = 0; i < payload_size; i++) {
|
|
|
+ my_msg->payload[i] = i;
|
|
|
+ }
|
|
|
+ sha1_init (&sha1_hash);
|
|
|
+ sha1_process (&sha1_hash, my_msg->payload, payload_size);
|
|
|
+ sha1_done (&sha1_hash, my_msg->sha1);
|
|
|
+
|
|
|
+ for (i = 0; i < send_now; i++) {
|
|
|
+
|
|
|
+ res = cpg_flow_control_state_get (cpg_handle, &fc_state);
|
|
|
+ if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
|
|
|
+ /* lets do this later */
|
|
|
+ send_some_more_messages_later ();
|
|
|
+ syslog (LOG_INFO, "%s() flow control enabled.", __func__);
|
|
|
+ goto free_buffer;
|
|
|
+ }
|
|
|
+
|
|
|
+ res = cpg_zcb_mcast_joined (cpg_handle, CPG_TYPE_AGREED, zcb_buffer, total_size);
|
|
|
+ if (res == CS_ERR_TRY_AGAIN) {
|
|
|
+ /* lets do this later */
|
|
|
+ send_some_more_messages_later ();
|
|
|
+ syslog (LOG_INFO, "%s() cpg_mcast_joined() says try again.",
|
|
|
+ __func__);
|
|
|
+ goto free_buffer;
|
|
|
+ } else if (res != CS_OK) {
|
|
|
+ syslog (LOG_ERR, "%s() -> cpg_mcast_joined error:%d, exiting.",
|
|
|
+ __func__, res);
|
|
|
+ exit (-2);
|
|
|
+ }
|
|
|
+
|
|
|
+ my_msgs_to_send--;
|
|
|
+ }
|
|
|
+free_buffer:
|
|
|
+ cpg_zcb_free (cpg_handle, zcb_buffer);
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
static unsigned char buffer[200000];
|
|
|
-static void send_some_more_messages (void * unused)
|
|
|
+static void send_some_more_messages_normal (void)
|
|
|
{
|
|
|
msg_t my_msg;
|
|
|
struct iovec iov[2];
|
|
|
@@ -353,12 +421,23 @@ static void send_some_more_messages (void * unused)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+static void send_some_more_messages (void * unused)
|
|
|
+{
|
|
|
+ if (use_zcb) {
|
|
|
+ send_some_more_messages_zcb ();
|
|
|
+ } else {
|
|
|
+ send_some_more_messages_normal ();
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
static void msg_blaster (int sock, char* num_to_send_str)
|
|
|
{
|
|
|
my_msgs_to_send = atoi (num_to_send_str);
|
|
|
my_seq = 1;
|
|
|
my_pid = getpid();
|
|
|
|
|
|
+ use_zcb = 0;
|
|
|
+
|
|
|
cpg_local_get (cpg_handle, &my_nodeid);
|
|
|
|
|
|
/* control the limits */
|
|
|
@@ -367,7 +446,26 @@ static void msg_blaster (int sock, char* num_to_send_str)
|
|
|
if (my_msgs_to_send > 10000)
|
|
|
my_msgs_to_send = 10000;
|
|
|
|
|
|
- send_some_more_messages (NULL);
|
|
|
+ send_some_more_messages_normal ();
|
|
|
+}
|
|
|
+
|
|
|
+static void msg_blaster_zcb (int sock, char* num_to_send_str)
|
|
|
+{
|
|
|
+ my_msgs_to_send = atoi (num_to_send_str);
|
|
|
+ my_seq = 1;
|
|
|
+ my_pid = getpid();
|
|
|
+
|
|
|
+ use_zcb = 1;
|
|
|
+
|
|
|
+ cpg_local_get (cpg_handle, &my_nodeid);
|
|
|
+
|
|
|
+ /* control the limits */
|
|
|
+ if (my_msgs_to_send <= 0)
|
|
|
+ my_msgs_to_send = 1;
|
|
|
+ if (my_msgs_to_send > 10000)
|
|
|
+ my_msgs_to_send = 10000;
|
|
|
+
|
|
|
+ send_some_more_messages_zcb ();
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -477,6 +575,10 @@ static void do_command (int sock, char* func, char*args[], int num_args)
|
|
|
|
|
|
read_messages (sock, args[0]);
|
|
|
|
|
|
+ } else if (strcmp ("msg_blaster_zcb", func) == 0) {
|
|
|
+
|
|
|
+ msg_blaster_zcb (sock, args[0]);
|
|
|
+
|
|
|
} else if (strcmp ("msg_blaster",func) == 0) {
|
|
|
|
|
|
msg_blaster (sock, args[0]);
|