|
|
@@ -50,6 +50,7 @@
|
|
|
#include <corosync/totem/coropoll.h>
|
|
|
#include <corosync/list.h>
|
|
|
#include <corosync/cpg.h>
|
|
|
+#include "../../exec/crypto.h"
|
|
|
|
|
|
|
|
|
#define SERVER_PORT "9034"
|
|
|
@@ -60,16 +61,16 @@ typedef enum {
|
|
|
MSG_PID_ERR,
|
|
|
MSG_SEQ_ERR,
|
|
|
MSG_SIZE_ERR,
|
|
|
- MSG_HASH_ERR,
|
|
|
+ MSG_SHA1_ERR,
|
|
|
} msg_status_t;
|
|
|
|
|
|
typedef struct {
|
|
|
uint32_t nodeid;
|
|
|
pid_t pid;
|
|
|
- uint32_t hash;
|
|
|
+ unsigned char sha1[20];
|
|
|
uint32_t seq;
|
|
|
size_t size;
|
|
|
- char payload[1];
|
|
|
+ unsigned char payload[0];
|
|
|
} msg_t;
|
|
|
|
|
|
#define LOG_STR_SIZE 256
|
|
|
@@ -95,6 +96,37 @@ static int32_t my_msgs_to_send;
|
|
|
static int32_t total_stored_msgs = 0;
|
|
|
static hdb_handle_t poll_handle;
|
|
|
|
|
|
+static char* err_status_string (char * buf, size_t buf_len, msg_status_t status)
|
|
|
+{
|
|
|
+ switch (status) {
|
|
|
+ case MSG_OK:
|
|
|
+ strncpy (buf, "OK", buf_len);
|
|
|
+ return buf;
|
|
|
+ break;
|
|
|
+ case MSG_NODEID_ERR:
|
|
|
+ strncpy (buf, "NODEID_ERR", buf_len);
|
|
|
+ return buf;
|
|
|
+ break;
|
|
|
+ case MSG_PID_ERR:
|
|
|
+ strncpy (buf, "PID_ERR", buf_len);
|
|
|
+ return buf;
|
|
|
+ break;
|
|
|
+ case MSG_SEQ_ERR:
|
|
|
+ strncpy (buf, "SEQ_ERR", buf_len);
|
|
|
+ return buf;
|
|
|
+ break;
|
|
|
+ case MSG_SIZE_ERR:
|
|
|
+ strncpy (buf, "SIZE_ERR", buf_len);
|
|
|
+ return buf;
|
|
|
+ break;
|
|
|
+ case MSG_SHA1_ERR:
|
|
|
+ default:
|
|
|
+ strncpy (buf, "SHA1_ERR", buf_len);
|
|
|
+ return buf;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
|
|
|
static void delivery_callback (
|
|
|
cpg_handle_t handle,
|
|
|
@@ -107,6 +139,9 @@ static void delivery_callback (
|
|
|
log_entry_t *log_pt;
|
|
|
msg_t *msg_pt = (msg_t*)msg;
|
|
|
msg_status_t status = MSG_OK;
|
|
|
+ char status_buf[20];
|
|
|
+ unsigned char sha1_compare[20];
|
|
|
+ hash_state sha1_hash;
|
|
|
|
|
|
if (record_messages_g == 0) {
|
|
|
return;
|
|
|
@@ -124,13 +159,21 @@ static void delivery_callback (
|
|
|
if (msg_len != msg_pt->size) {
|
|
|
status = MSG_SIZE_ERR;
|
|
|
}
|
|
|
- /* TODO: check hash here.
|
|
|
- */
|
|
|
+ sha1_init (&sha1_hash);
|
|
|
+ sha1_process (&sha1_hash, msg_pt->payload, msg_pt->size);
|
|
|
+ sha1_done (&sha1_hash, sha1_compare);
|
|
|
+ if (memcmp (sha1_compare, msg_pt->sha1, 20) != 0) {
|
|
|
+ syslog (LOG_ERR, "%s(); msg seq:%d; incorrect hash",
|
|
|
+ __func__, msg_pt->seq);
|
|
|
+ status = MSG_SHA1_ERR;
|
|
|
+ }
|
|
|
|
|
|
log_pt = malloc (sizeof(log_entry_t));
|
|
|
list_init (&log_pt->list);
|
|
|
- snprintf (log_pt->log, LOG_STR_SIZE, "%d:%d:%d:%d;",
|
|
|
- msg_pt->nodeid, msg_pt->pid, msg_pt->seq, status);
|
|
|
+
|
|
|
+ snprintf (log_pt->log, LOG_STR_SIZE, "%d:%d:%d:%s;",
|
|
|
+ msg_pt->nodeid, msg_pt->pid, msg_pt->seq,
|
|
|
+ err_status_string (status_buf, 20, status));
|
|
|
list_add_tail (&log_pt->list, &msg_log_head);
|
|
|
total_stored_msgs++;
|
|
|
}
|
|
|
@@ -242,12 +285,15 @@ static void read_messages (int sock, char* atmost_str)
|
|
|
send (sock, big_and_buf, strlen (big_and_buf), 0);
|
|
|
}
|
|
|
|
|
|
+static unsigned char buffer[200000];
|
|
|
static void send_some_more_messages (void)
|
|
|
{
|
|
|
msg_t my_msg;
|
|
|
- struct iovec iov[1];
|
|
|
+ struct iovec iov[2];
|
|
|
int i;
|
|
|
int send_now;
|
|
|
+ size_t payload_size;
|
|
|
+ hash_state sha1_hash;
|
|
|
cs_error_t res;
|
|
|
cpg_flow_control_state_t fc_state;
|
|
|
|
|
|
@@ -259,26 +305,34 @@ static void send_some_more_messages (void)
|
|
|
syslog (LOG_DEBUG,"%s() send_now:%d", __func__, send_now);
|
|
|
my_msg.pid = my_pid;
|
|
|
my_msg.nodeid = my_nodeid;
|
|
|
- my_msg.hash = 0;
|
|
|
- my_msg.size = sizeof (msg_t);
|
|
|
+ payload_size = (rand() % 100000);
|
|
|
+ my_msg.size = sizeof (msg_t) + payload_size;
|
|
|
my_msg.seq = 0;
|
|
|
+ for (i = 0; i < payload_size; i++) {
|
|
|
+ buffer[i] = i;
|
|
|
+ }
|
|
|
+ sha1_init (&sha1_hash);
|
|
|
+ sha1_process (&sha1_hash, buffer, payload_size);
|
|
|
+ sha1_done (&sha1_hash, my_msg.sha1);
|
|
|
|
|
|
- iov[0].iov_len = my_msg.size;
|
|
|
+ iov[0].iov_len = sizeof (msg_t);
|
|
|
iov[0].iov_base = &my_msg;
|
|
|
+ iov[1].iov_len = payload_size;
|
|
|
+ iov[1].iov_base = buffer;
|
|
|
|
|
|
for (i = 0; i < send_now; i++) {
|
|
|
|
|
|
res = cpg_flow_control_state_get (cpg_handle, &fc_state);
|
|
|
if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
|
|
|
/* lets do this later */
|
|
|
- syslog (LOG_DEBUG, "%s() flow control enabled.", __func__);
|
|
|
+ syslog (LOG_INFO, "%s() flow control enabled.", __func__);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 1);
|
|
|
+ res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 2);
|
|
|
if (res == CS_ERR_TRY_AGAIN) {
|
|
|
/* lets do this later */
|
|
|
- syslog (LOG_DEBUG, "%s() cpg_mcast_joined() says try again.",
|
|
|
+ syslog (LOG_INFO, "%s() cpg_mcast_joined() says try again.",
|
|
|
__func__);
|
|
|
return;
|
|
|
} else
|