Signed-off-by: Angus Salkeld asalkeld@redhat.com --- tests/check_ipc.c | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-)
diff --git a/tests/check_ipc.c b/tests/check_ipc.c index 70e41ef..f2124b2 100644 --- a/tests/check_ipc.c +++ b/tests/check_ipc.c @@ -383,99 +383,119 @@ test_ipc_exit(void)
START_TEST(test_ipc_exit_us) { + qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; recv_timeout = 5000; test_ipc_exit(); + qb_leave(); } END_TEST
#ifdef HAVE_SEM_TIMEDWAIT START_TEST(test_ipc_exit_shm) { + qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; recv_timeout = 1000; test_ipc_exit(); + qb_leave(); } END_TEST
START_TEST(test_ipc_txrx_shm_tmo) { + qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SHM; ipc_name = __func__; recv_timeout = 1000; test_ipc_txrx(); + qb_leave(); } END_TEST
START_TEST(test_ipc_txrx_shm_block) { + qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SHM; ipc_name = __func__; recv_timeout = -1; test_ipc_txrx(); + qb_leave(); } END_TEST
START_TEST(test_ipc_fc_shm) { + qb_enter(); turn_on_fc = QB_TRUE; ipc_type = QB_IPC_SHM; recv_timeout = 500; ipc_name = __func__; test_ipc_txrx(); + qb_leave(); } END_TEST #endif /* HAVE_SEM_TIMEDWAIT */
START_TEST(test_ipc_txrx_us_block) { + qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; recv_timeout = -1; test_ipc_txrx(); + qb_leave(); } END_TEST
START_TEST(test_ipc_txrx_us_tmo) { + qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SOCKET; ipc_name = __func__; recv_timeout = 1000; test_ipc_txrx(); + qb_leave(); } END_TEST
START_TEST(test_ipc_fc_us) { + qb_enter(); turn_on_fc = QB_TRUE; ipc_type = QB_IPC_SOCKET; recv_timeout = 500; ipc_name = __func__; test_ipc_txrx(); + qb_leave(); } END_TEST
START_TEST(test_ipc_txrx_pmq) { + qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_POSIX_MQ; ipc_name = __func__; test_ipc_txrx(); + qb_leave(); } END_TEST
START_TEST(test_ipc_txrx_smq) { + qb_enter(); turn_on_fc = QB_FALSE; ipc_type = QB_IPC_SYSV_MQ; ipc_name = __func__; test_ipc_txrx(); + qb_leave(); } END_TEST
@@ -524,9 +544,11 @@ test_ipc_dispatch(void)
START_TEST(test_ipc_disp_us) { + qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_dispatch(); + qb_leave(); } END_TEST
@@ -594,9 +616,11 @@ test_ipc_bulk_events(void)
START_TEST(test_ipc_bulk_events_us) { + qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_bulk_events(); + qb_leave(); } END_TEST
@@ -649,34 +673,42 @@ test_ipc_server_fail(void)
START_TEST(test_ipc_server_fail_soc) { + qb_enter(); ipc_type = QB_IPC_SOCKET; ipc_name = __func__; test_ipc_server_fail(); + qb_leave(); } END_TEST
#ifdef HAVE_SEM_TIMEDWAIT START_TEST(test_ipc_disp_shm) { + qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_dispatch(); + qb_leave(); } END_TEST
START_TEST(test_ipc_bulk_events_shm) { + qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_bulk_events(); + qb_leave(); } END_TEST
START_TEST(test_ipc_server_fail_shm) { + qb_enter(); ipc_type = QB_IPC_SHM; ipc_name = __func__; test_ipc_server_fail(); + qb_leave(); } END_TEST #endif /* HAVE_SEM_TIMEDWAIT */ @@ -788,7 +820,7 @@ main(void) qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, QB_LOG_FILTER_FILE, "*", LOG_TRACE); qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); - qb_log_format_set(QB_LOG_STDERR, "[%p] %f:%l %b"); + qb_log_format_set(QB_LOG_STDERR, "lib/%f|%l| %b");
srunner_run_all(sr, CK_VERBOSE); number_failed = srunner_ntests_failed(sr);
type "events" and the server will send 10 events. type 'kill' into the client and this will simulate the server dying.
Signed-off-by: Angus Salkeld asalkeld@redhat.com --- examples/ipcclient.c | 14 ++++++++++++++ examples/ipcserver.c | 33 ++++++++++++++++++++++++++------- 2 files changed, 40 insertions(+), 7 deletions(-)
diff --git a/examples/ipcclient.c b/examples/ipcclient.c index 412e6a8..8f62527 100644 --- a/examples/ipcclient.c +++ b/examples/ipcclient.c @@ -78,6 +78,7 @@ main(int argc, char *argv[]) rc = qb_ipcc_send(conn, &req, req.hdr.size); if (rc < 0) { perror("qb_ipcc_send"); + exit(0); } }
@@ -85,6 +86,19 @@ main(int argc, char *argv[]) rc = qb_ipcc_recv(conn, &res, sizeof(res), -1); if (rc < 0) { perror("qb_ipcc_recv"); + exit(0); + } + if (strcasecmp(req.message, "events") == 0) { + int32_t i; + printf("waiting for 10 events\n"); + for (i = 0; i < 10; i++) { + rc = qb_ipcc_event_recv(conn, &res, sizeof(res), -1); + if (rc < 0) { + perror("qb_ipcc_event_recv"); + } else { + printf("got event %d rc:%d\n", i, rc); + } + } } printf("Response[%d]: %s \n", res.hdr.id, res.message); } diff --git a/examples/ipcserver.c b/examples/ipcserver.c index e863690..383157d 100644 --- a/examples/ipcserver.c +++ b/examples/ipcserver.c @@ -91,32 +91,50 @@ s1_connection_closed_fn(qb_ipcs_connection_t * c) return 0; }
+struct my_req { + struct qb_ipc_request_header hdr; + char message[256]; +}; + static int32_t s1_msg_process_fn(qb_ipcs_connection_t * c, void *data, size_t size) { - struct qb_ipc_request_header *req_pt = - (struct qb_ipc_request_header *)data; + struct my_req *req_pt = (struct my_req *)data; struct qb_ipc_response_header response; ssize_t res; struct iovec iov[2]; char resp[100]; + int32_t sl; + + qb_log(LOG_DEBUG, "msg received (id:%d, size:%d, data:%s)", + req_pt->hdr.id, req_pt->hdr.size, req_pt->message);
- qb_log(LOG_DEBUG, "msg received (id:%d, size:%d)", - req_pt->id, req_pt->size); + if (strcmp(req_pt->message, "kill") == 0) { + exit(0); + } response.size = sizeof(struct qb_ipc_response_header); response.id = 13; response.error = 0;
- snprintf(resp, 100, "ACK %zd bytes", size); + sl = snprintf(resp, 100, "ACK %zd bytes", size) + 1; iov[0].iov_len = sizeof(response); iov[0].iov_base = &response; - iov[1].iov_len = strlen(resp) + 1; + iov[1].iov_len = sl; iov[1].iov_base = resp; + response.size += sl;
res = qb_ipcs_response_sendv(c, iov, 2); if (res < 0) { qb_perror(LOG_ERR, "qb_ipcs_response_send"); } + if (strcmp(req_pt->message, "events") == 0) { + int32_t i; + qb_log(LOG_INFO, "request to send 10 events"); + for (i = 0; i < 10; i++) { + res = qb_ipcs_event_sendv(c, iov, 2); + qb_log(LOG_INFO, "sent event %d res:%d", i, res); + } + } return 0; }
@@ -142,6 +160,7 @@ show_usage(const char *name) printf(" -s use sysv message queues\n"); printf(" -u use unix sockets\n"); printf(" -g use glib mainloop\n"); + printf(" -e use events\n"); printf("\n"); }
@@ -244,7 +263,7 @@ my_dispatch_del(int32_t fd) int32_t main(int32_t argc, char *argv[]) { - const char *options = "mpsugh"; + const char *options = "mpseugh"; int32_t opt; enum qb_ipc_type ipc_type = QB_IPC_NATIVE; struct qb_ipcs_service_handlers sh = {
It needs to only recv the size of this message, else we recv more than one message and effectively drop messages (hidden at the end of the current message).
Signed-off-by: Angus Salkeld asalkeld@redhat.com --- lib/ipc_us.c | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-)
diff --git a/lib/ipc_us.c b/lib/ipc_us.c index 579fa90..15c731f 100644 --- a/lib/ipc_us.c +++ b/lib/ipc_us.c @@ -242,7 +242,8 @@ qb_ipc_us_recv(struct qb_ipc_one_way * one_way, struct ipc_us_control *ctl = NULL;
retry_recv: - result = recv(one_way->u.us.sock, &data[processed], to_recv, MSG_NOSIGNAL | MSG_WAITALL); + result = recv(one_way->u.us.sock, &data[processed], to_recv, + MSG_NOSIGNAL | MSG_WAITALL); if (timeout == -1) { if (result == -1 && errno == EAGAIN) { goto retry_recv; @@ -272,35 +273,52 @@ retry_recv:
/* * recv a message of unknow size. - * (could use MSG_PEEK here) */ static ssize_t qb_ipc_us_recv_at_most(struct qb_ipc_one_way * one_way, void *msg, size_t len, int32_t timeout) { int32_t result; + int32_t processed = 0; + int32_t to_recv = sizeof(struct qb_ipc_request_header); + char *data = msg; struct ipc_us_control *ctl = NULL; + struct qb_ipc_request_header *hdr = NULL;
retry_recv: - result = recv(one_way->u.us.sock, msg, len, MSG_NOSIGNAL | MSG_WAITALL); + result = recv(one_way->u.us.sock, &data[processed], to_recv, + MSG_NOSIGNAL | MSG_WAITALL); if (timeout == -1) { if (result == -1 && errno == EAGAIN) { goto retry_recv; } } if (result == 0) { - qb_util_log(LOG_DEBUG, "recv(fd %d) got 0 bytes assuming ENOTCONN", + qb_util_log(LOG_DEBUG, + "recv(fd %d) got 0 bytes assuming ENOTCONN", one_way->u.us.sock); return -ENOTCONN; } if (result == -1) { return -errno; } + processed += result; + if (processed >= sizeof(struct qb_ipc_request_header) && hdr == NULL) { + hdr = (struct qb_ipc_request_header*)msg; + } + if (hdr) { + to_recv = hdr->size - processed; + } else { + to_recv = len - processed; + } + if (to_recv > 0) { + goto retry_recv; + } ctl = (struct ipc_us_control *)one_way->u.us.shared_data; if (ctl) { (void)qb_atomic_int_dec_and_test(&ctl->sent); } - return result; + return processed; }
quarterback-devel@lists.fedorahosted.org