[PATCH 1/5] LOOP: add per-level todo counters
by Angus Salkeld
Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
lib/loop.c | 19 +++++++++++++++++++
lib/loop_int.h | 6 ++++++
lib/loop_poll.c | 12 +++++-------
3 files changed, 30 insertions(+), 7 deletions(-)
diff --git a/lib/loop.c b/lib/loop.c
index 6eb802d..b4f35e2 100644
--- a/lib/loop.c
+++ b/lib/loop.c
@@ -24,6 +24,7 @@
#include <qb/qblist.h>
#include <qb/qbloop.h>
#include "loop_int.h"
+#include "util_int.h"
static int32_t qb_loop_run_level(struct qb_loop_level *level)
{
@@ -39,6 +40,7 @@ static int32_t qb_loop_run_level(struct qb_loop_level *level)
qb_list_del (&job->list);
qb_list_init (&job->list);
job->source->dispatch_and_take_back(job, level->priority);
+ level->todo--;
processed++;
if (level->l->stop_requested) {
return processed;
@@ -50,6 +52,21 @@ static int32_t qb_loop_run_level(struct qb_loop_level *level)
return processed;
}
+void qb_loop_level_item_add(struct qb_loop_level *level,
+ struct qb_loop_item *job)
+{
+ qb_list_init(&job->list);
+ qb_list_add_tail(&job->list, &level->job_head);
+ level->todo++;
+}
+
+void qb_loop_level_item_del(struct qb_loop_level *level,
+ struct qb_loop_item *job)
+{
+ qb_list_del(&job->list);
+ qb_list_init(&job->list);
+ level->todo--;
+}
struct qb_loop * qb_loop_create(void)
{
@@ -59,6 +76,7 @@ struct qb_loop * qb_loop_create(void)
for (p = QB_LOOP_LOW; p <= QB_LOOP_HIGH; p++) {
l->level[p].priority = p;
l->level[p].to_process = 4;
+ l->level[p].todo = 0;
l->level[p].l = l;
qb_list_init(&l->level[p].job_head);
@@ -120,6 +138,7 @@ void qb_loop_run(struct qb_loop *l)
}
}
todo += l->fd_source->poll(l->fd_source, ms_timeout);
+// qb_poll_print(l);
for (p = QB_LOOP_HIGH; p >= p_stop; p--) {
todo -= qb_loop_run_level(&l->level[p]);
diff --git a/lib/loop_int.h b/lib/loop_int.h
index e9da182..ddfefa5 100644
--- a/lib/loop_int.h
+++ b/lib/loop_int.h
@@ -36,6 +36,7 @@ struct qb_loop_item {
struct qb_loop_level {
enum qb_loop_priority priority;
int32_t to_process;
+ int32_t todo;
struct qb_list_head wait_head;
struct qb_list_head job_head;
struct qb_loop *l;
@@ -79,6 +80,11 @@ void qb_loop_signals_destroy(struct qb_loop *l);
int32_t qb_loop_timer_msec_duration_to_expire(struct qb_loop_source *timer_source);
+void qb_loop_level_item_add(struct qb_loop_level *level,
+ struct qb_loop_item *job);
+
+void qb_loop_level_item_del(struct qb_loop_level *level,
+ struct qb_loop_item *job);
#endif /* QB_LOOP_INT_DEFINED */
diff --git a/lib/loop_poll.c b/lib/loop_poll.c
index 6b1b937..a684c64 100644
--- a/lib/loop_poll.c
+++ b/lib/loop_poll.c
@@ -460,8 +460,8 @@ static int32_t _poll_add_(struct qb_loop *l,
static int32_t _qb_poll_add_to_jobs_(struct qb_loop* l, struct qb_poll_entry* pe)
{
- qb_list_init(&pe->item.list);
- qb_list_add_tail(&pe->item.list, &l->level[pe->p].job_head);
+ assert(pe->type == QB_POLL);
+ qb_loop_level_item_add(&l->level[pe->p], &pe->item);
return 1;
}
@@ -565,10 +565,10 @@ static int32_t _qb_timer_add_to_jobs_(struct qb_loop* l, struct qb_poll_entry* p
{
uint64_t expired = 0;
+ assert(pe->type == QB_TIMER);
if (pe->ufd.revents == POLLIN) {
(void)read(pe->ufd.fd, &expired, sizeof(expired));
- qb_list_init(&pe->item.list);
- qb_list_add_tail(&pe->item.list, &l->level[pe->p].job_head);
+ qb_loop_level_item_add(&l->level[pe->p], &pe->item);
} else {
qb_util_log(LOG_ERR, "timer revents: %d expected %d",
pe->ufd.revents, POLLIN);
@@ -847,9 +847,7 @@ static int32_t _qb_signal_add_to_jobs_(struct qb_loop* l,
memcpy(new_sig_job, sig, sizeof(struct qb_loop_sig));
new_sig_job->cloned_from = sig;
- qb_list_init(&new_sig_job->item.list);
- qb_list_add_tail(&new_sig_job->item.list,
- &l->level[pe->p].job_head);
+ qb_loop_level_item_add(&l->level[pe->p], &new_sig_job->item);
jobs_added++;
}
}
--
1.7.3.1
13 years, 4 months
Running suite loop_job fail
by Dietmar Maurer
Running suite(s): loop_job
loop_timers
average error for 1500000 ns timer is 19341 (ns) (1.289448)
timer expired early! by 52553
85%: Checks: 7, Failures: 1, Errors: 0
check_loop.c:100:P:limits:test_loop_job_input:0: Passed
check_loop.c:117:P:run_one:test_loop_job_1:0: Passed
check_loop.c:134:P:run_recursive:test_loop_job_4:0: Passed
check_loop.c:150:P:run_500:test_loop_job_nuts:0: Passed
check_loop.c:196:P:limits:test_loop_timer_input:0: Passed
check_loop.c:241:P:basic:test_loop_timer_basic:0: Passed
check_loop.c:270:F:precision:test_loop_timer_precision:0: Assertion 'diff >= sw->ns_timer' failed
FAIL: check_loop
Does that work for you?
13 years, 4 months
[PATCH 1/4] IPC: add a timeout to the client recv functions
by Angus Salkeld
Also allow the ringbuffer to pass ETIMEDOUT back to the
client applications.
Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
include/qb/qbipcc.h | 9 ++++-----
lib/ipc_shm.c | 15 ++-------------
lib/ipc_us.c | 4 ++--
lib/ipcc.c | 47 ++++++++++++++++-------------------------------
lib/ipcs.c | 18 ++++++++++++------
tests/bmc.c | 12 ++----------
tests/bmcpt.c | 21 ++++++++-------------
tests/check_ipc.c | 21 ++++++++++-----------
8 files changed, 56 insertions(+), 91 deletions(-)
diff --git a/include/qb/qbipcc.h b/include/qb/qbipcc.h
index 1fa892c..b6f94a8 100644
--- a/include/qb/qbipcc.h
+++ b/include/qb/qbipcc.h
@@ -113,7 +113,7 @@ ssize_t qb_ipcc_sendv(qb_ipcc_connection_t* c, const struct iovec* iov,
* @return (size recv'ed, -errno == error)
*/
ssize_t qb_ipcc_recv(qb_ipcc_connection_t* c, void *msg_ptr,
- size_t msg_len);
+ size_t msg_len, int32_t ms_timeout);
/**
* This is a convenience function that simply sends and then recvs.
@@ -127,10 +127,9 @@ ssize_t qb_ipcc_recv(qb_ipcc_connection_t* c, void *msg_ptr,
* @see qb_ipcc_sendv() qb_ipcc_recv()
*/
ssize_t qb_ipcc_sendv_recv(qb_ipcc_connection_t *c,
- const struct iovec *iov,
- unsigned int iov_len,
- void *msg_ptr,
- size_t msg_len);
+ const struct iovec *iov, uint32_t iov_len,
+ void *msg_ptr, size_t msg_len,
+ int32_t ms_timeout);
/**
* Receive an event.
diff --git a/lib/ipc_shm.c b/lib/ipc_shm.c
index 1b8fd6e..55670dc 100644
--- a/lib/ipc_shm.c
+++ b/lib/ipc_shm.c
@@ -86,34 +86,23 @@ static ssize_t qb_ipc_shm_recv(struct qb_ipc_one_way *one_way,
size_t msg_len,
int32_t ms_timeout)
{
- ssize_t res;
if (one_way->u.shm.rb == NULL) {
return -ENOTCONN;
}
- res = qb_rb_chunk_read(one_way->u.shm.rb,
+ return qb_rb_chunk_read(one_way->u.shm.rb,
(void *)msg_ptr,
msg_len,
ms_timeout);
- if (res == -ETIMEDOUT) {
- return -EAGAIN;
- }
- return res;
}
static ssize_t qb_ipc_shm_peek(struct qb_ipc_one_way *one_way, void **data_out, int32_t ms_timeout)
{
- ssize_t res;
-
if (one_way->u.shm.rb == NULL) {
return -ENOTCONN;
}
- res = qb_rb_chunk_peek(one_way->u.shm.rb,
+ return qb_rb_chunk_peek(one_way->u.shm.rb,
data_out,
ms_timeout);
- if (res == -ETIMEDOUT) {
- return -EAGAIN;
- }
- return res;
}
static void qb_ipc_shm_reclaim(struct qb_ipc_one_way *one_way)
diff --git a/lib/ipc_us.c b/lib/ipc_us.c
index 14da2dd..3fe0ddd 100644
--- a/lib/ipc_us.c
+++ b/lib/ipc_us.c
@@ -227,7 +227,7 @@ ssize_t qb_ipc_us_recv(struct qb_ipc_one_way *one_way,
retry_recv:
result = recv(one_way->u.us.sock, msg, len, MSG_NOSIGNAL | MSG_WAITALL);
- if (result == -1 && errno == EAGAIN) {
+ if (timeout == -1 && result == -1 && errno == EAGAIN) {
goto retry_recv;
}
if (result == -1) {
@@ -318,7 +318,7 @@ int32_t qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
return res;
}
- res = qb_ipc_us_recv(&c->setup, r, sizeof(struct qb_ipc_connection_response), 0);
+ res = qb_ipc_us_recv(&c->setup, r, sizeof(struct qb_ipc_connection_response), -1);
if (res < 0) {
return res;
}
diff --git a/lib/ipcc.c b/lib/ipcc.c
index 5fb8039..8248cdd 100644
--- a/lib/ipcc.c
+++ b/lib/ipcc.c
@@ -131,55 +131,40 @@ ssize_t qb_ipcc_sendv(struct qb_ipcc_connection* c, const struct iovec* iov,
}
ssize_t qb_ipcc_recv(struct qb_ipcc_connection * c, void *msg_ptr,
- size_t msg_len)
+ size_t msg_len, int32_t ms_timeout)
{
int32_t res = 0;
- int32_t retries = 0;
-
- recv_retry:
- retries++;
- res = c->funcs.recv(&c->response, msg_ptr, msg_len, 100);
- if (res == -EAGAIN && c->needs_sock_for_poll) {
- res = qb_ipc_us_recv_ready(&c->setup, 0);
- if (res == -EAGAIN && retries < 50) {
- goto recv_retry;
- } else if (res < 0) {
- return res;
+ int32_t res2 = 0;
+
+ res = c->funcs.recv(&c->response, msg_ptr, msg_len, ms_timeout);
+ if ((res == -EAGAIN || res == -ETIMEDOUT) && c->needs_sock_for_poll) {
+ res2 = qb_ipc_us_recv_ready(&c->setup, 0);
+ if (res2 < 0) {
+ return res2;
} else {
- return -EAGAIN;
+ return res;
}
}
return res;
}
-ssize_t qb_ipcc_sendv_recv (
- qb_ipcc_connection_t *c,
- const struct iovec *iov,
- unsigned int iov_len,
- void *res_msg,
- size_t res_len)
+ssize_t qb_ipcc_sendv_recv(qb_ipcc_connection_t *c,
+ const struct iovec *iov, uint32_t iov_len,
+ void *res_msg, size_t res_len,
+ int32_t ms_timeout)
{
- ssize_t res;
+ ssize_t res = 0;
if (c->funcs.fc_get && c->funcs.fc_get(&c->request)) {
return -EAGAIN;
}
-repeat_send:
res = qb_ipcc_sendv(c, iov, iov_len);
if (res < 0) {
- if (res == -EAGAIN) {
- goto repeat_send;
- }
return res;
}
-repeat_recv:
- res = qb_ipcc_recv(c, res_msg, res_len);
- if (res == -EAGAIN) {
- goto repeat_recv;
- }
- return res;
+ return qb_ipcc_recv(c, res_msg, res_len, ms_timeout);
}
int32_t qb_ipcc_fd_get(struct qb_ipcc_connection * c, int32_t * fd)
@@ -217,7 +202,7 @@ ssize_t qb_ipcc_event_recv(struct qb_ipcc_connection * c, void *msg_pt,
return size;
}
if (c->needs_sock_for_poll) {
- res = qb_ipc_us_recv(&c->setup, &one_byte, 1, 0);
+ res = qb_ipc_us_recv(&c->setup, &one_byte, 1, -1);
if (res < 0) {
return res;
}
diff --git a/lib/ipcs.c b/lib/ipcs.c
index 51a3164..2c615ba 100644
--- a/lib/ipcs.c
+++ b/lib/ipcs.c
@@ -211,7 +211,7 @@ ssize_t qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data,
res = c->service->funcs.send(&c->response, data, size);
if (res == size) {
c->stats.responses++;
- } else if (res == -EAGAIN) {
+ } else if (res == -EAGAIN || res == -ETIMEDOUT) {
c->stats.send_retries++;
}
qb_ipcs_connection_unref(c);
@@ -227,7 +227,7 @@ ssize_t qb_ipcs_response_sendv(struct qb_ipcs_connection *c, const struct iovec
res = c->service->funcs.sendv(&c->response, iov, iov_len);
if (res > 0) {
c->stats.responses++;
- } else if (res == -EAGAIN) {
+ } else if (res == -EAGAIN || res == -ETIMEDOUT) {
c->stats.send_retries++;
}
qb_ipcs_connection_unref(c);
@@ -481,7 +481,7 @@ static int32_t _process_request_(struct qb_ipcs_connection *c,
ms_timeout);
}
if (size < 0) {
- if (size != -EAGAIN) {
+ if (size != -EAGAIN && size != -ETIMEDOUT) {
qb_util_log(LOG_ERR, "%s(): %s", __func__, strerror(-res));
} else {
c->stats.recv_retries++;
@@ -573,6 +573,12 @@ int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents,
return 0;
}
avail = _request_q_len_get(c);
+
+ if (c->service->needs_sock_for_poll && avail == 0) {
+ (void)qb_ipc_us_recv(&c->setup, bytes, 1, 0);
+ return 0;
+ }
+
res = avail; /* in case error */
do {
res = _process_request_(c, IPC_REQUEST_TIMEOUT);
@@ -585,15 +591,15 @@ int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents,
} while (avail > 0 && res > 0 && !c->fc_enabled);
if (c->service->needs_sock_for_poll && recvd > 0) {
- (void)qb_ipc_us_recv(&c->setup, bytes, recvd, 0);
+ (void)qb_ipc_us_recv(&c->setup, bytes, recvd, -1);
}
res = QB_MIN(0, res);
- if (res == -EAGAIN || res == -ENOBUFS) {
+ if (res == -EAGAIN || res == -ETIMEDOUT || res == -ENOBUFS) {
res = 0;
}
if (res != 0) {
- qb_util_log(LOG_INFO, "%s returning %d : %s",
+ qb_util_log(LOG_DEBUG, "%s returning %d : %s",
__func__, res, strerror(-res));
qb_ipcs_connection_unref(c);
}
diff --git a/tests/bmc.c b/tests/bmc.c
index 5eafd56..e49d0e6 100644
--- a/tests/bmc.c
+++ b/tests/bmc.c
@@ -99,13 +99,9 @@ repeat_send:
}
if (blocking) {
- repeat_recv:
res = qb_ipcc_recv(conn,
&res_header,
- sizeof(struct qb_ipc_response_header));
- if (res == -EAGAIN) {
- goto repeat_recv;
- }
+ sizeof(struct qb_ipc_response_header), -1);
if (res == -EINTR) {
return -1;
}
@@ -117,13 +113,9 @@ repeat_send:
assert(res_header.size == sizeof(struct qb_ipc_response_header));
}
if (events) {
- repeat_event_recv:
res = qb_ipcc_event_recv(conn,
&res_header,
- sizeof(struct qb_ipc_response_header), 0);
- if (res == -EAGAIN) {
- goto repeat_event_recv;
- }
+ sizeof(struct qb_ipc_response_header), -1);
if (res == -EINTR) {
return -1;
}
diff --git a/tests/bmcpt.c b/tests/bmcpt.c
index e0d4fa0..b7b1369 100644
--- a/tests/bmcpt.c
+++ b/tests/bmcpt.c
@@ -123,19 +123,14 @@ repeat_send:
}
}
- repeat_recv:
- res = qb_ipcc_recv(ctx->conn,
- &res_header,
- sizeof(struct qb_ipc_response_header));
- if (res == -EAGAIN) {
- goto repeat_recv;
- }
- if (res == -EINTR) {
- return -1;
- }
- if (res < 0) {
- perror("qb_ipcc_recv");
- }
+ res = qb_ipcc_recv(ctx->conn, &res_header,
+ sizeof(struct qb_ipc_response_header), -1);
+ if (res == -EINTR) {
+ return -1;
+ }
+ if (res < 0) {
+ perror("qb_ipcc_recv");
+ }
assert(res == sizeof(struct qb_ipc_response_header));
assert(res_header.id == 13);
assert(res_header.size == sizeof(struct qb_ipc_response_header));
diff --git a/tests/check_ipc.c b/tests/check_ipc.c
index 539e7ee..fcdf91a 100644
--- a/tests/check_ipc.c
+++ b/tests/check_ipc.c
@@ -111,6 +111,7 @@ static int32_t s1_msg_process_fn(qb_ipcs_connection_t *c,
perror("qb_ipcs_event_send");
}
} else if (req_pt->id == IPC_MSG_REQ_SERVER_FAIL) {
+ qb_ipcs_destroy(s1);
exit(0);
}
return 0;
@@ -229,13 +230,8 @@ repeat_send:
}
}
- repeat_recv:
- res = qb_ipcc_recv(conn,
- &res_header,
- sizeof(struct qb_ipc_response_header));
- if (res == -EAGAIN) {
- goto repeat_recv;
- }
+ res = qb_ipcc_recv(conn, &res_header,
+ sizeof(struct qb_ipc_response_header), -1);
if (res == -EINTR) {
return -1;
}
@@ -371,7 +367,7 @@ static void test_ipc_dispatch(void)
repeat_event_recv:
res = qb_ipcc_event_recv(conn, res_header, IPC_BUF_SIZE, 0);
if (res < 0) {
- if (res == -EAGAIN) {
+ if (res == -EAGAIN || res == -ETIMEDOUT) {
goto repeat_event_recv;
} else {
errno = -res;
@@ -442,11 +438,14 @@ static void test_ipc_server_fail(void)
}
/*
+ * wait a bit for the server to die.
+ */
+ sleep(1);
+ /*
* try recv from the exit'ed server
*/
- res = qb_ipcc_recv(conn,
- &res_header,
- sizeof(struct qb_ipc_response_header));
+ res = qb_ipcc_recv(conn, &res_header,
+ sizeof(struct qb_ipc_response_header), 100);
/*
* confirm we get -ENOTCONN
*/
--
1.7.3.1
13 years, 4 months
qb_ipcc_send* return values
by Dietmar Maurer
Hi all,
I want my client to survive a server shutdown/restart. So what
return value indicates that the server is down?
I want to close the connection and reopen it later.
- Dietmar
13 years, 4 months
[PATCH] IPC: add a timeout to add client recv functions
by Angus Salkeld
Also:
- allow the ringbuffer to pass ETIMEDOUT back to client.
- handle -ETIMEDOUT in the calling functions.
This seems to work quite well, I'll test some more with corosync
then apply.
Dietmar, can you give this a go and tell me if this is what you
were after?
Thanks
Angus
Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
include/qb/qbipcc.h | 9 ++++-----
lib/ipc_shm.c | 15 ++-------------
lib/ipc_us.c | 4 ++--
lib/ipcc.c | 47 ++++++++++++++++-------------------------------
lib/ipcs.c | 8 ++++----
tests/bmc.c | 12 ++----------
tests/bmcpt.c | 21 ++++++++-------------
tests/check_ipc.c | 21 ++++++++++-----------
8 files changed, 48 insertions(+), 89 deletions(-)
diff --git a/include/qb/qbipcc.h b/include/qb/qbipcc.h
index 1fa892c..b6f94a8 100644
--- a/include/qb/qbipcc.h
+++ b/include/qb/qbipcc.h
@@ -113,7 +113,7 @@ ssize_t qb_ipcc_sendv(qb_ipcc_connection_t* c, const struct iovec* iov,
* @return (size recv'ed, -errno == error)
*/
ssize_t qb_ipcc_recv(qb_ipcc_connection_t* c, void *msg_ptr,
- size_t msg_len);
+ size_t msg_len, int32_t ms_timeout);
/**
* This is a convenience function that simply sends and then recvs.
@@ -127,10 +127,9 @@ ssize_t qb_ipcc_recv(qb_ipcc_connection_t* c, void *msg_ptr,
* @see qb_ipcc_sendv() qb_ipcc_recv()
*/
ssize_t qb_ipcc_sendv_recv(qb_ipcc_connection_t *c,
- const struct iovec *iov,
- unsigned int iov_len,
- void *msg_ptr,
- size_t msg_len);
+ const struct iovec *iov, uint32_t iov_len,
+ void *msg_ptr, size_t msg_len,
+ int32_t ms_timeout);
/**
* Receive an event.
diff --git a/lib/ipc_shm.c b/lib/ipc_shm.c
index 1b8fd6e..55670dc 100644
--- a/lib/ipc_shm.c
+++ b/lib/ipc_shm.c
@@ -86,34 +86,23 @@ static ssize_t qb_ipc_shm_recv(struct qb_ipc_one_way *one_way,
size_t msg_len,
int32_t ms_timeout)
{
- ssize_t res;
if (one_way->u.shm.rb == NULL) {
return -ENOTCONN;
}
- res = qb_rb_chunk_read(one_way->u.shm.rb,
+ return qb_rb_chunk_read(one_way->u.shm.rb,
(void *)msg_ptr,
msg_len,
ms_timeout);
- if (res == -ETIMEDOUT) {
- return -EAGAIN;
- }
- return res;
}
static ssize_t qb_ipc_shm_peek(struct qb_ipc_one_way *one_way, void **data_out, int32_t ms_timeout)
{
- ssize_t res;
-
if (one_way->u.shm.rb == NULL) {
return -ENOTCONN;
}
- res = qb_rb_chunk_peek(one_way->u.shm.rb,
+ return qb_rb_chunk_peek(one_way->u.shm.rb,
data_out,
ms_timeout);
- if (res == -ETIMEDOUT) {
- return -EAGAIN;
- }
- return res;
}
static void qb_ipc_shm_reclaim(struct qb_ipc_one_way *one_way)
diff --git a/lib/ipc_us.c b/lib/ipc_us.c
index 14da2dd..3fe0ddd 100644
--- a/lib/ipc_us.c
+++ b/lib/ipc_us.c
@@ -227,7 +227,7 @@ ssize_t qb_ipc_us_recv(struct qb_ipc_one_way *one_way,
retry_recv:
result = recv(one_way->u.us.sock, msg, len, MSG_NOSIGNAL | MSG_WAITALL);
- if (result == -1 && errno == EAGAIN) {
+ if (timeout == -1 && result == -1 && errno == EAGAIN) {
goto retry_recv;
}
if (result == -1) {
@@ -318,7 +318,7 @@ int32_t qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
return res;
}
- res = qb_ipc_us_recv(&c->setup, r, sizeof(struct qb_ipc_connection_response), 0);
+ res = qb_ipc_us_recv(&c->setup, r, sizeof(struct qb_ipc_connection_response), -1);
if (res < 0) {
return res;
}
diff --git a/lib/ipcc.c b/lib/ipcc.c
index 5fb8039..8248cdd 100644
--- a/lib/ipcc.c
+++ b/lib/ipcc.c
@@ -131,55 +131,40 @@ ssize_t qb_ipcc_sendv(struct qb_ipcc_connection* c, const struct iovec* iov,
}
ssize_t qb_ipcc_recv(struct qb_ipcc_connection * c, void *msg_ptr,
- size_t msg_len)
+ size_t msg_len, int32_t ms_timeout)
{
int32_t res = 0;
- int32_t retries = 0;
-
- recv_retry:
- retries++;
- res = c->funcs.recv(&c->response, msg_ptr, msg_len, 100);
- if (res == -EAGAIN && c->needs_sock_for_poll) {
- res = qb_ipc_us_recv_ready(&c->setup, 0);
- if (res == -EAGAIN && retries < 50) {
- goto recv_retry;
- } else if (res < 0) {
- return res;
+ int32_t res2 = 0;
+
+ res = c->funcs.recv(&c->response, msg_ptr, msg_len, ms_timeout);
+ if ((res == -EAGAIN || res == -ETIMEDOUT) && c->needs_sock_for_poll) {
+ res2 = qb_ipc_us_recv_ready(&c->setup, 0);
+ if (res2 < 0) {
+ return res2;
} else {
- return -EAGAIN;
+ return res;
}
}
return res;
}
-ssize_t qb_ipcc_sendv_recv (
- qb_ipcc_connection_t *c,
- const struct iovec *iov,
- unsigned int iov_len,
- void *res_msg,
- size_t res_len)
+ssize_t qb_ipcc_sendv_recv(qb_ipcc_connection_t *c,
+ const struct iovec *iov, uint32_t iov_len,
+ void *res_msg, size_t res_len,
+ int32_t ms_timeout)
{
- ssize_t res;
+ ssize_t res = 0;
if (c->funcs.fc_get && c->funcs.fc_get(&c->request)) {
return -EAGAIN;
}
-repeat_send:
res = qb_ipcc_sendv(c, iov, iov_len);
if (res < 0) {
- if (res == -EAGAIN) {
- goto repeat_send;
- }
return res;
}
-repeat_recv:
- res = qb_ipcc_recv(c, res_msg, res_len);
- if (res == -EAGAIN) {
- goto repeat_recv;
- }
- return res;
+ return qb_ipcc_recv(c, res_msg, res_len, ms_timeout);
}
int32_t qb_ipcc_fd_get(struct qb_ipcc_connection * c, int32_t * fd)
@@ -217,7 +202,7 @@ ssize_t qb_ipcc_event_recv(struct qb_ipcc_connection * c, void *msg_pt,
return size;
}
if (c->needs_sock_for_poll) {
- res = qb_ipc_us_recv(&c->setup, &one_byte, 1, 0);
+ res = qb_ipc_us_recv(&c->setup, &one_byte, 1, -1);
if (res < 0) {
return res;
}
diff --git a/lib/ipcs.c b/lib/ipcs.c
index 51a3164..b293608 100644
--- a/lib/ipcs.c
+++ b/lib/ipcs.c
@@ -211,7 +211,7 @@ ssize_t qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data,
res = c->service->funcs.send(&c->response, data, size);
if (res == size) {
c->stats.responses++;
- } else if (res == -EAGAIN) {
+ } else if (res == -EAGAIN || res == -ETIMEDOUT) {
c->stats.send_retries++;
}
qb_ipcs_connection_unref(c);
@@ -227,7 +227,7 @@ ssize_t qb_ipcs_response_sendv(struct qb_ipcs_connection *c, const struct iovec
res = c->service->funcs.sendv(&c->response, iov, iov_len);
if (res > 0) {
c->stats.responses++;
- } else if (res == -EAGAIN) {
+ } else if (res == -EAGAIN || res == -ETIMEDOUT) {
c->stats.send_retries++;
}
qb_ipcs_connection_unref(c);
@@ -481,7 +481,7 @@ static int32_t _process_request_(struct qb_ipcs_connection *c,
ms_timeout);
}
if (size < 0) {
- if (size != -EAGAIN) {
+ if (size != -EAGAIN && size != -ETIMEDOUT) {
qb_util_log(LOG_ERR, "%s(): %s", __func__, strerror(-res));
} else {
c->stats.recv_retries++;
@@ -589,7 +589,7 @@ int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents,
}
res = QB_MIN(0, res);
- if (res == -EAGAIN || res == -ENOBUFS) {
+ if (res == -EAGAIN || res == -ETIMEDOUT || res == -ENOBUFS) {
res = 0;
}
if (res != 0) {
diff --git a/tests/bmc.c b/tests/bmc.c
index 5eafd56..e49d0e6 100644
--- a/tests/bmc.c
+++ b/tests/bmc.c
@@ -99,13 +99,9 @@ repeat_send:
}
if (blocking) {
- repeat_recv:
res = qb_ipcc_recv(conn,
&res_header,
- sizeof(struct qb_ipc_response_header));
- if (res == -EAGAIN) {
- goto repeat_recv;
- }
+ sizeof(struct qb_ipc_response_header), -1);
if (res == -EINTR) {
return -1;
}
@@ -117,13 +113,9 @@ repeat_send:
assert(res_header.size == sizeof(struct qb_ipc_response_header));
}
if (events) {
- repeat_event_recv:
res = qb_ipcc_event_recv(conn,
&res_header,
- sizeof(struct qb_ipc_response_header), 0);
- if (res == -EAGAIN) {
- goto repeat_event_recv;
- }
+ sizeof(struct qb_ipc_response_header), -1);
if (res == -EINTR) {
return -1;
}
diff --git a/tests/bmcpt.c b/tests/bmcpt.c
index e0d4fa0..b7b1369 100644
--- a/tests/bmcpt.c
+++ b/tests/bmcpt.c
@@ -123,19 +123,14 @@ repeat_send:
}
}
- repeat_recv:
- res = qb_ipcc_recv(ctx->conn,
- &res_header,
- sizeof(struct qb_ipc_response_header));
- if (res == -EAGAIN) {
- goto repeat_recv;
- }
- if (res == -EINTR) {
- return -1;
- }
- if (res < 0) {
- perror("qb_ipcc_recv");
- }
+ res = qb_ipcc_recv(ctx->conn, &res_header,
+ sizeof(struct qb_ipc_response_header), -1);
+ if (res == -EINTR) {
+ return -1;
+ }
+ if (res < 0) {
+ perror("qb_ipcc_recv");
+ }
assert(res == sizeof(struct qb_ipc_response_header));
assert(res_header.id == 13);
assert(res_header.size == sizeof(struct qb_ipc_response_header));
diff --git a/tests/check_ipc.c b/tests/check_ipc.c
index 539e7ee..fcdf91a 100644
--- a/tests/check_ipc.c
+++ b/tests/check_ipc.c
@@ -111,6 +111,7 @@ static int32_t s1_msg_process_fn(qb_ipcs_connection_t *c,
perror("qb_ipcs_event_send");
}
} else if (req_pt->id == IPC_MSG_REQ_SERVER_FAIL) {
+ qb_ipcs_destroy(s1);
exit(0);
}
return 0;
@@ -229,13 +230,8 @@ repeat_send:
}
}
- repeat_recv:
- res = qb_ipcc_recv(conn,
- &res_header,
- sizeof(struct qb_ipc_response_header));
- if (res == -EAGAIN) {
- goto repeat_recv;
- }
+ res = qb_ipcc_recv(conn, &res_header,
+ sizeof(struct qb_ipc_response_header), -1);
if (res == -EINTR) {
return -1;
}
@@ -371,7 +367,7 @@ static void test_ipc_dispatch(void)
repeat_event_recv:
res = qb_ipcc_event_recv(conn, res_header, IPC_BUF_SIZE, 0);
if (res < 0) {
- if (res == -EAGAIN) {
+ if (res == -EAGAIN || res == -ETIMEDOUT) {
goto repeat_event_recv;
} else {
errno = -res;
@@ -442,11 +438,14 @@ static void test_ipc_server_fail(void)
}
/*
+ * wait a bit for the server to die.
+ */
+ sleep(1);
+ /*
* try recv from the exit'ed server
*/
- res = qb_ipcc_recv(conn,
- &res_header,
- sizeof(struct qb_ipc_response_header));
+ res = qb_ipcc_recv(conn, &res_header,
+ sizeof(struct qb_ipc_response_header), 100);
/*
* confirm we get -ENOTCONN
*/
--
1.7.3.1
13 years, 4 months
Re: [libqb] libqb - Did you come right?
by Dietmar Maurer
Hi Angus,
> Had you come right with the problems you were having?
Please apply the following patch to the example test server:
--- bms.c.org 2010-12-14 08:40:23.000000000 +0100
+++ bms.c 2010-12-14 08:41:11.000000000 +0100
@@ -127,7 +127,7 @@
perror("qb_ipcs_event_send");
}
}
- return 0;
+ return -1;
}
After recompile, the server continues to work - 'bmc' runs quite normal.
I thought it should be removed from the event loop if s1_msg_process_fn returns
something < 0?
The second problem is a bit artificial - don't know if we really need to handle that. When
you apply the following patch:
--- bms.c.org 2010-12-14 08:40:23.000000000 +0100
+++ bms.c 2010-12-14 08:49:13.000000000 +0100
@@ -113,6 +113,9 @@
response.size = sizeof(struct qb_ipc_response_header);
response.id = 13;
response.error = 0;
+
+ return 0;
+
if (blocking) {
res = qb_ipcs_response_send(c, &response,
sizeof(response));
The client (bmc) hangs forever, Well, the server misbehaves and never send a response. I just
thought a timeout on the client side would be a good idea?
- Dietmar
13 years, 4 months
[PATCH 1/2] IPC: return the correct number of bytes sent
by Angus Salkeld
Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
lib/ipcc.c | 10 +++++++---
1 files changed, 7 insertions(+), 3 deletions(-)
diff --git a/lib/ipcc.c b/lib/ipcc.c
index e6faebb..ea79370 100644
--- a/lib/ipcc.c
+++ b/lib/ipcc.c
@@ -82,6 +82,7 @@ ssize_t qb_ipcc_send(struct qb_ipcc_connection * c, const void *msg_ptr,
size_t msg_len)
{
ssize_t res;
+ ssize_t res2;
if (msg_len > c->request.max_msg_size) {
return -EINVAL;
@@ -91,10 +92,13 @@ ssize_t qb_ipcc_send(struct qb_ipcc_connection * c, const void *msg_ptr,
}
res = c->funcs.send(&c->request, msg_ptr, msg_len);
- if (res > 0 && c->needs_sock_for_poll) {
+ if (res == msg_len && c->needs_sock_for_poll) {
do {
- res = qb_ipc_us_send(&c->setup, msg_ptr, 1);
- } while (res == -EAGAIN);
+ res2 = qb_ipc_us_send(&c->setup, msg_ptr, 1);
+ } while (res2 == -EAGAIN);
+ if (res2 != 1) {
+ res = res2;
+ }
}
return res;
}
--
1.7.3.1
13 years, 4 months
[PATCH 1/4] DOCS: fix some doxygen warnings for missing comments.
by Angus Salkeld
Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
include/qb/qbipcs.h | 37 +++++++++++++++++++++++++++++++++----
1 files changed, 33 insertions(+), 4 deletions(-)
diff --git a/include/qb/qbipcs.h b/include/qb/qbipcs.h
index 5d2420b..ed55abd 100644
--- a/include/qb/qbipcs.h
+++ b/include/qb/qbipcs.h
@@ -162,7 +162,9 @@ void qb_ipcs_unref(qb_ipcs_service_t *s);
/**
* Set your poll callbacks.
+ *
* @param s service instance
+ * @param handlers the handlers that you want ipcs to use.
*/
void qb_ipcs_poll_handlers_set(qb_ipcs_service_t* s,
struct qb_ipcs_poll_handlers *handlers);
@@ -176,44 +178,68 @@ int32_t qb_ipcs_run(qb_ipcs_service_t* s);
/**
* Destroy the IPC server.
- * @param s service instance
+ *
+ * @param s service instance to destroy
*/
void qb_ipcs_destroy(qb_ipcs_service_t* s);
/**
- *
+ * Limit the incomming request rate.
* @param s service instance
+ * @param rl the new rate
*/
-void qb_ipcs_request_rate_limit(qb_ipcs_service_t* pt, enum qb_ipcs_rate_limit rl);
+void qb_ipcs_request_rate_limit(qb_ipcs_service_t* s, enum qb_ipcs_rate_limit rl);
/**
- * send a response to a incomming request.
+ * Send a response to a incomming request.
+ *
* @param c connection instance
+ * @param data the message to send
+ * @param size the size of the message
+ * @return size sent or -errno for errors
*/
ssize_t qb_ipcs_response_send(qb_ipcs_connection_t *c, const void *data, size_t size);
+/**
+ * Send a response to a incomming request.
+ *
+ * @param c connection instance
+ * @param iov the iovec struct that points to the message to send
+ * @param iov_len the number of iovecs.
+ * @return size sent or -errno for errors
+ */
ssize_t qb_ipcs_response_sendv(qb_ipcs_connection_t *c, const struct iovec * iov, size_t iov_len);
/**
* Send an asyncronous event message to the client.
+ *
* @param c connection instance
+ * @param data the message to send
+ * @param size the size of the message
+ * @return size sent or -errno for errors
*/
ssize_t qb_ipcs_event_send(qb_ipcs_connection_t *c, const void *data, size_t size);
/**
* Send an asyncronous event message to the client.
+ *
* @param c connection instance
+ * @param iov the iovec struct that points to the message to send
+ * @param iov_len the number of iovecs.
+ * @return size sent or -errno for errors
*/
ssize_t qb_ipcs_event_sendv(qb_ipcs_connection_t *c, const struct iovec * iov, size_t iov_len);
/**
* Increment the connection's reference counter.
+ *
* @param c connection instance
*/
void qb_ipcs_connection_ref_inc(qb_ipcs_connection_t *c);
/**
* Decrement the connection's reference counter.
+ *
* @param c connection instance
*/
void qb_ipcs_connection_ref_dec(qb_ipcs_connection_t *c);
@@ -223,6 +249,7 @@ void qb_ipcs_disconnect(qb_ipcs_connection_t *c);
/**
* Get the service id related to this connection's service.
* (as passed into qb_ipcs_create()
+ *
* @return service id.
*/
int32_t qb_ipcs_service_id_get(qb_ipcs_connection_t *c);
@@ -248,6 +275,7 @@ void *qb_ipcs_context_get(qb_ipcs_connection_t *c);
/**
* Get the connection statistics.
*
+ * @param stats (out) the statistics structure
* @param clear_after_read clear stats after copying them into stats
* @param c connection instance
* @return 0 == ok; -errno to indicate a failure
@@ -259,6 +287,7 @@ int32_t qb_ipcs_connection_stats_get(qb_ipcs_connection_t *c,
/**
* Get the service statistics.
*
+ * @param stats (out) the statistics structure
* @param clear_after_read clear stats after copying them into stats
* @param pt service instance
* @return 0 == ok; -errno to indicate a failure
--
1.7.3.1
13 years, 4 months
[PATCH] DOCS: fix some doxygen warnings for missing comments.
by Angus Salkeld
Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
include/qb/qbipcs.h | 37 +++++++++++++++++++++++++++++++++----
1 files changed, 33 insertions(+), 4 deletions(-)
diff --git a/include/qb/qbipcs.h b/include/qb/qbipcs.h
index 5d2420b..ed55abd 100644
--- a/include/qb/qbipcs.h
+++ b/include/qb/qbipcs.h
@@ -162,7 +162,9 @@ void qb_ipcs_unref(qb_ipcs_service_t *s);
/**
* Set your poll callbacks.
+ *
* @param s service instance
+ * @param handlers the handlers that you want ipcs to use.
*/
void qb_ipcs_poll_handlers_set(qb_ipcs_service_t* s,
struct qb_ipcs_poll_handlers *handlers);
@@ -176,44 +178,68 @@ int32_t qb_ipcs_run(qb_ipcs_service_t* s);
/**
* Destroy the IPC server.
- * @param s service instance
+ *
+ * @param s service instance to destroy
*/
void qb_ipcs_destroy(qb_ipcs_service_t* s);
/**
- *
+ * Limit the incomming request rate.
* @param s service instance
+ * @param rl the new rate
*/
-void qb_ipcs_request_rate_limit(qb_ipcs_service_t* pt, enum qb_ipcs_rate_limit rl);
+void qb_ipcs_request_rate_limit(qb_ipcs_service_t* s, enum qb_ipcs_rate_limit rl);
/**
- * send a response to a incomming request.
+ * Send a response to a incomming request.
+ *
* @param c connection instance
+ * @param data the message to send
+ * @param size the size of the message
+ * @return size sent or -errno for errors
*/
ssize_t qb_ipcs_response_send(qb_ipcs_connection_t *c, const void *data, size_t size);
+/**
+ * Send a response to a incomming request.
+ *
+ * @param c connection instance
+ * @param iov the iovec struct that points to the message to send
+ * @param iov_len the number of iovecs.
+ * @return size sent or -errno for errors
+ */
ssize_t qb_ipcs_response_sendv(qb_ipcs_connection_t *c, const struct iovec * iov, size_t iov_len);
/**
* Send an asyncronous event message to the client.
+ *
* @param c connection instance
+ * @param data the message to send
+ * @param size the size of the message
+ * @return size sent or -errno for errors
*/
ssize_t qb_ipcs_event_send(qb_ipcs_connection_t *c, const void *data, size_t size);
/**
* Send an asyncronous event message to the client.
+ *
* @param c connection instance
+ * @param iov the iovec struct that points to the message to send
+ * @param iov_len the number of iovecs.
+ * @return size sent or -errno for errors
*/
ssize_t qb_ipcs_event_sendv(qb_ipcs_connection_t *c, const struct iovec * iov, size_t iov_len);
/**
* Increment the connection's reference counter.
+ *
* @param c connection instance
*/
void qb_ipcs_connection_ref_inc(qb_ipcs_connection_t *c);
/**
* Decrement the connection's reference counter.
+ *
* @param c connection instance
*/
void qb_ipcs_connection_ref_dec(qb_ipcs_connection_t *c);
@@ -223,6 +249,7 @@ void qb_ipcs_disconnect(qb_ipcs_connection_t *c);
/**
* Get the service id related to this connection's service.
* (as passed into qb_ipcs_create()
+ *
* @return service id.
*/
int32_t qb_ipcs_service_id_get(qb_ipcs_connection_t *c);
@@ -248,6 +275,7 @@ void *qb_ipcs_context_get(qb_ipcs_connection_t *c);
/**
* Get the connection statistics.
*
+ * @param stats (out) the statistics structure
* @param clear_after_read clear stats after copying them into stats
* @param c connection instance
* @return 0 == ok; -errno to indicate a failure
@@ -259,6 +287,7 @@ int32_t qb_ipcs_connection_stats_get(qb_ipcs_connection_t *c,
/**
* Get the service statistics.
*
+ * @param stats (out) the statistics structure
* @param clear_after_read clear stats after copying them into stats
* @param pt service instance
* @return 0 == ok; -errno to indicate a failure
--
1.7.3.1
13 years, 4 months
Re: [libqb] [Openais] question abou qb_ipcs_msg_process_fn()
by Angus Salkeld
On Thu, Dec 09, 2010 at 10:35:15AM +0000, Dietmar Maurer wrote:
> Hi all,
>
Can we move this to the libqb mailing list?
https://fedorahosted.org/mailman/listinfo/quarterback-devel
> I am playing around with libqb writing my first test server. Normal operation work
> quite good so far. I just wonder how to handle errors on the server side, especially
> in qb_ipcs_msg_process_fn:
>
> typedef int32_t (*qb_ipcs_msg_process_fn) (qb_ipcs_connection_t *c,
> void *data, size_t size);
>
> I thought the return value is used to indicate errors, but if I simply
> return a negative number my client hang forever.
Currently if you return anything but -ENOBUFS or -EAGAIN the server will
remove the socket from the poll loop.
To return an error to the client use qb_ipcs_response_send().
like:
{
struct qb_ipc_response_header response;
response.size = sizeof(struct qb_ipc_response_header);
response.id = MY_MSG_ID;
response.error = MY_ERROR;
qb_ipcs_response_send(c, &response, sizeof(response));
}
Regards
-Angus
>
> So I always need to send something back? What is the purpose of the return value then?
>
> On the client side there seems to be no timeout in qb_ipcc_sendv_recv()?
>
> - Dietmar
>
>
> _______________________________________________
> Openais mailing list
> Openais(a)lists.linux-foundation.org
> https://lists.linux-foundation.org/mailman/listinfo/openais
13 years, 4 months