|
@@ -49,13 +49,14 @@
|
|
|
#include "../include/ais_msg.h"
|
|
#include "../include/ais_msg.h"
|
|
|
#include "util.h"
|
|
#include "util.h"
|
|
|
|
|
|
|
|
-struct message_overlay {
|
|
|
|
|
- struct req_header header;
|
|
|
|
|
|
|
+struct res_overlay {
|
|
|
|
|
+ struct res_header header;
|
|
|
char data[4096];
|
|
char data[4096];
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
struct clmInstance {
|
|
struct clmInstance {
|
|
|
int fd;
|
|
int fd;
|
|
|
|
|
+ struct queue inq;
|
|
|
SaClmCallbacksT callbacks;
|
|
SaClmCallbacksT callbacks;
|
|
|
int finalize;
|
|
int finalize;
|
|
|
pthread_mutex_t mutex;
|
|
pthread_mutex_t mutex;
|
|
@@ -121,7 +122,16 @@ saClmInitialize (
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
clmInstance->fd = -1;
|
|
clmInstance->fd = -1;
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
|
|
+ /*
|
|
|
|
|
+ * An inq is needed to store async messages while waiting for a
|
|
|
|
|
+ * sync response
|
|
|
|
|
+ */
|
|
|
|
|
+ error = saQueueInit (&clmInstance->inq, 512, sizeof (void *));
|
|
|
|
|
+ if (error != SA_OK) {
|
|
|
|
|
+ goto error_put_destroy;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
error = saServiceConnect (&clmInstance->fd, MESSAGE_REQ_CLM_INIT);
|
|
error = saServiceConnect (&clmInstance->fd, MESSAGE_REQ_CLM_INIT);
|
|
|
if (error != SA_OK) {
|
|
if (error != SA_OK) {
|
|
|
goto error_put_destroy;
|
|
goto error_put_destroy;
|
|
@@ -177,7 +187,10 @@ saClmDispatch (
|
|
|
struct res_clm_trackcallback *res_clm_trackcallback;
|
|
struct res_clm_trackcallback *res_clm_trackcallback;
|
|
|
struct res_clm_nodegetcallback *res_clm_nodegetcallback;
|
|
struct res_clm_nodegetcallback *res_clm_nodegetcallback;
|
|
|
SaClmCallbacksT callbacks;
|
|
SaClmCallbacksT callbacks;
|
|
|
- struct message_overlay dispatch_data;
|
|
|
|
|
|
|
+ struct res_overlay dispatch_data;
|
|
|
|
|
+ int empty;
|
|
|
|
|
+ struct res_header **queue_msg;
|
|
|
|
|
+ struct res_header *msg;
|
|
|
|
|
|
|
|
error = saHandleInstanceGet (&clmHandleDatabase, *clmHandle, (void *)&clmInstance);
|
|
error = saHandleInstanceGet (&clmHandleDatabase, *clmHandle, (void *)&clmInstance);
|
|
|
if (error != SA_OK) {
|
|
if (error != SA_OK) {
|
|
@@ -225,33 +238,42 @@ saClmDispatch (
|
|
|
continue; /* next poll */
|
|
continue; /* next poll */
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- /*
|
|
|
|
|
- * Read header
|
|
|
|
|
- */
|
|
|
|
|
- error = saRecvRetry (clmInstance->fd, &dispatch_data.header,
|
|
|
|
|
- sizeof (struct req_header), MSG_WAITALL | MSG_NOSIGNAL);
|
|
|
|
|
- if (error != SA_OK) {
|
|
|
|
|
- goto error_unlock;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /*
|
|
|
|
|
- * Read data payload
|
|
|
|
|
- */
|
|
|
|
|
- if (dispatch_data.header.size > sizeof (struct req_header)) {
|
|
|
|
|
- error = saRecvRetry (clmInstance->fd, &dispatch_data.data,
|
|
|
|
|
- dispatch_data.header.size - sizeof (struct req_header), MSG_WAITALL | MSG_NOSIGNAL);
|
|
|
|
|
|
|
+ saQueueIsEmpty(&clmInstance->inq, &empty);
|
|
|
|
|
+ if (empty == 0) {
|
|
|
|
|
+ /*
|
|
|
|
|
+ * Queue is not empty, read data from queue
|
|
|
|
|
+ */
|
|
|
|
|
+ saQueueItemGet (&clmInstance->inq, (void *)&queue_msg);
|
|
|
|
|
+ msg = *queue_msg;
|
|
|
|
|
+ memcpy (&dispatch_data, msg, msg->size);
|
|
|
|
|
+ saQueueItemRemove (&clmInstance->inq);
|
|
|
|
|
+ free (msg);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ /*
|
|
|
|
|
+ * Queue empty, read response from socket
|
|
|
|
|
+ */
|
|
|
|
|
+ error = saRecvRetry (clmInstance->fd, &dispatch_data.header,
|
|
|
|
|
+ sizeof (struct res_header), MSG_WAITALL | MSG_NOSIGNAL);
|
|
|
if (error != SA_OK) {
|
|
if (error != SA_OK) {
|
|
|
goto error_unlock;
|
|
goto error_unlock;
|
|
|
}
|
|
}
|
|
|
|
|
+ if (dispatch_data.header.size > sizeof (struct res_header)) {
|
|
|
|
|
+ error = saRecvRetry (clmInstance->fd, &dispatch_data.data,
|
|
|
|
|
+ dispatch_data.header.size - sizeof (struct res_header),
|
|
|
|
|
+ MSG_WAITALL | MSG_NOSIGNAL);
|
|
|
|
|
+ if (error != SA_OK) {
|
|
|
|
|
+ goto error_unlock;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
/*
|
|
/*
|
|
|
* Make copy of callbacks, message data, unlock instance, and call callback
|
|
* Make copy of callbacks, message data, unlock instance, and call callback
|
|
|
* A risk of this dispatch method is that the callback routines may
|
|
* A risk of this dispatch method is that the callback routines may
|
|
|
- * operate at the same time that clmFinalize has been called.
|
|
|
|
|
- */
|
|
|
|
|
|
|
+ * operate at the same time that clmFinalize has been called in another thread.
|
|
|
|
|
+ */
|
|
|
memcpy (&callbacks, &clmInstance->callbacks, sizeof (SaClmCallbacksT));
|
|
memcpy (&callbacks, &clmInstance->callbacks, sizeof (SaClmCallbacksT));
|
|
|
-
|
|
|
|
|
pthread_mutex_unlock (&clmInstance->mutex);
|
|
pthread_mutex_unlock (&clmInstance->mutex);
|
|
|
|
|
+
|
|
|
/*
|
|
/*
|
|
|
* Dispatch incoming message
|
|
* Dispatch incoming message
|
|
|
*/
|
|
*/
|
|
@@ -274,7 +296,8 @@ saClmDispatch (
|
|
|
res_clm_nodegetcallback = (struct res_clm_nodegetcallback *)&dispatch_data;
|
|
res_clm_nodegetcallback = (struct res_clm_nodegetcallback *)&dispatch_data;
|
|
|
|
|
|
|
|
memcpy (res_clm_nodegetcallback->clusterNodeAddress,
|
|
memcpy (res_clm_nodegetcallback->clusterNodeAddress,
|
|
|
- &res_clm_nodegetcallback->clusterNode, sizeof (SaClmClusterNodeT));
|
|
|
|
|
|
|
+ &res_clm_nodegetcallback->clusterNode,
|
|
|
|
|
+ sizeof (SaClmClusterNodeT));
|
|
|
|
|
|
|
|
callbacks.saClmClusterNodeGetCallback (
|
|
callbacks.saClmClusterNodeGetCallback (
|
|
|
res_clm_nodegetcallback->invocation,
|
|
res_clm_nodegetcallback->invocation,
|
|
@@ -412,7 +435,6 @@ saClmClusterNodeGet (
|
|
|
int fd;
|
|
int fd;
|
|
|
struct req_clm_nodeget req_clm_nodeget;
|
|
struct req_clm_nodeget req_clm_nodeget;
|
|
|
struct res_clm_nodeget res_clm_nodeget;
|
|
struct res_clm_nodeget res_clm_nodeget;
|
|
|
- struct message_overlay message;
|
|
|
|
|
SaErrorT error = SA_OK;
|
|
SaErrorT error = SA_OK;
|
|
|
struct timeval select_timeout;
|
|
struct timeval select_timeout;
|
|
|
fd_set read_fds;
|
|
fd_set read_fds;
|
|
@@ -454,21 +476,21 @@ saClmClusterNodeGet (
|
|
|
goto error_close;
|
|
goto error_close;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- error = saRecvRetry (fd, &message.header, sizeof (struct req_header), MSG_WAITALL | MSG_NOSIGNAL);
|
|
|
|
|
|
|
+ error = saRecvRetry (fd, &res_clm_nodeget, sizeof (struct res_clm_nodeget),
|
|
|
|
|
+ MSG_WAITALL | MSG_NOSIGNAL);
|
|
|
if (error != SA_OK) {
|
|
if (error != SA_OK) {
|
|
|
goto error_close;
|
|
goto error_close;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- error = saRecvRetry (fd, &message.data, message.header.size - sizeof (struct req_header), MSG_WAITALL | MSG_NOSIGNAL);
|
|
|
|
|
- if (error != SA_OK) {
|
|
|
|
|
- goto error_close;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ error = res_clm_nodeget.header.error;
|
|
|
|
|
|
|
|
memcpy (clusterNode, &res_clm_nodeget.clusterNode, sizeof (SaClmClusterNodeT));
|
|
memcpy (clusterNode, &res_clm_nodeget.clusterNode, sizeof (SaClmClusterNodeT));
|
|
|
|
|
|
|
|
|
|
+error_noclose:
|
|
|
|
|
+ return (error);
|
|
|
|
|
+
|
|
|
error_close:
|
|
error_close:
|
|
|
close (fd);
|
|
close (fd);
|
|
|
-error_noclose:
|
|
|
|
|
return (error);
|
|
return (error);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -480,14 +502,15 @@ saClmClusterNodeGetAsync (
|
|
|
SaClmClusterNodeT *clusterNode)
|
|
SaClmClusterNodeT *clusterNode)
|
|
|
{
|
|
{
|
|
|
struct clmInstance *clmInstance;
|
|
struct clmInstance *clmInstance;
|
|
|
- struct req_clm_nodeget req_clm_nodeget;
|
|
|
|
|
|
|
+ struct req_clm_nodegetasync req_clm_nodegetasync;
|
|
|
|
|
+ struct res_clm_nodegetasync res_clm_nodegetasync;
|
|
|
SaErrorT error = SA_OK;
|
|
SaErrorT error = SA_OK;
|
|
|
|
|
|
|
|
- req_clm_nodeget.header.size = sizeof (struct req_clm_nodeget);
|
|
|
|
|
- req_clm_nodeget.header.id = MESSAGE_REQ_CLM_NODEGET;
|
|
|
|
|
- memcpy (&req_clm_nodeget.invocation, &invocation, sizeof (SaInvocationT));
|
|
|
|
|
- memcpy (&req_clm_nodeget.nodeId, &nodeId, sizeof (SaClmNodeIdT));
|
|
|
|
|
- req_clm_nodeget.clusterNodeAddress = clusterNode;
|
|
|
|
|
|
|
+ req_clm_nodegetasync.header.size = sizeof (struct req_clm_nodeget);
|
|
|
|
|
+ req_clm_nodegetasync.header.id = MESSAGE_REQ_CLM_NODEGETASYNC;
|
|
|
|
|
+ memcpy (&req_clm_nodegetasync.invocation, &invocation, sizeof (SaInvocationT));
|
|
|
|
|
+ memcpy (&req_clm_nodegetasync.nodeId, &nodeId, sizeof (SaClmNodeIdT));
|
|
|
|
|
+ req_clm_nodegetasync.clusterNodeAddress = clusterNode;
|
|
|
|
|
|
|
|
error = saHandleInstanceGet (&clmHandleDatabase, *clmHandle, (void *)&clmInstance);
|
|
error = saHandleInstanceGet (&clmHandleDatabase, *clmHandle, (void *)&clmInstance);
|
|
|
if (error != SA_OK) {
|
|
if (error != SA_OK) {
|
|
@@ -496,11 +519,20 @@ saClmClusterNodeGetAsync (
|
|
|
|
|
|
|
|
pthread_mutex_lock (&clmInstance->mutex);
|
|
pthread_mutex_lock (&clmInstance->mutex);
|
|
|
|
|
|
|
|
- error = saSendRetry (clmInstance->fd, &req_clm_nodeget,
|
|
|
|
|
- sizeof (struct req_clm_nodeget), MSG_NOSIGNAL);
|
|
|
|
|
|
|
+ error = saSendRetry (clmInstance->fd, &req_clm_nodegetasync,
|
|
|
|
|
+ sizeof (struct req_clm_nodegetasync), MSG_NOSIGNAL);
|
|
|
|
|
|
|
|
pthread_mutex_unlock (&clmInstance->mutex);
|
|
pthread_mutex_unlock (&clmInstance->mutex);
|
|
|
|
|
|
|
|
|
|
+ error = saRecvQueue (clmInstance->fd, &res_clm_nodegetasync,
|
|
|
|
|
+ &clmInstance->inq, MESSAGE_RES_CLM_NODEGETASYNC);
|
|
|
|
|
+ if (error != SA_OK) {
|
|
|
|
|
+ goto error_exit;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ error = res_clm_nodegetasync.header.error;
|
|
|
|
|
+
|
|
|
|
|
+error_exit:
|
|
|
saHandleInstancePut (&clmHandleDatabase, *clmHandle);
|
|
saHandleInstancePut (&clmHandleDatabase, *clmHandle);
|
|
|
|
|
|
|
|
return (error);
|
|
return (error);
|