| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616 |
- /*
- * Copyright (c) 2010 Red Hat, Inc.
- *
- * All rights reserved.
- *
- * Author: Angus Salkeld (asalkeld@redhat.com)
- *
- * This software licensed under BSD license, the text of which follows:
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * - Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * - Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- * - Neither the name of the MontaVista Software, Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from this
- * software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
- * THE POSSIBILITY OF SUCH DAMAGE.
- */
- #include <errno.h>
- #include <unistd.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <assert.h>
- #include <string.h>
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <netdb.h>
- #include <syslog.h>
- #include <poll.h>
- #include <unistd.h>
- #include <fcntl.h>
- #include <corosync/totem/coropoll.h>
- #include <corosync/list.h>
- #include <corosync/cpg.h>
- #define SERVER_PORT "9034"
- typedef enum {
- MSG_OK,
- MSG_NODEID_ERR,
- MSG_PID_ERR,
- MSG_SEQ_ERR,
- MSG_SIZE_ERR,
- MSG_HASH_ERR,
- } msg_status_t;
- typedef struct {
- uint32_t nodeid;
- pid_t pid;
- uint32_t hash;
- uint32_t seq;
- size_t size;
- char payload[1];
- } msg_t;
- #define LOG_STR_SIZE 128
- typedef struct {
- char log[LOG_STR_SIZE];
- struct list_head list;
- } log_entry_t;
- #define HOW_BIG_AND_BUF 4096
- static char big_and_buf[HOW_BIG_AND_BUF];
- static char big_and_buf_rx[HOW_BIG_AND_BUF];
- static int32_t parse_debug = 0;
- static int32_t record_config_events_g = 0;
- static int32_t record_messages_g = 0;
- static cpg_handle_t cpg_handle = 0;
- static int32_t cpg_fd = -1;
- static struct list_head config_chg_log_head;
- static struct list_head msg_log_head;
- static pid_t my_pid;
- static uint32_t my_nodeid;
- static int32_t my_seq;
- static int32_t my_msgs_to_send;
- static int32_t total_stored_msgs = 0;
- static hdb_handle_t poll_handle;
- static void delivery_callback (
- cpg_handle_t handle,
- const struct cpg_name *groupName,
- uint32_t nodeid,
- uint32_t pid,
- void *msg,
- size_t msg_len)
- {
- log_entry_t *log_pt;
- msg_t *msg_pt = (msg_t*)msg;
- msg_status_t status = MSG_OK;
- if (record_messages_g == 0) {
- return;
- }
- msg_pt->seq = my_seq;
- my_seq++;
- if (nodeid != msg_pt->nodeid) {
- status = MSG_NODEID_ERR;
- }
- if (pid != msg_pt->pid) {
- status = MSG_PID_ERR;
- }
- if (msg_len != msg_pt->size) {
- status = MSG_SIZE_ERR;
- }
- /* TODO: check hash here.
- */
- log_pt = malloc (sizeof(log_entry_t));
- list_init (&log_pt->list);
- snprintf (log_pt->log, 128, "%d:%d:%d:%d;",
- msg_pt->nodeid, msg_pt->pid, msg_pt->seq, status);
- list_add_tail (&log_pt->list, &msg_log_head);
- total_stored_msgs++;
- }
- static void config_change_callback (
- cpg_handle_t handle,
- const struct cpg_name *groupName,
- const struct cpg_address *member_list, size_t member_list_entries,
- const struct cpg_address *left_list, size_t left_list_entries,
- const struct cpg_address *joined_list, size_t joined_list_entries)
- {
- int i;
- log_entry_t *log_pt;
- /* group_name,ip,pid,join|leave */
- if (record_config_events_g == 0) {
- return;
- }
- for (i = 0; i < left_list_entries; i++) {
- syslog (LOG_DEBUG, "%s() inserting leave event into list", __func__);
- log_pt = malloc (sizeof(log_entry_t));
- list_init (&log_pt->list);
- snprintf (log_pt->log, 256, "%s,%d,%d,left",
- groupName->value, left_list[i].nodeid,left_list[i].pid);
- list_add_tail(&log_pt->list, &config_chg_log_head);
- }
- for (i = 0; i < joined_list_entries; i++) {
- syslog (LOG_DEBUG, "%s() inserting join event into list", __func__);
- log_pt = malloc (sizeof(log_entry_t));
- list_init (&log_pt->list);
- snprintf (log_pt->log, 256, "%s,%d,%d,join",
- groupName->value, joined_list[i].nodeid,joined_list[i].pid);
- list_add_tail (&log_pt->list, &config_chg_log_head);
- }
- }
- static cpg_callbacks_t callbacks = {
- .cpg_deliver_fn = delivery_callback,
- .cpg_confchg_fn = config_change_callback,
- };
- static void record_messages (void)
- {
- record_messages_g = 1;
- syslog (LOG_DEBUG,"%s() record:%d", __func__, record_messages_g);
- }
- static void record_config_events (void)
- {
- record_config_events_g = 1;
- syslog (LOG_DEBUG,"%s() record:%d", __func__, record_config_events_g);
- }
- static void read_config_event (int sock)
- {
- const char *empty = "None";
- struct list_head * list = config_chg_log_head.next;
- log_entry_t *entry;
- if (list != &config_chg_log_head) {
- entry = list_entry (list, log_entry_t, list);
- send (sock, entry->log, strlen (entry->log) + 1, 0);
- list_del (&entry->list);
- free (entry);
- } else {
- syslog (LOG_DEBUG,"%s() no events in list", __func__);
- send (sock, empty, strlen (empty) + 1, 0);
- }
- }
- static void read_messages (int sock, char* atmost_str)
- {
- struct list_head * list;
- log_entry_t *entry;
- int atmost = atoi (atmost_str);
- int packed = 0;
- if (atmost == 0)
- atmost = 1;
- if (atmost > (HOW_BIG_AND_BUF / LOG_STR_SIZE))
- atmost = (HOW_BIG_AND_BUF / LOG_STR_SIZE);
- syslog (LOG_DEBUG, "%s() atmost %d; total_stored_msgs:%d",
- __func__, atmost, total_stored_msgs);
- big_and_buf[0] = '\0';
- for (list = msg_log_head.next;
- (!list_empty (&msg_log_head) && packed < atmost); ) {
- entry = list_entry (list, log_entry_t, list);
- strcat (big_and_buf, entry->log);
- packed++;
- list = list->next;
- list_del (&entry->list);
- free (entry);
- total_stored_msgs--;
- }
- syslog (LOG_DEBUG, "%s() sending %d; total_stored_msgs:%d; len:%d",
- __func__, packed, total_stored_msgs, (int)strlen (big_and_buf));
- if (packed == 0) {
- strcpy (big_and_buf, "None");
- }
- send (sock, big_and_buf, strlen (big_and_buf), 0);
- }
- static void send_some_more_messages (void)
- {
- msg_t my_msg;
- struct iovec iov[1];
- int i;
- int send_now;
- cs_error_t res;
- cpg_flow_control_state_t fc_state;
- if (cpg_fd < 0)
- return;
- send_now = my_msgs_to_send;
- syslog (LOG_DEBUG,"%s() send_now:%d", __func__, send_now);
- my_msg.pid = my_pid;
- my_msg.nodeid = my_nodeid;
- my_msg.hash = 0;
- my_msg.size = sizeof (msg_t);
- my_msg.seq = 0;
- iov[0].iov_len = my_msg.size;
- iov[0].iov_base = &my_msg;
- for (i = 0; i < send_now; i++) {
- res = cpg_flow_control_state_get (cpg_handle, &fc_state);
- if (res == CS_OK && fc_state == CPG_FLOW_CONTROL_ENABLED) {
- /* lets do this later */
- syslog (LOG_DEBUG, "%s() flow control enabled.", __func__);
- return;
- }
- res = cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, 1);
- if (res == CS_ERR_TRY_AGAIN) {
- /* lets do this later */
- syslog (LOG_DEBUG, "%s() cpg_mcast_joined() says try again.",
- __func__);
- return;
- } else
- if (res != CS_OK) {
- syslog (LOG_ERR, "%s() -> cpg_mcast_joined error:%d, exiting.",
- __func__, res);
- exit (-2);
- }
- my_msgs_to_send--;
- }
- }
- static void msg_blaster (int sock, char* num_to_send_str)
- {
- my_msgs_to_send = atoi (num_to_send_str);
- my_seq = 1;
- my_pid = getpid();
- cpg_local_get (cpg_handle, &my_nodeid);
- /* control the limits */
- if (my_msgs_to_send <= 0)
- my_msgs_to_send = 1;
- if (my_msgs_to_send > 1000)
- my_msgs_to_send = 1000;
- send_some_more_messages ();
- }
- static int cpg_dispatch_wrapper_fn (hdb_handle_t handle,
- int fd,
- int revents,
- void *data)
- {
- cs_error_t error = cpg_dispatch (cpg_handle, CS_DISPATCH_ALL);
- if (error == CS_ERR_LIBRARY) {
- syslog (LOG_ERR, "%s() got LIB error disconnecting from corosync.", __func__);
- poll_dispatch_delete (poll_handle, cpg_fd);
- close (cpg_fd);
- cpg_fd = -1;
- }
- return 0;
- }
- static void do_command (int sock, char* func, char*args[], int num_args)
- {
- int result;
- struct cpg_name group_name;
- if (parse_debug)
- syslog (LOG_DEBUG,"RPC:%s() called.", func);
- if (strcmp ("cpg_mcast_joined",func) == 0) {
- struct iovec iov[5];
- int a;
- for (a = 0; a < num_args; a++) {
- iov[a].iov_base = args[a];
- iov[a].iov_len = strlen(args[a])+1;
- }
- cpg_mcast_joined (cpg_handle, CPG_TYPE_AGREED, iov, num_args);
- } else if (strcmp ("cpg_join",func) == 0) {
- strcpy (group_name.value, args[0]);
- group_name.length = strlen(args[0]);
- result = cpg_join (cpg_handle, &group_name);
- if (result != CS_OK) {
- syslog (LOG_ERR,
- "Could not join process group, error %d\n", result);
- exit (1);
- }
- } else if (strcmp ("cpg_leave",func) == 0) {
- strcpy (group_name.value, args[0]);
- group_name.length = strlen(args[0]);
- result = cpg_leave (cpg_handle, &group_name);
- if (result != CS_OK) {
- syslog (LOG_ERR,
- "Could not leave process group, error %d\n", result);
- exit (1);
- }
- syslog (LOG_INFO, "called cpg_leave()!");
- } else if (strcmp ("cpg_initialize",func) == 0) {
- int retry_count = 0;
- result = cpg_initialize (&cpg_handle, &callbacks);
- while (result != CS_OK) {
- syslog (LOG_ERR,
- "cpg_initialize error %d (attempt %d)\n",
- result, retry_count);
- if (retry_count >= 3) {
- exit (1);
- }
- sleep(1);
- retry_count++;
- }
- cpg_fd_get (cpg_handle, &cpg_fd);
- poll_dispatch_add (poll_handle, cpg_fd, POLLIN|POLLNVAL, NULL, cpg_dispatch_wrapper_fn);
- } else if (strcmp ("cpg_local_get", func) == 0) {
- unsigned int local_nodeid;
- char response[100];
- cpg_local_get (cpg_handle, &local_nodeid);
- snprintf (response, 100, "%u",local_nodeid);
- send (sock, response, strlen (response) + 1, 0);
- } else if (strcmp ("cpg_finalize",func) == 0) {
- cpg_finalize (cpg_handle);
- poll_dispatch_delete (poll_handle, cpg_fd);
- cpg_fd = -1;
- } else if (strcmp ("record_config_events",func) == 0) {
- record_config_events ();
- } else if (strcmp ("record_messages",func) == 0) {
- record_messages ();
- } else if (strcmp ("read_config_event",func) == 0) {
- read_config_event (sock);
- } else if (strcmp ("read_messages",func) == 0) {
- read_messages (sock, args[0]);
- } else if (strcmp ("msg_blaster",func) == 0) {
- msg_blaster (sock, args[0]);
- } else {
- syslog (LOG_ERR,"%s RPC:%s not supported!", __func__, func);
- }
- }
- static void handle_command (int sock, char* msg)
- {
- int num_args;
- char *saveptr = NULL;
- char *str = strdup (msg);
- char *str_len;
- char *str_arg;
- char *args[5];
- int i = 0;
- int a = 0;
- char* func = NULL;
- if (parse_debug)
- syslog (LOG_DEBUG,"%s (MSG:%s)\n", __func__, msg);
- str_len = strtok_r (str, ":", &saveptr);
- assert (str_len);
- num_args = atoi (str_len) * 2;
- for (i = 0; i < num_args / 2; i++) {
- str_len = strtok_r (NULL, ":", &saveptr);
- str_arg = strtok_r (NULL, ":", &saveptr);
- if (func == NULL) {
- /* first "arg" is the function */
- if (parse_debug)
- syslog (LOG_DEBUG, "(LEN:%s, FUNC:%s)", str_len, str_arg);
- func = str_arg;
- a = 0;
- } else {
- args[a] = str_arg;
- a++;
- if (parse_debug)
- syslog (LOG_DEBUG, "(LEN:%s, ARG:%s)", str_len, str_arg);
- }
- }
- do_command (sock, func, args, a+1);
- free (str);
- }
- static int server_process_data_fn (hdb_handle_t handle,
- int fd,
- int revents,
- void *data)
- {
- char *saveptr;
- char *msg;
- char *cmd;
- int32_t nbytes;
- if ((nbytes = recv (fd, big_and_buf_rx, sizeof (big_and_buf_rx), 0)) <= 0) {
- /* got error or connection closed by client */
- if (nbytes == 0) {
- /* connection closed */
- syslog (LOG_WARNING, "socket %d hung up: exiting...\n", fd);
- } else {
- syslog (LOG_ERR,"recv() failed: %s", strerror(errno));
- }
- close (fd);
- exit (0);
- } else {
- if (my_msgs_to_send > 0)
- send_some_more_messages ();
- big_and_buf_rx[nbytes] = '\0';
- msg = strtok_r (big_and_buf_rx, ";", &saveptr);
- assert (msg);
- while (msg) {
- cmd = strdup (msg);
- handle_command (fd, cmd);
- free (cmd);
- msg = strtok_r (NULL, ";", &saveptr);
- }
- }
- return 0;
- }
- static int server_accept_fn (hdb_handle_t handle,
- int fd,
- int revents,
- void *data)
- {
- socklen_t addrlen;
- struct sockaddr_in in_addr;
- int new_fd;
- int res;
- addrlen = sizeof (struct sockaddr_in);
- retry_accept:
- new_fd = accept (fd, (struct sockaddr *)&in_addr, &addrlen);
- if (new_fd == -1 && errno == EINTR) {
- goto retry_accept;
- }
- if (new_fd == -1) {
- syslog (LOG_ERR,
- "Could not accept connection: %s\n", strerror (errno));
- return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
- }
- res = fcntl (new_fd, F_SETFL, O_NONBLOCK);
- if (res == -1) {
- syslog (LOG_ERR,
- "Could not set non-blocking operation on connection: %s\n",
- strerror (errno));
- close (new_fd);
- return (0); /* This is an error, but -1 would indicate disconnect from poll loop */
- }
- poll_dispatch_add (poll_handle, new_fd, POLLIN|POLLNVAL, NULL, server_process_data_fn);
- return 0;
- }
- static int create_server_sockect (void)
- {
- int listener;
- int yes = 1;
- int rv;
- struct addrinfo hints, *ai, *p;
- /* get a socket and bind it
- */
- memset (&hints, 0, sizeof hints);
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = SOCK_STREAM;
- hints.ai_flags = AI_PASSIVE;
- if ((rv = getaddrinfo (NULL, SERVER_PORT, &hints, &ai)) != 0) {
- syslog (LOG_ERR, "%s\n", gai_strerror (rv));
- exit (1);
- }
- for (p = ai; p != NULL; p = p->ai_next) {
- listener = socket (p->ai_family, p->ai_socktype, p->ai_protocol);
- if (listener < 0) {
- continue;
- }
- /* lose the pesky "address already in use" error message
- */
- if (setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
- &yes, sizeof(int)) < 0) {
- syslog (LOG_ERR, "setsockopt() failed: %s\n", strerror (errno));
- }
- if (bind (listener, p->ai_addr, p->ai_addrlen) < 0) {
- syslog (LOG_ERR, "bind() failed: %s\n", strerror (errno));
- close (listener);
- continue;
- }
- break;
- }
- if (p == NULL) {
- syslog (LOG_ERR, "failed to bind\n");
- exit (2);
- }
- freeaddrinfo (ai);
- if (listen (listener, 10) == -1) {
- syslog (LOG_ERR, "listen() failed: %s", strerror(errno));
- exit (3);
- }
- return listener;
- }
- int main (int argc, char *argv[])
- {
- int listener;
- openlog (NULL, LOG_CONS|LOG_PID, LOG_DAEMON);
- list_init (&msg_log_head);
- list_init (&config_chg_log_head);
- poll_handle = poll_create ();
- listener = create_server_sockect ();
- poll_dispatch_add (poll_handle, listener, POLLIN|POLLNVAL, NULL, server_accept_fn);
- poll_run (poll_handle);
- return -1;
- }
|