|
|
@@ -505,9 +505,15 @@ static void outq_flush (void *data)
|
|
|
outq_item = list_entry (list, struct outq_item, list);
|
|
|
|
|
|
rc = qb_ipcs_event_send(conn, outq_item->msg, outq_item->mlen);
|
|
|
- if (rc != outq_item->mlen) {
|
|
|
+ if (rc < 0 && rc != -EAGAIN) {
|
|
|
+ errno = -rc;
|
|
|
+ qb_perror(LOG_ERR, "qb_ipcs_event_send");
|
|
|
+ qb_ipcs_connection_unref(conn);
|
|
|
+ return;
|
|
|
+ } else if (rc == -EAGAIN) {
|
|
|
break;
|
|
|
}
|
|
|
+ assert(rc == outq_item->mlen);
|
|
|
context->sent++;
|
|
|
context->queued--;
|
|
|
|
|
|
@@ -521,11 +527,9 @@ static void outq_flush (void *data)
|
|
|
context->queued, context->sent);
|
|
|
context->queued = 0;
|
|
|
context->sent = 0;
|
|
|
- return;
|
|
|
- }
|
|
|
- qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, conn, outq_flush);
|
|
|
- if (rc < 0 && rc != -EAGAIN) {
|
|
|
- log_printf(LOGSYS_LEVEL_ERROR, "event_send retuned %d!", rc);
|
|
|
+ qb_ipcs_connection_unref(conn);
|
|
|
+ } else {
|
|
|
+ qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, conn, outq_flush);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -553,6 +557,7 @@ static void msg_send_or_queue(qb_ipcs_connection_t *conn, const struct iovec *io
|
|
|
context->queued = 0;
|
|
|
context->sent = 0;
|
|
|
context->queuing = QB_TRUE;
|
|
|
+ qb_ipcs_connection_ref(conn);
|
|
|
qb_loop_job_add(cs_poll_handle_get(), QB_LOOP_HIGH, conn, outq_flush);
|
|
|
} else {
|
|
|
log_printf(LOGSYS_LEVEL_ERROR, "event_send retuned %d, expected %d!", rc, bytes_msg);
|
|
|
@@ -561,12 +566,14 @@ static void msg_send_or_queue(qb_ipcs_connection_t *conn, const struct iovec *io
|
|
|
}
|
|
|
outq_item = malloc (sizeof (struct outq_item));
|
|
|
if (outq_item == NULL) {
|
|
|
+ qb_ipcs_connection_unref(conn);
|
|
|
qb_ipcs_disconnect(conn);
|
|
|
return;
|
|
|
}
|
|
|
outq_item->msg = malloc (bytes_msg);
|
|
|
if (outq_item->msg == NULL) {
|
|
|
free (outq_item);
|
|
|
+ qb_ipcs_connection_unref(conn);
|
|
|
qb_ipcs_disconnect(conn);
|
|
|
return;
|
|
|
}
|