Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
include/qb/qbipcs.h | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++
lib/ipc_int.h | 2 +
lib/ipc_us.c | 2 +
lib/ipcs.c | 44 +++++++++++++++++++++++++++++++++-
tests/bms.c | 33 +++++++++++++++++++++----
5 files changed, 139 insertions(+), 7 deletions(-)
diff --git a/include/qb/qbipcs.h b/include/qb/qbipcs.h
index 35e0868..3517eed 100644
--- a/include/qb/qbipcs.h
+++ b/include/qb/qbipcs.h
@@ -46,6 +46,22 @@ typedef struct qb_ipcs_connection qb_ipcs_connection_t;
typedef qb_handle_t qb_ipcs_service_pt;
+struct qb_ipcs_stats {
+ uint32_t active_connections;
+ uint32_t closed_connections;
+};
+
+struct qb_ipcs_connection_stats {
+ int32_t client_pid;
+ uint64_t requests;
+ uint64_t responses;
+ uint64_t events;
+ uint64_t send_retries;
+ uint64_t recv_retries;
+ int32_t flow_control_state;
+ uint64_t flow_control_count;
+};
+
typedef int32_t (*qb_ipcs_dispatch_fn_t) (int32_t fd, int32_t revents,
void *data);
@@ -109,44 +125,57 @@ qb_ipcs_service_pt qb_ipcs_create(const char *name,
/**
* Set your poll callbacks.
+ * @param s service instance
*/
void qb_ipcs_poll_handlers_set(qb_ipcs_service_pt s,
struct qb_ipcs_poll_handlers *handlers);
/**
* run the new IPC server.
+ * @param s service instance
+ * @return 0 == ok; -errno to indicate a failure
*/
int32_t qb_ipcs_run(qb_ipcs_service_pt s);
/**
* Destroy the IPC server.
+ * @param s service instance
*/
void qb_ipcs_destroy(qb_ipcs_service_pt s);
+/**
+ *
+ * @param s service instance
+ */
void qb_ipcs_request_rate_limit(qb_ipcs_service_pt pt, enum qb_ipcs_rate_limit rl);
/**
* send a response to a incomming request.
+ * @param c connection instance
*/
ssize_t qb_ipcs_response_send(qb_ipcs_connection_t *c, const void *data, size_t size);
/**
* Send an asyncronous event message to the client.
+ * @param c connection instance
*/
ssize_t qb_ipcs_event_send(qb_ipcs_connection_t *c, const void *data, size_t size);
/**
* Send an asyncronous event message to the client.
+ * @param c connection instance
*/
ssize_t qb_ipcs_event_sendv(qb_ipcs_connection_t *c, const struct iovec * iov, size_t
iov_len);
/**
* Increment the connection's reference counter.
+ * @param c connection instance
*/
void qb_ipcs_connection_ref_inc(qb_ipcs_connection_t *c);
/**
* Decrement the connection's reference counter.
+ * @param c connection instance
*/
void qb_ipcs_connection_ref_dec(qb_ipcs_connection_t *c);
@@ -157,10 +186,46 @@ void qb_ipcs_connection_ref_dec(qb_ipcs_connection_t *c);
*/
int32_t qb_ipcs_service_id_get(qb_ipcs_connection_t *c);
+/**
+ * Associate a "user" pointer with this connection.
+ *
+ * @param context the point to associate with this connection.
+ * @param c connection instance
+ * @see qb_ipcs_context_get()
+ */
void qb_ipcs_context_set(qb_ipcs_connection_t *c, void *context);
+/**
+ * Get the context (set previously)
+ *
+ * @param c connection instance
+ * @return the context
+ * @see qb_ipcs_context_set()
+ */
void *qb_ipcs_context_get(qb_ipcs_connection_t *c);
+/**
+ * Get the connection statistics.
+ *
+ * @param clear_after_read clear stats after copying them into stats
+ * @param c connection instance
+ * @return 0 == ok; -errno to indicate a failure
+ */
+int32_t qb_ipcs_connection_stats_get(qb_ipcs_connection_t *c,
+ struct qb_ipcs_connection_stats* stats,
+ int32_t clear_after_read);
+
+/**
+ * Get the service statistics.
+ *
+ * @param clear_after_read clear stats after copying them into stats
+ * @param s service instance
+ * @return 0 == ok; -errno to indicate a failure
+ */
+int32_t qb_ipcs_stats_get(qb_ipcs_service_pt s,
+ struct qb_ipcs_stats* stats,
+ int32_t clear_after_read);
+
/* *INDENT-OFF* */
#ifdef __cplusplus
diff --git a/lib/ipc_int.h b/lib/ipc_int.h
index 10183b9..5ba2147 100644
--- a/lib/ipc_int.h
+++ b/lib/ipc_int.h
@@ -163,6 +163,7 @@ struct qb_ipcs_service {
enum qb_loop_priority poll_priority;
struct qb_list_head connections;
+ struct qb_ipcs_stats stats;
};
struct qb_ipcs_connection {
@@ -179,6 +180,7 @@ struct qb_ipcs_connection {
char *receive_buf;
void *context;
int32_t fc_enabled;
+ struct qb_ipcs_connection_stats stats;
};
void qb_ipcs_pmq_init(struct qb_ipcs_service *s);
diff --git a/lib/ipc_us.c b/lib/ipc_us.c
index 32c687d..9df5345 100644
--- a/lib/ipc_us.c
+++ b/lib/ipc_us.c
@@ -520,6 +520,7 @@ static int32_t handle_new_connection(struct qb_ipcs_service *s,
c->pid = ugp->pid;
c->euid = ugp->uid;
c->egid = ugp->gid;
+ c->stats.client_pid = ugp->pid;
if (c->service->serv_fns.connection_accept) {
res = c->service->serv_fns.connection_accept(c,
@@ -567,6 +568,7 @@ send_response:
response.connection = (intptr_t)c;
response.connection_type = s->type;
response.max_msg_size = c->request.max_msg_size;
+ s->stats.active_connections++;
}
qb_ipc_us_send(&c->setup, &response, response.hdr.size);
diff --git a/lib/ipcs.c b/lib/ipcs.c
index c92baa4..4f268b2 100644
--- a/lib/ipcs.c
+++ b/lib/ipcs.c
@@ -203,6 +203,11 @@ ssize_t qb_ipcs_response_send(struct qb_ipcs_connection *c, const
void *data,
qb_ipcs_connection_ref_inc(c);
res = c->service->funcs.send(&c->response, data, size);
+ if (res == size) {
+ c->stats.responses++;
+ } else if (res == -EAGAIN) {
+ c->stats.send_retries++;
+ }
qb_ipcs_connection_ref_dec(c);
return res;
@@ -220,6 +225,11 @@ ssize_t qb_ipcs_event_send(struct qb_ipcs_connection *c, const void
*data,
do {
try_count++;
res = c->service->funcs.send(&c->event, data, size);
+ if (res == size) {
+ c->stats.events++;
+ } else if (res == -EAGAIN) {
+ c->stats.send_retries++;
+ }
} while (res == -EAGAIN && try_count < 20);
if (res > 0) {
if (c->service->needs_sock_for_poll) {
@@ -249,6 +259,11 @@ ssize_t qb_ipcs_event_sendv(struct qb_ipcs_connection *c, const
struct iovec * i
do {
try_count++;
res = c->service->funcs.sendv(&c->event, iov, iov_len);
+ if (res > 0) {
+ c->stats.events++;
+ } else if (res == -EAGAIN) {
+ c->stats.send_retries++;
+ }
} while (res == -EAGAIN && try_count < 20);
if (res > 0) {
if (c->service->needs_sock_for_poll) {
@@ -269,7 +284,7 @@ ssize_t qb_ipcs_event_sendv(struct qb_ipcs_connection *c, const struct
iovec * i
struct qb_ipcs_connection *qb_ipcs_connection_alloc(struct qb_ipcs_service *s)
{
- struct qb_ipcs_connection *c = malloc(sizeof(struct qb_ipcs_connection));
+ struct qb_ipcs_connection *c = calloc(1, sizeof(struct qb_ipcs_connection));
c->refcount = 1;
c->service = s;
@@ -295,6 +310,8 @@ void qb_ipcs_connection_ref_dec(struct qb_ipcs_connection *c)
kill_it = qb_atomic_int_dec_and_test(&c->refcount);
if (kill_it) {
qb_util_log(LOG_DEBUG, "%s() %d", __func__, c->refcount);
+ c->service->stats.active_connections--;
+ c->service->stats.closed_connections++;
qb_list_del(&c->list);
if (c->service->serv_fns.connection_destroyed) {
c->service->serv_fns.connection_destroyed(c);
@@ -325,6 +342,8 @@ static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c,
int32_t fc_ena
if (c->fc_enabled != fc_enable) {
c->service->funcs.fc_set(&c->request, fc_enable);
c->fc_enabled = fc_enable;
+ c->stats.flow_control_state = fc_enable;
+ c->stats.flow_control_count++;
}
}
@@ -348,10 +367,13 @@ static int32_t _process_request_(struct qb_ipcs_connection *c,
if (size < 0) {
if (size != -EAGAIN) {
qb_util_log(LOG_ERR, "%s(): %s", __func__, strerror(-res));
+ } else {
+ c->stats.recv_retries++;
}
res = size;
goto cleanup;
}
+ c->stats.requests++;
if (hdr->id == QB_IPC_MSG_DISCONNECT) {
qb_util_log(LOG_DEBUG, "%s() QB_IPC_MSG_DISCONNECT", __func__);
@@ -466,3 +488,23 @@ void *qb_ipcs_context_get(struct qb_ipcs_connection *c)
return c->context;
}
+int32_t qb_ipcs_connection_stats_get(qb_ipcs_connection_t *c,
+ struct qb_ipcs_connection_stats* stats,
+ int32_t clear_after_read)
+{
+ memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats));
+ return 0;
+}
+
+int32_t qb_ipcs_stats_get(qb_ipcs_service_pt pt,
+ struct qb_ipcs_stats* stats,
+ int32_t clear_after_read)
+{
+ struct qb_ipcs_service *s;
+
+ qb_hdb_handle_get(&qb_ipc_services, pt, (void **)&s);
+ memcpy(stats, &s->stats, sizeof(struct qb_ipcs_stats));
+ qb_hdb_handle_put(&qb_ipc_services, pt);
+ return 0;
+}
+
diff --git a/tests/bms.c b/tests/bms.c
index 546bd65..3f1a3cd 100644
--- a/tests/bms.c
+++ b/tests/bms.c
@@ -61,15 +61,36 @@ static int32_t s1_connection_accept_fn(qb_ipcs_connection_t *c, uid_t
uid, gid_t
static void s1_connection_created_fn(qb_ipcs_connection_t *c)
{
- if (verbose) {
- printf("%s:%d %s\n", __FILE__, __LINE__, __func__);
- }
+ struct qb_ipcs_stats srv_stats;
+
+ qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE);
+ printf("\n Connection created\n > active:%d\n > closed:%d\n\n",
+ srv_stats.active_connections,
+ srv_stats.closed_connections);
}
+
static void s1_connection_destroyed_fn(qb_ipcs_connection_t *c)
{
- if (verbose) {
- printf("%s:%d %s\n", __FILE__, __LINE__, __func__);
- }
+ struct qb_ipcs_connection_stats stats;
+ struct qb_ipcs_stats srv_stats;
+
+ qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE);
+
+ qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
+
+ printf("\n Connection to pid:%d destroyed\n > active:%d\n >
closed:%d\n\n",
+ stats.client_pid,
+ srv_stats.active_connections,
+ srv_stats.closed_connections);
+
+ printf(" Requests %"PRIu64"\n", stats.requests);
+ printf(" Responses %"PRIu64"\n", stats.responses);
+ printf(" Events %"PRIu64"\n", stats.events);
+ printf(" Send retries %"PRIu64"\n", stats.send_retries);
+ printf(" Recv retries %"PRIu64"\n", stats.recv_retries);
+ printf(" FC state %d\n", stats.flow_control_state);
+ printf(" FC count %"PRIu64"\n\n", stats.flow_control_count);
+
}
static int32_t s1_msg_process_fn(qb_ipcs_connection_t *c,
--
1.7.2.3