Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
lib/ipc_int.h | 2 +
lib/ipcs.c | 178 ++++++++++++++++++++++++++++++++++++---------------------
2 files changed, 115 insertions(+), 65 deletions(-)
diff --git a/lib/ipc_int.h b/lib/ipc_int.h
index e609302..ed36ca8 100644
--- a/lib/ipc_int.h
+++ b/lib/ipc_int.h
@@ -178,6 +178,8 @@ struct qb_ipcs_connection {
char *receive_buf;
void *context;
int32_t fc_enabled;
+ int32_t poll_events;
+ int32_t outstanding_notifiers;
struct qb_ipcs_connection_stats stats;
};
diff --git a/lib/ipcs.c b/lib/ipcs.c
index 37d950b..5fd014c 100644
--- a/lib/ipcs.c
+++ b/lib/ipcs.c
@@ -105,22 +105,49 @@ int32_t qb_ipcs_run(struct qb_ipcs_service* s)
return res;
}
-void qb_ipcs_request_rate_limit(struct qb_ipcs_service* s, enum qb_ipcs_rate_limit rl)
+static int32_t _modify_dispatch_descriptor_(struct qb_ipcs_connection *c)
+{
+ if (c->service->type == QB_IPC_POSIX_MQ
+ && !c->service->needs_sock_for_poll) {
+ return c->service->poll_fns.dispatch_mod(c->service->
+ poll_priority,
+ (int32_t) c->request.u.
+ pmq.q, c->poll_events,
+ c,
+ qb_ipcs_dispatch_service_request);
+ } else if (c->service->type == QB_IPC_SOCKET) {
+ return c->service->poll_fns.dispatch_mod(c->service->
+ poll_priority,
+ c->event.u.us.sock,
+ c->poll_events, c,
+ qb_ipcs_dispatch_connection_request);
+ } else {
+ return c->service->poll_fns.dispatch_mod(c->service->
+ poll_priority,
+ c->setup.u.us.sock,
+ c->poll_events, c,
+ qb_ipcs_dispatch_connection_request);
+ }
+ return -EINVAL;
+}
+
+void qb_ipcs_request_rate_limit(struct qb_ipcs_service *s,
+ enum qb_ipcs_rate_limit rl)
{
struct qb_ipcs_connection *c;
- enum qb_loop_priority p;
+ enum qb_loop_priority old_p = s->poll_priority;
switch (rl) {
case QB_IPCS_RATE_FAST:
- p = QB_LOOP_HIGH;
+ s->poll_priority = QB_LOOP_HIGH;
break;
case QB_IPCS_RATE_SLOW:
case QB_IPCS_RATE_OFF:
- p = QB_LOOP_LOW;
+ s->poll_priority = QB_LOOP_LOW;
break;
default:
case QB_IPCS_RATE_NORMAL:
- p = QB_LOOP_MED;
+ s->poll_priority = QB_LOOP_MED;
break;
}
@@ -128,29 +155,14 @@ void qb_ipcs_request_rate_limit(struct qb_ipcs_service* s, enum
qb_ipcs_rate_lim
qb_ipcs_connection_ref_inc(c);
qb_ipcs_flowcontrol_set(c, (rl == QB_IPCS_RATE_OFF));
- if (s->poll_priority == p) {
+ if (old_p == s->poll_priority) {
qb_ipcs_connection_ref_dec(c);
continue;
}
- if (s->type == QB_IPC_POSIX_MQ && !s->needs_sock_for_poll) {
- (void)s->poll_fns.dispatch_mod(p, (int32_t)c->request.u.pmq.q,
- POLLIN | POLLPRI | POLLNVAL,
- c, qb_ipcs_dispatch_service_request);
- } else if (s->type == QB_IPC_SOCKET) {
- (void)s->poll_fns.dispatch_mod(p, c->event.u.us.sock,
- POLLIN | POLLPRI | POLLNVAL,
- c,
- qb_ipcs_dispatch_connection_request);
- } else {
- (void)s->poll_fns.dispatch_mod(p, c->setup.u.us.sock,
- POLLIN | POLLPRI | POLLNVAL,
- c,
- qb_ipcs_dispatch_connection_request);
- }
+ (void)_modify_dispatch_descriptor_(c);
qb_ipcs_connection_ref_dec(c);
}
- s->poll_priority = p;
}
void qb_ipcs_ref(struct qb_ipcs_service *s)
@@ -223,76 +235,102 @@ ssize_t qb_ipcs_response_sendv(struct qb_ipcs_connection *c, const
struct iovec
return res;
}
-ssize_t qb_ipcs_event_send(struct qb_ipcs_connection *c, const void *data,
+static int32_t send_event_notification(int32_t fd, int32_t revents, void *data)
+{
+ ssize_t res = 0;
+ struct qb_ipcs_connection *c = data;
+
+ if (c->outstanding_notifiers > 0) {
+ res = qb_ipc_us_send(&c->setup, data, c->outstanding_notifiers);
+ }
+ if (res > 0) {
+ c->outstanding_notifiers -= res;
+ }
+ if (c->outstanding_notifiers > 0) {
+ return 0;
+ } else {
+ c->outstanding_notifiers = 0;
+ c->poll_events = POLLIN;
+ (void)_modify_dispatch_descriptor_(c);
+ }
+ return 0;
+}
+
+ssize_t qb_ipcs_event_send(struct qb_ipcs_connection * c, const void *data,
size_t size)
{
ssize_t res;
ssize_t res2 = 0;
- int32_t try_count = 0;
qb_ipcs_connection_ref_inc(c);
- do {
- try_count++;
- res = c->service->funcs.send(&c->event, data, size);
- if (res == size) {
- c->stats.events++;
- } else if (res == -EAGAIN) {
- c->stats.send_retries++;
- }
- } while (res == -EAGAIN && try_count < 20);
- if (res > 0) {
- if (c->service->needs_sock_for_poll) {
- do {
- res2 = qb_ipc_us_send(&c->setup, &res, 1);
- } while (res2 == -EAGAIN);
+ res = c->service->funcs.send(&c->event, data, size);
+ if (res != size) {
+ goto deref_and_return;
+ }
+ c->stats.events++;
+ if (c->service->needs_sock_for_poll) {
+ if (c->outstanding_notifiers > 0) {
+ c->outstanding_notifiers++;
+ } else {
+ res2 = qb_ipc_us_send(&c->setup, data, 1);
+ if (res2 == 1) {
+ goto deref_and_return;
+ }
+ /*
+ * notify the client later, when we can.
+ */
+ c->outstanding_notifiers++;
+ c->poll_events = POLLOUT | POLLIN;
+ (void)_modify_dispatch_descriptor_(c);
}
- } else if (res != -EAGAIN) {
- qb_util_log(LOG_ERR,
- "failed to send event : %s",
- strerror(-res));
}
+
+deref_and_return:
+
qb_ipcs_connection_ref_dec(c);
return res;
}
-
-ssize_t qb_ipcs_event_sendv(struct qb_ipcs_connection *c, const struct iovec * iov,
size_t iov_len)
+ssize_t qb_ipcs_event_sendv(struct qb_ipcs_connection * c,
+ const struct iovec * iov, size_t iov_len)
{
ssize_t res;
ssize_t res2;
- int32_t try_count = 0;
qb_ipcs_connection_ref_inc(c);
- do {
- try_count++;
- res = c->service->funcs.sendv(&c->event, iov, iov_len);
- if (res > 0) {
- c->stats.events++;
- } else if (res == -EAGAIN) {
- c->stats.send_retries++;
- }
- } while (res == -EAGAIN && try_count < 20);
- if (res > 0) {
- if (c->service->needs_sock_for_poll) {
- do {
- res2 = qb_ipc_us_send(&c->setup, &res, 1);
- } while (res2 == -EAGAIN);
+ res = c->service->funcs.sendv(&c->event, iov, iov_len);
+ if (res < 0) {
+ goto deref_and_return;
+ }
+ c->stats.events++;
+ if (c->service->needs_sock_for_poll) {
+ if (c->outstanding_notifiers > 0) {
+ c->outstanding_notifiers++;
+ } else {
+ res2 = qb_ipc_us_send(&c->setup, res, 1);
+ if (res2 == 1) {
+ goto deref_and_return;
+ }
+ /*
+ * notify the client later, when we can.
+ */
+ c->outstanding_notifiers++;
+ c->poll_events = POLLOUT | POLLIN;
+ (void)_modify_dispatch_descriptor_(c);
}
- } else if (res != -EAGAIN) {
- qb_util_log(LOG_ERR,
- "failed to send event : %s",
- strerror(-res));
}
+deref_and_return:
+
qb_ipcs_connection_ref_dec(c);
return res;
}
-qb_ipcs_connection_t * qb_ipcs_connection_first_get(struct qb_ipcs_service* s)
+qb_ipcs_connection_t *qb_ipcs_connection_first_get(struct qb_ipcs_service * s)
{
struct qb_ipcs_connection *c;
struct qb_list_head *iter;
@@ -523,6 +561,16 @@ int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t
revents,
qb_ipcs_disconnect(c);
return -ESHUTDOWN;
}
+
+ if (revents & POLLOUT) {
+ res = send_event_notification(fd, revents, data);
+ if ((revents & POLLIN) == 0) {
+ return 0;
+ }
+ }
+ if (c->fc_enabled) {
+ return 0;
+ }
avail = _request_q_len_get(c);
do {
res = _process_request_(c, IPC_REQUEST_TIMEOUT);
@@ -532,7 +580,7 @@ int32_t qb_ipcs_dispatch_connection_request(int32_t fd, int32_t
revents,
if (res > 0) {
avail--;
}
- } while (avail > 0 && res > 0);
+ } while (avail > 0 && res > 0 && !c->fc_enabled);
if (c->service->needs_sock_for_poll && recvd > 0) {
(void)qb_ipc_us_recv(&c->setup, bytes, recvd, 0);
--
1.7.3.1