Signed-off-by: Angus Salkeld asalkeld@redhat.com --- configure.ac | 10 +- lib/Makefile.am | 11 ++ lib/loop_poll.c | 277 ++++-------------------------------------------- lib/loop_poll_epoll.c | 210 ++++++++++++++++++++++++++++++++++++ lib/loop_poll_int.h | 91 ++++++++++++++++ lib/loop_poll_kqueue.c | 192 +++++++++++++++++++++++++++++++++ lib/loop_poll_poll.c | 109 +++++++++++++++++++ 7 files changed, 643 insertions(+), 257 deletions(-) create mode 100644 lib/loop_poll_epoll.c create mode 100644 lib/loop_poll_int.h create mode 100644 lib/loop_poll_kqueue.c create mode 100644 lib/loop_poll_poll.c
diff --git a/configure.ac b/configure.ac index f10834a..a957844 100644 --- a/configure.ac +++ b/configure.ac @@ -118,7 +118,7 @@ AC_HEADER_SYS_WAIT
AC_CHECK_HEADERS([arpa/inet.h link.h fcntl.h inttypes.h limits.h netinet/in.h stdint.h \ dlfcn.h time.h sys/time.h stdlib.h string.h strings.h sys/types.h sys/stat.h \ - sys/param.h sys/socket.h sys/time.h sys/poll.h sys/epoll.h sys/uio.h \ + sys/param.h sys/socket.h sys/time.h sys/poll.h sys/epoll.h sys/uio.h sys/event.h \ sys/sockio.h sys/un.h sys/resource.h syslog.h errno.h unistd.h sys/mman.h \ sys/sem.h sys/ipc.h sys/msg.h mqueue.h])
@@ -142,12 +142,18 @@ AC_FUNC_FORK AC_FUNC_MMAP AC_CHECK_FUNCS([alarm clock_gettime ftruncate gettimeofday localtime localtime_r \ memset munmap socket strchr strrchr strdup strerror strstr \ - poll epoll_create epoll_create1 random rand getrlimit sysconf \ + poll epoll_create epoll_create1 kqueue random rand getrlimit sysconf \ pthread_spin_lock pthread_setschedparam sem_timedwait semtimedop \ sched_get_priority_max sched_setscheduler getpeerucred getpeereid])
AM_CONDITIONAL(HAVE_SEM_TIMEDWAIT, [test "x$ac_cv_func_sem_timedwait" = xyes]) +AM_CONDITIONAL(HAVE_EPOLL, + [test "x$ac_cv_func_epoll_create" = xyes]) +AM_CONDITIONAL(HAVE_POLL, + [test "x$ac_cv_func_poll" = xyes]) +AM_CONDITIONAL(HAVE_KQUEUE, + [test "x$ac_cv_func_kqueue" = xyes])
AC_CONFIG_LIBOBJ_DIR(lib) AC_REPLACE_FUNCS(strlcpy strlcat strchrnul) diff --git a/lib/Makefile.am b/lib/Makefile.am index dec3d28..28a86fd 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -61,6 +61,17 @@ else libqb_la_SOURCES+=rpl_sem.c endif
+if HAVE_EPOLL + libqb_la_SOURCES+=loop_poll_epoll.c +else +if HAVE_KQUEUE + libqb_la_SOURCES+=loop_poll_kqueue.c +else + libqb_la_SOURCES+=loop_poll_poll.c +endif +endif + + pkgconfigdir = $(libdir)/pkgconfig pkgconfig_DATA = libqb.pc
diff --git a/lib/loop_poll.c b/lib/loop_poll.c index 44f9601..987382f 100644 --- a/lib/loop_poll.c +++ b/lib/loop_poll.c @@ -23,27 +23,10 @@ #ifdef HAVE_SYS_RESOURCE_H #include <sys/resource.h> #endif -#ifdef HAVE_SYS_EPOLL_H -#include <sys/epoll.h> -#ifndef epoll_create1 -int epoll_create1(int flags); -#endif /* workaround a set of sparc and alpha broken headers */ -#endif /* HAVE_SYS_EPOLL_H */ - -#ifdef HAVE_SYS_POLL_H -#include <sys/poll.h> -#endif /* HAVE_SYS_POLL_H */ -#ifndef S_SPLINT_S -#endif /* S_SPLINT_S */ + #include <signal.h>
-#include <qb/qbdefs.h> -#include <qb/qblist.h> -#include <qb/qbarray.h> -#include <qb/qbloop.h> -#include <qb/qbutil.h> -#include "loop_int.h" -#include "util_int.h" +#include "loop_poll_int.h"
/* * Define this to log slow (>10ms) jobs. @@ -53,77 +36,9 @@ int epoll_create1(int flags); /* logs, std(in|out|err), pipe */ #define POLL_FDS_USED_MISC 50
-struct qb_poll_entry; - -typedef int32_t(*qb_poll_add_to_jobs_fn) (struct qb_loop * l, - struct qb_poll_entry * pe); - -struct qb_poll_entry { - struct qb_loop_item item; - qb_loop_poll_dispatch_fn poll_dispatch_fn; - enum qb_loop_priority p; - uint32_t install_pos; - struct pollfd ufd; - qb_poll_add_to_jobs_fn add_to_jobs; - uint32_t runs; - enum qb_poll_entry_state state; - uint32_t check; -}; - -struct qb_poll_source { - struct qb_loop_source s; -#ifdef HAVE_EPOLL - int32_t epollfd; -#else - struct pollfd *ufds; -#endif /* HAVE_EPOLL */ - int32_t poll_entry_count; - qb_array_t *poll_entries; - qb_loop_poll_low_fds_event_fn low_fds_event_fn; - int32_t not_enough_fds; -}; - static int32_t _qb_signal_add_to_jobs_(struct qb_loop *l, struct qb_poll_entry *pe);
-#ifdef HAVE_EPOLL -static int32_t -_poll_to_epoll_event_(int32_t event) -{ - int32_t out = 0; - if (event & POLLIN) - out |= EPOLLIN; - if (event & POLLOUT) - out |= EPOLLOUT; - if (event & POLLPRI) - out |= EPOLLPRI; - if (event & POLLERR) - out |= EPOLLERR; - if (event & POLLHUP) - out |= EPOLLHUP; - if (event & POLLNVAL) - out |= EPOLLERR; - return out; -} - -static int32_t -_epoll_to_poll_event_(int32_t event) -{ - int32_t out = 0; - if (event & EPOLLIN) - out |= POLLIN; - if (event & EPOLLOUT) - out |= POLLOUT; - if (event & EPOLLPRI) - out |= POLLPRI; - if (event & EPOLLERR) - out |= POLLERR; - if (event & EPOLLHUP) - out |= POLLHUP; - return out; -} -#endif /* HAVE_EPOLL */ - static void _poll_entry_check_generate_(struct qb_poll_entry *pe) { @@ -138,28 +53,6 @@ _poll_entry_check_generate_(struct qb_poll_entry *pe) } }
-#if defined(HAVE_EPOLL) -static int32_t -_poll_entry_from_handle_(struct qb_poll_source *s, - uint64_t handle_in, struct qb_poll_entry **pe_pt) -{ - int32_t res = 0; - uint32_t check = ((uint32_t) (((uint64_t) handle_in) >> 32)); - uint32_t handle = handle_in & 0xffffffff; - struct qb_poll_entry *pe; - - res = qb_array_index(s->poll_entries, handle, (void **)&pe); - if (res != 0) { - return res; - } - if (pe->check != check) { - return -EINVAL; - } - *pe_pt = pe; - return 0; -} -#endif /* HAVE_EPOLL */ - static void _poll_entry_mark_deleted_(struct qb_poll_entry *pe) { @@ -224,8 +117,8 @@ _poll_dispatch_and_take_back_(struct qb_loop_item *item, #endif /* DEBUG_DISPATCH_TIME */ }
-static void -_poll_fds_usage_check_(struct qb_poll_source *s) +void +qb_poll_fds_usage_check_(struct qb_poll_source *s) { struct rlimit lim; static int32_t socks_limit = 0; @@ -279,107 +172,6 @@ _poll_fds_usage_check_(struct qb_poll_source *s) } }
-#ifdef HAVE_EPOLL -#define MAX_EVENTS 12 -static int32_t -_poll_and_add_to_jobs_(struct qb_loop_source *src, int32_t ms_timeout) -{ - int32_t i; - int32_t res; - int32_t event_count; - int32_t new_jobs = 0; - struct qb_poll_entry *pe = NULL; - struct qb_poll_source *s = (struct qb_poll_source *)src; - struct epoll_event events[MAX_EVENTS]; - - _poll_fds_usage_check_(s); - -retry_poll: - - event_count = epoll_wait(s->epollfd, events, MAX_EVENTS, ms_timeout); - - if (errno == EINTR && event_count == -1) { - goto retry_poll; - } else if (event_count == -1) { - return -errno; - } - - for (i = 0; i < event_count; i++) { - res = _poll_entry_from_handle_(s, events[i].data.u64, &pe); - if (res != 0) { - qb_util_log(LOG_WARNING, - "can't find poll entry for new event."); - continue; - } - if (pe->ufd.fd == -1 || pe->state == QB_POLL_ENTRY_DELETED) { - qb_util_log(LOG_WARNING, - "can't post new event to a deleted entry."); - /* - * empty/deleted - */ - continue; - } - if (events[i].events == pe->ufd.revents || - pe->state == QB_POLL_ENTRY_JOBLIST) { - /* - * entry already in the job queue. - */ - continue; - } - pe->ufd.revents = _epoll_to_poll_event_(events[i].events); - - new_jobs += pe->add_to_jobs(src->l, pe); - } - - return new_jobs; -} -#else -static int32_t -_poll_and_add_to_jobs_(struct qb_loop_source *src, int32_t ms_timeout) -{ - int32_t i; - int32_t res; - int32_t new_jobs = 0; - struct qb_poll_entry *pe; - struct qb_poll_source *s = (struct qb_poll_source *)src; - - _poll_fds_usage_check_(s); - - for (i = 0; i < s->poll_entry_count; i++) { - assert(qb_array_index(s->poll_entries, i, (void **)&pe) == 0); - memcpy(&s->ufds[i], &pe->ufd, sizeof(struct pollfd)); - } - -retry_poll: - res = poll(s->ufds, s->poll_entry_count, ms_timeout); - if (errno == EINTR && res == -1) { - goto retry_poll; - } else if (res == -1) { - return -errno; - } - - for (i = 0; i < s->poll_entry_count; i++) { - if (s->ufds[i].fd == -1 || s->ufds[i].revents == 0) { - /* - * empty entry - */ - continue; - } - assert(qb_array_index(s->poll_entries, i, (void **)&pe) == 0); - if (pe->state != QB_POLL_ENTRY_ACTIVE || - s->ufds[i].revents == pe->ufd.revents) { - /* - * Wrong state to accept an event. - */ - continue; - } - pe->ufd.revents = s->ufds[i].revents; - new_jobs += pe->add_to_jobs(src->l, pe); - } - - return new_jobs; -} -#endif /* HAVE_EPOLL */
struct qb_loop_source * qb_loop_poll_create(struct qb_loop *l) @@ -390,7 +182,6 @@ qb_loop_poll_create(struct qb_loop *l) } s->s.l = l; s->s.dispatch_and_take_back = _poll_dispatch_and_take_back_; - s->s.poll = _poll_and_add_to_jobs_;
s->poll_entries = qb_array_create_2(16, sizeof(struct qb_poll_entry), 16); s->poll_entry_count = 0; @@ -398,9 +189,13 @@ qb_loop_poll_create(struct qb_loop *l) s->not_enough_fds = 0;
#ifdef HAVE_EPOLL - s->epollfd = epoll_create1(EPOLL_CLOEXEC); + (void)qb_epoll_init(s); #else - s->ufds = 0; +#ifdef HAVE_KQUEUE + (void)qb_kqueue_init(s); +#else + (void)qb_poll_init(s); +#endif /* HAVE_KQUEUE */ #endif /* HAVE_EPOLL */
return (struct qb_loop_source *)s; @@ -411,12 +206,9 @@ qb_loop_poll_destroy(struct qb_loop *l) { struct qb_poll_source *s = (struct qb_poll_source *)l->fd_source; qb_array_free(s->poll_entries); -#ifdef HAVE_EPOLL - if (s->epollfd != -1) { - close(s->epollfd); - s->epollfd = -1; - } -#endif /* HAVE_EPOLL */ + + s->driver.fini(s); + free(s); }
@@ -484,9 +276,6 @@ _poll_add_(struct qb_loop *l, uint32_t install_pos; int32_t res = 0; struct qb_poll_source *s; -#ifdef HAVE_EPOLL - struct epoll_event ev; -#endif /* HAVE_EPOLL */
if (l == NULL) { return -EINVAL; @@ -507,17 +296,14 @@ _poll_add_(struct qb_loop *l, pe->item.source = (struct qb_loop_source *)l->fd_source; pe->p = p; pe->runs = 0; -#ifdef HAVE_EPOLL - ev.events = _poll_to_epoll_event_(events); - ev.data.u64 = (((uint64_t) (pe->check)) << 32) | pe->install_pos; - if (epoll_ctl(s->epollfd, EPOLL_CTL_ADD, fd, &ev) == -1) { - res = -errno; - qb_util_perror(LOG_ERR, "epoll_ctl(add)"); + res = s->driver.add(s, pe, fd, events); + if (res == 0) { + *pe_pt = pe; + return 0; + } else { + pe->state = QB_POLL_ENTRY_EMPTY; + return res; } -#endif /* HAVE_EPOLL */ - *pe_pt = pe; - - return (res); }
static int32_t @@ -573,9 +359,6 @@ qb_loop_poll_mod(struct qb_loop * lp, int32_t res = 0; struct qb_poll_entry *pe; struct qb_poll_source *s; -#ifdef HAVE_EPOLL - struct epoll_event ev; -#endif /* HAVE_EPOLL */ struct qb_loop *l = lp;
if (l == NULL) { @@ -600,14 +383,7 @@ qb_loop_poll_mod(struct qb_loop * lp, pe->item.user_data = data; pe->p = p; if (pe->ufd.events != events) { -#ifdef HAVE_EPOLL - ev.events = _poll_to_epoll_event_(events); - ev.data.u64 = (((uint64_t) (pe->check)) << 32) | i; - if (epoll_ctl(s->epollfd, EPOLL_CTL_MOD, fd, &ev) == -1) { - res = -errno; - qb_util_perror(LOG_ERR, "epoll_ctl(mod)"); - } -#endif /* HAVE_EPOLL */ + res = s->driver.mod(s, pe, fd, events); pe->ufd.events = events; } return res; @@ -641,16 +417,7 @@ qb_loop_poll_del(struct qb_loop * lp, int32_t fd) if (pe->state == QB_POLL_ENTRY_JOBLIST) { qb_loop_level_item_del(&l->level[pe->p], &pe->item); } -#ifdef HAVE_EPOLL - if (epoll_ctl(s->epollfd, EPOLL_CTL_DEL, fd, NULL) == -1) { - res = -errno; - qb_util_perror(LOG_WARNING, "epoll_ctl(del)"); - } -#else - s->ufds[i].fd = -1; - s->ufds[i].events = 0; - s->ufds[i].revents = 0; -#endif /* HAVE_EPOLL */ + res = s->driver.del(s, pe, fd, i); _poll_entry_mark_deleted_(pe); return res; } diff --git a/lib/loop_poll_epoll.c b/lib/loop_poll_epoll.c new file mode 100644 index 0000000..a2eb12d --- /dev/null +++ b/lib/loop_poll_epoll.c @@ -0,0 +1,210 @@ +/* + * Copyright (C) 2012 Red Hat, Inc. + * + * Author: Angus Salkeld asalkeld@redhat.com + * + * This file is part of libqb. + * + * libqb is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 2.1 of the License, or + * (at your option) any later version. + * + * libqb is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with libqb. If not, see http://www.gnu.org/licenses/. + */ + +#include "os_base.h" +#include "loop_poll_int.h" + +#ifdef HAVE_SYS_EPOLL_H +#include <sys/epoll.h> +#ifndef epoll_create1 +int epoll_create1(int flags); +#endif /* workaround a set of sparc and alpha broken headers */ +#endif /* HAVE_SYS_EPOLL_H */ + +#define MAX_EVENTS 12 + +static int32_t +_poll_to_epoll_event_(int32_t event) +{ + int32_t out = 0; + if (event & POLLIN) + out |= EPOLLIN; + if (event & POLLOUT) + out |= EPOLLOUT; + if (event & POLLPRI) + out |= EPOLLPRI; + if (event & POLLERR) + out |= EPOLLERR; + if (event & POLLHUP) + out |= EPOLLHUP; + if (event & POLLNVAL) + out |= EPOLLERR; + return out; +} + +static int32_t +_epoll_to_poll_event_(int32_t event) +{ + int32_t out = 0; + if (event & EPOLLIN) + out |= POLLIN; + if (event & EPOLLOUT) + out |= POLLOUT; + if (event & EPOLLPRI) + out |= POLLPRI; + if (event & EPOLLERR) + out |= POLLERR; + if (event & EPOLLHUP) + out |= POLLHUP; + return out; +} + +static void +_fini(struct qb_poll_source *s) +{ + if (s->epollfd != -1) { + close(s->epollfd); + s->epollfd = -1; + } +} + +static int32_t +_add(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t events) +{ + struct epoll_event ev; + int32_t res = 0; + + ev.events = _poll_to_epoll_event_(events); + ev.data.u64 = (((uint64_t) (pe->check)) << 32) | pe->install_pos; + if (epoll_ctl(s->epollfd, EPOLL_CTL_ADD, fd, &ev) == -1) { + res = -errno; + qb_util_perror(LOG_ERR, "epoll_ctl(add)"); + } + return res; +} + + +static int32_t +_mod(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t events) +{ + struct epoll_event ev; + int32_t res = 0; + + ev.events = _poll_to_epoll_event_(events); + ev.data.u64 = (((uint64_t) (pe->check)) << 32) | pe->install_pos; + if (epoll_ctl(s->epollfd, EPOLL_CTL_MOD, fd, &ev) == -1) { + res = -errno; + qb_util_perror(LOG_DEBUG, "epoll_ctl(mod)"); + } + return res; +} + +static int32_t +_del(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t arr_index) +{ + struct epoll_event ev; + int32_t res = 0; + + if (epoll_ctl(s->epollfd, EPOLL_CTL_DEL, fd, NULL) == -1) { + res = -errno; + qb_util_perror(LOG_DEBUG, "epoll_ctl(del)"); + } + return res; +} + +static int32_t +_poll_entry_from_handle_(struct qb_poll_source *s, + uint64_t handle_in, struct qb_poll_entry **pe_pt) +{ + int32_t res = 0; + uint32_t check = ((uint32_t) (((uint64_t) handle_in) >> 32)); + uint32_t handle = handle_in & 0xffffffff; + struct qb_poll_entry *pe; + + res = qb_array_index(s->poll_entries, handle, (void **)&pe); + if (res != 0) { + return res; + } + if (pe->check != check) { + return -EINVAL; + } + *pe_pt = pe; + return 0; +} + +static int32_t +_poll_and_add_to_jobs_(struct qb_loop_source *src, int32_t ms_timeout) +{ + int32_t i; + int32_t res; + int32_t event_count; + int32_t new_jobs = 0; + struct qb_poll_entry *pe = NULL; + struct qb_poll_source *s = (struct qb_poll_source *)src; + struct epoll_event events[MAX_EVENTS]; + + qb_poll_fds_usage_check_(s); + +retry_poll: + + event_count = epoll_wait(s->epollfd, events, MAX_EVENTS, ms_timeout); + + if (errno == EINTR && event_count == -1) { + goto retry_poll; + } else if (event_count == -1) { + return -errno; + } + + for (i = 0; i < event_count; i++) { + res = _poll_entry_from_handle_(s, events[i].data.u64, &pe); + if (res != 0) { + qb_util_log(LOG_WARNING, + "can't find poll entry for new event."); + continue; + } + if (pe->ufd.fd == -1 || pe->state == QB_POLL_ENTRY_DELETED) { + qb_util_log(LOG_WARNING, + "can't post new event to a deleted entry."); + /* + * empty/deleted + */ + continue; + } + if (events[i].events == pe->ufd.revents || + pe->state == QB_POLL_ENTRY_JOBLIST) { + /* + * entry already in the job queue. + */ + continue; + } + pe->ufd.revents = _epoll_to_poll_event_(events[i].events); + + new_jobs += pe->add_to_jobs(src->l, pe); + } + + return new_jobs; +} + +int32_t +qb_epoll_init(struct qb_poll_source *s) +{ + s->epollfd = epoll_create1(EPOLL_CLOEXEC); + if (s->epollfd < 0) { + return -errno; + } + s->driver.fini = _fini; + s->driver.add = _add; + s->driver.mod = _mod; + s->driver.del = _del; + s->s.poll = _poll_and_add_to_jobs_; + return 0; +} + diff --git a/lib/loop_poll_int.h b/lib/loop_poll_int.h new file mode 100644 index 0000000..f99f442 --- /dev/null +++ b/lib/loop_poll_int.h @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2012 Red Hat, Inc. + * + * Author: Angus Salkeld asalkeld@redhat.com + * + * This file is part of libqb. + * + * libqb is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 2.1 of the License, or + * (at your option) any later version. + * + * libqb is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with libqb. If not, see http://www.gnu.org/licenses/. + */ +#ifndef QB_LOOP_POLL_INT_DEFINED +#define QB_LOOP_POLL_INT_DEFINED + +#include "os_base.h" + +#ifdef HAVE_SYS_POLL_H +#include <sys/poll.h> +#endif /* HAVE_SYS_POLL_H */ + +#include <qb/qbdefs.h> +#include <qb/qblist.h> +#include <qb/qbarray.h> +#include <qb/qbutil.h> + +#include "loop_int.h" +#include "util_int.h" + +struct qb_poll_entry; + +typedef int32_t(*qb_poll_add_to_jobs_fn) (struct qb_loop * l, + struct qb_poll_entry * pe); + +struct qb_poll_entry { + struct qb_loop_item item; + qb_loop_poll_dispatch_fn poll_dispatch_fn; + enum qb_loop_priority p; + uint32_t install_pos; + struct pollfd ufd; + qb_poll_add_to_jobs_fn add_to_jobs; + uint32_t runs; + enum qb_poll_entry_state state; + uint32_t check; +}; + +struct qb_poll_source; + +struct qb_loop_driver { + int32_t (*poll)(struct qb_loop_source* s, int32_t ms_timeout); + void (*fini)(struct qb_poll_source *s); + int32_t (*add)(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t events); + int32_t (*mod)(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t events); + int32_t (*del)(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t arr_index); +}; + +struct qb_poll_source { + struct qb_loop_source s; + int32_t poll_entry_count; + qb_array_t *poll_entries; + qb_loop_poll_low_fds_event_fn low_fds_event_fn; + int32_t not_enough_fds; +#if defined(HAVE_EPOLL) || defined(HAVE_KQUEUE) + int32_t epollfd; +#else + struct pollfd *ufds; +#endif /* HAVE_EPOLL */ + struct qb_loop_driver driver; +}; + +void +qb_poll_fds_usage_check_(struct qb_poll_source *s); + +int32_t +qb_epoll_init(struct qb_poll_source *s); + +int32_t +qb_poll_init(struct qb_poll_source *s); + +int32_t +qb_kqueue_init(struct qb_poll_source *s); + +#endif /* QB_LOOP_POLL_INT_DEFINED */ diff --git a/lib/loop_poll_kqueue.c b/lib/loop_poll_kqueue.c new file mode 100644 index 0000000..6acbcc1 --- /dev/null +++ b/lib/loop_poll_kqueue.c @@ -0,0 +1,192 @@ +/* + * Copyright (C) 2012 Red Hat, Inc. + * + * Author: Angus Salkeld asalkeld@redhat.com + * + * This file is part of libqb. + * + * libqb is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 2.1 of the License, or + * (at your option) any later version. + * + * libqb is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with libqb. If not, see http://www.gnu.org/licenses/. + */ + +#include "os_base.h" +#include "loop_poll_int.h" + +#ifdef HAVE_SYS_EVENT_H +#include <sys/event.h> +#endif /* HAVE_SYS_EVENT_H */ + +#define MAX_EVENTS 12 + +static int32_t +_poll_to_filter_(int32_t event) +{ + int32_t out = 0; + if (event & POLLIN) + out |= EVFILT_READ; + if (event & POLLOUT) + out |= EVFILT_WRITE; + return out; +} + +static int32_t +_filter_to_poll_event_(int32_t event) +{ + int32_t out = 0; + if (event & EPOLLIN) + out |= POLLIN; + if (event & EPOLLOUT) + out |= POLLOUT; + if (event & EPOLLPRI) + out |= POLLPRI; + if (event & EPOLLERR) + out |= POLLERR; + if (event & EPOLLHUP) + out |= POLLHUP; + return out; +} + +static void +_fini(struct qb_poll_source *s) +{ + if (s->epollfd != -1) { + close(s->epollfd); + s->epollfd = -1; + } +} + +static int32_t +_add(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t events) +{ + int32_t res = 0; + struct kevent ke; + int kents = _poll_to_filter_(events); + + /* fill out the kevent struct */ + EV_SET(&ke, pe->check, kents, EV_ADD, 0, NULL, pe); + + /* set the event */ + res = kevent(kq, &ke, 1, NULL, 0, NULL); + if (res == -1) { + res = -errno; + qb_util_perror(LOG_ERR, "kevent(add)"); + } + + return res; +} + + +static int32_t +_mod(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t events) +{ +} + +static int32_t +_del(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t arr_index) +{ + int32_t res = 0; + struct kevent ke; + int kents = _poll_to_filter_(events); + + /* fill out the kevent struct */ + EV_SET(&ke, pe->check, kents, EV_DELETE, 0, NULL, pe); + + /* set the event */ + res = kevent(kq, &ke, 1, NULL, 0, NULL); + if (res == -1) { + res = -errno; + qb_util_perror(LOG_ERR, "kevent(del)"); + } + return res; +} + +static int32_t +_poll_and_add_to_jobs_(struct qb_loop_source *src, int32_t ms_timeout) +{ + int32_t i; + int32_t res; + int32_t event_count; + int32_t new_jobs = 0; + int32_t revents; + struct qb_poll_entry *pe = NULL; + struct qb_poll_source *s = (struct qb_poll_source *)src; + struct kevent events[MAX_EVENTS]; + struct timespec timeout = { 0, 0 }; + + qb_timespec_add_ms(&timeout, ms_timeout); + + qb_poll_fds_usage_check_(s); + +retry_poll: + + event_count = kevent(s->epollfd, NULL, 0, events, MAX_EVENTS, NULL); + if (errno == EINTR && event_count == -1) { + goto retry_poll; + } else if (event_count == -1) { + return -errno; + } + + for (i = 0; i < event_count; i++) { + if (evi.flags & EV_ERROR) { + revents = POLLHUP; + } + if (evi.filter == EVFILT_READ) { + revents |= POLLIN; + } + if (evi.filter == EVFILT_WRITE) { + revents |= POLLOUT; + } + pe = evi.udata; + if (pe->check != evi.ident) { + qb_util_log(LOG_WARNING, + "can't find poll entry for new event."); + continue; + } + if (pe->ufd.fd == -1 || pe->state == QB_POLL_ENTRY_DELETED) { + qb_util_log(LOG_WARNING, + "can't post new event to a deleted entry."); + /* + * empty/deleted + */ + continue; + } + if (revents == pe->ufd.revents || + pe->state == QB_POLL_ENTRY_JOBLIST) { + /* + * entry already in the job queue. + */ + continue; + } + pe->ufd.revents = revents; + + new_jobs += pe->add_to_jobs(src->l, pe); + } + + return new_jobs; +} + +int32_t +qb_epoll_init(struct qb_poll_source *s) +{ + s->epollfd = kqueue(); + + if (s->epollfd < 0) { + return -errno; + } + s->driver.fini = _fini; + s->driver.add = _add; + s->driver.mod = _mod; + s->driver.del = _del; + s->s.poll = _poll_and_add_to_jobs_; + return 0; +} diff --git a/lib/loop_poll_poll.c b/lib/loop_poll_poll.c new file mode 100644 index 0000000..ecc7418 --- /dev/null +++ b/lib/loop_poll_poll.c @@ -0,0 +1,109 @@ +/* + * Copyright (C) 2012 Red Hat, Inc. + * + * Author: Angus Salkeld asalkeld@redhat.com + * + * This file is part of libqb. + * + * libqb is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 2.1 of the License, or + * (at your option) any later version. + * + * libqb is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with libqb. If not, see http://www.gnu.org/licenses/. + */ +#include "os_base.h" + +#include "loop_poll_int.h" + + +static void +_fini(struct qb_poll_source *s) +{ +} + +static int32_t +_add(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t events) +{ + return 0; +} + +static int32_t +_mod(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t events) +{ + return 0; +} + +static int32_t +_del(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t i) +{ + s->ufds[i].fd = -1; + s->ufds[i].events = 0; + s->ufds[i].revents = 0; + return 0; +} + +static int32_t +_poll_and_add_to_jobs_(struct qb_loop_source *src, int32_t ms_timeout) +{ + int32_t i; + int32_t res; + int32_t new_jobs = 0; + struct qb_poll_entry *pe; + struct qb_poll_source *s = (struct qb_poll_source *)src; + + qb_poll_fds_usage_check_(s); + + for (i = 0; i < s->poll_entry_count; i++) { + assert(qb_array_index(s->poll_entries, i, (void **)&pe) == 0); + memcpy(&s->ufds[i], &pe->ufd, sizeof(struct pollfd)); + } + +retry_poll: + res = poll(s->ufds, s->poll_entry_count, ms_timeout); + if (errno == EINTR && res == -1) { + goto retry_poll; + } else if (res == -1) { + return -errno; + } + + for (i = 0; i < s->poll_entry_count; i++) { + if (s->ufds[i].fd == -1 || s->ufds[i].revents == 0) { + /* + * empty entry + */ + continue; + } + assert(qb_array_index(s->poll_entries, i, (void **)&pe) == 0); + if (pe->state != QB_POLL_ENTRY_ACTIVE || + s->ufds[i].revents == pe->ufd.revents) { + /* + * Wrong state to accept an event. + */ + continue; + } + pe->ufd.revents = s->ufds[i].revents; + new_jobs += pe->add_to_jobs(src->l, pe); + } + + return new_jobs; +} + +int32_t +qb_poll_init(struct qb_poll_source *s) +{ + s->ufds = 0; + s->driver.fini = _fini; + s->driver.add = _add; + s->driver.mod = _mod; + s->driver.del = _del; + s->s.poll = _poll_and_add_to_jobs_; + return 0; +} +
quarterback-devel@lists.fedorahosted.org