[PATCH 1/5] LOOP: fix job poll and simplify main loop
by Angus Salkeld
Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
lib/loop.c | 58 +++++++---------
lib/loop_job.c | 12 +++-
tests/.gitignore | 1 +
tests/Makefile.am | 8 ++-
tests/check_loop.c | 197 ++++++++++++++++++++++++++++++++++++++++++++++++++++
5 files changed, 239 insertions(+), 37 deletions(-)
create mode 100644 tests/check_loop.c
diff --git a/lib/loop.c b/lib/loop.c
index 5a98d67..08b709a 100644
--- a/lib/loop.c
+++ b/lib/loop.c
@@ -32,25 +32,24 @@ static struct qb_loop_source * fd_source;
static int32_t qb_loop_run_level(struct qb_loop_level *level)
{
struct qb_loop_item *job;
- struct qb_list_head *iter, *iter_next;
+ struct qb_list_head *iter;
int32_t processed = 0;
- for (iter = level->job_head.next;
- iter != &level->job_head;
- iter = iter_next) {
- if (processed >= level->to_process) {
- break;
- }
- iter_next = iter->next;
+ Ill_have_another:
+
+ iter = level->job_head.next;
+ if (iter != &level->job_head) {
job = qb_list_entry(iter, struct qb_loop_item, list);
qb_list_del (&job->list);
qb_list_init (&job->list);
job->source->dispatch_and_take_back(job, level->priority);
+ processed++;
if (level->l->stop_requested) {
- printf("%s:%d pr:%d STOP!!!\n", __func__, __LINE__, level->priority);
return processed;
}
- processed++;
+ if (processed < level->to_process) {
+ goto Ill_have_another;
+ }
}
return processed;
}
@@ -88,40 +87,33 @@ void qb_loop_stop(struct qb_loop *l)
void qb_loop_run(struct qb_loop *l)
{
int32_t p;
- int32_t p_stop;
- int32_t todo;
- int32_t done;
+ int32_t p_stop = QB_LOOP_LOW;
+ int32_t todo = 0;
int32_t ms_timeout;
- int32_t fd_poll_done = QB_FALSE;
do {
- p_stop = QB_LOOP_HIGH;
- todo = 0;
- poll_again:
- if (!fd_poll_done) {
- todo += fd_source->poll(fd_source, 0);
+ if (p_stop == QB_LOOP_LOW) {
+ p_stop = QB_LOOP_HIGH;
+ } else {
+ p_stop--;
}
+
todo += job_source->poll(job_source, 0);
todo += timer_source->poll(timer_source, 0);
+ if (todo > 0) {
+ ms_timeout = 0;
+ } else {
+ todo = 0;
+ ms_timeout = qb_loop_timer_msec_duration_to_expire(timer_source);
+ }
+ todo += fd_source->poll(fd_source, ms_timeout);
+
for (p = QB_LOOP_HIGH; p >= p_stop; p--) {
- done = qb_loop_run_level(&l->level[p]);
+ todo -= qb_loop_run_level(&l->level[p]);
if (l->stop_requested) {
return;
}
- todo -= done;
- }
- if (p_stop > QB_LOOP_LOW) {
- p_stop--;
- fd_poll_done = QB_FALSE;
- goto poll_again;
- }
- if (todo == 0) {
- ms_timeout = qb_loop_timer_msec_duration_to_expire(timer_source);
- todo = fd_source->poll(fd_source, ms_timeout);
- fd_poll_done = QB_TRUE;
- } else {
- fd_poll_done = QB_FALSE;
}
} while (!l->stop_requested);
}
diff --git a/lib/loop_job.c b/lib/loop_job.c
index 9323995..e3e83e8 100644
--- a/lib/loop_job.c
+++ b/lib/loop_job.c
@@ -52,10 +52,10 @@ static int32_t get_more_jobs(struct qb_loop_source* s, int32_t ms_timeout)
// this is simple, move jobs from wait_head to job_head
for (p = QB_LOOP_LOW; p <= QB_LOOP_HIGH; p++) {
if (!qb_list_empty(&s->l->level[p].wait_head)) {
+ new_jobs += qb_list_length(&s->l->level[p].wait_head);
qb_list_splice(&s->l->level[p].wait_head, &s->l->level[p].job_head);
qb_list_init(&s->l->level[p].wait_head);
}
- new_jobs += qb_list_length(&s->l->level[p].job_head);
}
return new_jobs;
}
@@ -78,7 +78,15 @@ int32_t qb_loop_job_add(struct qb_loop *l,
void *data,
qb_loop_job_dispatch_fn dispatch_fn)
{
- struct qb_loop_job *job = malloc(sizeof(struct qb_loop_job));
+ struct qb_loop_job *job;
+
+ if (l == NULL || dispatch_fn == NULL) {
+ return -EINVAL;
+ }
+ if (p < QB_LOOP_LOW || p > QB_LOOP_HIGH) {
+ return -EINVAL;
+ }
+ job = malloc(sizeof(struct qb_loop_job));
job->dispatch_fn = dispatch_fn;
job->item.user_data = data;
diff --git a/tests/.gitignore b/tests/.gitignore
index 7d7d1b8..e2fa575 100644
--- a/tests/.gitignore
+++ b/tests/.gitignore
@@ -1,4 +1,5 @@
check_ipc
+check_loop
check_array
check_rb
bmc
diff --git a/tests/Makefile.am b/tests/Makefile.am
index dcc0911..001c412 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -47,9 +47,9 @@ loop_LDADD = -lrt $(top_builddir)/lib/libqb.la
if HAVE_CHECK
-TESTS = check_array check_rb check_ipc
+TESTS = check_array check_rb check_loop check_ipc
-check_PROGRAMS = check_array check_rb check_ipc
+check_PROGRAMS = check_array check_rb check_loop check_ipc
check_array_SOURCES = check_array.c $(top_builddir)/include/qb/qbarray.h
check_array_CFLAGS = @CHECK_CFLAGS@ -I$(top_srcdir)/include
@@ -59,6 +59,10 @@ check_rb_SOURCES = check_rb.c $(top_builddir)/include/qb/qbrb.h
check_rb_CFLAGS = @CHECK_CFLAGS@ -I$(top_srcdir)/include
check_rb_LDADD = $(top_builddir)/lib/libqb.la -lrt @CHECK_LIBS@
+check_loop_SOURCES = check_loop.c $(top_builddir)/include/qb/qbloop.h
+check_loop_CFLAGS = @CHECK_CFLAGS@ -I$(top_srcdir)/include
+check_loop_LDADD = $(top_builddir)/lib/libqb.la -lrt @CHECK_LIBS@
+
check_ipc_SOURCES = check_ipc.c $(top_builddir)/include/qb/qbipcc.h $(top_builddir)/include/qb/qbipcs.h
check_ipc_CFLAGS = @CHECK_CFLAGS@ -I$(top_srcdir)/include
check_ipc_LDADD = $(top_builddir)/lib/libqb.la -lrt @CHECK_LIBS@
diff --git a/tests/check_loop.c b/tests/check_loop.c
new file mode 100644
index 0000000..74d54f4
--- /dev/null
+++ b/tests/check_loop.c
@@ -0,0 +1,197 @@
+/*
+ * Copyright (c) 2010 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Angus Salkeld <asalkeld(a)redhat.com>
+ *
+ * This file is part of libqb.
+ *
+ * libqb is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation, either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * libqb is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with libqb. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "os_base.h"
+#include <check.h>
+
+#include <qb/qbdefs.h>
+#include <qb/qbutil.h>
+#include <qb/qbloop.h>
+
+static int32_t job_1_run_count = 0;
+static int32_t job_2_run_count = 0;
+static int32_t job_3_run_count = 0;
+static int32_t job_4_run_count = 0;
+
+static void job_1(void *data)
+{
+ job_1_run_count++;
+}
+
+static void job_stop(void *data)
+{
+ qb_loop_t *l = (qb_loop_t *)data;
+ job_3_run_count++;
+ qb_loop_stop(l);
+}
+static void job_2(void *data)
+{
+ int32_t res;
+ qb_loop_t *l = (qb_loop_t *)data;
+ job_2_run_count++;
+ res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_stop);
+ ck_assert_int_eq(res, 0);
+}
+static void job_1_r(void *data)
+{
+ int32_t res;
+ qb_loop_t *l = (qb_loop_t *)data;
+ job_1_run_count++;
+ res = qb_loop_job_add(l, QB_LOOP_MED, data, job_2);
+ ck_assert_int_eq(res, 0);
+}
+static void job_1_add_nuts(void *data)
+{
+ int32_t res;
+ qb_loop_t *l = (qb_loop_t *)data;
+ job_1_run_count++;
+ res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
+ res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
+ res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
+ res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
+ res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
+ res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
+ res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
+ res = qb_loop_job_add(l, QB_LOOP_HIGH, data, job_1);
+ res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1);
+ res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1);
+ res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1);
+ res = qb_loop_job_add(l, QB_LOOP_MED, data, job_1);
+ res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_1);
+ res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_1);
+ if (job_1_run_count < 500) {
+ res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_1_add_nuts);
+ } else {
+ res = qb_loop_job_add(l, QB_LOOP_LOW, data, job_stop);
+ }
+ ck_assert_int_eq(res, 0);
+}
+
+START_TEST(test_loop_job_input)
+{
+ int32_t res;
+ qb_loop_t *l = qb_loop_create();
+ fail_if(l == NULL);
+
+ res = qb_loop_job_add(NULL, QB_LOOP_LOW, NULL, job_2);
+ ck_assert_int_eq(res, -EINVAL);
+ res = qb_loop_job_add(l, 89, NULL, job_2);
+ ck_assert_int_eq(res, -EINVAL);
+ res = qb_loop_job_add(l, QB_LOOP_LOW, NULL, NULL);
+ ck_assert_int_eq(res, -EINVAL);
+
+}
+END_TEST
+
+START_TEST(test_loop_job_1)
+{
+ int32_t res;
+ qb_loop_t *l = qb_loop_create();
+ fail_if(l == NULL);
+
+ res = qb_loop_job_add(l, QB_LOOP_LOW, NULL, job_1);
+ ck_assert_int_eq(res, 0);
+ res = qb_loop_job_add(l, QB_LOOP_LOW, l, job_stop);
+ ck_assert_int_eq(res, 0);
+
+ qb_loop_run(l);
+ ck_assert_int_eq(job_1_run_count, 1);
+}
+END_TEST
+
+START_TEST(test_loop_job_4)
+{
+ int32_t res;
+ qb_loop_t *l = qb_loop_create();
+ fail_if(l == NULL);
+
+ res = qb_loop_job_add(l, QB_LOOP_LOW, l, job_1_r);
+ ck_assert_int_eq(res, 0);
+
+ qb_loop_run(l);
+ ck_assert_int_eq(job_1_run_count, 1);
+ ck_assert_int_eq(job_2_run_count, 1);
+ ck_assert_int_eq(job_3_run_count, 1);
+}
+END_TEST
+
+
+START_TEST(test_loop_job_nuts)
+{
+ int32_t res;
+ qb_loop_t *l = qb_loop_create();
+ fail_if(l == NULL);
+
+ res = qb_loop_job_add(l, QB_LOOP_LOW, l, job_1_add_nuts);
+ ck_assert_int_eq(res, 0);
+
+ qb_loop_run(l);
+ fail_if(job_1_run_count < 500);
+}
+END_TEST
+
+
+static Suite *rb_suite(void)
+{
+ TCase *tc;
+ Suite *s = suite_create("qb_loop_job");
+
+ tc = tcase_create("limits");
+ tcase_add_test(tc, test_loop_job_input);
+ suite_add_tcase(s, tc);
+
+ tc = tcase_create("run_one");
+ tcase_add_test(tc, test_loop_job_1);
+ suite_add_tcase(s, tc);
+
+ tc = tcase_create("run_recursive");
+ tcase_add_test(tc, test_loop_job_4);
+ suite_add_tcase(s, tc);
+
+ tc = tcase_create("run_500");
+ tcase_add_test(tc, test_loop_job_nuts);
+ suite_add_tcase(s, tc);
+
+ return s;
+}
+
+static void libqb_log_fn(const char *file_name,
+ int32_t file_line, int32_t severity, const char *msg)
+{
+ printf("libqb: %s:%d %s\n", file_name, file_line, msg);
+}
+
+int32_t main(void)
+{
+ int32_t number_failed;
+
+ Suite *s = rb_suite();
+ SRunner *sr = srunner_create(s);
+
+ qb_util_set_log_function(libqb_log_fn);
+
+ srunner_run_all(sr, CK_VERBOSE);
+ number_failed = srunner_ntests_failed(sr);
+ srunner_free(sr);
+ return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
+}
--
1.7.2.3
13 years, 5 months
[PATCH 1/5] IPC: add job_add() API to the poll abstraction.
by Angus Salkeld
Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
include/qb/qbipcs.h | 4 ++++
lib/ipcs.c | 1 +
tests/bms.c | 7 +++++++
3 files changed, 12 insertions(+), 0 deletions(-)
diff --git a/include/qb/qbipcs.h b/include/qb/qbipcs.h
index b04563c..890c757 100644
--- a/include/qb/qbipcs.h
+++ b/include/qb/qbipcs.h
@@ -77,8 +77,12 @@ typedef int32_t (*qb_ipcs_dispatch_mod_fn)(enum qb_loop_priority p,
qb_ipcs_dispatch_fn_t fn);
typedef int32_t (*qb_ipcs_dispatch_del_fn)(int32_t fd);
+typedef int32_t (*qb_ipcs_job_add_fn)(enum qb_loop_priority p,
+ void *data,
+ qb_loop_job_dispatch_fn dispatch_fn);
struct qb_ipcs_poll_handlers {
+ qb_ipcs_job_add_fn job_add;
qb_ipcs_dispatch_add_fn dispatch_add;
qb_ipcs_dispatch_mod_fn dispatch_mod;
qb_ipcs_dispatch_del_fn dispatch_del;
diff --git a/lib/ipcs.c b/lib/ipcs.c
index c04956e..dbfcdb0 100644
--- a/lib/ipcs.c
+++ b/lib/ipcs.c
@@ -71,6 +71,7 @@ void qb_ipcs_poll_handlers_set(qb_ipcs_service_pt pt,
qb_hdb_handle_get(&qb_ipc_services, pt, (void **)&s);
+ s->poll_fns.job_add = handlers->job_add;
s->poll_fns.dispatch_add = handlers->dispatch_add;
s->poll_fns.dispatch_mod = handlers->dispatch_mod;
s->poll_fns.dispatch_del = handlers->dispatch_del;
diff --git a/tests/bms.c b/tests/bms.c
index 3f1a3cd..f3327b2 100644
--- a/tests/bms.c
+++ b/tests/bms.c
@@ -232,6 +232,11 @@ static int32_t my_g_dispatch_del(int32_t fd)
#endif /* HAVE_GLIB */
+static int32_t my_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn)
+{
+ return qb_loop_job_add(bms_loop, p, data, fn);
+}
+
static int32_t my_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
void *data, qb_ipcs_dispatch_fn_t fn)
{
@@ -262,12 +267,14 @@ int32_t main(int32_t argc, char *argv[])
.connection_destroyed = s1_connection_destroyed_fn,
};
struct qb_ipcs_poll_handlers ph = {
+ .job_add = my_job_add,
.dispatch_add = my_dispatch_add,
.dispatch_mod = my_dispatch_mod,
.dispatch_del = my_dispatch_del,
};
#ifdef HAVE_GLIB
struct qb_ipcs_poll_handlers glib_ph = {
+ .job_add = NULL, // FIXME
.dispatch_add = my_g_dispatch_add,
.dispatch_mod = my_g_dispatch_mod,
.dispatch_del = my_g_dispatch_del,
--
1.7.2.3
13 years, 5 months
[PATCH 1/2] fix some build issues on FreeBSD
by Angus Salkeld
Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
configure.ac | 6 ++++++
include/os_base.h | 4 ++++
include/qb/qbloop.h | 1 -
lib/Makefile.am | 4 ++--
lib/atomic_lock.c | 1 +
lib/hdb.c | 2 +-
lib/loop_poll.c | 2 +-
lib/ringbuffer_helper.c | 12 +++++++++++-
lib/util.c | 2 ++
9 files changed, 28 insertions(+), 6 deletions(-)
diff --git a/configure.ac b/configure.ac
index 12d2565..4a2e8a4 100644
--- a/configure.ac
+++ b/configure.ac
@@ -182,6 +182,10 @@ if test x"$GCC" = xyes; then
memory_barrier_needed=no
;;
esac
+ AC_MSG_WARN([-----------------------------])
+ AC_MSG_WARN([You have gcc but not __sync_bool_compare_and_swap])
+ AC_MSG_WARN([try CFLAGS="-march=<your arch> -mtune=native" ./configure])
+ AC_MSG_WARN([-----------------------------])
fi
fi
@@ -236,6 +240,7 @@ case "$host_os" in
*linux*)
AC_DEFINE_UNQUOTED([QB_LINUX], [1],
[Compiling for Linux platform])
+ LINT_FLAGS+=" -expect 41"
;;
darwin*)
AC_DEFINE_UNQUOTED([QB_DARWIN], [1],
@@ -261,6 +266,7 @@ case "$host_os" in
[Compiling for FreeBSD >= 8 platform])
;;
esac
+ LINT_FLAGS+=" -expect 55"
;;
*solaris*)
AC_DEFINE_UNQUOTED([QB_SOLARIS], [1],
diff --git a/include/os_base.h b/include/os_base.h
index 533e027..6123289 100644
--- a/include/os_base.h
+++ b/include/os_base.h
@@ -75,6 +75,10 @@
#include <sys/time.h>
#endif /* HAVE_SYS_TIME_H */
+#ifdef HAVE_SYS_STAT_H
+#include <sys/stat.h>
+#endif /* HAVE_SYS_STAT_H */
+
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif /* HAVE_FCNTL_H */
diff --git a/include/qb/qbloop.h b/include/qb/qbloop.h
index b5c9d3a..cb47085 100644
--- a/include/qb/qbloop.h
+++ b/include/qb/qbloop.h
@@ -44,7 +44,6 @@ typedef struct qb_loop qb_loop_t;
typedef void *qb_loop_job_handle;
typedef void *qb_loop_timer_handle;
-#define qb_poll_timer_handle qb_loop_timer_handle
typedef int32_t (*qb_loop_poll_dispatch_fn) (int32_t fd, int32_t revents, void *data);
typedef void (*qb_loop_job_dispatch_fn)(void *data);
diff --git a/lib/Makefile.am b/lib/Makefile.am
index c46fff6..a8906da 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -56,9 +56,9 @@ TESTS = $(check_SCRIPTS)
ALL_LINT_FLAGS = $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) \
$(libqb_la_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) \
- $(LINT_FLAGS) -expect 41
+ $(LINT_FLAGS)
-run_splint.sh:
+run_splint.sh: $(top_srcdir)/configure.ac
echo "$(SPLINT) $(ALL_LINT_FLAGS) $(source_to_lint)" > $@
chmod +x $@
endif
diff --git a/lib/atomic_lock.c b/lib/atomic_lock.c
index 3384849..595cc3d 100644
--- a/lib/atomic_lock.c
+++ b/lib/atomic_lock.c
@@ -50,6 +50,7 @@ void qb_atomic_init(void)
if (qb_atomic_mutex == NULL) {
qb_atomic_mutex = qb_thread_lock_create(QB_THREAD_LOCK_SHORT);
}
+ assert(qb_atomic_mutex);
}
int32_t
diff --git a/lib/hdb.c b/lib/hdb.c
index bbb9108..837eefc 100644
--- a/lib/hdb.c
+++ b/lib/hdb.c
@@ -33,8 +33,8 @@ static void qb_hdb_create_first_run(struct qb_hdb *hdb)
{
if (hdb->first_run == 1) {
hdb->first_run = 0;
- hdb->handles = qb_array_create(32, sizeof(struct qb_hdb_handle));
qb_atomic_init();
+ hdb->handles = qb_array_create(32, sizeof(struct qb_hdb_handle));
}
}
diff --git a/lib/loop_poll.c b/lib/loop_poll.c
index ad5f199..b39bf05 100644
--- a/lib/loop_poll.c
+++ b/lib/loop_poll.c
@@ -225,7 +225,7 @@ static int32_t poll_and_add_to_jobs(struct qb_loop_source* src, int32_t ms_timeo
// empty
continue;
}
- pe = &s->poll_entries[i];
+ assert(qb_array_index(my_src->poll_entries, i, (void**)&pe) == 0);
if (s->ufds[i].revents == pe->ufd.revents) {
// entry already in the job queue.
continue;
diff --git a/lib/ringbuffer_helper.c b/lib/ringbuffer_helper.c
index 7a2034c..08548ab 100644
--- a/lib/ringbuffer_helper.c
+++ b/lib/ringbuffer_helper.c
@@ -64,9 +64,10 @@ sem_wait_again:
static int32_t my_sysv_sem_timedwait(qb_ringbuffer_t * rb, int32_t ms_timeout)
{
struct sembuf sops[1];
+ int32_t res = 0;
+#ifndef QB_FREEBSD_GE_8
struct timespec ts_timeout;
struct timespec *ts_pt;
- int32_t res = 0;
if (ms_timeout >= 0) {
/*
@@ -80,16 +81,25 @@ static int32_t my_sysv_sem_timedwait(qb_ringbuffer_t * rb, int32_t ms_timeout)
} else {
ts_pt = NULL;
}
+#endif /* bsd */
/*
* wait for sem post.
*/
sops[0].sem_num = 0;
sops[0].sem_op = -1;
+#ifdef QB_FREEBSD_GE_8
+ sops[0].sem_flg = IPC_NOWAIT;
+#else
sops[0].sem_flg = 0;
+#endif /* bsd */
semop_again:
+#ifdef QB_FREEBSD_GE_8
+ if (semop(rb->sem_id, sops, 1) == -1) {
+#else
if (semtimedop(rb->sem_id, sops, 1, ts_pt) == -1) {
+#endif
if (errno == EINTR) {
goto semop_again;
} else if (errno == EAGAIN) {
diff --git a/lib/util.c b/lib/util.c
index 3104358..947e7cb 100644
--- a/lib/util.c
+++ b/lib/util.c
@@ -47,11 +47,13 @@ qb_thread_lock_t *qb_thread_lock_create(qb_thread_lock_type_t type)
int32_t res;
#if defined(HAVE_PTHREAD_SPIN_LOCK)
+#if _POSIX_THREAD_PROCESS_SHARED > 0
if (type == QB_THREAD_LOCK_SHORT) {
tl->type = QB_THREAD_LOCK_SHORT;
res = pthread_spin_init(&tl->spinlock, 1);
} else
#endif
+#endif
{
tl->type = QB_THREAD_LOCK_LONG;
res = pthread_mutex_init(&tl->mutex, NULL);
--
1.7.2.3
13 years, 6 months
[PATCH 1/2] IPC: add stats to server end.
by Angus Salkeld
Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
include/qb/qbipcs.h | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++
lib/ipc_int.h | 2 +
lib/ipc_us.c | 2 +
lib/ipcs.c | 51 +++++++++++++++++++++++++++++++++++++++-
tests/bms.c | 33 +++++++++++++++++++++----
5 files changed, 146 insertions(+), 7 deletions(-)
diff --git a/include/qb/qbipcs.h b/include/qb/qbipcs.h
index 35e0868..3517eed 100644
--- a/include/qb/qbipcs.h
+++ b/include/qb/qbipcs.h
@@ -46,6 +46,22 @@ typedef struct qb_ipcs_connection qb_ipcs_connection_t;
typedef qb_handle_t qb_ipcs_service_pt;
+struct qb_ipcs_stats {
+ uint32_t active_connections;
+ uint32_t closed_connections;
+};
+
+struct qb_ipcs_connection_stats {
+ int32_t client_pid;
+ uint64_t requests;
+ uint64_t responses;
+ uint64_t events;
+ uint64_t send_retries;
+ uint64_t recv_retries;
+ int32_t flow_control_state;
+ uint64_t flow_control_count;
+};
+
typedef int32_t (*qb_ipcs_dispatch_fn_t) (int32_t fd, int32_t revents,
void *data);
@@ -109,44 +125,57 @@ qb_ipcs_service_pt qb_ipcs_create(const char *name,
/**
* Set your poll callbacks.
+ * @param s service instance
*/
void qb_ipcs_poll_handlers_set(qb_ipcs_service_pt s,
struct qb_ipcs_poll_handlers *handlers);
/**
* run the new IPC server.
+ * @param s service instance
+ * @return 0 == ok; -errno to indicate a failure
*/
int32_t qb_ipcs_run(qb_ipcs_service_pt s);
/**
* Destroy the IPC server.
+ * @param s service instance
*/
void qb_ipcs_destroy(qb_ipcs_service_pt s);
+/**
+ *
+ * @param s service instance
+ */
void qb_ipcs_request_rate_limit(qb_ipcs_service_pt pt, enum qb_ipcs_rate_limit rl);
/**
* send a response to a incomming request.
+ * @param c connection instance
*/
ssize_t qb_ipcs_response_send(qb_ipcs_connection_t *c, const void *data, size_t size);
/**
* Send an asyncronous event message to the client.
+ * @param c connection instance
*/
ssize_t qb_ipcs_event_send(qb_ipcs_connection_t *c, const void *data, size_t size);
/**
* Send an asyncronous event message to the client.
+ * @param c connection instance
*/
ssize_t qb_ipcs_event_sendv(qb_ipcs_connection_t *c, const struct iovec * iov, size_t iov_len);
/**
* Increment the connection's reference counter.
+ * @param c connection instance
*/
void qb_ipcs_connection_ref_inc(qb_ipcs_connection_t *c);
/**
* Decrement the connection's reference counter.
+ * @param c connection instance
*/
void qb_ipcs_connection_ref_dec(qb_ipcs_connection_t *c);
@@ -157,10 +186,46 @@ void qb_ipcs_connection_ref_dec(qb_ipcs_connection_t *c);
*/
int32_t qb_ipcs_service_id_get(qb_ipcs_connection_t *c);
+/**
+ * Associate a "user" pointer with this connection.
+ *
+ * @param context the point to associate with this connection.
+ * @param c connection instance
+ * @see qb_ipcs_context_get()
+ */
void qb_ipcs_context_set(qb_ipcs_connection_t *c, void *context);
+/**
+ * Get the context (set previously)
+ *
+ * @param c connection instance
+ * @return the context
+ * @see qb_ipcs_context_set()
+ */
void *qb_ipcs_context_get(qb_ipcs_connection_t *c);
+/**
+ * Get the connection statistics.
+ *
+ * @param clear_after_read clear stats after copying them into stats
+ * @param c connection instance
+ * @return 0 == ok; -errno to indicate a failure
+ */
+int32_t qb_ipcs_connection_stats_get(qb_ipcs_connection_t *c,
+ struct qb_ipcs_connection_stats* stats,
+ int32_t clear_after_read);
+
+/**
+ * Get the service statistics.
+ *
+ * @param clear_after_read clear stats after copying them into stats
+ * @param s service instance
+ * @return 0 == ok; -errno to indicate a failure
+ */
+int32_t qb_ipcs_stats_get(qb_ipcs_service_pt s,
+ struct qb_ipcs_stats* stats,
+ int32_t clear_after_read);
+
/* *INDENT-OFF* */
#ifdef __cplusplus
diff --git a/lib/ipc_int.h b/lib/ipc_int.h
index 10183b9..5ba2147 100644
--- a/lib/ipc_int.h
+++ b/lib/ipc_int.h
@@ -163,6 +163,7 @@ struct qb_ipcs_service {
enum qb_loop_priority poll_priority;
struct qb_list_head connections;
+ struct qb_ipcs_stats stats;
};
struct qb_ipcs_connection {
@@ -179,6 +180,7 @@ struct qb_ipcs_connection {
char *receive_buf;
void *context;
int32_t fc_enabled;
+ struct qb_ipcs_connection_stats stats;
};
void qb_ipcs_pmq_init(struct qb_ipcs_service *s);
diff --git a/lib/ipc_us.c b/lib/ipc_us.c
index 32c687d..9df5345 100644
--- a/lib/ipc_us.c
+++ b/lib/ipc_us.c
@@ -520,6 +520,7 @@ static int32_t handle_new_connection(struct qb_ipcs_service *s,
c->pid = ugp->pid;
c->euid = ugp->uid;
c->egid = ugp->gid;
+ c->stats.client_pid = ugp->pid;
if (c->service->serv_fns.connection_accept) {
res = c->service->serv_fns.connection_accept(c,
@@ -567,6 +568,7 @@ send_response:
response.connection = (intptr_t)c;
response.connection_type = s->type;
response.max_msg_size = c->request.max_msg_size;
+ s->stats.active_connections++;
}
qb_ipc_us_send(&c->setup, &response, response.hdr.size);
diff --git a/lib/ipcs.c b/lib/ipcs.c
index c92baa4..f45561c 100644
--- a/lib/ipcs.c
+++ b/lib/ipcs.c
@@ -203,6 +203,11 @@ ssize_t qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data,
qb_ipcs_connection_ref_inc(c);
res = c->service->funcs.send(&c->response, data, size);
+ if (res == size) {
+ c->stats.responses++;
+ } else if (res == -EAGAIN) {
+ c->stats.send_retries++;
+ }
qb_ipcs_connection_ref_dec(c);
return res;
@@ -220,6 +225,11 @@ ssize_t qb_ipcs_event_send(struct qb_ipcs_connection *c, const void *data,
do {
try_count++;
res = c->service->funcs.send(&c->event, data, size);
+ if (res == size) {
+ c->stats.events++;
+ } else if (res == -EAGAIN) {
+ c->stats.send_retries++;
+ }
} while (res == -EAGAIN && try_count < 20);
if (res > 0) {
if (c->service->needs_sock_for_poll) {
@@ -249,6 +259,11 @@ ssize_t qb_ipcs_event_sendv(struct qb_ipcs_connection *c, const struct iovec * i
do {
try_count++;
res = c->service->funcs.sendv(&c->event, iov, iov_len);
+ if (res > 0) {
+ c->stats.events++;
+ } else if (res == -EAGAIN) {
+ c->stats.send_retries++;
+ }
} while (res == -EAGAIN && try_count < 20);
if (res > 0) {
if (c->service->needs_sock_for_poll) {
@@ -269,7 +284,7 @@ ssize_t qb_ipcs_event_sendv(struct qb_ipcs_connection *c, const struct iovec * i
struct qb_ipcs_connection *qb_ipcs_connection_alloc(struct qb_ipcs_service *s)
{
- struct qb_ipcs_connection *c = malloc(sizeof(struct qb_ipcs_connection));
+ struct qb_ipcs_connection *c = calloc(1, sizeof(struct qb_ipcs_connection));
c->refcount = 1;
c->service = s;
@@ -295,6 +310,8 @@ void qb_ipcs_connection_ref_dec(struct qb_ipcs_connection *c)
kill_it = qb_atomic_int_dec_and_test(&c->refcount);
if (kill_it) {
qb_util_log(LOG_DEBUG, "%s() %d", __func__, c->refcount);
+ c->service->stats.active_connections--;
+ c->service->stats.closed_connections++;
qb_list_del(&c->list);
if (c->service->serv_fns.connection_destroyed) {
c->service->serv_fns.connection_destroyed(c);
@@ -325,6 +342,8 @@ static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t fc_ena
if (c->fc_enabled != fc_enable) {
c->service->funcs.fc_set(&c->request, fc_enable);
c->fc_enabled = fc_enable;
+ c->stats.flow_control_state = fc_enable;
+ c->stats.flow_control_count++;
}
}
@@ -348,10 +367,13 @@ static int32_t _process_request_(struct qb_ipcs_connection *c,
if (size < 0) {
if (size != -EAGAIN) {
qb_util_log(LOG_ERR, "%s(): %s", __func__, strerror(-res));
+ } else {
+ c->stats.recv_retries++;
}
res = size;
goto cleanup;
}
+ c->stats.requests++;
if (hdr->id == QB_IPC_MSG_DISCONNECT) {
qb_util_log(LOG_DEBUG, "%s() QB_IPC_MSG_DISCONNECT", __func__);
@@ -466,3 +488,30 @@ void *qb_ipcs_context_get(struct qb_ipcs_connection *c)
return c->context;
}
+int32_t qb_ipcs_connection_stats_get(qb_ipcs_connection_t *c,
+ struct qb_ipcs_connection_stats* stats,
+ int32_t clear_after_read)
+{
+ memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats));
+ if (clear_after_read) {
+ memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats));
+ c->stats.client_pid = c->pid;
+ }
+ return 0;
+}
+
+int32_t qb_ipcs_stats_get(qb_ipcs_service_pt pt,
+ struct qb_ipcs_stats* stats,
+ int32_t clear_after_read)
+{
+ struct qb_ipcs_service *s;
+
+ qb_hdb_handle_get(&qb_ipc_services, pt, (void **)&s);
+ memcpy(stats, &s->stats, sizeof(struct qb_ipcs_stats));
+ if (clear_after_read) {
+ memset(&s->stats, 0, sizeof(struct qb_ipcs_stats));
+ }
+ qb_hdb_handle_put(&qb_ipc_services, pt);
+ return 0;
+}
+
diff --git a/tests/bms.c b/tests/bms.c
index 546bd65..3f1a3cd 100644
--- a/tests/bms.c
+++ b/tests/bms.c
@@ -61,15 +61,36 @@ static int32_t s1_connection_accept_fn(qb_ipcs_connection_t *c, uid_t uid, gid_t
static void s1_connection_created_fn(qb_ipcs_connection_t *c)
{
- if (verbose) {
- printf("%s:%d %s\n", __FILE__, __LINE__, __func__);
- }
+ struct qb_ipcs_stats srv_stats;
+
+ qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE);
+ printf("\n Connection created\n > active:%d\n > closed:%d\n\n",
+ srv_stats.active_connections,
+ srv_stats.closed_connections);
}
+
static void s1_connection_destroyed_fn(qb_ipcs_connection_t *c)
{
- if (verbose) {
- printf("%s:%d %s\n", __FILE__, __LINE__, __func__);
- }
+ struct qb_ipcs_connection_stats stats;
+ struct qb_ipcs_stats srv_stats;
+
+ qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE);
+
+ qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
+
+ printf("\n Connection to pid:%d destroyed\n > active:%d\n > closed:%d\n\n",
+ stats.client_pid,
+ srv_stats.active_connections,
+ srv_stats.closed_connections);
+
+ printf(" Requests %"PRIu64"\n", stats.requests);
+ printf(" Responses %"PRIu64"\n", stats.responses);
+ printf(" Events %"PRIu64"\n", stats.events);
+ printf(" Send retries %"PRIu64"\n", stats.send_retries);
+ printf(" Recv retries %"PRIu64"\n", stats.recv_retries);
+ printf(" FC state %d\n", stats.flow_control_state);
+ printf(" FC count %"PRIu64"\n\n", stats.flow_control_count);
+
}
static int32_t s1_msg_process_fn(qb_ipcs_connection_t *c,
--
1.7.2.3
13 years, 6 months
[PATCH] IPC: add stats to server end.
by Angus Salkeld
Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
include/qb/qbipcs.h | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++
lib/ipc_int.h | 2 +
lib/ipc_us.c | 2 +
lib/ipcs.c | 44 +++++++++++++++++++++++++++++++++-
tests/bms.c | 33 +++++++++++++++++++++----
5 files changed, 139 insertions(+), 7 deletions(-)
diff --git a/include/qb/qbipcs.h b/include/qb/qbipcs.h
index 35e0868..3517eed 100644
--- a/include/qb/qbipcs.h
+++ b/include/qb/qbipcs.h
@@ -46,6 +46,22 @@ typedef struct qb_ipcs_connection qb_ipcs_connection_t;
typedef qb_handle_t qb_ipcs_service_pt;
+struct qb_ipcs_stats {
+ uint32_t active_connections;
+ uint32_t closed_connections;
+};
+
+struct qb_ipcs_connection_stats {
+ int32_t client_pid;
+ uint64_t requests;
+ uint64_t responses;
+ uint64_t events;
+ uint64_t send_retries;
+ uint64_t recv_retries;
+ int32_t flow_control_state;
+ uint64_t flow_control_count;
+};
+
typedef int32_t (*qb_ipcs_dispatch_fn_t) (int32_t fd, int32_t revents,
void *data);
@@ -109,44 +125,57 @@ qb_ipcs_service_pt qb_ipcs_create(const char *name,
/**
* Set your poll callbacks.
+ * @param s service instance
*/
void qb_ipcs_poll_handlers_set(qb_ipcs_service_pt s,
struct qb_ipcs_poll_handlers *handlers);
/**
* run the new IPC server.
+ * @param s service instance
+ * @return 0 == ok; -errno to indicate a failure
*/
int32_t qb_ipcs_run(qb_ipcs_service_pt s);
/**
* Destroy the IPC server.
+ * @param s service instance
*/
void qb_ipcs_destroy(qb_ipcs_service_pt s);
+/**
+ *
+ * @param s service instance
+ */
void qb_ipcs_request_rate_limit(qb_ipcs_service_pt pt, enum qb_ipcs_rate_limit rl);
/**
* send a response to a incomming request.
+ * @param c connection instance
*/
ssize_t qb_ipcs_response_send(qb_ipcs_connection_t *c, const void *data, size_t size);
/**
* Send an asyncronous event message to the client.
+ * @param c connection instance
*/
ssize_t qb_ipcs_event_send(qb_ipcs_connection_t *c, const void *data, size_t size);
/**
* Send an asyncronous event message to the client.
+ * @param c connection instance
*/
ssize_t qb_ipcs_event_sendv(qb_ipcs_connection_t *c, const struct iovec * iov, size_t iov_len);
/**
* Increment the connection's reference counter.
+ * @param c connection instance
*/
void qb_ipcs_connection_ref_inc(qb_ipcs_connection_t *c);
/**
* Decrement the connection's reference counter.
+ * @param c connection instance
*/
void qb_ipcs_connection_ref_dec(qb_ipcs_connection_t *c);
@@ -157,10 +186,46 @@ void qb_ipcs_connection_ref_dec(qb_ipcs_connection_t *c);
*/
int32_t qb_ipcs_service_id_get(qb_ipcs_connection_t *c);
+/**
+ * Associate a "user" pointer with this connection.
+ *
+ * @param context the point to associate with this connection.
+ * @param c connection instance
+ * @see qb_ipcs_context_get()
+ */
void qb_ipcs_context_set(qb_ipcs_connection_t *c, void *context);
+/**
+ * Get the context (set previously)
+ *
+ * @param c connection instance
+ * @return the context
+ * @see qb_ipcs_context_set()
+ */
void *qb_ipcs_context_get(qb_ipcs_connection_t *c);
+/**
+ * Get the connection statistics.
+ *
+ * @param clear_after_read clear stats after copying them into stats
+ * @param c connection instance
+ * @return 0 == ok; -errno to indicate a failure
+ */
+int32_t qb_ipcs_connection_stats_get(qb_ipcs_connection_t *c,
+ struct qb_ipcs_connection_stats* stats,
+ int32_t clear_after_read);
+
+/**
+ * Get the service statistics.
+ *
+ * @param clear_after_read clear stats after copying them into stats
+ * @param s service instance
+ * @return 0 == ok; -errno to indicate a failure
+ */
+int32_t qb_ipcs_stats_get(qb_ipcs_service_pt s,
+ struct qb_ipcs_stats* stats,
+ int32_t clear_after_read);
+
/* *INDENT-OFF* */
#ifdef __cplusplus
diff --git a/lib/ipc_int.h b/lib/ipc_int.h
index 10183b9..5ba2147 100644
--- a/lib/ipc_int.h
+++ b/lib/ipc_int.h
@@ -163,6 +163,7 @@ struct qb_ipcs_service {
enum qb_loop_priority poll_priority;
struct qb_list_head connections;
+ struct qb_ipcs_stats stats;
};
struct qb_ipcs_connection {
@@ -179,6 +180,7 @@ struct qb_ipcs_connection {
char *receive_buf;
void *context;
int32_t fc_enabled;
+ struct qb_ipcs_connection_stats stats;
};
void qb_ipcs_pmq_init(struct qb_ipcs_service *s);
diff --git a/lib/ipc_us.c b/lib/ipc_us.c
index 32c687d..9df5345 100644
--- a/lib/ipc_us.c
+++ b/lib/ipc_us.c
@@ -520,6 +520,7 @@ static int32_t handle_new_connection(struct qb_ipcs_service *s,
c->pid = ugp->pid;
c->euid = ugp->uid;
c->egid = ugp->gid;
+ c->stats.client_pid = ugp->pid;
if (c->service->serv_fns.connection_accept) {
res = c->service->serv_fns.connection_accept(c,
@@ -567,6 +568,7 @@ send_response:
response.connection = (intptr_t)c;
response.connection_type = s->type;
response.max_msg_size = c->request.max_msg_size;
+ s->stats.active_connections++;
}
qb_ipc_us_send(&c->setup, &response, response.hdr.size);
diff --git a/lib/ipcs.c b/lib/ipcs.c
index c92baa4..4f268b2 100644
--- a/lib/ipcs.c
+++ b/lib/ipcs.c
@@ -203,6 +203,11 @@ ssize_t qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data,
qb_ipcs_connection_ref_inc(c);
res = c->service->funcs.send(&c->response, data, size);
+ if (res == size) {
+ c->stats.responses++;
+ } else if (res == -EAGAIN) {
+ c->stats.send_retries++;
+ }
qb_ipcs_connection_ref_dec(c);
return res;
@@ -220,6 +225,11 @@ ssize_t qb_ipcs_event_send(struct qb_ipcs_connection *c, const void *data,
do {
try_count++;
res = c->service->funcs.send(&c->event, data, size);
+ if (res == size) {
+ c->stats.events++;
+ } else if (res == -EAGAIN) {
+ c->stats.send_retries++;
+ }
} while (res == -EAGAIN && try_count < 20);
if (res > 0) {
if (c->service->needs_sock_for_poll) {
@@ -249,6 +259,11 @@ ssize_t qb_ipcs_event_sendv(struct qb_ipcs_connection *c, const struct iovec * i
do {
try_count++;
res = c->service->funcs.sendv(&c->event, iov, iov_len);
+ if (res > 0) {
+ c->stats.events++;
+ } else if (res == -EAGAIN) {
+ c->stats.send_retries++;
+ }
} while (res == -EAGAIN && try_count < 20);
if (res > 0) {
if (c->service->needs_sock_for_poll) {
@@ -269,7 +284,7 @@ ssize_t qb_ipcs_event_sendv(struct qb_ipcs_connection *c, const struct iovec * i
struct qb_ipcs_connection *qb_ipcs_connection_alloc(struct qb_ipcs_service *s)
{
- struct qb_ipcs_connection *c = malloc(sizeof(struct qb_ipcs_connection));
+ struct qb_ipcs_connection *c = calloc(1, sizeof(struct qb_ipcs_connection));
c->refcount = 1;
c->service = s;
@@ -295,6 +310,8 @@ void qb_ipcs_connection_ref_dec(struct qb_ipcs_connection *c)
kill_it = qb_atomic_int_dec_and_test(&c->refcount);
if (kill_it) {
qb_util_log(LOG_DEBUG, "%s() %d", __func__, c->refcount);
+ c->service->stats.active_connections--;
+ c->service->stats.closed_connections++;
qb_list_del(&c->list);
if (c->service->serv_fns.connection_destroyed) {
c->service->serv_fns.connection_destroyed(c);
@@ -325,6 +342,8 @@ static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t fc_ena
if (c->fc_enabled != fc_enable) {
c->service->funcs.fc_set(&c->request, fc_enable);
c->fc_enabled = fc_enable;
+ c->stats.flow_control_state = fc_enable;
+ c->stats.flow_control_count++;
}
}
@@ -348,10 +367,13 @@ static int32_t _process_request_(struct qb_ipcs_connection *c,
if (size < 0) {
if (size != -EAGAIN) {
qb_util_log(LOG_ERR, "%s(): %s", __func__, strerror(-res));
+ } else {
+ c->stats.recv_retries++;
}
res = size;
goto cleanup;
}
+ c->stats.requests++;
if (hdr->id == QB_IPC_MSG_DISCONNECT) {
qb_util_log(LOG_DEBUG, "%s() QB_IPC_MSG_DISCONNECT", __func__);
@@ -466,3 +488,23 @@ void *qb_ipcs_context_get(struct qb_ipcs_connection *c)
return c->context;
}
+int32_t qb_ipcs_connection_stats_get(qb_ipcs_connection_t *c,
+ struct qb_ipcs_connection_stats* stats,
+ int32_t clear_after_read)
+{
+ memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats));
+ return 0;
+}
+
+int32_t qb_ipcs_stats_get(qb_ipcs_service_pt pt,
+ struct qb_ipcs_stats* stats,
+ int32_t clear_after_read)
+{
+ struct qb_ipcs_service *s;
+
+ qb_hdb_handle_get(&qb_ipc_services, pt, (void **)&s);
+ memcpy(stats, &s->stats, sizeof(struct qb_ipcs_stats));
+ qb_hdb_handle_put(&qb_ipc_services, pt);
+ return 0;
+}
+
diff --git a/tests/bms.c b/tests/bms.c
index 546bd65..3f1a3cd 100644
--- a/tests/bms.c
+++ b/tests/bms.c
@@ -61,15 +61,36 @@ static int32_t s1_connection_accept_fn(qb_ipcs_connection_t *c, uid_t uid, gid_t
static void s1_connection_created_fn(qb_ipcs_connection_t *c)
{
- if (verbose) {
- printf("%s:%d %s\n", __FILE__, __LINE__, __func__);
- }
+ struct qb_ipcs_stats srv_stats;
+
+ qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE);
+ printf("\n Connection created\n > active:%d\n > closed:%d\n\n",
+ srv_stats.active_connections,
+ srv_stats.closed_connections);
}
+
static void s1_connection_destroyed_fn(qb_ipcs_connection_t *c)
{
- if (verbose) {
- printf("%s:%d %s\n", __FILE__, __LINE__, __func__);
- }
+ struct qb_ipcs_connection_stats stats;
+ struct qb_ipcs_stats srv_stats;
+
+ qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE);
+
+ qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
+
+ printf("\n Connection to pid:%d destroyed\n > active:%d\n > closed:%d\n\n",
+ stats.client_pid,
+ srv_stats.active_connections,
+ srv_stats.closed_connections);
+
+ printf(" Requests %"PRIu64"\n", stats.requests);
+ printf(" Responses %"PRIu64"\n", stats.responses);
+ printf(" Events %"PRIu64"\n", stats.events);
+ printf(" Send retries %"PRIu64"\n", stats.send_retries);
+ printf(" Recv retries %"PRIu64"\n", stats.recv_retries);
+ printf(" FC state %d\n", stats.flow_control_state);
+ printf(" FC count %"PRIu64"\n\n", stats.flow_control_count);
+
}
static int32_t s1_msg_process_fn(qb_ipcs_connection_t *c,
--
1.7.2.3
13 years, 6 months
[PATCH 1/2] TEST: add glib mainloop option to bms
by Angus Salkeld
Note: glib is only linked into the test app, so
libqb not dependant on glib. This is just testing
integration.
Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
configure.ac | 10 ++++
tests/Makefile.am | 4 +-
tests/bmc.c | 12 +----
tests/bms.c | 147 +++++++++++++++++++++++++++++++++++++++++------------
4 files changed, 128 insertions(+), 45 deletions(-)
diff --git a/configure.ac b/configure.ac
index 35fba01..f8d4678 100644
--- a/configure.ac
+++ b/configure.ac
@@ -92,9 +92,19 @@ AC_CHECK_LIB([dl], [dlopen])
AC_CHECK_LIB([pthread], [pthread_create])
AC_CHECK_LIB([socket], [socket])
+# look for testing harness "check"
PKG_CHECK_MODULES([CHECK], [check >= 0.9.4],[with_check=yes],[with_check=no])
AM_CONDITIONAL(HAVE_CHECK, test "${with_check}" = "yes")
+# look for GLIB (used for testing integration)
+PKG_CHECK_MODULES(GLIB, glib-2.0 >= 2.0, have_glib=yes, have_glib=no)
+AM_CONDITIONAL(HAVE_GLIB, test x$have_glib = xyes)
+AC_SUBST(GLIB_CFLAGS)
+AC_SUBST(GLIB_LIBS)
+if test x"$have_glib" = xyes; then
+AC_DEFINE_UNQUOTED([HAVE_GLIB], [1], [We have glib])
+fi
+
# Checks for header files.
AC_HEADER_STDC
AC_HEADER_SYS_WAIT
diff --git a/tests/Makefile.am b/tests/Makefile.am
index e4975c6..dcc0911 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -29,8 +29,8 @@ bmcpt_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include
bmcpt_LDADD = -lrt $(top_builddir)/lib/libqb.la
bms_SOURCES = bms.c $(top_builddir)/include/qb/qbipcs.h
-bms_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include
-bms_LDADD = -lrt $(top_builddir)/lib/libqb.la
+bms_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include $(GLIB_CFLAGS)
+bms_LDADD = -lrt $(top_builddir)/lib/libqb.la $(GLIB_LIBS)
rbwriter_SOURCES = rbwriter.c $(top_builddir)/include/qb/qbrb.h
rbwriter_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include
diff --git a/tests/bmc.c b/tests/bmc.c
index c44e416..5eafd56 100644
--- a/tests/bmc.c
+++ b/tests/bmc.c
@@ -18,17 +18,7 @@
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "config.h"
-#include <unistd.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <errno.h>
-#include <assert.h>
-#include <stdio.h>
-#include <stdint.h>
-#include <string.h>
-#include <sys/time.h>
-#include <time.h>
+#include "os_base.h"
#include <signal.h>
#include <qb/qbdefs.h>
diff --git a/tests/bms.c b/tests/bms.c
index 63a39d1..6b156e1 100644
--- a/tests/bms.c
+++ b/tests/bms.c
@@ -18,40 +18,27 @@
* You should have received a copy of the GNU Lesser General Public License
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
-
-#include <pthread.h>
-#include <assert.h>
-#include <sys/types.h>
-#include <sys/poll.h>
-#include <sys/uio.h>
-#include <sys/mman.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-#include <sys/time.h>
-#include <sys/resource.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <unistd.h>
-#include <fcntl.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <errno.h>
+#include "os_base.h"
#include <signal.h>
-#include <sched.h>
-#include <time.h>
-#include <stdarg.h>
-#include <sched.h>
#include <qb/qbdefs.h>
#include <qb/qbutil.h>
#include <qb/qbloop.h>
#include <qb/qbipcs.h>
+#ifdef HAVE_GLIB
+#include <glib.h>
+#endif
int32_t blocking = QB_TRUE;
int32_t events = QB_FALSE;
+int32_t use_glib = QB_FALSE;
int32_t verbose = 0;
static qb_loop_t *bms_loop;
+#ifdef HAVE_GLIB
+static GMainLoop *glib_loop;
+static qb_array_t *gio_map;
+#endif
static qb_ipcs_service_pt s1;
static int32_t s1_connection_accept_fn(qb_ipcs_connection_t *c, uid_t uid, gid_t gid)
@@ -145,9 +132,77 @@ static void show_usage(const char *name)
printf(" -p use posix message queues\n");
printf(" -s use sysv message queues\n");
printf(" -u use unix sockets\n");
+ printf(" -g use glib mainloop\n");
printf("\n");
}
+#ifdef HAVE_GLIB
+
+struct gio_to_qb_poll {
+ int32_t is_used;
+ GIOChannel *channel;
+ int32_t events;
+ void * data;
+ qb_ipcs_dispatch_fn_t fn;
+ enum qb_loop_priority p;
+};
+
+
+static gboolean
+gio_read_socket (GIOChannel *gio, GIOCondition condition, gpointer data)
+{
+ struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
+ gint fd = g_io_channel_unix_get_fd(gio);
+
+ return (adaptor->fn(fd, condition, adaptor->data) == 0);
+}
+
+static int32_t my_g_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
+ void *data, qb_ipcs_dispatch_fn_t fn)
+{
+ struct gio_to_qb_poll *adaptor;
+ GIOChannel *channel = g_io_channel_unix_new(fd);
+ if (!channel) {
+ g_error("couldn't create GIOChannel\n");
+ }
+ qb_array_grow(gio_map, fd + 1);
+ qb_array_index(gio_map, fd, (void**)&adaptor);
+
+ adaptor->is_used = QB_TRUE;
+ adaptor->channel = channel;
+ adaptor->fn = fn;
+ adaptor->events = evts;
+ adaptor->data = data;
+ adaptor->p = p;
+
+ if (!g_io_add_watch(channel, evts, gio_read_socket, adaptor))
+ {
+ g_error("Cannot add watch on GIOChannel!\n");
+ return -EINVAL;
+ }
+ return 0;
+}
+
+static int32_t my_g_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
+ void *data, qb_ipcs_dispatch_fn_t fn)
+{
+ //return qb_loop_poll_mod(bms_loop, p, fd, evts, data, fn);
+ return 0;
+}
+
+static int32_t my_g_dispatch_del(int32_t fd)
+{
+ struct gio_to_qb_poll *adaptor;
+ if (qb_array_index(gio_map, fd, (void**)&adaptor) == 0) {
+ g_io_channel_shutdown(adaptor->channel,
+ QB_FALSE, NULL);
+ adaptor->is_used = QB_FALSE;
+ }
+ return 0;
+}
+
+#endif /* HAVE_GLIB */
+
static int32_t my_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
void *data, qb_ipcs_dispatch_fn_t fn)
{
@@ -165,9 +220,10 @@ static int32_t my_dispatch_del(int32_t fd)
return qb_loop_poll_del(bms_loop, fd);
}
+
int32_t main(int32_t argc, char *argv[])
{
- const char *options = "nevhmpsu";
+ const char *options = "nevhmpsug";
int32_t opt;
enum qb_ipc_type ipc_type = QB_IPC_SHM;
struct qb_ipcs_service_handlers sh = {
@@ -181,6 +237,13 @@ int32_t main(int32_t argc, char *argv[])
.dispatch_mod = my_dispatch_mod,
.dispatch_del = my_dispatch_del,
};
+#ifdef HAVE_GLIB
+ struct qb_ipcs_poll_handlers glib_ph = {
+ .dispatch_add = my_g_dispatch_add,
+ .dispatch_mod = my_g_dispatch_mod,
+ .dispatch_del = my_g_dispatch_del,
+ };
+#endif /* HAVE_GLIB */
while ((opt = getopt(argc, argv, options)) != -1) {
switch (opt) {
@@ -202,6 +265,9 @@ int32_t main(int32_t argc, char *argv[])
case 'e': /* events */
events = QB_TRUE;
break;
+ case 'g':
+ use_glib = QB_TRUE;
+ break;
case 'v':
verbose++;
break;
@@ -218,17 +284,34 @@ int32_t main(int32_t argc, char *argv[])
qb_util_set_log_function(ipc_log_fn);
- bms_loop = qb_loop_create();
+ if (!use_glib) {
+ bms_loop = qb_loop_create();
+ s1 = qb_ipcs_create("bm1", 0, ipc_type, &sh);
+ if (s1 == 0) {
+ perror("qb_ipcs_create");
+ exit(1);
+ }
+ qb_ipcs_poll_handlers_set(s1, &ph);
+ qb_ipcs_run(s1);
+ qb_loop_run(bms_loop);
+ } else {
+#ifdef HAVE_GLIB
+ glib_loop = g_main_loop_new(NULL, FALSE);
- s1 = qb_ipcs_create("bm1", 0, ipc_type, &sh);
- if (s1 == 0) {
- perror("qb_ipcs_create");
- exit(1);
- }
- qb_ipcs_poll_handlers_set(s1, &ph);
+ gio_map = qb_array_create(64, sizeof(struct gio_to_qb_poll));
- qb_ipcs_run(s1);
- qb_loop_run(bms_loop);
+ s1 = qb_ipcs_create("bm1", 0, ipc_type, &sh);
+ if (s1 == 0) {
+ perror("qb_ipcs_create");
+ exit(1);
+ }
+ qb_ipcs_poll_handlers_set(s1, &glib_ph);
+ qb_ipcs_run(s1);
+ g_main_loop_run(glib_loop);
+#else
+ printf("You don't seem to have glib-devel installed.\n");
+#endif
+ }
return EXIT_SUCCESS;
}
--
1.7.2.3
13 years, 6 months
[PATCH 1/3] TEST: add some more array tests.
by Angus Salkeld
Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
lib/array.c | 11 +++++-
tests/check_array.c | 100 ++++++++++++++++++++++++++++++++++++---------------
2 files changed, 80 insertions(+), 31 deletions(-)
diff --git a/lib/array.c b/lib/array.c
index 75df814..0288416 100644
--- a/lib/array.c
+++ b/lib/array.c
@@ -44,6 +44,10 @@ qb_array_t* qb_array_create(size_t max_elements, size_t element_size)
errno = -EINVAL;
return NULL;
}
+ if (element_size < 1) {
+ errno = -EINVAL;
+ return NULL;
+ }
a->element_size = element_size;
a->max_elements = max_elements;
a->num_bins = (max_elements / MAX_BIN_ELEMENTS) + 1;
@@ -64,7 +68,10 @@ int32_t qb_array_index(struct qb_array* a, int32_t idx, void** element_out)
int32_t elem;
char *bin;
- if (idx >= a->max_elements) {
+ if (a == NULL || element_out == NULL) {
+ return -EINVAL;
+ }
+ if (idx >= a->max_elements || idx < 0) {
return -EINVAL;
}
b = BIN_NUM_GET(idx);
@@ -82,7 +89,7 @@ int32_t qb_array_grow(struct qb_array* a, size_t max_elements)
int32_t i;
int32_t old_bins;
- if (max_elements > (MAX_BIN_ELEMENTS*MAX_BINS)) {
+ if (a == NULL || max_elements > (MAX_BIN_ELEMENTS*MAX_BINS)) {
return -EINVAL;
}
if (max_elements <= a->max_elements) {
diff --git a/tests/check_array.c b/tests/check_array.c
index 777d924..99ba148 100644
--- a/tests/check_array.c
+++ b/tests/check_array.c
@@ -21,10 +21,7 @@
* along with libqb. If not, see <http://www.gnu.org/licenses/>.
*/
-#include <stdio.h>
-#include <stdlib.h>
-#include <syslog.h>
-#include <errno.h>
+#include "os_base.h"
#include <check.h>
#include <qb/qbdefs.h>
@@ -39,17 +36,46 @@ struct test_my_st {
};
-START_TEST(test_array1)
+START_TEST(test_array_limits)
+{
+ qb_array_t *a;
+ int32_t res;
+ struct test_my_st *st;
+
+ a = qb_array_create(INT_MAX, sizeof(struct test_my_st));
+ fail_unless(a == NULL);
+ a = qb_array_create(-56, sizeof(struct test_my_st));
+ fail_unless(a == NULL);
+ a = qb_array_create(67, 0);
+ fail_unless(a == NULL);
+
+ /* working array */
+ a = qb_array_create(10, sizeof(struct test_my_st));
+ fail_if(a == NULL);
+
+ /* out-of-bounds */
+ res = qb_array_index(a, 10, (void**)&st);
+ ck_assert_int_eq(res, -EINVAL);
+ res = qb_array_index(a, -10, (void**)&st);
+ ck_assert_int_eq(res, -EINVAL);
+ res = qb_array_index(NULL, 1, (void**)&st);
+ ck_assert_int_eq(res, -EINVAL);
+ res = qb_array_index(a, -10, NULL);
+ ck_assert_int_eq(res, -EINVAL);
+
+ qb_array_free(a);
+}
+END_TEST
+
+START_TEST(test_array_correct_retrieval)
{
qb_array_t *a;
int32_t i;
int32_t res;
- struct test_my_st *st_old;
struct test_my_st *st;
a = qb_array_create(112, sizeof(struct test_my_st));
- /* valid */
for (i = 0; i < 112; i++) {
res = qb_array_index(a, i, (void**)&st);
ck_assert_int_eq(res, 0);
@@ -58,20 +84,6 @@ START_TEST(test_array1)
st->c = i+2;
st->d = i+3;
}
- res = qb_array_index(a, 99, (void**)&st_old);
- ck_assert_int_eq(res, 0);
-
- /* out-of-bounds */
- res = qb_array_index(a, 112, (void**)&st);
- ck_assert_int_eq(res, -EINVAL);
-
-
- res = qb_array_grow(a, 1453);
- ck_assert_int_eq(res, 0);
-
- res = qb_array_index(a, 345, (void**)&st);
- st->a = 411;
-
/* read back */
for (i = 0; i < 112; i++) {
res = qb_array_index(a, i, (void**)&st);
@@ -81,10 +93,33 @@ START_TEST(test_array1)
ck_assert_int_eq(st->c, i+2);
ck_assert_int_eq(st->d, i+3);
}
+
+ qb_array_free(a);
+}
+END_TEST
+
+START_TEST(test_array_static_memory)
+{
+ qb_array_t *a;
+ int32_t res;
+ struct test_my_st *st_old;
+ struct test_my_st *st;
+
+ a = qb_array_create(112, sizeof(struct test_my_st));
+
+ res = qb_array_index(a, 99, (void**)&st_old);
+ ck_assert_int_eq(res, 0);
+
+ res = qb_array_grow(a, 1453);
+ ck_assert_int_eq(res, 0);
+
+ res = qb_array_index(a, 345, (void**)&st);
+ st->a = 411;
+
/* confirm the pointer is the same after a grow */
res = qb_array_index(a, 99, (void**)&st);
ck_assert_int_eq(res, 0);
- fail_if(st != st_old);
+ fail_unless(st == st_old);
qb_array_free(a);
}
@@ -92,12 +127,20 @@ END_TEST
static Suite *rb_suite(void)
{
- TCase *tc_load;
- Suite *s = suite_create("array");
+ TCase *tc;
+ Suite *s = suite_create("qb_array");
+
+ tc = tcase_create("limits");
+ tcase_add_test(tc, test_array_limits);
+ suite_add_tcase(s, tc);
+
+ tc = tcase_create("correct_retrieval");
+ tcase_add_test(tc, test_array_correct_retrieval);
+ suite_add_tcase(s, tc);
- tc_load = tcase_create("test01");
- tcase_add_test(tc_load, test_array1);
- suite_add_tcase(s, tc_load);
+ tc = tcase_create("static_memory");
+ tcase_add_test(tc, test_array_static_memory);
+ suite_add_tcase(s, tc);
return s;
}
@@ -105,8 +148,7 @@ static Suite *rb_suite(void)
static void libqb_log_fn(const char *file_name,
int32_t file_line, int32_t severity, const char *msg)
{
-// if (severity < LOG_INFO)
- printf("libqb: %s:%d %s\n", file_name, file_line, msg);
+ printf("libqb: %s:%d %s\n", file_name, file_line, msg);
}
int32_t main(void)
--
1.7.2.3
13 years, 6 months
ANNOUNCE libqb-v0.3.0
by Angus Salkeld
Hi
I am pleased to announce libqb 0.3.0.
This release mainly includes the new atomic & array API.
Also fixes to IPC to handle a multi-threaded client.
Find all the releases here: http://libqb.org/wiki/index.php/Releases
The Changelog is below.
Regards
Angus Salkeld
Angus Salkeld (16):
Add atomic operations.
IPC: use atomic for ref counting.
LIST/LOOP: allow empty list items but don't splice an empty head.
ATOMIC: fix make distcheck
Add a resizable array that doesn't move memory.
LOOP: use qbarray for poll_entries.
IPC: get bmcpt working
IPC/RB: name the ringbuffer's files better.
RB: prevent fd's from been leaked
RB: remove locking from ringbuffer.
RB: use the semaphore to return chunks_used.
RB: fix the sem init logic (always use some semaphore).
DOCS: add some doxygen comments to array & hdb.
HDB: use qb_array.
HDB: remove locks and use atomic.
Fix the current warnings
The Quarterback Library Release Team (2):
Tweek the release.mk file to produce tags like vX.Y.Z
Bump version to 0.3.0
13 years, 6 months
[PATCH 1/3] DOCS: add some doxygen comments to array & hdb.
by Angus Salkeld
Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
docs/mainpage.h | 11 +++++++++--
include/qb/qbarray.h | 20 +++++++++++++++++---
include/qb/qbhdb.h | 3 ++-
3 files changed, 28 insertions(+), 6 deletions(-)
diff --git a/docs/mainpage.h b/docs/mainpage.h
index e10927d..1139707 100644
--- a/docs/mainpage.h
+++ b/docs/mainpage.h
@@ -12,12 +12,13 @@
* tuned for maximum performance for client/server applications.
*
* See the following pages for more info:
- * - @subpage qb_rb_overview
* - @subpage qb_list_overview
+ * - @subpage qb_atomic_overview
+ * - @subpage qb_array_overview
* - @subpage qb_hdb_overview
+ * - @subpage qb_rb_overview
* - @subpage qb_loop_overview
* - @subpage qb_ipc_overview
- * - @subpage qb_atomic_overview
*/
/**
@@ -32,6 +33,12 @@
* @see qblist.h
*/
+/**
+ * @page qb_array_overview Array
+ * @copydoc qbarray.h
+ * @see qbarray.h
+ */
+
/**
* @page qb_hdb_overview Handle Database
* @copydoc qbhdb.h
diff --git a/include/qb/qbarray.h b/include/qb/qbarray.h
index a3c5ce1..4f8d8d6 100644
--- a/include/qb/qbarray.h
+++ b/include/qb/qbarray.h
@@ -33,32 +33,46 @@ extern "C" {
/**
* @file qbarray.h
+ * This is a dynamic array (it can grow, but without moving memory).
*/
struct qb_array;
/**
- *
+ * This is an opaque data type representing an instance of an array.
*/
typedef struct qb_array qb_array_t;
/**
+ * Create an array with fixed sized elements.
*
+ * @param max_elements initial max elements.
+ * @param element_size size of each element.
+ * @return array instance.
*/
qb_array_t* qb_array_create(size_t max_elements, size_t element_size);
/**
- *
+ * Get an element at a particular index.
+ * @param a array instance.
+ * @param idx the index
+ * @param element_out the pointer to the element data.
+ * @return (0 == success, else -errno)
*/
int32_t qb_array_index(qb_array_t* a, int32_t idx, void** element_out);
/**
+ * Grow the array.
*
+ * @param a array instance.
+ * @param max_elements the new maximum size of the array.
+ * @return (0 == success, else -errno)
*/
int32_t qb_array_grow(qb_array_t* a, size_t max_elements);
/**
- *
+ * Free all the memory used by the array.
+ * @param a array instance.
*/
void qb_array_free(qb_array_t * a);
diff --git a/include/qb/qbhdb.h b/include/qb/qbhdb.h
index df0f767..0ea385e 100644
--- a/include/qb/qbhdb.h
+++ b/include/qb/qbhdb.h
@@ -84,7 +84,8 @@ static struct qb_hdb (database_name) = { \
void qb_hdb_create(struct qb_hdb *hdb);
/**
- *
+ * Destroy a handle database.
+ * @param hdb the database to destroy.
*/
void qb_hdb_destroy(struct qb_hdb *hdb);
--
1.7.2.3
13 years, 6 months
[PATCH 1/4] LIST: fix logic in qb_list_splice()
by Angus Salkeld
Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
include/qb/qblist.h | 2 +-
1 files changed, 1 insertions(+), 1 deletions(-)
diff --git a/include/qb/qblist.h b/include/qb/qblist.h
index a52c035..dadae9b 100644
--- a/include/qb/qblist.h
+++ b/include/qb/qblist.h
@@ -130,7 +130,7 @@ static inline void qb_list_splice(struct qb_list_head *list,
struct qb_list_head *last = list->prev;
struct qb_list_head *at = head->next;
- if (qb_list_empty(list) == 0) {
+ if (qb_list_empty(list)) {
return;
}
first->prev = head;
--
1.7.2.3
13 years, 6 months