Signed-off-by: Angus Salkeld asalkeld@redhat.com --- lib/loop_poll.c | 50 ++++++++++++++++++++++++---------------- lib/loop_poll_kqueue.c | 60 ++++++++++++++++++++++++++++++++++-------------- 2 files changed, 73 insertions(+), 37 deletions(-)
diff --git a/lib/loop_poll.c b/lib/loop_poll.c index 1f6b960..30f8666 100644 --- a/lib/loop_poll.c +++ b/lib/loop_poll.c @@ -36,6 +36,16 @@ /* logs, std(in|out|err), pipe */ #define POLL_FDS_USED_MISC 50
+#ifdef HAVE_EPOLL +#define USE_EPOLL 1 +#else + #ifdef HAVE_KQUEUE + #define USE_KQUEUE 1 + #else + #define USE_POLL 1 + #endif /* HAVE_KQUEUE */ +#endif /* HAVE_EPOLL */ + static int32_t _qb_signal_add_to_jobs_(struct qb_loop *l, struct qb_poll_entry *pe);
@@ -188,15 +198,15 @@ qb_loop_poll_create(struct qb_loop *l) s->low_fds_event_fn = NULL; s->not_enough_fds = 0;
-#ifdef HAVE_EPOLL +#ifdef USE_EPOLL (void)qb_epoll_init(s); -#else -#ifdef HAVE_KQUEUE +#endif +#ifdef USE_KQUEUE (void)qb_kqueue_init(s); -#else +#endif +#ifdef USE_POLL (void)qb_poll_init(s); -#endif /* HAVE_KQUEUE */ -#endif /* HAVE_EPOLL */ +#endif /* USE_POLL */
return (struct qb_loop_source *)s; } @@ -229,10 +239,6 @@ _get_empty_array_position_(struct qb_poll_source *s) uint32_t install_pos; int32_t res = 0; struct qb_poll_entry *pe; -#ifndef HAVE_EPOLL - struct pollfd *ufds; - int32_t new_size = 0; -#endif /* HAVE_EPOLL */
for (found = 0, install_pos = 0; install_pos < s->poll_entry_count; install_pos++) { @@ -245,6 +251,15 @@ _get_empty_array_position_(struct qb_poll_source *s) }
if (found == 0) { +#ifdef USE_POLL + struct pollfd *ufds; + int32_t new_size = (s->poll_entry_count + 1) * sizeof(struct pollfd); + ufds = realloc(s->ufds, new_size); + if (ufds == NULL) { + return -ENOMEM; + } + s->ufds = ufds; +#endif /* USE_POLL */ /* * Grow pollfd list */ @@ -252,16 +267,6 @@ _get_empty_array_position_(struct qb_poll_source *s) if (res != 0) { return res; } -#ifndef HAVE_EPOLL -#ifndef HAVE_KQUEUE - new_size = (s->poll_entry_count + 1) * sizeof(struct pollfd); - ufds = realloc(s->ufds, new_size); - if (ufds == NULL) { - return -ENOMEM; - } - s->ufds = ufds; -#endif -#endif /* HAVE_EPOLL */
s->poll_entry_count += 1; install_pos = s->poll_entry_count - 1; @@ -336,6 +341,11 @@ qb_loop_poll_add(struct qb_loop * lp,
size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count; res = _poll_add_(l, p, fd, events, data, &pe); + if (res != 0) { + qb_util_perror(LOG_ERR, + "couldn't add poll entryfor FD %d", fd); + return res; + } new_size = ((struct qb_poll_source *)l->fd_source)->poll_entry_count;
pe->poll_dispatch_fn = dispatch_fn; diff --git a/lib/loop_poll_kqueue.c b/lib/loop_poll_kqueue.c index b33825b..8a59496 100644 --- a/lib/loop_poll_kqueue.c +++ b/lib/loop_poll_kqueue.c @@ -53,12 +53,10 @@ _add(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t eve { int32_t res = 0; struct kevent ke; - int kents = _poll_to_filter_(events); + short filters = _poll_to_filter_(events);
- /* fill out the kevent struct */ - EV_SET(&ke, pe->check, kents, EV_ADD, 0, NULL, pe); + EV_SET(&ke, fd, filters, EV_ADD, 0, NULL, pe);
- /* set the event */ res = kevent(s->epollfd, &ke, 1, NULL, 0, NULL); if (res == -1) { res = -errno; @@ -68,23 +66,34 @@ _add(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t eve return res; }
- static int32_t _mod(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t events) { + int32_t res = 0; + struct kevent ke[2]; + short new_filters = _poll_to_filter_(events); + short old_filters = _poll_to_filter_(pe->ufd.events); + + EV_SET(&ke[0], fd, old_filters, EV_DELETE, 0, NULL, pe); + EV_SET(&ke[1], fd, new_filters, EV_ADD, 0, NULL, pe); + + res = kevent(s->epollfd, ke, 2, NULL, 0, NULL); + if (res == -1) { + res = -errno; + qb_util_perror(LOG_ERR, "kevent(mod)"); + } + return res; }
static int32_t -_del(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t arr_index) +_del(struct qb_poll_source *s, struct qb_poll_entry *pe, int32_t fd, int32_t events) { int32_t res = 0; struct kevent ke; - int kents = 0; //_poll_to_filter_(events); + short filters = _poll_to_filter_(events);
- /* fill out the kevent struct */ - EV_SET(&ke, pe->check, kents, EV_DELETE, 0, NULL, pe); + EV_SET(&ke, fd, filters, EV_DELETE, 0, NULL, pe);
- /* set the event */ res = kevent(s->epollfd, &ke, 1, NULL, 0, NULL); if (res == -1) { res = -errno; @@ -97,31 +106,47 @@ static int32_t _poll_and_add_to_jobs_(struct qb_loop_source *src, int32_t ms_timeout) { int32_t i; - int32_t res; int32_t event_count; int32_t new_jobs = 0; - int32_t revents; + int32_t revents = 0; struct qb_poll_entry *pe = NULL; struct qb_poll_source *s = (struct qb_poll_source *)src; struct kevent events[MAX_EVENTS]; struct timespec timeout = { 0, 0 }; + struct timespec *timeout_pt = &timeout;
- qb_timespec_add_ms(&timeout, ms_timeout); - + if (ms_timeout > 0) { + qb_timespec_add_ms(&timeout, ms_timeout); + } else if (ms_timeout < 0) { + timeout_pt = NULL; + } qb_poll_fds_usage_check_(s);
retry_poll:
- event_count = kevent(s->epollfd, NULL, 0, events, MAX_EVENTS, NULL); + event_count = kevent(s->epollfd, NULL, 0, events, MAX_EVENTS, timeout_pt); if (errno == EINTR && event_count == -1) { goto retry_poll; } else if (event_count == -1) { + qb_util_perror(LOG_ERR, "kevent(poll)"); return -errno; }
for (i = 0; i < event_count; i++) { + revents = 0; + if (events[i].flags) { + qb_util_log(LOG_INFO, + "got flags %d on fd %d.", events[i].flags, pe->ufd.fd); + } if (events[i].flags & EV_ERROR) { - revents = POLLHUP; + qb_util_log(LOG_WARNING, + "got EV_ERROR on fd %d.", pe->ufd.fd); + revents |= POLLERR; + } + if (events[i].flags & EV_EOF) { + qb_util_log(LOG_INFO, + "got EV_EOF on fd %d.", pe->ufd.fd); + revents |= POLLHUP; } if (events[i].filter == EVFILT_READ) { revents |= POLLIN; @@ -130,7 +155,7 @@ retry_poll: revents |= POLLOUT; } pe = events[i].udata; - if (pe->check != events[i].ident) { + if (pe->ufd.fd != events[i].ident) { qb_util_log(LOG_WARNING, "can't find poll entry for new event."); continue; @@ -164,6 +189,7 @@ qb_kqueue_init(struct qb_poll_source *s) s->epollfd = kqueue();
if (s->epollfd < 0) { + qb_util_perror(LOG_ERR, "kqueue()"); return -errno; } s->driver.fini = _fini;