dumping ground for random patches and texts
 help / color / mirror / Atom feed
* [PATCH 1/7] auto fiber scheduling for I/O
@ 2017-05-30 19:03 Eric Wong
  2017-05-30 19:03 ` [PATCH 2/7] iom: implement waitpid generically Eric Wong
                   ` (5 more replies)
  0 siblings, 6 replies; 7+ messages in thread
From: Eric Wong @ 2017-05-30 19:03 UTC (permalink / raw)
  To: spew

Currently; basic IO scheduling works with a single FD per Fiber
works.  The Ruby API changes for Fiber are named after existing
Thread methods.

main Ruby API:

    Fiber#start -> enable auto-scheduling and run Fiber until it
                   automatically yields (due to EAGAIN/EWOULDBLOCK)

The following behave like their Thread counterparts:

    Fiber.start - Fiber.new + Fiber#start (prelude.rb)
    Fiber#join - run internal scheduler until Fiber is terminated
    Fiber#value - ditto
    Fiber#run - like Fiber#start (prelude.rb)

Right now, it takes over rb_wait_for_single_fd() function
if the running Fiber is auto-enabled (cont.c::rb_fiber_auto_sched_p)

Changes to existing functions are minimal.

New files (all new structs and relations should be documented):

    iom.h - internal API for the rest of RubyVM (incomplete?)
    iom_internal.h - internal header for iom_(select|epoll|kqueue).h
    iom_epoll.h - epoll-specific pieces
    iom_kqueue.h - kqueue-specific pieces
    iom_select.h - select()-specific pieces
    iom_pingable_common.h - common code for iom_(epoll|kqueue).h
    iom_common.h - common footer for iom_(select|epoll|kqueue).h

Changes to existing data structures:

    rb_thread_t.afhead   - list of fibers to auto-resume
    rb_fiber_t.afnode    - link to rb_thread_t->afhead
    rb_vm_t.iom          - Ruby I/O Manager (rb_iom_t) :)

Besides rb_iom_t, all the new structs are stack-only and relies
extensively on ccan/list for branch-less O(1) insert/delete.

As usual, understanding the data structures first should help
you understand the code.

Right now, I reuse some static functions in thread.c,
so thread.c includes iom_(select|epoll|kqueue).h

TODO:

    Hijack other blocking functions (waitpid, IO.select, ...)

I am using "double" for timeout since it is more convenient for
arithmetic like parts of thread.c.   Most platforms have good FP,
I think.  Also, all "blocking" functions (rb_iom_wait*) will
have timeout support.

kqueue support can be tested portably with libkqueue:

	./configure cflags="$(pkg-config --cflags libkqueue)" \
		LIBS="$(pkg-config --libs libkqueue)"

./configure gains a new --with-iom=(select|epoll|kqueue) switch

With libkqueue installed, I can test all 3 iom implementations on
Debian 8.x

Test script I used to download a file from my server:
----8<---
require 'net/http'
require 'uri'
require 'digest/sha1'
require 'fiber'

url = 'http://80x24.org/git-i-forgot-to-pack/objects/pack/pack-97b25a76c03b489d4cbbd85b12d0e1ad28717e55.idx'

uri = URI(url)
use_ssl = "https" == uri.scheme
fibs = 10.times.map do
  Fiber.start do
    cur = Fiber.current.object_id
    # XXX getaddrinfo() and connect() are blocking
    # XXX resolv/replace + connect_nonblock
    Net::HTTP.start(uri.host, uri.port, use_ssl: use_ssl) do |http|
      req = Net::HTTP::Get.new(uri)
      http.request(req) do |res|
    dig = Digest::SHA1.new
    res.read_body do |buf|
      dig.update(buf)
      #warn "#{cur} #{buf.bytesize}\n"
    end
    warn "#{cur} #{dig.hexdigest}\n"
      end
    end
    warn "done\n"
    :done
  end
end

warn "joining #{Time.now}\n"
fibs[-1].join(4)
warn "joined #{Time.now}\n"
all = fibs.dup

warn "1 joined, wait for the rest\n"
until fibs.empty?
  fibs.each(&:join)
  fibs.keep_if(&:alive?)
  warn fibs.inspect
end

p all.map(&:value)

Fiber.new do
  puts 'HI'
end.run.join
__END__
---
 common.mk                    |   7 +
 configure.in                 |  32 ++++
 cont.c                       | 156 ++++++++++++++-
 include/ruby/io.h            |   2 +
 iom.h                        |  91 +++++++++
 iom_common.h                 |  90 +++++++++
 iom_epoll.h                  | 392 ++++++++++++++++++++++++++++++++++++++
 iom_internal.h               | 153 +++++++++++++++
 iom_kqueue.h                 | 440 +++++++++++++++++++++++++++++++++++++++++++
 iom_pingable_common.h        |  55 ++++++
 iom_select.h                 | 341 +++++++++++++++++++++++++++++++++
 prelude.rb                   |  12 ++
 test/lib/leakchecker.rb      |   9 +
 test/ruby/test_fiber_auto.rb | 126 +++++++++++++
 thread.c                     |  42 +++++
 thread_pthread.c             |   5 +
 vm.c                         |   9 +
 vm_core.h                    |   4 +
 18 files changed, 1961 insertions(+), 5 deletions(-)
 create mode 100644 iom.h
 create mode 100644 iom_common.h
 create mode 100644 iom_epoll.h
 create mode 100644 iom_internal.h
 create mode 100644 iom_kqueue.h
 create mode 100644 iom_pingable_common.h
 create mode 100644 iom_select.h
 create mode 100644 test/ruby/test_fiber_auto.rb

diff --git a/common.mk b/common.mk
index 654533ce8d..2a7c3866cf 100644
--- a/common.mk
+++ b/common.mk
@@ -2598,6 +2598,13 @@ thread.$(OBJEXT): {$(VPATH)}id.h
 thread.$(OBJEXT): {$(VPATH)}intern.h
 thread.$(OBJEXT): {$(VPATH)}internal.h
 thread.$(OBJEXT): {$(VPATH)}io.h
+thread.$(OBJEXT): {$(VPATH)}iom.h
+thread.$(OBJEXT): {$(VPATH)}iom_internal.h
+thread.$(OBJEXT): {$(VPATH)}iom_common.h
+thread.$(OBJEXT): {$(VPATH)}iom_epoll.h
+thread.$(OBJEXT): {$(VPATH)}iom_kqueue.h
+thread.$(OBJEXT): {$(VPATH)}iom_pingable_common.h
+thread.$(OBJEXT): {$(VPATH)}iom_select.h
 thread.$(OBJEXT): {$(VPATH)}method.h
 thread.$(OBJEXT): {$(VPATH)}missing.h
 thread.$(OBJEXT): {$(VPATH)}node.h
diff --git a/configure.in b/configure.in
index 24035c7c1a..f6577a89dc 100644
--- a/configure.in
+++ b/configure.in
@@ -1389,6 +1389,8 @@ AC_CHECK_HEADERS(process.h)
 AC_CHECK_HEADERS(pwd.h)
 AC_CHECK_HEADERS(setjmpex.h)
 AC_CHECK_HEADERS(sys/attr.h)
+AC_CHECK_HEADERS(sys/epoll.h)
+AC_CHECK_HEADERS(sys/event.h)
 AC_CHECK_HEADERS(sys/fcntl.h)
 AC_CHECK_HEADERS(sys/file.h)
 AC_CHECK_HEADERS(sys/id.h)
@@ -2405,6 +2407,10 @@ AC_CHECK_FUNCS(dladdr)
 AC_CHECK_FUNCS(dup)
 AC_CHECK_FUNCS(dup3)
 AC_CHECK_FUNCS(eaccess)
+AC_CHECK_FUNCS(epoll_create)
+AC_CHECK_FUNCS(epoll_create1)
+AC_CHECK_FUNCS(epoll_ctl)
+AC_CHECK_FUNCS(epoll_wait)
 AC_CHECK_FUNCS(endgrent)
 AC_CHECK_FUNCS(fchmod)
 AC_CHECK_FUNCS(fchown)
@@ -2438,7 +2444,9 @@ AC_CHECK_FUNCS(initgroups)
 AC_CHECK_FUNCS(ioctl)
 AC_CHECK_FUNCS(isfinite)
 AC_CHECK_FUNCS(issetugid)
+AC_CHECK_FUNCS(kevent)
 AC_CHECK_FUNCS(killpg)
+AC_CHECK_FUNCS(kqueue)
 AC_CHECK_FUNCS(lchmod)
 AC_CHECK_FUNCS(lchown)
 AC_CHECK_FUNCS(link)
@@ -3590,6 +3598,29 @@ AC_ARG_WITH(valgrind,
 AS_IF([test x$with_valgrind != xno],
         [AC_CHECK_HEADERS(valgrind/memcheck.h)])
 
+AC_DEFINE_UNQUOTED(IOM_SELECT, 0)
+AC_DEFINE_UNQUOTED(IOM_KQUEUE, 1)
+AC_DEFINE_UNQUOTED(IOM_EPOLL, 2)
+
+iom_default=select
+AS_CASE([$ac_cv_func_kqueue:$ac_cv_func_kevent:$ac_cv_header_sys_event_h],
+[yes:yes:yes], [iom_default=kqueue],
+[*],
+  [AS_CASE(
+    [$ac_cv_func_epoll_wait:$ac_cv_func_epoll_create:$ac_cv_header_sys_epoll_h],
+    [yes:yes:yes], [iom_default=epoll])]
+)
+
+AC_ARG_WITH(iom,
+  AS_HELP_STRING([--with-iom=XXXXX],
+		 [I/O manager (select|kqueue|epoll)]),
+  [with_iom="$withval"], [with_iom="$iom_default"])
+AS_CASE(["$with_iom"],
+  [select], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_SELECT)],
+  [kqueue], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_KQUEUE)],
+  [epoll], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_EPOLL)],
+  [AC_MSG_ERROR(unknown I/O manager: $with_iom)])
+
 dln_a_out_works=no
 if test "$ac_cv_header_a_out_h" = yes; then
   if test "$with_dln_a_out" = yes || test "$rb_cv_dlopen" = unknown; then
@@ -4745,5 +4776,6 @@ config_summary "install doc"         "$install_doc"
 config_summary "man page type"       "$MANTYPE"
 config_summary "search path"         "$search_path"
 config_summary "static-linked-ext"   ${EXTSTATIC:+"yes"}
+config_summary "I/O manager"         ${with_iom}
 echo ""
 echo "---"
diff --git a/cont.c b/cont.c
index 4d6176f00c..ced496f61b 100644
--- a/cont.c
+++ b/cont.c
@@ -13,6 +13,7 @@
 #include "vm_core.h"
 #include "gc.h"
 #include "eval_intern.h"
+#include "iom.h"
 
 /* FIBER_USE_NATIVE enables Fiber performance improvement using system
  * dependent method such as make/setcontext on POSIX system or
@@ -126,6 +127,14 @@ static machine_stack_cache_t terminated_machine_stack;
 struct rb_fiber_struct {
     rb_context_t cont;
     struct rb_fiber_struct *prev;
+
+    /*
+     * afnode.next == 0          auto fiber disabled
+     * afnode.next == &afnode    auto fiber enabled, but not enqueued (running)
+     * afnode.next != &afnode    auto fiber runnable (enqueued in th->afhead)
+     */
+    struct list_node afnode;
+
     enum fiber_status status;
     /* If a fiber invokes "transfer",
      * then this fiber can't "resume" any more after that.
@@ -1496,19 +1505,24 @@ rb_fiber_terminate(rb_fiber_t *fib)
     fiber_switch(return_fiber(), 1, &value, 0);
 }
 
-VALUE
-rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
+static void
+fiber_check_resume(const rb_fiber_t *fib)
 {
-    rb_fiber_t *fib;
-    GetFiberPtr(fibval, fib);
-
     if (fib->prev != 0 || fib->cont.type == ROOT_FIBER_CONTEXT) {
 	rb_raise(rb_eFiberError, "double resume");
     }
     if (fib->transferred != 0) {
 	rb_raise(rb_eFiberError, "cannot resume transferred Fiber");
     }
+}
 
+VALUE
+rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
+{
+    rb_fiber_t *fib;
+    GetFiberPtr(fibval, fib);
+
+    fiber_check_resume(fib);
     return fiber_switch(fib, argc, argv, 1);
 }
 
@@ -1651,7 +1665,136 @@ rb_fiber_s_current(VALUE klass)
     return rb_fiber_current();
 }
 
+/* enqueue given Fiber so it may be auto-resumed */
+void
+rb_fiber_auto_enqueue(VALUE fibval)
+{
+    rb_fiber_t *fib;
+    rb_thread_t *th;
+
+    GetFiberPtr(fibval, fib);
+    GetThreadPtr(fib->cont.saved_thread.self, th);
+    list_add_tail(&th->afhead, &fib->afnode);
+}
+
+/* for vm.c::rb_thread_mark */
+void
+rb_fiber_auto_schedule_mark(const rb_thread_t *th)
+{
+    rb_fiber_t *fib = 0;
+    list_for_each(&th->afhead, fib, afnode) {
+	rb_gc_mark(fib->cont.self);
+    }
+}
+
+/* Returns true if auto-fiber is enabled for current fiber */
+int
+rb_fiber_auto_sched_p(const rb_thread_t *th)
+{
+    const rb_fiber_t *cur = th->fiber;
+
+    return (cur && cur->afnode.next && th->root_fiber != cur);
+}
+
+/*
+ * Resumes any ready fibers in the auto-Fiber run queue
+ * returns true if yielding current Fiber is needed, false if not
+ */
+int
+rb_fiber_auto_do_yield_p(rb_thread_t *th)
+{
+    rb_fiber_t *current_auto = rb_fiber_auto_sched_p(th) ? th->fiber : 0;
+    rb_fiber_t *fib = 0, *next = 0;
+    LIST_HEAD(tmp);
+
+    /*
+     * do not infinite loop as new fibers get added to
+     * th->afhead, only work off a temporary list:
+     */
+    list_append_list(&tmp, &th->afhead);
+    list_for_each_safe(&tmp, fib, next, afnode) {
+	if (fib == current_auto || (fib->prev && fib != th->root_fiber)) {
+	    /* tell the caller to yield */
+	    list_prepend_list(&th->afhead, &tmp);
+	    return 1;
+	}
+	list_del_init(&fib->afnode);
+	fiber_check_resume(fib);
+	fiber_switch(fib, 0, 0, 1);
+    }
+    return 0;
+}
+
+/*
+ * Enable auto-scheduling for the Fiber and resume it
+ */
+static VALUE
+rb_fiber_auto_start(int argc, VALUE *argv, VALUE self)
+{
+    rb_fiber_t *fib;
+    GetFiberPtr(self, fib);
+    if (fib->afnode.next) {
+	rb_raise(rb_eFiberError, "Fiber already started");
+    }
+    list_node_init(&fib->afnode);
+    fiber_check_resume(fib);
+    return fiber_switch(fib, argc, argv, 1);
+}
+
+static void
+fiber_auto_join(rb_fiber_t *fib, double *timeout)
+{
+    rb_thread_t *th = GET_THREAD();
+    rb_fiber_t *cur = fiber_current();
 
+    if (cur == fib) {
+	rb_raise(rb_eFiberError, "Target fiber must not be current fiber");
+    }
+    if (th->root_fiber == fib) {
+	rb_raise(rb_eFiberError, "Target fiber must not be root fiber");
+    }
+    if (fib->cont.saved_thread.self != th->self) {
+	rb_raise(rb_eFiberError, "Target fiber not owned by current thread");
+    }
+    if (!fib->afnode.next) {
+	rb_raise(rb_eFiberError, "Target fiber is not an auto-fiber");
+    }
+
+    while (fib->status != TERMINATED && (timeout == 0 || *timeout >= 0.0)) {
+	rb_iom_schedule(th, timeout);
+    }
+}
+
+static VALUE
+rb_fiber_auto_join(int argc, VALUE *argv, VALUE self)
+{
+    rb_fiber_t *fib;
+    double timeout, *t;
+    VALUE limit;
+
+    GetFiberPtr(self, fib);
+    rb_scan_args(argc, argv, "01", &limit);
+
+    if (NIL_P(limit)) {
+	t = 0;
+    } else {
+	timeout = rb_num2dbl(limit);
+	t = &timeout;
+    }
+
+    fiber_auto_join(fib, t);
+    return fib->status == TERMINATED ? fib->cont.self : Qnil;
+}
+
+static VALUE
+rb_fiber_auto_value(VALUE self)
+{
+    rb_fiber_t *fib;
+    GetFiberPtr(self, fib);
+
+    fiber_auto_join(fib, 0);
+    return fib->cont.value;
+}
 
 /*
  *  Document-class: FiberError
@@ -1688,6 +1831,9 @@ Init_Cont(void)
     rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1);
     rb_define_method(rb_cFiber, "initialize", rb_fiber_init, 0);
     rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1);
+    rb_define_method(rb_cFiber, "start", rb_fiber_auto_start, -1);
+    rb_define_method(rb_cFiber, "join", rb_fiber_auto_join, -1);
+    rb_define_method(rb_cFiber, "value", rb_fiber_auto_value, 0);
 }
 
 RUBY_SYMBOL_EXPORT_BEGIN
diff --git a/include/ruby/io.h b/include/ruby/io.h
index 60d6f6d32e..3bd6f06cf3 100644
--- a/include/ruby/io.h
+++ b/include/ruby/io.h
@@ -116,6 +116,8 @@ typedef struct rb_io_t {
 /* #define FMODE_UNIX                  0x00200000 */
 /* #define FMODE_INET                  0x00400000 */
 /* #define FMODE_INET6                 0x00800000 */
+/* #define FMODE_IOM_PRIVATE1          0x01000000 */ /* OS-dependent */
+/* #define FMODE_IOM_PRIVATE2          0x02000000 */ /* OS-dependent */
 
 #define GetOpenFile(obj,fp) rb_io_check_closed((fp) = RFILE(rb_io_taint_check(obj))->fptr)
 
diff --git a/iom.h b/iom.h
new file mode 100644
index 0000000000..d412885baa
--- /dev/null
+++ b/iom.h
@@ -0,0 +1,91 @@
+/*
+ * iom -> I/O Manager for RubyVM (auto-Fiber-aware)
+ *
+ * On platforms with epoll or kqueue, this should be ready for multicore;
+ * even if the rest of the RubyVM is not.
+ *
+ * Some inspiration taken from Mio in GHC:
+ * http://haskell.cs.yale.edu/wp-content/uploads/2013/08/hask035-voellmy.pdf
+ */
+#ifndef RUBY_IOM_H
+#define RUBY_IOM_H
+#include "ruby.h"
+#include "ruby/io.h"
+#include "ruby/intern.h"
+#include "vm_core.h"
+
+typedef struct rb_iom_struct rb_iom_t;
+
+/* WARNING: unstable API, only for Ruby internal use */
+
+/*
+ * Note: the first "rb_thread_t *" is a placeholder and may be replaced
+ * with "rb_execution_context_t *" in the future.
+ */
+
+/*
+ * All functions with "wait" in it take an optional double * +timeout+
+ * argument specifying the timeout in seconds.  If NULL, it can wait
+ * forever until the event happens (or the fiber is explicitly resumed).
+ *
+ * (maybe) TODO: If non-NULL, the timeout will be updated to the
+ * remaining time upon returning.  Not sure if useful, could just be
+ * a a waste of cycles; so not implemented, yet.
+ */
+
+/*
+ * Relinquish calling fiber while waiting for +events+ on the given
+ * +rb_io_t+
+ *
+ * Multiple native threads can enter this function at the same time.
+ *
+ * Events are RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI
+ *
+ * Returns a mask of events.
+ */
+
+int rb_iom_waitio(rb_thread_t *, rb_io_t *, int events, double *timeout);
+
+/*
+ * Identical to rb_iom_waitio, but takes a pointer to an integer file
+ * descriptor, instead of rb_io_t.  Use rb_iom_waitio when possible,
+ * since it allows us to optimize epoll (and perhaps avoid kqueue
+ * portability bugs across different *BSDs).
+ */
+int rb_iom_waitfd(rb_thread_t *, int *fdp, int events, double *timeout);
+
+/*
+ * Relinquish calling fiber to wait for the given PID to change status.
+ * Multiple native threads can enter this function at the same time.
+ * If timeout is negative, wait forever.
+ */
+rb_pid_t rb_iom_waitpid(rb_thread_t *,
+			rb_pid_t, int *status, int options, double *timeout);
+
+/*
+ * Relinquish calling fiber for at least the duration of given timeout
+ * in seconds.  If timeout is negative, wait forever (until explicitly
+ * resumed).
+ * Multiple native threads can enter this function at the same time.
+ */
+void rb_iom_sleep(rb_thread_t *, double *timeout);
+
+/* callback for SIGCHLD, needed to implemented for rb_iom_waitpid */
+void rb_iom_sigchld(rb_vm_t *);
+
+/*
+ * there is no public create function, creation is lazy to avoid incurring
+ * overhead for small scripts which do not need fibers, we only need this
+ * at VM destruction
+ */
+void rb_iom_destroy(rb_vm_t *);
+
+/*
+ * schedule
+ */
+void rb_iom_schedule(rb_thread_t *th, double *timeout);
+
+/* cont.c */
+int rb_fiber_auto_sched_p(const rb_thread_t *);
+
+#endif /* RUBY_IOM_H */
diff --git a/iom_common.h b/iom_common.h
new file mode 100644
index 0000000000..c9e7f716d8
--- /dev/null
+++ b/iom_common.h
@@ -0,0 +1,90 @@
+
+/* we lazily create this, small scripts may never need iom */
+static rb_iom_t *
+iom_new(rb_thread_t *th)
+{
+    rb_iom_t *iom = ALLOC(rb_iom_t);
+    rb_iom_init(iom);
+    return iom;
+}
+
+static rb_iom_t *
+rb_iom_get(rb_thread_t *th)
+{
+    VM_ASSERT(th && th->vm);
+    if (!th->vm->iom) {
+	th->vm->iom = iom_new(th);
+    }
+    return th->vm->iom;
+}
+
+
+/* included by iom_(epoll|select|kqueue).h */
+#ifdef HAVE_SYS_TYPES_H
+#  include <sys/types.h>
+#endif
+#ifdef HAVE_SYS_WAIT_H
+#  include <sys/wait.h>
+#endif
+#if defined(WNOHANG) && WNOHANG != 0 && \
+    (defined(HAVE_WAITPID) || defined(HAVE_WAIT4))
+
+static VALUE
+iom_schedule_pid(VALUE ptr)
+{
+    struct rb_iom_pid_waiter *pw  = (struct rb_iom_pid_waiter *)ptr;
+    rb_thread_t *th = pw->th;
+
+    rb_fiber_auto_do_yield_p(th);
+    return rb_fiber_yield(0, 0);
+}
+
+static rb_iom_t *rb_iom_get(rb_thread_t *);
+
+rb_pid_t
+rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
+		double *timeout)
+{
+    rb_iom_t *iom = rb_iom_get(th);
+    struct rb_iom_pid_waiter pw;
+
+    VM_ASSERT((options & WNOHANG) == 0 &&
+		"WNOHANG should be handled in rb_waitpid");
+    pw.th = th;
+    pw.pid = pid;
+    pw.options = options;
+    pw.errnum = 0;
+    rb_iom_timer_add(&iom->timers, &pw.w.timer, timeout, IOM_FIB|IOM_WAIT);
+
+    /* LIFO, to match Linux wait4() blocking behavior */
+    list_add(&iom->pids, &pw.w.wnode);
+    rb_ensure(iom_schedule_pid, (VALUE)&pw, rb_iom_waiter_done, (VALUE)&pw.w);
+
+    *status = pw.status;
+    rb_last_status_set(pw.status, pw.pid);
+    return pw.pid;
+}
+
+void
+rb_iom_sigchld(rb_vm_t *vm)
+{
+    rb_iom_t *iom = vm->iom;
+    if (iom) {
+	struct rb_iom_pid_waiter *pw = 0, *next = 0;
+
+	list_for_each_safe(&iom->pids, pw, next, w.wnode) {
+	    pid_t r = rb_waitpid(pw->pid, &pw->status, pw->options | WNOHANG);
+
+	    if (r < 0) {
+		pw->errnum = errno;
+		if (pw->errnum == ECHILD) {
+		    continue;
+		}
+	    }
+	    pw->pid = r;
+	    rb_iom_waiter_ready(&pw->w);
+	}
+    }
+}
+
+#endif /* defined(WNOHANG) && (defined(HAVE_WAITPID) || defined(HAVE_WAIT4)) */
diff --git a/iom_epoll.h b/iom_epoll.h
new file mode 100644
index 0000000000..aa1ec087fa
--- /dev/null
+++ b/iom_epoll.h
@@ -0,0 +1,392 @@
+/*
+ * Linux-only epoll-based implementation of I/O Manager for RubyVM
+ *
+ * Notes:
+ *
+ * TODO: epoll_wait only has millisecond resolution; if we need higher
+ * resolution we can use timerfd or ppoll on the epoll_fd itself.
+ *
+ * Inside the Linux kernel, select/poll/ppoll/epoll_wait all use the
+ * same notification callback (struct file_operations)->poll.
+ * Unlike with kqueue across different *BSDs; we do not need to worry
+ * about inconsistencies between these syscalls.
+ *
+ * See also notes in iom_kqueue.h
+ */
+#include "iom_internal.h"
+#include <sys/epoll.h>
+#include <math.h> /* round() */
+#define FMODE_IOM_ADDED           FMODE_IOM_PRIVATE1
+
+/* allocated on heap (rb_vm_t.iom) */
+struct rb_iom_struct {
+    /*
+     * Everything here is protected by GVL at this time,
+     * URCU lists (LGPL-2.1+) may be used in the future
+     */
+    struct list_head epws; /* -epw.w.wnode, order agnostic */
+    struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
+    struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
+
+    int epoll_fd;
+    int maxevents; /* auto-increases */
+
+    /*
+     * Any number of threads may use epoll_wait concurrently on
+     * the same epoll_fd, but we only allow one thread to use
+     * a non-zero timeout; since it is possible for a long sleeper
+     * to miss fibers queued by other threads for the sleeping thread.
+     */
+    rb_thread_t *waiter;
+};
+
+/* allocated on stack */
+struct epw {
+    struct rb_iom_waiter w;
+    union {
+	rb_thread_t *th;
+	short revents;
+    } as;
+    int *fdp;
+    int *flags; /* &fptr->mode */
+    struct epoll_event ev;
+};
+
+static void
+increase_maxevents(rb_iom_t *iom, int retries)
+{
+    /* 1024 is the RUBY_ALLOCV_LIMIT on such systems */
+    const int max_alloca = 1024 / sizeof(struct epoll_event);
+    const int max = max_alloca * 2;
+
+    if (retries) {
+	iom->maxevents *= retries;
+	if (iom->maxevents > max || iom->maxevents <= 0) {
+	    iom->maxevents = max;
+	}
+    }
+}
+
+static int
+double2msec(double sec)
+{
+    /*
+     * clamp timeout to workaround a Linux <= 2.6.37 bug,
+     * see epoll_wait(2) manpage
+     */
+    const int max_msec = 35 * 60 * 1000; /* floor(35.79 minutes) */
+    double msec = round(sec * 1000);
+
+    if (msec < (double)max_msec) {
+	int ret = (int)msec;
+	return ret < 0 ? 0 : ret;
+    }
+    return max_msec;
+}
+
+/* we can avoid branches when mapping RB_WAIT_* bits to EPOLL* bits */
+STATIC_ASSERT(epbits_matches_waitfd_bits,
+    RB_WAITFD_IN == EPOLLIN && RB_WAITFD_OUT == EPOLLOUT &&
+    RB_WAITFD_PRI == EPOLLPRI);
+
+/* what goes into epoll_ctl */
+static int
+rb_events2ep(int events)
+{
+    return EPOLLONESHOT | events;
+}
+
+/* what comes out of epoll_wait */
+static short
+rb_ep2revents(int revents)
+{
+    return (short)(revents & (EPOLLIN|EPOLLOUT|EPOLLPRI));
+}
+
+/* lazilly create epoll FD, since not everybody waits on I/O */
+static int
+iom_epfd(rb_iom_t *iom)
+{
+    if (iom->epoll_fd < 0) {
+#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
+	iom->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+	if (iom->epoll_fd < 0) {
+	    int err = errno;
+	    if (rb_gc_for_fd(err)) {
+		iom->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+		if (iom->epoll_fd < 0) {
+		    rb_sys_fail("epoll_create1");
+		}
+	    }
+	    else if (err != ENOSYS) {
+		rb_syserr_fail(err, "epoll_create1");
+	    }
+	    else { /* libc >= kernel || build-env > run-env */
+#endif /* HAVE_EPOLL_CREATE1 */
+		iom->epoll_fd = epoll_create(1);
+		if (iom->epoll_fd < 0) {
+		    if (rb_gc_for_fd(errno)) {
+			iom->epoll_fd = epoll_create(1);
+		    }
+		}
+		if (iom->epoll_fd < 0) {
+		    rb_sys_fail("epoll_create");
+		}
+		rb_maygvl_fd_fix_cloexec(iom->epoll_fd);
+#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
+	    }
+	}
+#endif /* HAVE_EPOLL_CREATE1 */
+	rb_update_max_fd(iom->epoll_fd);
+    }
+    return iom->epoll_fd;
+}
+
+static void
+rb_iom_init(rb_iom_t *iom)
+{
+    list_head_init(&iom->timers);
+    list_head_init(&iom->epws);
+    list_head_init(&iom->pids);
+    iom->waiter = 0;
+    iom->maxevents = 8;
+    iom->epoll_fd = -1;
+}
+
+static int
+next_timeout(rb_iom_t *iom)
+{
+    struct rb_iom_timer *t;
+    t = list_top(&iom->timers, struct rb_iom_timer, tnode);
+
+    if (t) {
+	double diff = t->expires_at - timeofday();
+	return double2msec(diff);
+    }
+    else {
+	return -1; /* forever */
+    }
+}
+
+static void
+check_epoll_wait(rb_thread_t *th, int nr, struct epoll_event *ev)
+{
+    if (nr >= 0) {
+	int i;
+
+	for (i = 0; i < nr; i++) {
+	    struct epw *epw = ev[i].data.ptr;
+
+	    epw->as.revents = rb_ep2revents(ev[i].events);
+	    rb_iom_waiter_ready(&epw->w);
+	}
+
+	/* notify the waiter thread in case we enqueued fibers for them */
+	if (nr > 0 && th->vm->iom->waiter) {
+	    ubf_select(th->vm->iom->waiter);
+	    rb_thread_schedule();
+	}
+    }
+    else {
+	int err = errno;
+	if (err != EINTR) {
+	    rb_syserr_fail(err, "epoll_wait");
+	}
+    }
+    rb_iom_timer_check(&th->vm->iom->timers);
+    RUBY_VM_CHECK_INTS_BLOCKING(th);
+}
+
+/* perform a non-blocking epoll_wait while holding GVL */
+static void
+ping_events(rb_thread_t *th)
+{
+    rb_iom_t *iom = th->vm->iom;
+    int epfd = iom->epoll_fd;
+
+    if (epfd >= 0) {
+	VALUE v;
+	int nr;
+	int maxevents = iom->maxevents;
+	struct epoll_event *ev = ALLOCV_N(struct epoll_event, v, maxevents);
+	int retries = 0;
+
+	do {
+	    nr = epoll_wait(epfd, ev, maxevents, 0);
+	    check_epoll_wait(th, nr, ev);
+	} while (nr == maxevents && ++retries);
+	if (v) {
+	    ALLOCV_END(v);
+	}
+	increase_maxevents(iom, retries);
+    }
+}
+
+/* for iom_pingable_common.h */
+static void
+rb_iom_do_wait(rb_thread_t *th, rb_iom_t *iom)
+{
+    int maxevents = iom->maxevents;
+    int nr = maxevents;
+
+    if (!iom->waiter) {
+	int msec = next_timeout(iom);
+
+	if (list_empty(&iom->epws)) {
+	    if (msec == 0) {
+		rb_thread_schedule();
+	    }
+	    else {
+		struct timeval *tvp = 0;
+		struct timeval tv;
+		if (msec > 0) {
+		    tv.tv_sec = msec / 1000;
+		    tv.tv_usec = (msec % 1000) * 1000;
+		    tvp = &tv;
+		}
+		native_sleep(th, tvp);
+		RUBY_VM_CHECK_INTS_BLOCKING(th);
+	    }
+	}
+	else {
+	    VALUE v;
+	    struct epoll_event *ev = ALLOCV_N(struct epoll_event, v, maxevents);
+	    int epfd = iom->epoll_fd;
+
+	    VM_ASSERT(epfd >= 0);
+	    iom->waiter = th;
+	    BLOCKING_REGION({
+		nr = epoll_wait(epfd, ev, maxevents, msec);
+	    }, ubf_select, th, FALSE);
+	    iom->waiter = 0;
+	    check_epoll_wait(th, nr, ev);
+	    if (v) {
+		ALLOCV_END(v);
+	    }
+	}
+    }
+    if (nr == maxevents) { /* || msec == 0 */
+	ping_events(th);
+    }
+}
+
+static VALUE
+epmod_yield(VALUE ptr)
+{
+    struct epw *epw = (struct epw *)ptr;
+    rb_thread_t *th = epw->as.th;
+    rb_iom_t *iom = rb_iom_get(th);
+    int e;
+    int fd = *epw->fdp;
+    int epfd = iom_epfd(iom);
+
+    /*
+     * if we did not have GVL, revents may be set immediately
+     * upon epoll_ctl by another thread running epoll_wait,
+     * so we must initialize it before epoll_ctl:
+     */
+    epw->as.revents = 0;
+
+    /* we want to track if an FD is already being watched ourselves */
+    if (epw->flags) {
+	if (*epw->flags & FMODE_IOM_ADDED) {
+	    e = epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epw->ev);
+	}
+	else {
+	    e = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epw->ev);
+	    if (e == 0) {
+		*epw->flags |= FMODE_IOM_ADDED;
+	    }
+	    else if (e < 0 && errno == EEXIST) {
+		/*
+		 * It's possible to get EEXIST if fptrs point to the same FD:
+		 *   f1 = Fiber.start { io1.read(1) }
+		 *   io2 = IO.for_fd(io1.fileno)
+		 *   f2 = Fiber.start { io2.read(1) }
+		 */
+		*epw->flags |= FMODE_IOM_ADDED;
+		e = epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epw->ev);
+	    }
+	}
+    }
+    else { /* don't know if added or not, fall back to addiing on ENOENT */
+	e = epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epw->ev);
+	if (e < 0 && errno == ENOENT) {
+	    e = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epw->ev);
+	}
+    }
+    if (e < 0) {
+	rb_sys_fail("epoll_ctl");
+    }
+
+    ping_events(th);
+    (void)rb_fiber_auto_do_yield_p(th);
+    return rb_fiber_yield(0, 0);
+}
+
+static int
+iom_waitfd(rb_thread_t *th, int *fdp, int *flags, int events, double *timeout)
+{
+    rb_iom_t *iom = rb_iom_get(th);
+    struct epw epw;
+
+    epw.as.th = th;
+    epw.fdp = fdp;
+    epw.flags = flags;
+    epw.ev.events = rb_events2ep(events);
+    epw.ev.data.ptr = &epw;
+
+    list_add(&iom->epws, &epw.w.wnode);
+    rb_iom_timer_add(&iom->timers, &epw.w.timer, timeout, IOM_FIB|IOM_WAIT);
+    rb_ensure(epmod_yield, (VALUE)&epw, rb_iom_waiter_done, (VALUE)&epw.w);
+
+    return (int)epw.as.revents; /* may be zero if timed out */
+}
+
+int
+rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, double *timeout)
+{
+    return iom_waitfd(th, &fptr->fd, &fptr->mode, events, timeout);
+}
+
+int
+rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
+{
+    return iom_waitfd(th, fdp, 0, events, timeout);
+}
+
+void
+rb_iom_destroy(rb_vm_t *vm)
+{
+    rb_iom_t *iom = vm->iom;
+    vm->iom = 0;
+    if (iom) {
+	/*
+	 * it's confusing to share epoll FDs across processes
+	 * (kqueue has a rather unique close-on-fork behavior)
+	 */
+	if (iom->epoll_fd >= 0) {
+	    close(iom->epoll_fd);
+	}
+	xfree(iom);
+    }
+}
+
+/* used by thread.c::rb_thread_atfork */
+static void
+rb_iom_atfork_child(rb_thread_t *th)
+{
+    rb_iom_destroy(th->vm);
+}
+
+/* used by thread_pthread.c */
+static int
+rb_iom_reserved_fd(int fd)
+{
+    rb_iom_t *iom = GET_VM()->iom;
+
+    return iom && fd == iom->epoll_fd;
+}
+
+#include "iom_pingable_common.h"
+#include "iom_common.h"
diff --git a/iom_internal.h b/iom_internal.h
new file mode 100644
index 0000000000..b5aa321274
--- /dev/null
+++ b/iom_internal.h
@@ -0,0 +1,153 @@
+#ifndef RB_IOM_COMMON_H
+#define RB_IOM_COMMON_H
+
+#include "internal.h"
+#include "iom.h"
+
+/* cont.c */
+void rb_fiber_auto_enqueue(VALUE fibval);
+int rb_fiber_auto_do_yield_p(rb_thread_t *);
+
+#define FMODE_IOM_PRIVATE1          0x01000000
+#define FMODE_IOM_PRIVATE2          0x02000000
+
+#define IOM_FIBMASK ((VALUE)0x1)
+#define IOM_FIB     (0x2)
+#define IOM_WAIT    (0x1) /* container_of(..., struct rb_iom_waiter, timer) */
+
+/* allocated on stack */
+struct rb_iom_timer {
+    struct list_node tnode; /* <=> rb_iom_struct.timers */
+    double expires_at; /* absolute monotonic time */
+    VALUE _fibval;
+};
+
+struct rb_iom_waiter {
+    struct rb_iom_timer timer;
+    struct list_node wnode; /* <=> rb_iom_struct.(fds|pids) */
+};
+
+struct rb_iom_fd_waiter {
+    struct rb_iom_waiter w;
+    int *fdp;
+    short events;
+    short revents;
+};
+
+struct rb_iom_pid_waiter {
+    struct rb_iom_waiter w;
+    rb_thread_t *th;
+    rb_pid_t pid;
+    int status;
+    int options;
+    int errnum;
+};
+
+static VALUE
+rb_iom_timer_fibval(const struct rb_iom_timer *t)
+{
+    return t->_fibval & ~IOM_FIBMASK;
+}
+
+static struct rb_iom_waiter *
+rb_iom_waiter_of(struct rb_iom_timer *t)
+{
+    if (t->_fibval & IOM_FIBMASK) {
+	return 0;
+    }
+    return container_of(t, struct rb_iom_waiter, timer);
+}
+
+/* check for expired timers */
+static void
+rb_iom_timer_check(struct list_head *timers)
+{
+    struct rb_iom_timer *t = 0, *next = 0;
+    double now = timeofday();
+
+    list_for_each_safe(timers, t, next, tnode) {
+	if (t->expires_at <= now) {
+	    struct rb_iom_waiter *w = rb_iom_waiter_of(t);
+	    VALUE fibval = rb_iom_timer_fibval(t);
+
+	    list_del_init(&t->tnode);
+	    if (w) {
+		list_del_init(&w->wnode);
+	    }
+	    t->_fibval = Qfalse;
+
+	    /* non-auto-fibers may set timer in rb_iom_schedule */
+	    if (fibval != Qfalse) {
+		rb_fiber_auto_enqueue(fibval);
+	    }
+	}
+	return; /* done, timers is a sorted list */
+    }
+}
+
+/* insert a new +timer+ into +timers+, maintain sort order by expires_at */
+static void
+rb_iom_timer_add(struct list_head *timers, struct rb_iom_timer *add,
+		const double *timeout, int flags)
+{
+    rb_iom_timer_check(timers);
+
+    add->_fibval = flags & IOM_FIB ? rb_fiber_current() : Qfalse;
+    add->_fibval |= flags & IOM_WAIT ? 0 : IOM_FIBMASK;
+
+    if (timeout) {
+	struct rb_iom_timer *i = 0;
+	add->expires_at = timeofday() + *timeout;
+
+	/*
+	 * search backwards: assume typical projects have multiple objects
+	 * sharing the same timeout values, so new timers will expire later
+	 * than existing timers
+	 */
+	list_for_each_rev(timers, i, tnode) {
+	    if (add->expires_at > i->expires_at) {
+		list_add_after(timers, &i->tnode, &add->tnode);
+		return;
+	    }
+	}
+	list_add(timers, &add->tnode);
+    }
+    else {
+	/* not active, just allow list_del to function in cleanup */
+	list_node_init(&add->tnode);
+    }
+}
+
+static VALUE
+rb_iom_timer_done(VALUE ptr)
+{
+    struct rb_iom_timer *t = (struct rb_iom_timer *)ptr;
+    list_del(&t->tnode);
+    return Qfalse;
+}
+
+static void
+rb_iom_waiter_ready(struct rb_iom_waiter *w)
+{
+    VALUE fibval = rb_iom_timer_fibval(&w->timer);
+
+    w->timer._fibval = Qfalse;
+    list_del_init(&w->timer.tnode);
+    list_del_init(&w->wnode);
+    if (fibval != Qfalse) {
+	rb_fiber_auto_enqueue(fibval);
+    }
+}
+
+static VALUE
+rb_iom_waiter_done(VALUE ptr)
+{
+    struct rb_iom_waiter *w = (struct rb_iom_waiter *)ptr;
+    list_del(&w->timer.tnode);
+    list_del(&w->wnode);
+    return Qfalse;
+}
+
+static rb_iom_t *rb_iom_get(rb_thread_t *);
+
+#endif /* IOM_COMMON_H */
diff --git a/iom_kqueue.h b/iom_kqueue.h
new file mode 100644
index 0000000000..f0776b2f58
--- /dev/null
+++ b/iom_kqueue.h
@@ -0,0 +1,440 @@
+/*
+ * kqueue-based implementation of I/O Manager for RubyVM on *BSD
+ *
+ * The kevent API has an advantage over epoll_ctl+epoll_wait since
+ * it can simultaneously add filters and check for events with one
+ * syscall.  It also allows EV_ADD to be used idempotently for
+ * enabling filters, where as epoll_ctl requires separate ADD and
+ * MOD operations.
+ *
+ * These are advantages in the common case.
+ *
+ * The epoll API has advantages in more esoteric cases:
+ *
+ *   epoll has the advantage over kqueue when watching for multiple
+ *   events (POLLIN|POLLOUT|POLLPRI) (which is rare).  We also have
+ *   to install two kevent filters to watch POLLIN|POLLOUT simutaneously.
+ *
+ *   Finally, kevent does not support POLLPRI directly, we need to use
+ *   select() (or perhaps poll() on some platforms) with a zero
+ *   timeout to check for POLLPRI after EVFILT_READ returns.
+ *
+ * Finally, several *BSDs implement kqueue; and the quality of each
+ * implementation may vary.  Anecdotally, *BSDs are not known to even
+ * support poll() consistently across different types of files.
+ * We will need to selective and careful about injecting them into
+ * kevent().
+ */
+#include "iom_internal.h"
+#include <sys/types.h>
+#include <sys/event.h>
+#include <sys/time.h>
+
+/* allocated on heap (rb_vm_t.iom) */
+struct rb_iom_struct {
+    /*
+     * Everything here is protected by GVL at this time,
+     * URCU lists (LGPL-2.1+) may be used in the future
+     */
+    struct list_head kevs; /* -kev.fdw.w.wnode, order agnostic */
+    struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
+    struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
+
+    int kqueue_fd;
+    int nevents; /* auto-increases */
+
+    /*
+     * Any number of threads may use kevent concurrently on
+     * the same kqueue_fd, but we only allow one thread to use
+     * a non-zero timeout.  It is possible for a long sleeper
+     * to miss fibers queued by other threads for the sleeping thread.
+     */
+    rb_thread_t *waiter;
+};
+
+/* allocated on stack */
+struct kev {
+    struct rb_iom_fd_waiter fdw;
+    rb_thread_t *th; /* 0 - kev is disabled */
+};
+
+/*
+ * like our epoll implementation, we "ping" using kevent with zero-timeout
+ * and can do so on any thread.
+ */
+static const struct timespec zero;
+
+static void
+increase_nevents(rb_iom_t *iom, int retries)
+{
+    /* 1024 is the RUBY_ALLOCV_LIMIT on such systems */
+    const int max_alloca = 1024 / sizeof(struct kevent);
+    const int max = max_alloca * 2;
+
+    if (retries) {
+	iom->nevents *= retries;
+	if (iom->nevents > max || iom->nevents <= 0) {
+	    iom->nevents = max;
+	}
+    }
+}
+
+static int
+iom_kqfd(rb_iom_t *iom)
+{
+    if (iom->kqueue_fd >= 0) {
+	return iom->kqueue_fd;
+    }
+    iom->kqueue_fd = kqueue();
+    if (iom->kqueue_fd < 0) {
+	if (rb_gc_for_fd(errno)) {
+	    iom->kqueue_fd = kqueue();
+	}
+	if (iom->kqueue_fd < 0) {
+	    rb_sys_fail("kqueue");
+	}
+    }
+    rb_fd_fix_cloexec(iom->kqueue_fd);
+    return iom->kqueue_fd;
+}
+
+static void
+rb_iom_init(rb_iom_t *iom)
+{
+    list_head_init(&iom->timers);
+    list_head_init(&iom->kevs);
+    list_head_init(&iom->pids);
+    iom->waiter = 0;
+    iom->nevents = 8;
+    iom->kqueue_fd = -1;
+}
+
+static struct timespec *
+next_timeout(rb_iom_t *iom, struct timespec *ts)
+{
+    struct rb_iom_timer *t;
+    t = list_top(&iom->timers, struct rb_iom_timer, tnode);
+
+    if (t) {
+	double diff = t->expires_at - timeofday();
+
+	ts->tv_sec = (long)diff;
+	ts->tv_nsec = (long)((diff - (double)ts->tv_sec) * 1e9);
+	if (ts->tv_sec < 0) {
+	    ts->tv_sec = 0;
+	}
+	if (ts->tv_nsec < 0) {
+	    ts->tv_nsec = 0;
+	}
+	return ts;
+    }
+    return NULL; /* forever */
+}
+
+/*
+ * kevent does not have an EVFILT_* flag for (rarely-used) urgent data,
+ * at least not on FreeBSD 11.0, so we call select() separately after
+ * EVFILT_READ returns to set the RB_WAITFD_PRI bit:
+ */
+static void
+check_pri(rb_thread_t *th, int fd, struct kev *kev)
+{
+    rb_fdset_t efds;
+    int r;
+    struct timeval tv = { 0 };
+
+    rb_fd_init(&efds);
+    rb_fd_set(fd, &efds);
+    r = native_fd_select(fd + 1, 0, 0, &efds, &tv, th);
+    rb_fd_term(&efds);
+    /* TODO: use poll() if possible */
+
+    if (r > 0) {
+	kev->fdw.revents |= RB_WAITFD_PRI;
+    }
+}
+
+static void
+check_kevent(rb_thread_t *th, int nr, struct kevent *eventlist)
+{
+    int err = 0;
+    if (nr >= 0) {
+	int i;
+	struct kevent *changelist = eventlist;
+	int nchanges = 0;
+
+	for (i = 0; i < nr; i++) {
+	    struct kevent *ev = &eventlist[i];
+	    struct kev *kev = ev->udata;
+	    int fd = *kev->fdw.fdp;
+
+	    if (ev->filter == EVFILT_READ) {
+		int rd = kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI);
+		if (rd == RB_WAITFD_IN) { /* common */
+		    kev->fdw.revents |= rd;
+		}
+		else if (rd & RB_WAITFD_PRI) { /* ugh... */
+		    if (fd >= 0) {
+			check_pri(th, fd, kev);
+		    }
+		}
+		assert(rd && "got EVFILT_READ unexpectedly");
+
+		/* disable the other filter if we waited on both read+write */
+		if (fd >= 0 && (kev->fdw.events & RB_WAITFD_OUT) && kev->th) {
+		    struct kevent *chg = &changelist[nchanges++];
+
+		    kev->th = 0;
+		    EV_SET(chg, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
+		}
+	    }
+	    else if (ev->filter == EVFILT_WRITE) {
+		kev->fdw.revents |= RB_WAITFD_OUT;
+
+		assert(kev->fdw.events & RB_WAITFD_OUT &&
+			"unexpected EVFILT_WRITE");
+
+		/* disable the other filter if we waited on both read+write */
+		if (fd >= 0 &&
+			kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI) &&
+			kev->th) {
+		    struct kevent *chg = &changelist[nchanges++];
+
+		    kev->th = 0;
+		    EV_SET(chg, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
+		}
+	    }
+	    else {
+		rb_bug("unexpected filter: %d", ev->filter);
+	    }
+	    rb_iom_waiter_ready(&kev->fdw.w);
+	}
+
+	/*
+	 * kqueue is a little more complicated because we install
+	 * separate filters for simultaneously watching read and write
+	 * on the same FD, and now we must clear shared filters
+	 */
+	if (nchanges && kevent(th->vm->iom->kqueue_fd, changelist, nchanges,
+				0, 0, &zero) < 0) {
+	    err = errno;
+	}
+
+	/* notify the waiter thread in case we enqueued fibers for them */
+	if (nr > 0 && th->vm->iom->waiter) {
+	    ubf_select(th->vm->iom->waiter);
+	    rb_thread_schedule();
+	}
+    }
+    else {
+	err = errno;
+    }
+    if (err && err != EINTR) {
+	rb_syserr_fail(err, "kevent");
+    }
+    rb_iom_timer_check(&th->vm->iom->timers);
+    RUBY_VM_CHECK_INTS_BLOCKING(th);
+}
+
+/* perform a non-blocking kevent check while holding GVL */
+static void
+ping_events(rb_thread_t *th)
+{
+    rb_iom_t *iom = th->vm->iom;
+    int kqfd = iom->kqueue_fd;
+
+    if (kqfd >= 0) {
+	VALUE v;
+	int nr;
+	int nevents = iom->nevents;
+	struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents);
+	int retries = 0;
+
+	do {
+	    nr = kevent(kqfd, 0, 0, eventlist, nevents, &zero);
+	    check_kevent(th, nr, eventlist);
+	} while (nr == nevents && ++retries);
+	if (v) {
+	    ALLOCV_END(v);
+	}
+	increase_nevents(iom, retries);
+    }
+}
+
+/* for iom_pingable_common.h */
+static void
+rb_iom_do_wait(rb_thread_t *th, rb_iom_t *iom)
+{
+    int nevents = iom->nevents;
+    int nr = nevents;
+
+    if (!iom->waiter) {
+	struct timespec ts;
+	const struct timespec *tsp = next_timeout(iom, &ts);
+
+	if (list_empty(&iom->kevs)) {
+	    if (tsp) {
+		struct timeval tv;
+		tv.tv_sec = tsp->tv_sec;
+		tv.tv_usec = (int)(tsp->tv_nsec / 1000);
+		native_sleep(th, &tv);
+		RUBY_VM_CHECK_INTS_BLOCKING(th);
+	    }
+	    else {
+		rb_thread_schedule();
+	    }
+	}
+	else {
+	    VALUE v;
+	    struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents);
+	    int kqfd = iom->kqueue_fd;
+
+	    VM_ASSERT(kqfd >= 0);
+	    iom->waiter = th;
+	    BLOCKING_REGION({
+		nr = kevent(kqfd, 0, 0, eventlist, nevents, tsp);
+	    }, ubf_select, th, FALSE);
+	    iom->waiter = 0;
+	    check_kevent(th, nr, eventlist);
+	    if (v) {
+		ALLOCV_END(v);
+	    }
+	}
+    }
+    if (nr == nevents) { /* || tsp == &zero */
+	ping_events(th);
+    }
+}
+
+static void
+kevxchg_ping(struct kev *kev)
+{
+    rb_thread_t *th = kev->th;
+    rb_iom_t *iom = rb_iom_get(th);
+    int retries = 0;
+    int nchanges = 0;
+    int nr;
+    VALUE v;
+    int nevents = iom->nevents;
+    struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents);
+    struct kevent *changelist = eventlist; /* allowed, see kevent manpage */
+    int fd = *kev->fdw.fdp;
+    int kqfd = iom_kqfd(iom);
+
+    VM_ASSERT(nevents > 2);
+
+    /* EVFILT_READ handles urgent data (POLLPRI) */
+    if (kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI)) {
+	struct kevent *chg = &changelist[nchanges++];
+	EV_SET(chg, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, kev);
+    }
+
+    if (kev->fdw.events & RB_WAITFD_OUT) {
+	struct kevent *chg = &changelist[nchanges++];
+	EV_SET(chg, fd, EVFILT_WRITE, EV_ADD|EV_ONESHOT, 0, 0, kev);
+    }
+
+    do {
+	int err;
+
+	nr = kevent(kqfd, changelist, nchanges, eventlist, nevents, &zero);
+
+	/* kevent may fail with ENOMEM when processing changelist */
+	if (nr < 0 && nchanges && rb_gc_for_fd(errno)) {
+	    continue;
+	}
+	check_kevent(th, nr, eventlist);
+    } while (nr == nevents && ++retries && (nchanges = 0) == 0);
+    if (v) {
+	ALLOCV_END(v);
+    }
+    increase_nevents(iom, retries);
+}
+
+static VALUE
+kevxchg_yield(VALUE ptr)
+{
+    struct kev *kev = (struct kev *)ptr;
+    kevxchg_ping(kev);
+    (void)rb_fiber_auto_do_yield_p(kev->th);
+    return rb_fiber_yield(0, 0);
+}
+
+static int
+iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
+{
+    rb_iom_t *iom = rb_iom_get(th);
+    struct kev kev;
+
+    kev.th = th;
+    kev.fdw.fdp = fdp;
+    kev.fdw.events = (short)events;
+    kev.fdw.revents = 0;
+    list_add(&iom->kevs, &kev.fdw.w.wnode);
+    rb_iom_timer_add(&iom->timers, &kev.fdw.w.timer, timeout, IOM_FIB|IOM_WAIT);
+    rb_ensure(kevxchg_yield, (VALUE)&kev,
+		rb_iom_waiter_done, (VALUE)&kev.fdw.w);
+
+    return kev.fdw.revents; /* may be zero if timed out */
+}
+
+/*
+ * XXX we may use fptr->mode to mark FDs which kqueue cannot handle,
+ * different BSDs may have different bugs which prevent certain
+ * FD types from being handled by kqueue...
+ */
+int
+rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, double *timeout)
+{
+    return iom_waitfd(th, &fptr->fd, events, timeout);
+}
+
+int
+rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
+{
+    return iom_waitfd(th, fdp, events, timeout);
+}
+
+void
+rb_iom_destroy(rb_vm_t *vm)
+{
+    rb_iom_t *iom = vm->iom;
+    vm->iom = 0;
+    if (iom) {
+	if (iom->kqueue_fd >= 0) {
+	    close(iom->kqueue_fd);
+	}
+	xfree(iom);
+    }
+}
+
+/* used by thread.c::rb_thread_atfork */
+static void
+rb_iom_atfork_child(rb_thread_t *th)
+{
+#ifdef LIBKQUEUE
+    /*
+     * libkqueue uses epoll, /dev/poll FD, or poll/select on a pipe,
+     * so there is an FD.
+     * I guess we're going to hit an error in case somebody uses
+     * libkqueue with a real native kqueue backend, but nobody does
+     * that in production, hopefully.
+     */
+    rb_iom_destroy(th->vm);
+#else /* native kqueue is close-on-fork, so we do not close ourselves */
+    xfree(th->vm->iom);
+    th->vm->iom = 0;
+#endif
+}
+
+/* used by thread_pthread.c */
+static int
+rb_iom_reserved_fd(int fd)
+{
+    rb_iom_t *iom = GET_VM()->iom;
+
+    return iom && fd == iom->kqueue_fd;
+}
+
+#include "iom_pingable_common.h"
+#include "iom_common.h"
diff --git a/iom_pingable_common.h b/iom_pingable_common.h
new file mode 100644
index 0000000000..b3caf2f1df
--- /dev/null
+++ b/iom_pingable_common.h
@@ -0,0 +1,55 @@
+/* shared between "pingable" implementations (iom_kqueue.h and iom_epoll.h) */
+
+/* only for iom_kqueue.h and iom_epoll.h */
+static void rb_iom_do_wait(rb_thread_t *, rb_iom_t *);
+
+static VALUE
+rb_iom_do_schedule(VALUE ptr)
+{
+    rb_thread_t *th = (rb_thread_t *)ptr;
+    rb_iom_t *iom = th->vm->iom;
+
+    if (rb_fiber_auto_do_yield_p(th)) {
+	rb_fiber_yield(0, 0);
+    }
+    if (iom) {
+	rb_iom_do_wait(th, iom);
+    }
+    if (rb_fiber_auto_do_yield_p(th)) {
+	rb_fiber_yield(0, 0);
+    }
+    return Qfalse;
+}
+
+void
+rb_iom_schedule(rb_thread_t *th, double *timeout)
+{
+    if (rb_fiber_auto_sched_p(th)) {
+	rb_iom_t *iom = th->vm->iom;
+
+	if (iom) {
+	    rb_iom_timer_check(&iom->timers);
+	    ping_events(th);
+	}
+	if (rb_fiber_auto_do_yield_p(th)) {
+	    rb_fiber_yield(0, 0);
+	}
+    }
+    else if (timeout) {
+	double t0 = timeofday();
+	struct rb_iom_timer t;
+	rb_iom_t *iom = rb_iom_get(th);
+
+	rb_iom_timer_add(&iom->timers, &t, timeout, 0);
+	rb_ensure(rb_iom_do_schedule, (VALUE)th, rb_iom_timer_done, (VALUE)&t);
+	*timeout -= timeofday() - t0;
+    }
+    else {
+	rb_iom_t *iom = th->vm->iom;
+
+	if (iom) {
+	    rb_iom_timer_check(&iom->timers);
+	}
+	rb_iom_do_schedule((VALUE)th);
+    }
+}
diff --git a/iom_select.h b/iom_select.h
new file mode 100644
index 0000000000..d6a93e7555
--- /dev/null
+++ b/iom_select.h
@@ -0,0 +1,341 @@
+/*
+ * select()-based implementation of I/O Manager for RubyVM
+ *
+ * This is crippled and relies heavily on GVL compared to the
+ * epoll and kqueue versions
+ */
+#include "iom_internal.h"
+
+/* allocated on heap (rb_vm_t.iom) */
+struct rb_iom_struct {
+    /*
+     * Everything here is protected by GVL at this time,
+     * URCU lists (LGPL-2.1+) may be used in the future
+     */
+    struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
+    struct list_head fds; /* -rb_iom_fd_waiter.w.wnode, FIFO order */
+    struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
+
+    rb_serial_t select_start; /* WR protected by GVL (no need to protect RD) */
+    rb_serial_t select_gen; /* RDWR protected by GVL */
+    rb_thread_t *selector; /* rb_thread_fd_select owner (acts as lock w/ GVL */
+
+    /* these could be on stack, but they're huge on some platforms: */
+    rb_fdset_t rfds;
+    rb_fdset_t wfds;
+    rb_fdset_t efds;
+};
+
+/* allocated on stack */
+struct select_do {
+    rb_thread_t *th;
+    int do_wait;
+    int ret;
+};
+
+static void
+rb_iom_init(rb_iom_t *iom)
+{
+    iom->select_start = 0;
+    iom->select_gen = 0;
+    iom->selector = 0;
+    list_head_init(&iom->timers);
+    list_head_init(&iom->fds);
+    list_head_init(&iom->pids);
+}
+
+static struct timeval *
+next_timeout(rb_iom_t *iom, struct timeval *tv)
+{
+    struct rb_iom_timer *t;
+    t = list_top(&iom->timers, struct rb_iom_timer, tnode);
+
+    if (t) {
+	double diff = t->expires_at - timeofday();
+	if (diff > 0) {
+	    *tv = double2timeval(diff);
+	}
+	else {
+	    tv->tv_sec = 0;
+	    tv->tv_usec = 0;
+	}
+
+	return tv;
+    }
+    else {
+	return 0;
+    }
+}
+
+static VALUE
+iom_select_wait(VALUE ptr)
+{
+    struct select_do *sd = (struct select_do *)ptr;
+    rb_thread_t *th = sd->th;
+    rb_iom_t *iom = th->vm->iom;
+    int max = -1;
+    struct timeval tv, *tvp;
+    struct rb_iom_fd_waiter *fdw = 0, *next = 0;
+    rb_fdset_t *rfds, *wfds, *efds;
+
+    iom->select_start = iom->select_gen;
+    sd->ret = 0;
+    rfds = wfds = efds = 0;
+    rb_fd_init(&iom->rfds);
+    rb_fd_init(&iom->wfds);
+    rb_fd_init(&iom->efds);
+
+    list_for_each_safe(&iom->fds, fdw, next, w.wnode) {
+	int fd = *fdw->fdp;
+	if (fd < 0) { /* closed */
+	    fdw->revents = fdw->events;
+	    rb_iom_waiter_ready(&fdw->w);
+	    sd->do_wait = 0;
+	    continue;
+	}
+	if (fd > max) {
+	    max = fd;
+	}
+	if (fdw->events & RB_WAITFD_IN) {
+	    rb_fd_set(fd, rfds = &iom->rfds);
+	}
+	if (fdw->events & RB_WAITFD_OUT) {
+	    rb_fd_set(fd, wfds = &iom->wfds);
+	}
+	if (fdw->events & RB_WAITFD_PRI) {
+	    rb_fd_set(fd, efds = &iom->efds);
+	}
+    }
+
+    if (sd->do_wait) {
+	tvp = next_timeout(iom, &tv);
+	if (tvp == 0 && max < 0) {
+	    goto nowait;
+	}
+    }
+    else {
+  nowait:
+	tv.tv_sec = 0;
+	tv.tv_usec = 0;
+	tvp = &tv;
+    }
+    BLOCKING_REGION({
+	sd->ret = native_fd_select(max + 1, rfds, wfds, efds, tvp, th);
+    }, ubf_select, th, FALSE);
+    RUBY_VM_CHECK_INTS_BLOCKING(th);
+
+    return Qfalse;
+}
+
+static VALUE
+iom_select_done(VALUE ptr)
+{
+    struct select_do *sd = (struct select_do *)ptr;
+    rb_iom_t *iom = sd->th->vm->iom;
+    struct rb_iom_fd_waiter *fdw = 0, *next = 0;
+    int ret = sd->ret;
+
+    iom->selector = 0;
+    if (ret > 0) {
+	list_for_each_safe(&iom->fds, fdw, next, w.wnode) {
+	    int fd = *fdw->fdp;
+
+	    if (fd < 0) { /* closed */
+		fdw->revents = fdw->events;
+		rb_iom_waiter_ready(&fdw->w);
+		continue;
+	    }
+	    if (fdw->events & RB_WAITFD_IN && rb_fd_isset(fd, &iom->rfds)) {
+		fdw->revents |= RB_WAITFD_IN;
+	    }
+	    if (fdw->events & RB_WAITFD_OUT && rb_fd_isset(fd, &iom->wfds)) {
+		fdw->revents |= RB_WAITFD_OUT;
+	    }
+	    if (fdw->events & RB_WAITFD_PRI && rb_fd_isset(fd, &iom->efds)) {
+		fdw->revents |= RB_WAITFD_PRI;
+	    }
+
+	    /* got revents? enqueue ourselves to be run! */
+	    if (fdw->revents) {
+		rb_iom_waiter_ready(&fdw->w);
+		if (--ret <= 0) {
+		    break;
+		}
+	    }
+	}
+    }
+
+    rb_fd_term(&iom->rfds);
+    rb_fd_term(&iom->wfds);
+    rb_fd_term(&iom->efds);
+
+    return Qfalse;
+}
+
+static int
+retry_select_p(struct select_do *sd, rb_iom_t *iom)
+{
+    /*
+     * if somebody changed iom->fds while we were inside select,
+     * rerun it with zero timeout to avoid a race condition.
+     * This is not necessary for epoll or kqueue because the kernel
+     * is constantly monitoring the watched set.
+     */
+    if (iom->select_start != iom->select_gen) {
+	sd->do_wait = 0;
+	return 1;
+    }
+    return 0;
+}
+
+static VALUE
+iom_do_select(VALUE ptr)
+{
+    struct select_do *sd = (struct select_do *)ptr;
+    rb_thread_t *th = sd->th;
+    rb_iom_t *iom = th->vm->iom;
+    rb_thread_t *s;
+
+    if (rb_fiber_auto_do_yield_p(th)) {
+	rb_fiber_yield(0, 0);
+    }
+    if (!iom) {
+	return Qfalse;
+    }
+    s = iom->selector;
+    if (s) { /* somebody else is running select() */
+	if (iom->select_start != iom->select_gen) {
+	    ubf_select(s);
+	}
+	if (rb_fiber_auto_sched_p(th)) {
+	    rb_fiber_yield(0, 0);
+	}
+    }
+    else {
+	iom->selector = th;
+
+	do {
+	    rb_ensure(iom_select_wait, (VALUE)sd, iom_select_done, (VALUE)sd);
+	    rb_iom_timer_check(&iom->timers);
+	} while (retry_select_p(sd, iom));
+
+	if (rb_fiber_auto_do_yield_p(th)) {
+	    rb_fiber_yield(0, 0);
+	}
+    }
+    return Qfalse;
+}
+
+void
+rb_iom_schedule(rb_thread_t *th, double *timeout)
+{
+    struct select_do sd;
+
+    sd.th = th;
+    sd.do_wait = !rb_fiber_auto_sched_p(th);
+
+    if (timeout && sd.do_wait) {
+	double t0 = timeofday();
+	rb_iom_t *iom = rb_iom_get(th);
+	struct rb_iom_timer t;
+
+	rb_iom_timer_add(&iom->timers, &t, timeout, 0);
+	rb_ensure(iom_do_select, (VALUE)&sd, rb_iom_timer_done, (VALUE)&t);
+	*timeout -= timeofday() - t0;
+    }
+    else {
+	rb_iom_t *iom = th->vm->iom;
+	if (iom) {
+	    rb_iom_timer_check(&iom->timers);
+	}
+	iom_do_select((VALUE)&sd);
+    }
+}
+
+static VALUE
+iom_schedule_fd(VALUE ptr)
+{
+    rb_thread_t *th = (rb_thread_t *)ptr;
+    rb_iom_t *iom = rb_iom_get(th);
+    rb_thread_t *s;
+
+    if (rb_fiber_auto_do_yield_p(th)) {
+	return rb_fiber_yield(0, 0);
+    }
+
+    s = iom->selector;
+    if (s) {
+	/*
+	 * kick the select thread, retry_select_p will return true since
+	 * caller bumped iom->select_gen
+	 */
+	ubf_select(s);
+    }
+    else {
+	struct select_do sd;
+
+	iom->selector = sd.th = th;
+	sd.do_wait = 0;
+	do {
+	    rb_ensure(iom_select_wait, (VALUE)&sd, iom_select_done, (VALUE)&sd);
+	    rb_iom_timer_check(&iom->timers);
+	} while (retry_select_p(&sd, iom));
+	(void)rb_fiber_auto_do_yield_p(th);
+    }
+    return rb_fiber_yield(0, 0);
+}
+
+
+/* only epoll takes advantage of this (kqueue may for portability bugs) */
+int
+rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, double *timeout)
+{
+    return rb_iom_waitfd(th, &fptr->fd, events, timeout);
+}
+
+int
+rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
+{
+    rb_iom_t *iom = rb_iom_get(th);
+    struct rb_iom_fd_waiter fdw;
+
+    if (*fdp < 0) return -1;
+    fdw.fdp = fdp;
+    fdw.events = (short)events;
+    fdw.revents = 0;
+
+    /* use FIFO order for fairness in iom_select_done */
+    list_add_tail(&iom->fds, &fdw.w.wnode);
+    rb_iom_timer_add(&iom->timers, &fdw.w.timer, timeout, IOM_FIB|IOM_WAIT);
+    iom->select_gen++;
+    rb_ensure(iom_schedule_fd, (VALUE)th, rb_iom_waiter_done, (VALUE)&fdw.w);
+
+    if (*fdp < 0) return -1;
+
+    return (int)fdw.revents; /* may be zero if timed out */
+}
+
+void
+rb_iom_destroy(rb_vm_t *vm)
+{
+    if (vm->iom) {
+	xfree(vm->iom);
+	vm->iom = 0;
+    }
+}
+
+/* used by thread.c::rb_thread_atfork */
+static void
+rb_iom_atfork_child(rb_thread_t *th)
+{
+    rb_iom_destroy(th->vm);
+}
+
+/* used by thread_pthread.c */
+static int
+rb_iom_reserved_fd(int fd)
+{
+    return 0;
+}
+
+#include "iom_common.h"
diff --git a/prelude.rb b/prelude.rb
index 7b98e28285..1e71a3f168 100644
--- a/prelude.rb
+++ b/prelude.rb
@@ -16,6 +16,18 @@ def self.exclusive
   end
 end
 
+class Fiber
+  def self.start(*args, &block)
+    # loses Fiber#resume return value, but maybe people do not care
+    new(&block).run
+  end
+
+  def run
+    start
+    self
+  end
+end
+
 class IO
 
   # call-seq:
diff --git a/test/lib/leakchecker.rb b/test/lib/leakchecker.rb
index c0cf718635..562f32efca 100644
--- a/test/lib/leakchecker.rb
+++ b/test/lib/leakchecker.rb
@@ -28,6 +28,15 @@ def find_fds
         if d.respond_to? :fileno
           a -= [d.fileno]
         end
+
+        # only place we use epoll is internally, do not count it against us
+        a.delete_if do |fd|
+          begin
+            File.readlink("#{fd_dir}/#{fd}").match?(/\beventpoll\b/)
+          rescue
+            false
+          end
+        end
         a
       }
       fds.sort
diff --git a/test/ruby/test_fiber_auto.rb b/test/ruby/test_fiber_auto.rb
new file mode 100644
index 0000000000..0dd798644c
--- /dev/null
+++ b/test/ruby/test_fiber_auto.rb
@@ -0,0 +1,126 @@
+# frozen_string_literal: true
+require 'test/unit'
+require 'fiber'
+require 'io/nonblock'
+require 'io/wait'
+
+class TestFiberAuto < Test::Unit::TestCase
+  def test_value
+    nr = 0
+    f = Fiber.start { nr += 1 }
+    assert_equal 1, f.value, 'value returns as expected'
+    assert_equal 1, f.value, 'value is idempotent'
+  end
+
+  def test_join
+    nr = 0
+    f = Fiber.start { nr += 1 }
+    assert_equal f, f.join
+    assert_equal 1, f.value
+    assert_equal f, f.join, 'join is idempotent'
+    assert_equal f, f.join(10), 'join is idempotent w/ timeout'
+  end
+
+  def test_io_wait_read
+    tick = 0.1 # 100ms, TIME_QUANTUM_USEC in thread_pthread.c
+    IO.pipe do |r, w|
+      r.nonblock = w.nonblock = true
+      rdr = Fiber.start { r.read(1) }
+      t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
+      assert_nil rdr.join(tick), 'join returns nil on timeout'
+      diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0
+      assert_operator diff, :>=, tick, 'Fiber still running'
+      w.write('a')
+      assert_equal 'a', rdr.value, 'finished'
+
+      t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
+      rwait = Fiber.start { r.wait_readable(tick) }
+      assert_nil rwait.value, 'IO returned no events after timeout'
+      diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0
+      assert_operator diff, :>=, tick, 'Fiber waited for timeout'
+    end
+  end
+
+  # make sure we do not create extra FDs (from epoll/kqueue)
+  # for operations which do not need it
+  def test_fd_creation_and_fork
+    assert_separately(%w(-r io/nonblock), <<~'end;') # do
+      fdrange = (3..64)
+      reserved = -'The given fd is not accessible because RubyVM reserves it'
+      fdstats = lambda do
+        stats = Hash.new(0)
+        fdrange.map do |fd|
+          begin
+            IO.for_fd(fd, autoclose: false)
+            stats[:good] += 1
+          rescue Errno::EBADF # ignore
+          rescue ArgumentError => e
+            raise if e.message != reserved
+            stats[:reserved] += 1
+          end
+        end
+        stats.freeze
+      end
+
+      before = fdstats.call
+      Fiber.start { :fib }.join
+      assert_equal before, fdstats.call,
+                  'Fiber.start + Fiber#join does not create new FD'
+
+      # now, epoll/kqueue implementations create FDs, ensure they're reserved
+      IO.pipe do |r, w|
+        r.nonblock = w.nonblock = true
+
+        before_io = fdstats.call
+        assert_equal before[:reserved], before_io[:reserved],
+                    'no reserved FDs created before I/O attempted'
+
+        f1 = Fiber.start { r.read(1) }
+        after_io = fdstats.call
+        assert_equal before_io[:good], after_io[:good],
+                    'no change in unreserved FDs during I/O wait'
+
+        w.write('!')
+        assert_equal '!', f1.value, 'auto-Fiber IO#read works'
+        assert_operator before_io[:reserved], :<=, after_io[:reserved],
+          'a reserved FD may be created on some implementations'
+
+        # ensure we do not share epoll/kqueue FDs across fork
+        if Process.respond_to?(:fork) && Process.respond_to?(:waitpid2)
+          pid = fork do
+            after_fork = fdstats.call
+            w.write(after_fork.inspect == before_io.inspect ? '1' : '0')
+
+            # ensure auto-Fiber works properly after forking
+            IO.pipe do |a, b|
+              a.nonblock = b.nonblock = true
+              f1 = Fiber.start { a.read(1) }
+              f2 = Fiber.start { b.write('!') }
+              assert_equal 1, f2.value
+              w.write(f1.value == '!' ? '!' : '?')
+            end
+
+            exit!(0)
+          end
+          assert_equal '1', r.read(1), 'reserved FD cleared after fork'
+          assert_equal '!', r.read(1), 'auto-fiber works after forking'
+          _, status = Process.waitpid2(pid)
+          assert_predicate status, :success?, 'forked child exited properly'
+        end
+      end
+    end;
+  end
+
+  # As usual, cannot resume/run fibers across threads, but the
+  # scheduler works across threads
+  def test_cross_thread_schedule
+    IO.pipe do |r, w|
+      r.nonblock = w.nonblock = true
+      t = Thread.new { Fiber.start { r.read(1) }.value }
+      assert_nil t.join(0.1)
+      f = Fiber.start { w.write('a') }
+      assert_equal 'a', t.value
+      assert_equal 1, f.value
+    end
+  end
+end
diff --git a/thread.c b/thread.c
index 4d954c76de..7899aeae0b 100644
--- a/thread.c
+++ b/thread.c
@@ -70,6 +70,7 @@
 #include "ruby/thread.h"
 #include "ruby/thread_native.h"
 #include "internal.h"
+#include "iom.h"
 
 #ifndef USE_NATIVE_THREAD_PRIORITY
 #define USE_NATIVE_THREAD_PRIORITY 0
@@ -3484,6 +3485,18 @@ rb_thread_priority_set(VALUE thread, VALUE prio)
 
 /* for IO */
 
+static int
+rb_iom_waitfd_tv(rb_thread_t *th, int *fdp, int events, struct timeval *tv)
+{
+    double *to = NULL;
+    double tout;
+    if (tv) {
+	tout = (double)tv->tv_sec + (double)tv->tv_usec * 1e-6;
+	to = &tout;
+    }
+    return rb_iom_waitfd(th, fdp, events, to);
+}
+
 #if defined(NFDBITS) && defined(HAVE_RB_FD_INIT)
 
 /*
@@ -3919,6 +3932,10 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
     struct timespec *timeout = NULL;
     rb_thread_t *th = GET_THREAD();
 
+    if (rb_fiber_auto_sched_p(th)) {
+	return rb_iom_waitfd_tv(th, &fd, events, tv);
+    }
+
 #define poll_update() \
     (update_timespec(timeout, limit), \
      TRUE)
@@ -4030,7 +4047,11 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
     struct select_args args;
     int r;
     VALUE ptr = (VALUE)&args;
+    rb_thread_t *th = GET_THREAD();
 
+    if (rb_fiber_auto_sched_p(th)) {
+	return rb_iom_waitfd_tv(th, &fd, events, tv);
+    }
     args.as.fd = fd;
     args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
     args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
@@ -4177,10 +4198,13 @@ terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th)
     }
 }
 
+static void rb_iom_atfork_child(rb_thread_t *);
+
 void
 rb_thread_atfork(void)
 {
     rb_thread_t *th = GET_THREAD();
+    rb_iom_atfork_child(th);
     rb_thread_atfork_internal(th, terminate_atfork_i);
     th->join_list = NULL;
 
@@ -5094,3 +5118,21 @@ ruby_kill(rb_pid_t pid, int sig)
 	rb_sys_fail(0);
     }
 }
+
+#ifndef RUBYVM_IOM
+#  if defined(HAVE_SYS_EVENT_H) && defined(HAVE_KQUEUE) && defined(HAVE_KEVENT)
+#    define RUBYVM_IOM IOM_KQUEUE
+#  elif defined(HAVE_SYS_EPOLL_H) && defined(HAVE_EPOLL_CREATE) && \
+        defined(HAVE_EPOLL_CTL) && defined(HAVE_EPOLL_WAIT)
+#    define RUBYVM_IOM IOM_EPOLL
+#  else
+#    define RUBYVM_IOM IOM_SELECT
+#  endif
+#endif
+#if RUBYVM_IOM == IOM_KQUEUE
+#  include "iom_kqueue.h"
+#elif RUBYVM_IOM == IOM_EPOLL
+#  include "iom_epoll.h"
+#else
+#  include "iom_select.h"
+#endif
diff --git a/thread_pthread.c b/thread_pthread.c
index 437ff370d5..14945ff40c 100644
--- a/thread_pthread.c
+++ b/thread_pthread.c
@@ -1746,9 +1746,14 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr)
 }
 #endif
 
+static int rb_iom_reserved_fd(int); /* check kqueue or epoll FD */
+
 int
 rb_reserved_fd_p(int fd)
 {
+    if (rb_iom_reserved_fd(fd)) {
+	return 1;
+    }
 #if USE_SLEEPY_TIMER_THREAD
     if ((fd == timer_thread_pipe.normal[0] ||
 	 fd == timer_thread_pipe.normal[1] ||
diff --git a/vm.c b/vm.c
index 4810321d6f..44bc4e73f9 100644
--- a/vm.c
+++ b/vm.c
@@ -11,6 +11,7 @@
 #include "internal.h"
 #include "ruby/vm.h"
 #include "ruby/st.h"
+#include "iom.h"
 
 #include "gc.h"
 #include "vm_core.h"
@@ -2179,6 +2180,7 @@ ruby_vm_destruct(rb_vm_t *vm)
 	    rb_fiber_reset_root_local_storage(th->self);
 	    thread_free(th);
 	}
+	rb_iom_destroy(vm);
 	rb_vm_living_threads_init(vm);
 	ruby_vm_run_at_exit_hooks(vm);
 	if (vm->loading_table) {
@@ -2345,6 +2347,7 @@ rb_thread_recycle_stack_release(VALUE *stack)
 }
 
 void rb_fiber_mark_self(rb_fiber_t *fib);
+void rb_fiber_auto_schedule_mark(const rb_thread_t *th);
 
 void
 rb_thread_mark(void *ptr)
@@ -2387,6 +2390,11 @@ rb_thread_mark(void *ptr)
     RUBY_MARK_UNLESS_NULL(th->top_wrapper);
     rb_fiber_mark_self(th->fiber);
     rb_fiber_mark_self(th->root_fiber);
+
+    /* th->afhead may be 0 early on */
+    if (th->afhead.n.next && !list_empty(&th->afhead)) {
+	rb_fiber_auto_schedule_mark(th);
+    }
     RUBY_MARK_UNLESS_NULL(th->stat_insn_usage);
     RUBY_MARK_UNLESS_NULL(th->last_status);
 
@@ -2499,6 +2507,7 @@ static void
 th_init(rb_thread_t *th, VALUE self)
 {
     th->self = self;
+    list_head_init(&th->afhead);
 
     /* allocate thread stack */
 #ifdef USE_SIGALTSTACK
diff --git a/vm_core.h b/vm_core.h
index 21136a8874..50a3f7d2fe 100644
--- a/vm_core.h
+++ b/vm_core.h
@@ -481,6 +481,8 @@ typedef struct rb_hook_list_struct {
     int need_clean;
 } rb_hook_list_t;
 
+struct rb_iom_struct;
+
 typedef struct rb_vm_struct {
     VALUE self;
 
@@ -489,6 +491,7 @@ typedef struct rb_vm_struct {
 
     struct rb_thread_struct *main_thread;
     struct rb_thread_struct *running_thread;
+    struct rb_iom_struct *iom;
 
     struct list_head waiting_fds; /* <=> struct waiting_fd */
     struct list_head living_threads;
@@ -808,6 +811,7 @@ typedef struct rb_thread_struct {
     rb_fiber_t *fiber;
     rb_fiber_t *root_fiber;
     rb_jmpbuf_t root_jmpbuf;
+    struct list_head afhead; /* -rb_fiber_t.afnode */
 
     /* ensure & callcc */
     rb_ensure_list_t *ensure_list;
-- 
EW


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH 2/7] iom: implement waitpid generically
  2017-05-30 19:03 [PATCH 1/7] auto fiber scheduling for I/O Eric Wong
@ 2017-05-30 19:03 ` Eric Wong
  2017-05-30 19:03 ` [PATCH 3/7] iom: move to fdmap Eric Wong
                   ` (4 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2017-05-30 19:03 UTC (permalink / raw)
  To: spew

This requires us to constantly hijack the SIGCHLD handler,
so special values like nil, 'SYSTEM_DEFAULT', 'SIG_IGN',
'IGNORE', are always ignored in favor Ruby's internal
signal handling
---
 iom_common.h                 | 19 +++++++++-----
 process.c                    | 14 ++++++++---
 signal.c                     | 40 ++++++++++++++++++++++++-----
 test/ruby/test_fiber_auto.rb | 60 ++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 117 insertions(+), 16 deletions(-)

diff --git a/iom_common.h b/iom_common.h
index c9e7f716d8..af0646b6cb 100644
--- a/iom_common.h
+++ b/iom_common.h
@@ -1,4 +1,3 @@
-
 /* we lazily create this, small scripts may never need iom */
 static rb_iom_t *
 iom_new(rb_thread_t *th)
@@ -18,8 +17,12 @@ rb_iom_get(rb_thread_t *th)
     return th->vm->iom;
 }
 
-
-/* included by iom_(epoll|select|kqueue).h */
+/*
+ * TODO: consider EVFILT_PROC for kqueue and netlink+epoll on Linux;
+ * see the "god" RubyGem for usage examples.
+ * However, I doubt rb_waitpid scalability will be a problem and
+ * the simplicity of a single implementation for all is appealing.
+ */
 #ifdef HAVE_SYS_TYPES_H
 #  include <sys/types.h>
 #endif
@@ -39,8 +42,6 @@ iom_schedule_pid(VALUE ptr)
     return rb_fiber_yield(0, 0);
 }
 
-static rb_iom_t *rb_iom_get(rb_thread_t *);
-
 rb_pid_t
 rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
 		double *timeout)
@@ -86,5 +87,11 @@ rb_iom_sigchld(rb_vm_t *vm)
 	}
     }
 }
-
+#else
+rb_pid_t
+rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
+		double *timeout)
+{
+    rb_bug("Should not get here, WNOHANG not implemented");
+}
 #endif /* defined(WNOHANG) && (defined(HAVE_WAITPID) || defined(HAVE_WAIT4)) */
diff --git a/process.c b/process.c
index 67f51db847..bd384e86e1 100644
--- a/process.c
+++ b/process.c
@@ -16,6 +16,7 @@
 #include "ruby/thread.h"
 #include "ruby/util.h"
 #include "vm_core.h"
+#include "iom.h"
 
 #include <stdio.h>
 #include <errno.h>
@@ -907,10 +908,15 @@ rb_waitpid(rb_pid_t pid, int *st, int flags)
 	result = do_waitpid(pid, st, flags);
     }
     else {
-	while ((result = do_waitpid_nonblocking(pid, st, flags)) < 0 &&
-	       (errno == EINTR)) {
-	    rb_thread_t *th = GET_THREAD();
-	    RUBY_VM_CHECK_INTS(th);
+	rb_thread_t *th = GET_THREAD();
+	if (rb_fiber_auto_sched_p(th) && WNOHANG) {
+	    return rb_iom_waitpid(th, pid, st, flags, 0);
+	}
+	else {
+	    while ((result = do_waitpid_nonblocking(pid, st, flags)) < 0 &&
+		   (errno == EINTR)) {
+		RUBY_VM_CHECK_INTS(th);
+	    }
 	}
     }
     if (result > 0) {
diff --git a/signal.c b/signal.c
index 8ee0963b8a..8f5dee375f 100644
--- a/signal.c
+++ b/signal.c
@@ -13,6 +13,7 @@
 
 #include "internal.h"
 #include "vm_core.h"
+#include "iom.h"
 #include <signal.h>
 #include <stdio.h>
 #include <errno.h>
@@ -1028,6 +1029,17 @@ rb_trap_exit(void)
     }
 }
 
+static int
+sig_is_chld(int sig)
+{
+#if defined(SIGCLD)
+    return (sig == SIGCLD);
+#elif defined(SIGCHLD)
+    return (sig == SIGCHLD);
+#endif
+    return 0;
+}
+
 void
 rb_signal_exec(rb_thread_t *th, int sig)
 {
@@ -1035,6 +1047,9 @@ rb_signal_exec(rb_thread_t *th, int sig)
     VALUE cmd = vm->trap_list[sig].cmd;
     int safe = vm->trap_list[sig].safe;
 
+    if (sig_is_chld(sig)) {
+	rb_iom_sigchld(vm);
+    }
     if (cmd == 0) {
 	switch (sig) {
 	  case SIGINT:
@@ -1094,6 +1109,11 @@ default_handler(int sig)
 #ifdef SIGUSR2
       case SIGUSR2:
 #endif
+#ifdef SIGCLD
+      case SIGCLD:
+#elif defined(SIGCHLD)
+      case SIGCHLD:
+#endif
         func = sighandler;
         break;
 #ifdef SIGBUS
@@ -1131,6 +1151,9 @@ trap_handler(VALUE *cmd, int sig)
     VALUE command;
 
     if (NIL_P(*cmd)) {
+	if (sig_is_chld(sig)) {
+	    goto sig_dfl;
+	}
 	func = SIG_IGN;
     }
     else {
@@ -1151,6 +1174,9 @@ trap_handler(VALUE *cmd, int sig)
 		break;
               case 14:
 		if (memcmp(cptr, "SYSTEM_DEFAULT", 14) == 0) {
+		    if (sig_is_chld(sig)) {
+			goto sig_dfl;
+		    }
                     func = SIG_DFL;
                     *cmd = 0;
 		}
@@ -1158,6 +1184,9 @@ trap_handler(VALUE *cmd, int sig)
 	      case 7:
 		if (memcmp(cptr, "SIG_IGN", 7) == 0) {
 sig_ign:
+		    if (sig_is_chld(sig)) {
+			goto sig_dfl;
+		    }
                     func = SIG_IGN;
                     *cmd = Qtrue;
 		}
@@ -1406,15 +1435,14 @@ static int
 init_sigchld(int sig)
 {
     sighandler_t oldfunc;
+    sighandler_t func = sighandler;
 
     oldfunc = ruby_signal(sig, SIG_DFL);
     if (oldfunc == SIG_ERR) return -1;
-    if (oldfunc != SIG_DFL && oldfunc != SIG_IGN) {
-	ruby_signal(sig, oldfunc);
-    }
-    else {
-	GET_VM()->trap_list[sig].cmd = 0;
-    }
+
+    ruby_signal(sig, func);
+    GET_VM()->trap_list[sig].cmd = 0;
+
     return 0;
 }
 #  ifndef __native_client__
diff --git a/test/ruby/test_fiber_auto.rb b/test/ruby/test_fiber_auto.rb
index 0dd798644c..67a9fe87f3 100644
--- a/test/ruby/test_fiber_auto.rb
+++ b/test/ruby/test_fiber_auto.rb
@@ -111,6 +111,66 @@ def test_fd_creation_and_fork
     end;
   end
 
+  def test_waitpid
+    assert_separately(%w(-r io/nonblock), <<~'end;') # do
+      t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
+      pid = fork do
+        sleep 0.1
+        exit!(0)
+      end
+      f = Fiber.start { Process.waitpid2(pid) }
+      wpid, st = f.value
+      diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0
+      assert_operator diff, :>=, 0.1, 'child did not return immediately'
+      assert_equal pid, wpid, 'waited pid matches'
+      assert_predicate st, :success?, 'process exited successfully'
+
+      IO.pipe do |r, w|
+        r.nonblock = w.nonblock = true
+
+        # "blocking" Process.waitpid2 takes precedence over trap(:CHLD)
+        waited = []
+        chld_called = false
+        trap(:CHLD) { chld_called = true; waited.concat(Process.waitall) }
+        pid = fork do
+          r.read(1)
+          exit!(0)
+        end
+        assert_nil Process.waitpid2(pid, Process::WNOHANG), 'child is waiting'
+        assert_nil(Fiber.start do
+            Process.waitpid2(pid, Process::WNOHANG)
+          end.value, 'WNOHANG works normally in fiber')
+        f = Fiber.start { Process.waitpid2(pid) }
+        assert_equal 1, w.write('.'), 'woke child'
+        wpid, st = f.value
+        assert_equal pid, wpid, 'waited pid matches'
+        assert_predicate st, :success?, 'child exited successfully'
+        assert_empty waited, 'waitpid had predence'
+        assert chld_called, 'CHLD handler got called anyways'
+
+        chld_called = false
+        [ 'DEFAULT', 'SIG_DFL', 'IGNORE', 'SYSTEM_DEFAULT', nil
+        ].each do |handler|
+          trap(:CHLD, handler)
+          pid = fork do
+            r.read(1)
+            exit!(0)
+          end
+          t = "trap(:CHLD, #{handler.inspect})"
+          f = Fiber.start { Process.waitpid2(pid) }
+          assert_equal 1, w.write('.'), "woke child for #{t}"
+          wpid, st = f.value
+          assert_predicate st, :success?, "child exited successfully for #{t}"
+          assert_equal wpid, pid, "return value correct for #{t}"
+          assert_equal false, chld_called,
+            "old CHLD handler did not fire for #{t}"
+        end
+      end
+    end;
+  end if Process.respond_to?(:fork) && Process.respond_to?(:waitpid2) &&
+      Process.const_defined?(:WNOHANG)
+
+
   # As usual, cannot resume/run fibers across threads, but the
   # scheduler works across threads
   def test_cross_thread_schedule
-- 
EW


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH 3/7] iom: move to fdmap
  2017-05-30 19:03 [PATCH 1/7] auto fiber scheduling for I/O Eric Wong
  2017-05-30 19:03 ` [PATCH 2/7] iom: implement waitpid generically Eric Wong
@ 2017-05-30 19:03 ` Eric Wong
  2017-05-30 19:03 ` [PATCH 4/7] epoll fdmap inversion Eric Wong
                   ` (3 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2017-05-30 19:03 UTC (permalink / raw)
  To: spew

---
 iom_epoll.h                  |  41 +++++++++-
 iom_internal.h               |  63 +++++++++++++++
 iom_kqueue.h                 | 178 ++++++++++++++++++++++++++++++-------------
 test/ruby/test_fiber_auto.rb |  14 ++++
 4 files changed, 240 insertions(+), 56 deletions(-)

diff --git a/iom_epoll.h b/iom_epoll.h
index aa1ec087fa..888f8a696c 100644
--- a/iom_epoll.h
+++ b/iom_epoll.h
@@ -27,6 +27,7 @@ struct rb_iom_struct {
     struct list_head epws; /* -epw.w.wnode, order agnostic */
     struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
     struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
+    struct rb_iom_fdmap fdmap; /* maps each FD to multiple epws */
 
     int epoll_fd;
     int maxevents; /* auto-increases */
@@ -47,7 +48,11 @@ struct epw {
 	rb_thread_t *th;
 	short revents;
     } as;
-    int *fdp;
+    struct list_node fdnode; /* self->fd.fdm->fdhead */
+    union {
+	int *fdp;
+	struct rb_iom_fdm *fdm;
+    } fd;
     int *flags; /* &fptr->mode */
     struct epoll_event ev;
 };
@@ -151,6 +156,7 @@ rb_iom_init(rb_iom_t *iom)
     iom->waiter = 0;
     iom->maxevents = 8;
     iom->epoll_fd = -1;
+    rb_iom_fdmap_init(&iom->fdmap);
 }
 
 static int
@@ -178,7 +184,18 @@ check_epoll_wait(rb_thread_t *th, int nr, struct epoll_event *ev)
 	    struct epw *epw = ev[i].data.ptr;
 
 	    epw->as.revents = rb_ep2revents(ev[i].events);
+
+	    list_del_init(&epw->fdnode);
 	    rb_iom_waiter_ready(&epw->w);
+
+	    /* are there other epw using the same FD? */
+	    if (!list_empty(&epw->fd.fdm->fdhead)) {
+		struct epw *cur = 0, *nxt = 0;
+		list_for_each_safe(&epw->fd.fdm->fdhead, cur, nxt, fdnode) {
+		    list_del_init(&cur->fdnode);
+		    rb_iom_waiter_ready(&cur->w);
+		}
+	    }
 	}
 
 	/* notify the waiter thread in case we enqueued fibers for them */
@@ -277,7 +294,7 @@ epmod_yield(VALUE ptr)
     rb_thread_t *th = epw->as.th;
     rb_iom_t *iom = rb_iom_get(th);
     int e;
-    int fd = *epw->fdp;
+    int fd = *epw->fd.fdp;
     int epfd = iom_epfd(iom);
 
     /*
@@ -286,6 +303,12 @@ epmod_yield(VALUE ptr)
      * so we must initialize it before epoll_ctl:
      */
     epw->as.revents = 0;
+    if (fd < 0) {
+	list_node_init(&epw->fdnode); /* for list_del in ensure */
+	return Qfalse;
+    }
+    epw->fd.fdm = rb_iom_fdm_get(&iom->fdmap, fd);
+    list_add(&epw->fd.fdm->fdhead, &epw->fdnode);
 
     /* we want to track if an FD is already being watched ourselves */
     if (epw->flags) {
@@ -324,6 +347,15 @@ epmod_yield(VALUE ptr)
     return rb_fiber_yield(0, 0);
 }
 
+static VALUE
+epw_done(VALUE ptr)
+{
+    struct epw *epw = (struct epw *)ptr;
+
+    list_del(&epw->fdnode);
+    return rb_iom_waiter_done((VALUE)&epw->w);
+}
+
 static int
 iom_waitfd(rb_thread_t *th, int *fdp, int *flags, int events, double *timeout)
 {
@@ -331,14 +363,14 @@ iom_waitfd(rb_thread_t *th, int *fdp, int *flags, int events, double *timeout)
     struct epw epw;
 
     epw.as.th = th;
-    epw.fdp = fdp;
+    epw.fd.fdp = fdp;
     epw.flags = flags;
     epw.ev.events = rb_events2ep(events);
     epw.ev.data.ptr = &epw;
 
     list_add(&iom->epws, &epw.w.wnode);
     rb_iom_timer_add(&iom->timers, &epw.w.timer, timeout, IOM_FIB|IOM_WAIT);
-    rb_ensure(epmod_yield, (VALUE)&epw, rb_iom_waiter_done, (VALUE)&epw.w);
+    rb_ensure(epmod_yield, (VALUE)&epw, epw_done, (VALUE)&epw);
 
     return (int)epw.as.revents; /* may be zero if timed out */
 }
@@ -368,6 +400,7 @@ rb_iom_destroy(rb_vm_t *vm)
 	if (iom->epoll_fd >= 0) {
 	    close(iom->epoll_fd);
 	}
+	rb_iom_fdmap_destroy(&iom->fdmap);
 	xfree(iom);
     }
 }
diff --git a/iom_internal.h b/iom_internal.h
index b5aa321274..45fda397db 100644
--- a/iom_internal.h
+++ b/iom_internal.h
@@ -15,6 +15,19 @@ int rb_fiber_auto_do_yield_p(rb_thread_t *);
 #define IOM_FIB     (0x2)
 #define IOM_WAIT    (0x1) /* container_of(..., struct rb_iom_waiter, timer) */
 
+#define RB_IOM_FD_PER_HEAP 256
+/* on-heap and persistent, keep as small as possible */
+struct rb_iom_fdm {
+    struct list_head fdhead; /* -rb_iom_fd_waiter.fdnode */
+};
+
+/* singleton (per-rb_iom_t) */
+struct rb_iom_fdmap {
+    struct rb_iom_fdm **map;
+    unsigned int heaps;
+    int max_fd;
+};
+
 /* allocated on stack */
 struct rb_iom_timer {
     struct list_node tnode; /* <=> rb_iom_struct.timers */
@@ -29,6 +42,7 @@ struct rb_iom_waiter {
 
 struct rb_iom_fd_waiter {
     struct rb_iom_waiter w;
+    struct list_node fdnode; /* <=> &self->fdm->fdhead */
     int *fdp;
     short events;
     short revents;
@@ -43,6 +57,55 @@ struct rb_iom_pid_waiter {
     int errnum;
 };
 
+#if (RUBYVM_IOM == IOM_KQUEUE || RUBYVM_IOM == IOM_EPOLL)
+static struct rb_iom_fdm *
+iom_fdhead_aref(struct rb_iom_fdmap *fdmap, int fd)
+{
+    VM_ASSERT(fd >= 0);
+    return &fdmap->map[fd / RB_IOM_FD_PER_HEAP][fd % RB_IOM_FD_PER_HEAP];
+}
+
+static struct rb_iom_fdm *
+rb_iom_fdm_get(struct rb_iom_fdmap *fdmap, int fd)
+{
+    if (fd >= fdmap->max_fd) {
+	struct rb_iom_fdm *base, *h;
+	unsigned n = fdmap->heaps + 1;
+	unsigned i;
+
+	fdmap->map = xrealloc2(fdmap->map, n, sizeof(struct rb_iom_fdm *));
+	base = h = ALLOC_N(struct rb_iom_fdm, RB_IOM_FD_PER_HEAP);
+	for (i = 0; i < RB_IOM_FD_PER_HEAP; i++) {
+	    list_head_init(&h->fdhead);
+	    h++;
+	}
+	fdmap->map[fdmap->heaps] = base;
+	fdmap->max_fd += RB_IOM_FD_PER_HEAP;
+    }
+    return iom_fdhead_aref(fdmap, fd);
+}
+
+static void
+rb_iom_fdmap_init(struct rb_iom_fdmap *fdmap)
+{
+    fdmap->max_fd = 0;
+    fdmap->heaps = 0;
+    fdmap->map = 0;
+}
+
+static void
+rb_iom_fdmap_destroy(struct rb_iom_fdmap *fdmap)
+{
+    unsigned n;
+
+    for (n = 0; n < fdmap->heaps; n++) {
+	xfree(fdmap->map[n]);
+    }
+    xfree(fdmap->map);
+    rb_iom_fdmap_init(fdmap);
+}
+#endif /* (RUBYVM_IOM == IOM_KQUEUE || RUBYVM_IOM == IOM_EPOLL) */
+
 static VALUE
 rb_iom_timer_fibval(const struct rb_iom_timer *t)
 {
diff --git a/iom_kqueue.h b/iom_kqueue.h
index f0776b2f58..52298b9055 100644
--- a/iom_kqueue.h
+++ b/iom_kqueue.h
@@ -26,6 +26,9 @@
  * kevent().
  */
 #include "iom_internal.h"
+
+/* LIST_HEAD (via ccan/list) conflicts with sys/queue.h (via sys/event.h) */
+#undef LIST_HEAD
 #include <sys/types.h>
 #include <sys/event.h>
 #include <sys/time.h>
@@ -39,6 +42,8 @@ struct rb_iom_struct {
     struct list_head kevs; /* -kev.fdw.w.wnode, order agnostic */
     struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
     struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
+    struct rb_iom_fdmap rfdmap; /* holds fdm for EVFILT_READ */
+    struct rb_iom_fdmap wfdmap; /* holds fdm for EVFILT_WRITE */
 
     int kqueue_fd;
     int nevents; /* auto-increases */
@@ -56,6 +61,8 @@ struct rb_iom_struct {
 struct kev {
     struct rb_iom_fd_waiter fdw;
     rb_thread_t *th; /* 0 - kev is disabled */
+    struct list_node rfdnode; /* -(ev.udata==fdm)->fdhead (EVFILT_READ) */
+    struct list_node wfdnode; /* -(ev.udata==fdm)->fdhead (EVFILT_WRITE) */
 };
 
 /*
@@ -107,6 +114,8 @@ rb_iom_init(rb_iom_t *iom)
     iom->waiter = 0;
     iom->nevents = 8;
     iom->kqueue_fd = -1;
+    rb_iom_fdmap_init(&iom->rfdmap);
+    rb_iom_fdmap_init(&iom->wfdmap);
 }
 
 static struct timespec *
@@ -137,21 +146,40 @@ next_timeout(rb_iom_t *iom, struct timespec *ts)
  * EVFILT_READ returns to set the RB_WAITFD_PRI bit:
  */
 static void
-check_pri(rb_thread_t *th, int fd, struct kev *kev)
+check_pri(rb_thread_t *th, struct list_head *pri)
 {
     rb_fdset_t efds;
     int r;
     struct timeval tv = { 0 };
+    struct kev *kev = 0, *next = 0;
+    int max = -1;
 
     rb_fd_init(&efds);
-    rb_fd_set(fd, &efds);
-    r = native_fd_select(fd + 1, 0, 0, &efds, &tv, th);
-    rb_fd_term(&efds);
-    /* TODO: use poll() if possible */
+    list_for_each(pri, kev, fdw.w.wnode) {
+	int fd = *kev->fdw.fdp;
+	if (fd >= 0) {
+	    rb_fd_set(fd, &efds);
+	    if (fd > max) {
+		max = fd;
+	    }
+	}
+    }
 
+    r = native_fd_select(max + 1, 0, 0, &efds, &tv, th);
     if (r > 0) {
-	kev->fdw.revents |= RB_WAITFD_PRI;
+	list_for_each(pri, kev, fdw.w.wnode) {
+	    int fd = *kev->fdw.fdp;
+	    if (fd >= 0 && rb_fd_isset(fd, &efds)) {
+		kev->fdw.revents |= RB_WAITFD_PRI;
+	    }
+	}
     }
+
+    /* needed for rb_ensure */
+    list_for_each_safe(pri, kev, next, fdw.w.wnode) {
+	list_del_init(&kev->fdw.w.wnode);
+    }
+    rb_fd_term(&efds);
 }
 
 static void
@@ -162,53 +190,68 @@ check_kevent(rb_thread_t *th, int nr, struct kevent *eventlist)
 	int i;
 	struct kevent *changelist = eventlist;
 	int nchanges = 0;
+	struct list_head pri = LIST_HEAD_INIT(pri);
 
 	for (i = 0; i < nr; i++) {
 	    struct kevent *ev = &eventlist[i];
-	    struct kev *kev = ev->udata;
-	    int fd = *kev->fdw.fdp;
+	    struct rb_iom_fdm *fdm = ev->udata;
+	    struct kev *kev = 0, *next = 0;
+	    int fd = (int)ev->ident;
+	    int ok_fd = -1;
+	    int paired = 0;
 
 	    if (ev->filter == EVFILT_READ) {
-		int rd = kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI);
-		if (rd == RB_WAITFD_IN) { /* common */
-		    kev->fdw.revents |= rd;
-		}
-		else if (rd & RB_WAITFD_PRI) { /* ugh... */
-		    if (fd >= 0) {
-			check_pri(th, fd, kev);
+		list_for_each_safe(&fdm->fdhead, kev, next, rfdnode) {
+		    int rbits = kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI);
+
+		    list_del_init(&kev->rfdnode);
+		    assert(rbits && "unexpected EVFILT_READ");
+		    rb_iom_waiter_ready(&kev->fdw.w);
+		    paired |= (kev->fdw.events & RB_WAITFD_OUT);
+		    if (rbits == RB_WAITFD_IN) { /* common */
+			kev->fdw.revents |= RB_WAITFD_IN;
+		    }
+		    else if (rbits & RB_WAITFD_PRI) { /* ugh... */
+			kev->fdw.revents |= rbits & RB_WAITFD_IN;
+			list_add_tail(&pri, &kev->fdw.w.wnode);
+		    }
+		    if (ok_fd != fd) {
+			ok_fd = *kev->fdw.fdp;
 		    }
-		}
-		assert(rd && "got EVFILT_READ unexpectedly");
-
-		/* disable the other filter if we waited on both read+write */
-		if (fd >= 0 && (kev->fdw.events & RB_WAITFD_OUT) && kev->th) {
-		    struct kevent *chg = &changelist[nchanges++];
-
-		    kev->th = 0;
-		    EV_SET(chg, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
 		}
 	    }
 	    else if (ev->filter == EVFILT_WRITE) {
-		kev->fdw.revents |= RB_WAITFD_OUT;
-
-		assert(kev->fdw.events & RB_WAITFD_OUT &&
-			"unexpected EVFILT_WRITE");
-
-		/* disable the other filter if we waited on both read+write */
-		if (fd >= 0 &&
-			kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI) &&
-			kev->th) {
-		    struct kevent *chg = &changelist[nchanges++];
-
-		    kev->th = 0;
-		    EV_SET(chg, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
+		list_for_each_safe(&fdm->fdhead, kev, next, wfdnode) {
+		    list_del_init(&kev->wfdnode);
+		    kev->fdw.revents |= RB_WAITFD_OUT;
+		    assert(kev->fdw.events & RB_WAITFD_OUT &&
+			    "unexpected EVFILT_WRITE");
+		    rb_iom_waiter_ready(&kev->fdw.w);
+		    paired |= (kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI));
+		    if (ok_fd != fd) {
+			ok_fd = *kev->fdw.fdp;
+		    }
 		}
 	    }
 	    else {
 		rb_bug("unexpected filter: %d", ev->filter);
 	    }
-	    rb_iom_waiter_ready(&kev->fdw.w);
+	    /* delete the corresponding event when one FD has 2 filters */
+	    if (ok_fd == fd) {
+		if (paired & (RB_WAITFD_IN|RB_WAITFD_PRI)) {
+		    struct kevent *chg = &changelist[nchanges++];
+		    EV_SET(chg, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
+		}
+		if (paired & RB_WAITFD_OUT) {
+		    struct kevent *chg = &changelist[nchanges++];
+		    EV_SET(chg, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
+		}
+	    }
 	}
+	if (!list_empty(&pri)) {
+	    check_pri(th, &pri);
+	}
+
 
 	/*
 	 * kqueue is a little more complicated because we install
@@ -319,24 +362,32 @@ kevxchg_ping(struct kev *kev)
     struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents);
     struct kevent *changelist = eventlist; /* allowed, see kevent manpage */
     int fd = *kev->fdw.fdp;
-    int kqfd = iom_kqfd(iom);
+    int kqfd;
 
     VM_ASSERT(nevents > 2);
+    if (fd < 0) {
+	return;
+    }
 
+    kqfd = iom_kqfd(iom);
     /* EVFILT_READ handles urgent data (POLLPRI) */
     if (kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI)) {
-	struct kevent *chg = &changelist[nchanges++];
-	EV_SET(chg, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, kev);
+	struct rb_iom_fdm *fdm = rb_iom_fdm_get(&iom->rfdmap, fd);
+	if (list_empty(&fdm->fdhead)) {
+	    struct kevent *chg = &changelist[nchanges++];
+	    EV_SET(chg, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, fdm);
+	}
+	list_add(&fdm->fdhead, &kev->rfdnode);
     }
-
     if (kev->fdw.events & RB_WAITFD_OUT) {
-	struct kevent *chg = &changelist[nchanges++];
-	EV_SET(chg, fd, EVFILT_WRITE, EV_ADD|EV_ONESHOT, 0, 0, kev);
+	struct rb_iom_fdm *fdm = rb_iom_fdm_get(&iom->wfdmap, fd);
+	if (list_empty(&fdm->fdhead)) {
+	    struct kevent *chg = &changelist[nchanges++];
+	    EV_SET(chg, fd, EVFILT_WRITE, EV_ADD|EV_ONESHOT, 0, 0, fdm);
+	}
+	list_add(&fdm->fdhead, &kev->wfdnode);
     }
-
     do {
-	int err;
-
 	nr = kevent(kqfd, changelist, nchanges, eventlist, nevents, &zero);
 
 	/* kevent may fail with ENOMEM when processing changelist */
@@ -360,6 +411,19 @@ kevxchg_yield(VALUE ptr)
     return rb_fiber_yield(0, 0);
 }
 
+static VALUE
+kev_done(VALUE ptr)
+{
+    struct kev *kev = (struct kev *)ptr;
+    if (kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI)) {
+	list_del(&kev->rfdnode);
+    }
+    if (kev->fdw.events & RB_WAITFD_OUT) {
+	list_del(&kev->wfdnode);
+    }
+    return rb_iom_waiter_done((VALUE)&kev->fdw.w);
+}
+
 static int
 iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
 {
@@ -372,8 +436,7 @@ iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
     kev.fdw.revents = 0;
     list_add(&iom->kevs, &kev.fdw.w.wnode);
     rb_iom_timer_add(&iom->timers, &kev.fdw.w.timer, timeout, IOM_FIB|IOM_WAIT);
-    rb_ensure(kevxchg_yield, (VALUE)&kev,
-		rb_iom_waiter_done, (VALUE)&kev.fdw.w);
+    rb_ensure(kevxchg_yield, (VALUE)&kev, kev_done, (VALUE)&kev);
 
     return kev.fdw.revents; /* may be zero if timed out */
 }
@@ -395,6 +458,14 @@ rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
     return iom_waitfd(th, fdp, events, timeout);
 }
 
+static void
+iom_free(rb_iom_t *iom)
+{
+    rb_iom_fdmap_destroy(&iom->rfdmap);
+    rb_iom_fdmap_destroy(&iom->wfdmap);
+    xfree(iom);
+}
+
 void
 rb_iom_destroy(rb_vm_t *vm)
 {
@@ -404,7 +475,7 @@ rb_iom_destroy(rb_vm_t *vm)
 	if (iom->kqueue_fd >= 0) {
 	    close(iom->kqueue_fd);
 	}
-	xfree(iom);
+	iom_free(iom);
     }
 }
 
@@ -422,8 +493,11 @@ rb_iom_atfork_child(rb_thread_t *th)
      */
     rb_iom_destroy(th->vm);
 #else /* native kqueue is close-on-fork, so we do not close ourselves */
-    xfree(th->vm->iom);
-    th->vm->iom = 0;
+    rb_iom_t *iom = th->vm->iom;
+    if (iom) {
+	iom_free(iom);
+	th->vm->iom = 0;
+    }
 #endif
 }
 
diff --git a/test/ruby/test_fiber_auto.rb b/test/ruby/test_fiber_auto.rb
index 67a9fe87f3..196361c674 100644
--- a/test/ruby/test_fiber_auto.rb
+++ b/test/ruby/test_fiber_auto.rb
@@ -183,4 +183,18 @@ def test_cross_thread_schedule
       assert_equal 1, f.value
     end
   end
+
+  # tricky for kqueue and epoll implementations
+  def test_multi_readers
+    IO.pipe do |r, w|
+      r.nonblock = w.nonblock = true
+      nr = 30
+      fibs = nr.times.map { Fiber.start { r.read(1) } }
+      fibs.each { |f| assert_nil f.join(0.001) }
+
+      exp = nr.times.map { -'a' }.freeze
+      w.write(exp.join)
+      assert_equal exp, fibs.map(&:value);
+    end
+  end
 end
-- 
EW


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH 4/7] epoll fdmap inversion
  2017-05-30 19:03 [PATCH 1/7] auto fiber scheduling for I/O Eric Wong
  2017-05-30 19:03 ` [PATCH 2/7] iom: implement waitpid generically Eric Wong
  2017-05-30 19:03 ` [PATCH 3/7] iom: move to fdmap Eric Wong
@ 2017-05-30 19:03 ` Eric Wong
  2017-05-30 19:03 ` [PATCH 5/7] kq-share Eric Wong
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2017-05-30 19:03 UTC (permalink / raw)
  To: spew

---
 iom_epoll.h    | 130 ++++++++++++++++++++++++++++++++-------------------------
 iom_internal.h |   3 +-
 2 files changed, 75 insertions(+), 58 deletions(-)

diff --git a/iom_epoll.h b/iom_epoll.h
index 888f8a696c..3afb73325c 100644
--- a/iom_epoll.h
+++ b/iom_epoll.h
@@ -27,7 +27,7 @@ struct rb_iom_struct {
     struct list_head epws; /* -epw.w.wnode, order agnostic */
     struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
     struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
-    struct rb_iom_fdmap fdmap; /* maps each FD to multiple epws */
+    struct rb_iom_fdmap fdmap; /* maps each FD to multiple epw */
 
     int epoll_fd;
     int maxevents; /* auto-increases */
@@ -45,16 +45,16 @@ struct rb_iom_struct {
 struct epw {
     struct rb_iom_waiter w;
     union {
-	rb_thread_t *th;
-	short revents;
+	struct list_node fdnode;
+	struct {
+	    rb_thread_t *th;
+	    struct rb_iom_fdm *fdm;
+	} pre_ctl;
     } as;
-    struct list_node fdnode; /* self->fd.fdm->fdhead */
-    union {
-	int *fdp;
-	struct rb_iom_fdm *fdm;
-    } fd;
+    int fd; /* no need for "int *", here */
+    short events;
+    short revents;
     int *flags; /* &fptr->mode */
-    struct epoll_event ev;
 };
 
 static void
@@ -181,20 +181,14 @@ check_epoll_wait(rb_thread_t *th, int nr, struct epoll_event *ev)
 	int i;
 
 	for (i = 0; i < nr; i++) {
-	    struct epw *epw = ev[i].data.ptr;
-
-	    epw->as.revents = rb_ep2revents(ev[i].events);
-
-	    list_del_init(&epw->fdnode);
-	    rb_iom_waiter_ready(&epw->w);
-
-	    /* are there other epw using the same FD? */
-	    if (!list_empty(&epw->fd.fdm->fdhead)) {
-		struct epw *cur = 0, *nxt = 0;
-		list_for_each_safe(&epw->fd.fdm->fdhead, cur, nxt, fdnode) {
-		    list_del_init(&cur->fdnode);
-		    rb_iom_waiter_ready(&cur->w);
-		}
+	    struct rb_iom_fdm *fdm = ev[i].data.ptr;
+	    struct epw *epw = 0, *next = 0;
+	    short revents = rb_ep2revents(ev[i].events);
+
+	    list_for_each_safe(&fdm->fdhead, epw, next, as.fdnode) {
+		epw->revents = epw->events & revents;
+		list_del_init(&epw->as.fdnode);
+		rb_iom_waiter_ready(&epw->w);
 	    }
 	}
 
@@ -287,61 +281,74 @@ rb_iom_do_wait(rb_thread_t *th, rb_iom_t *iom)
     }
 }
 
-static VALUE
-epmod_yield(VALUE ptr)
+static void
+epoll_ctl_or_raise(rb_thread_t *th, struct epw *epw)
 {
-    struct epw *epw = (struct epw *)ptr;
-    rb_thread_t *th = epw->as.th;
-    rb_iom_t *iom = rb_iom_get(th);
     int e;
-    int fd = *epw->fd.fdp;
-    int epfd = iom_epfd(iom);
+    int epfd;
+    struct epoll_event ev;
 
-    /*
-     * if we did not have GVL, revents may be set immediately
-     * upon epoll_ctl by another thread running epoll_wait,
-     * so we must initialize it before epoll_ctl:
-     */
-    epw->as.revents = 0;
-    if (fd < 0) {
-	list_node_init(&epw->fdnode); /* for list_del in ensure */
-	return Qfalse;
+    /* we cannot raise until list_add: */
+    {
+	struct rb_iom_fdm *fdm = epw->as.pre_ctl.fdm;
+
+	ev.data.ptr = fdm;
+	ev.events = rb_events2ep(epw->events);
+	/*
+	 * merge events from other threads/fibers waiting on the same
+	 * [ descriptor (int fd), description (struct file *) ] tuplet
+	 */
+	if (!list_empty(&fdm->fdhead)) { /* uncommon, I hope... */
+	    struct epw *cur;
+	    list_for_each(&fdm->fdhead, cur, as.fdnode) {
+		ev.events |= rb_events2ep(cur->events);
+	    }
+	}
+	list_add(&fdm->fdhead, &epw->as.fdnode);
     }
-    epw->fd.fdm = rb_iom_fdm_get(&iom->fdmap, fd);
-    list_add(&epw->fd.fdm->fdhead, &epw->fdnode);
+
+    epfd = iom_epfd(th->vm->iom); /* may raise */
 
     /* we want to track if an FD is already being watched ourselves */
     if (epw->flags) {
-	if (*epw->flags & FMODE_IOM_ADDED) {
-	    e = epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epw->ev);
+	if (*epw->flags & FMODE_IOM_ADDED) { /* ideal situation */
+	    e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev);
 	}
 	else {
-	    e = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epw->ev);
+	    e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev);
 	    if (e == 0) {
 		*epw->flags |= FMODE_IOM_ADDED;
 	    }
 	    else if (e < 0 && errno == EEXIST) {
 		/*
-		 * It's possible to get EEXIST if fptrs point to the same FD:
+		 * possible EEXIST if several fptrs point to the same FD:
 		 *   f1 = Fiber.start { io1.read(1) }
 		 *   io2 = IO.for_fd(io1.fileno)
 		 *   f2 = Fiber.start { io2.read(1) }
 		 */
 		*epw->flags |= FMODE_IOM_ADDED;
-		e = epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epw->ev);
+		e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev);
 	    }
 	}
     }
-    else { /* don't know if added or not, fall back to addiing on ENOENT */
-	e = epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epw->ev);
+    else { /* don't know if added or not, fall back to add on ENOENT */
+	e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev);
 	if (e < 0 && errno == ENOENT) {
-	    e = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epw->ev);
+	    e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev);
 	}
     }
     if (e < 0) {
 	rb_sys_fail("epoll_ctl");
     }
+}
 
+static VALUE
+epmod_yield(VALUE ptr)
+{
+    /* we must have no posibility of raising until list_add: */
+    struct epw *epw = (struct epw *)ptr;
+    rb_thread_t *th = epw->as.pre_ctl.th;
+    epoll_ctl_or_raise(th, epw);
     ping_events(th);
     (void)rb_fiber_auto_do_yield_p(th);
     return rb_fiber_yield(0, 0);
@@ -351,8 +358,7 @@ static VALUE
 epw_done(VALUE ptr)
 {
     struct epw *epw = (struct epw *)ptr;
-
-    list_del(&epw->fdnode);
+    list_del(&epw->as.fdnode);
     return rb_iom_waiter_done((VALUE)&epw->w);
 }
 
@@ -362,17 +368,29 @@ iom_waitfd(rb_thread_t *th, int *fdp, int *flags, int events, double *timeout)
     rb_iom_t *iom = rb_iom_get(th);
     struct epw epw;
 
-    epw.as.th = th;
-    epw.fd.fdp = fdp;
+    /* unlike kqueue or select, we never need to reread fdp after yielding */
+    epw.fd = *fdp;
+    if (epw.fd < 0) {
+	return -1;
+    }
+
+    /* may raise on OOM: */
+    epw.as.pre_ctl.fdm = rb_iom_fdm_get(&iom->fdmap, epw.fd);
+    epw.as.pre_ctl.th = th;
     epw.flags = flags;
-    epw.ev.events = rb_events2ep(events);
-    epw.ev.data.ptr = &epw;
+    /*
+     * if we did not have GVL, revents may be set immediately
+     * upon epoll_ctl by another thread running epoll_wait,
+     * so we must initialize it before epoll_ctl:
+     */
+    epw.revents = 0;
+    epw.events = (short)events;
 
     list_add(&iom->epws, &epw.w.wnode);
     rb_iom_timer_add(&iom->timers, &epw.w.timer, timeout, IOM_FIB|IOM_WAIT);
     rb_ensure(epmod_yield, (VALUE)&epw, epw_done, (VALUE)&epw);
 
-    return (int)epw.as.revents; /* may be zero if timed out */
+    return (int)epw.revents; /* may be zero if timed out */
 }
 
 int
diff --git a/iom_internal.h b/iom_internal.h
index 45fda397db..1a4050d62d 100644
--- a/iom_internal.h
+++ b/iom_internal.h
@@ -18,7 +18,7 @@ int rb_fiber_auto_do_yield_p(rb_thread_t *);
 #define RB_IOM_FD_PER_HEAP 256
 /* on-heap and persistent, keep as small as possible */
 struct rb_iom_fdm {
-    struct list_head fdhead; /* -rb_iom_fd_waiter.fdnode */
+    struct list_head fdhead; /* -kev.(rfdnode|wfdnode), epw.fdnode */
 };
 
 /* singleton (per-rb_iom_t) */
@@ -42,7 +42,6 @@ struct rb_iom_waiter {
 
 struct rb_iom_fd_waiter {
     struct rb_iom_waiter w;
-    struct list_node fdnode; /* <=> &self->fdm->fdhead */
     int *fdp;
     short events;
     short revents;
-- 
EW


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH 5/7] kq-share
  2017-05-30 19:03 [PATCH 1/7] auto fiber scheduling for I/O Eric Wong
                   ` (2 preceding siblings ...)
  2017-05-30 19:03 ` [PATCH 4/7] epoll fdmap inversion Eric Wong
@ 2017-05-30 19:03 ` Eric Wong
  2017-05-30 19:03 ` [PATCH 6/7] wip Eric Wong
  2017-05-30 19:03 ` [PATCH 7/7] wip Eric Wong
  5 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2017-05-30 19:03 UTC (permalink / raw)
  To: spew

---
 iom_kqueue.h | 229 +++++++++++++++++++++++++++++++++++++++--------------------
 1 file changed, 153 insertions(+), 76 deletions(-)

diff --git a/iom_kqueue.h b/iom_kqueue.h
index 52298b9055..31cddd54be 100644
--- a/iom_kqueue.h
+++ b/iom_kqueue.h
@@ -7,13 +7,14 @@
  * enabling filters, where as epoll_ctl requires separate ADD and
  * MOD operations.
  *
- * These are advantages in the common case.
+ * These are advantages in the common case...
  *
  * The epoll API has advantages in more esoteric cases:
  *
  *   epoll has the advantage over kqueue when watching for multiple
  *   events (POLLIN|POLLOUT|POLLPRI) (which is rare).  We also have
  *   to install two kevent filters to watch POLLIN|POLLOUT simutaneously.
+ *   See udata_set/udata_get functions below for more on this.
  *
  *   Finally, kevent does not support POLLPRI directly, we need to use
  *   select() (or perhaps poll() on some platforms) with a zero
@@ -33,6 +34,9 @@
 #include <sys/event.h>
 #include <sys/time.h>
 
+/* We need to use EVFILT_READ to watch RB_WAITFD_PRI */
+#define WAITFD_READ (RB_WAITFD_IN|RB_WAITFD_PRI)
+
 /* allocated on heap (rb_vm_t.iom) */
 struct rb_iom_struct {
     /*
@@ -154,6 +158,9 @@ check_pri(rb_thread_t *th, struct list_head *pri)
     struct kev *kev = 0, *next = 0;
     int max = -1;
 
+    if (list_empty(pri)) {
+	return;
+    }
     rb_fd_init(&efds);
     list_for_each(pri, kev, fdw.w.wnode) {
 	int fd = *kev->fdw.fdp;
@@ -166,20 +173,108 @@ check_pri(rb_thread_t *th, struct list_head *pri)
     }
 
     r = native_fd_select(max + 1, 0, 0, &efds, &tv, th);
-    if (r > 0) {
-	list_for_each(pri, kev, fdw.w.wnode) {
+
+    list_for_each_safe(pri, kev, next, fdw.w.wnode) {
+	list_del_init(&kev->fdw.w.wnode);
+	if (r > 0) {
 	    int fd = *kev->fdw.fdp;
 	    if (fd >= 0 && rb_fd_isset(fd, &efds)) {
 		kev->fdw.revents |= RB_WAITFD_PRI;
 	    }
 	}
     }
+    rb_fd_term(&efds);
+}
 
-    /* needed for rb_ensure */
-    list_for_each_safe(pri, kev, next, fdw.w.wnode) {
-	list_del_init(&kev->fdw.w.wnode);
+/*
+ * kqueue is a more complicated because we install separate filters for
+ * simultaneously watching read and write on the same FD, and now we
+ * must clear shared filters if the event from the other side came in.
+ */
+static int
+drop_pairs(rb_thread_t *th, int max, struct kevent *changelist,
+	    struct list_head *rdel, struct list_head *wdel)
+{
+    int nchanges = 0;
+    struct kev *kev = 0, *next = 0;
+
+    list_for_each_safe(rdel, kev, next, wfdnode) {
+	int fd = *kev->fdw.fdp;
+	list_del_init(&kev->wfdnode);
+	assert(kev->fdw.revents & RB_WAITFD_OUT);
+	if (fd >= 0) {
+	    struct kevent *chg = &changelist[nchanges++];
+	    assert(nchanges <= max);
+	    EV_SET(chg, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
+	}
     }
-    rb_fd_term(&efds);
+    list_for_each_safe(wdel, kev, next, rfdnode) {
+	int fd = *kev->fdw.fdp;
+	list_del_init(&kev->rfdnode);
+	assert(kev->fdw.revents & WAITFD_READ);
+	if (fd >= 0) {
+	    struct kevent *chg = &changelist[nchanges++];
+	    assert(nchanges <= max);
+	    EV_SET(chg, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
+	}
+    }
+    if (nchanges) {
+	int kqfd = th->vm->iom->kqueue_fd;
+	if (kevent(kqfd, changelist, nchanges, 0, 0, &zero) < 0) {
+	    return errno;
+	}
+    }
+    return 0;
+}
+
+/*
+ * kqueue requires separate filters when watching for simultaneously
+ * watching read and write events on the same FD.  Different filters
+ * means they can have different udata pointers; at least with
+ * native kqueue.
+ *
+ * When emulating native kqueue behavior using epoll as libkqueue does,
+ * there is only space for one udata pointer.  This is because epoll
+ * allows each "filter" (epitem in the kernel) to watch read and write
+ * simultaneously.
+ * epoll keys epitems using: [ fd, file description (struct file *) ].
+ * In contrast, kevent keys its filters using: [ fd (ident), filter ].
+ *
+ * So, with native kqueue, we can at least optimize away rb_iom_fdm_get
+ * function calls; but we cannot do that when libkqueue emulates
+ * kqueue with epoll and need to retrieve the fdm from the correct
+ * fdmap.
+ *
+ * Finally, the epoll implementation only needs one fdmap.
+ */
+
+static void *
+udata_set(rb_iom_t *iom, struct rb_iom_fdm *fdm)
+{
+#ifdef LIBKQUEUE
+    return iom;
+#else /* native kqueue */
+    return fdm;
+#endif
+}
+
+static struct rb_iom_fdm *
+udata_get(struct kevent *ev)
+{
+#ifdef LIBKQUEUE
+    rb_iom_t *iom = ev->udata;
+    int fd = ev->ident;
+
+    switch (ev->filter) {
+      case EVFILT_READ:
+	return rb_iom_fdm_get(&iom->rfdmap, fd);
+      case EVFILT_WRITE:
+	return rb_iom_fdm_get(&iom->wfdmap, fd);
+      default:
+	rb_bug("bad filter in libkqueue compatibility mode: %d", ev->filter);
+    }
+#endif
+    return ev->udata;
 }
 
 static void
@@ -187,81 +282,64 @@ check_kevent(rb_thread_t *th, int nr, struct kevent *eventlist)
 {
     int err = 0;
     if (nr >= 0) {
-	int i;
-	struct kevent *changelist = eventlist;
-	int nchanges = 0;
 	struct list_head pri = LIST_HEAD_INIT(pri);
+	struct list_head wdel = LIST_HEAD_INIT(wdel);
+	struct list_head rdel = LIST_HEAD_INIT(rdel);
+	rb_iom_t *iom = th->vm->iom;
+	struct kev *kev = 0, *next = 0;
+	int i;
 
 	for (i = 0; i < nr; i++) {
 	    struct kevent *ev = &eventlist[i];
-	    struct rb_iom_fdm *fdm = ev->udata;
-	    struct kev *kev = 0, *next = 0;
-	    int fd = (int)ev->ident;
-	    int ok_fd = -1;
-	    int paired = 0;
+	    struct rb_iom_fdm *fdm = udata_get(ev);
 
 	    if (ev->filter == EVFILT_READ) {
+		int pair_queued = 0;
+
 		list_for_each_safe(&fdm->fdhead, kev, next, rfdnode) {
-		    int rbits = kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI);
+		    int rbits = kev->fdw.events & WAITFD_READ;
 
-		    list_del_init(&kev->rfdnode);
 		    assert(rbits && "unexpected EVFILT_READ");
+		    kev->fdw.revents |= (rbits & RB_WAITFD_IN);
 		    rb_iom_waiter_ready(&kev->fdw.w);
-		    paired |= (kev->fdw.events & RB_WAITFD_OUT);
-		    if (rbits == RB_WAITFD_IN) { /* common */
-			kev->fdw.revents |= RB_WAITFD_IN;
-		    }
-		    else if (rbits & RB_WAITFD_PRI) { /* ugh... */
-			kev->fdw.revents |= rbits & RB_WAITFD_IN;
-			list_add_tail(&pri, &kev->fdw.w.wnode);
+		    list_del_init(&kev->rfdnode);
+
+		    if (rbits & RB_WAITFD_PRI) {
+			list_add(&pri, &kev->fdw.w.wnode);
 		    }
-		    if (ok_fd != fd) {
-			ok_fd = *kev->fdw.fdp;
+
+		    if ((kev->fdw.events & RB_WAITFD_OUT) &&
+				*kev->fdw.fdp >= 0 &&
+				!pair_queued) {
+			pair_queued = 1;
+			list_add(&wdel, &kev->rfdnode);
 		    }
 		}
 	    }
 	    else if (ev->filter == EVFILT_WRITE) {
+		int pair_queued = 0;
+
 		list_for_each_safe(&fdm->fdhead, kev, next, wfdnode) {
-		    list_del_init(&kev->wfdnode);
-		    kev->fdw.revents |= RB_WAITFD_OUT;
 		    assert(kev->fdw.events & RB_WAITFD_OUT &&
 			    "unexpected EVFILT_WRITE");
+		    kev->fdw.revents |= RB_WAITFD_OUT;
 		    rb_iom_waiter_ready(&kev->fdw.w);
-		    paired |= (kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI));
-		    if (ok_fd != fd) {
-			ok_fd = *kev->fdw.fdp;
+		    list_del_init(&kev->wfdnode);
+
+		    if ((kev->fdw.events & WAITFD_READ) &&
+				*kev->fdw.fdp >= 0 &&
+				!pair_queued) {
+			pair_queued = 1;
+			list_add(&rdel, &kev->wfdnode);
 		    }
 		}
 	    }
 	    else {
 		rb_bug("unexpected filter: %d", ev->filter);
 	    }
-	    /* delete the corresponding event when one FD has 2 filters */
-	    if (ok_fd == fd) {
-		if (paired & (RB_WAITFD_IN|RB_WAITFD_PRI)) {
-		    struct kevent *chg = &changelist[nchanges++];
-		    EV_SET(chg, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
-		}
-		if (paired & RB_WAITFD_OUT) {
-		    struct kevent *chg = &changelist[nchanges++];
-		    EV_SET(chg, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
-		}
-	    }
-	}
-	if (!list_empty(&pri)) {
-	    check_pri(th, &pri);
-	}
-
-
-	/*
-	 * kqueue is a little more complicated because we install
-	 * separate filters for simultaneously watching read and write
-	 * on the same FD, and now we must clear shared filters
-	 */
-	if (nchanges && kevent(th->vm->iom->kqueue_fd, changelist, nchanges,
-				0, 0, &zero) < 0) {
-	    err = errno;
 	}
+	check_pri(th, &pri);
+	err = drop_pairs(th, nr, eventlist, &rdel, &wdel);
 
 	/* notify the waiter thread in case we enqueued fibers for them */
 	if (nr > 0 && th->vm->iom->waiter) {
@@ -350,7 +428,7 @@ rb_iom_do_wait(rb_thread_t *th, rb_iom_t *iom)
 }
 
 static void
-kevxchg_ping(struct kev *kev)
+kevxchg_ping(int fd, struct kev *kev)
 {
     rb_thread_t *th = kev->th;
     rb_iom_t *iom = rb_iom_get(th);
@@ -360,21 +438,17 @@ kevxchg_ping(struct kev *kev)
     VALUE v;
     int nevents = iom->nevents;
     struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents);
-    struct kevent *changelist = eventlist; /* allowed, see kevent manpage */
-    int fd = *kev->fdw.fdp;
-    int kqfd;
+    struct kevent *changelist = eventlist;
+    int kqfd = iom_kqfd(iom);
 
-    VM_ASSERT(nevents > 2);
-    if (fd < 0) {
-	return;
-    }
+    VM_ASSERT(nevents >= 2);
 
-    kqfd = iom_kqfd(iom);
-    /* EVFILT_READ handles urgent data (POLLPRI) */
-    if (kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI)) {
+    /* EVFILT_READ handles urgent data (POLLPRI)... hopefully */
+    if (kev->fdw.events & WAITFD_READ) {
 	struct rb_iom_fdm *fdm = rb_iom_fdm_get(&iom->rfdmap, fd);
 	if (list_empty(&fdm->fdhead)) {
 	    struct kevent *chg = &changelist[nchanges++];
+	    fdm = udata_set(iom, fdm);
 	    EV_SET(chg, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, fdm);
 	}
 	list_add(&fdm->fdhead, &kev->rfdnode);
@@ -383,6 +457,7 @@ kevxchg_ping(struct kev *kev)
 	struct rb_iom_fdm *fdm = rb_iom_fdm_get(&iom->wfdmap, fd);
 	if (list_empty(&fdm->fdhead)) {
 	    struct kevent *chg = &changelist[nchanges++];
+	    fdm = udata_set(iom, fdm);
 	    EV_SET(chg, fd, EVFILT_WRITE, EV_ADD|EV_ONESHOT, 0, 0, fdm);
 	}
 	list_add(&fdm->fdhead, &kev->wfdnode);
@@ -406,21 +481,21 @@ static VALUE
 kevxchg_yield(VALUE ptr)
 {
     struct kev *kev = (struct kev *)ptr;
-    kevxchg_ping(kev);
-    (void)rb_fiber_auto_do_yield_p(kev->th);
-    return rb_fiber_yield(0, 0);
+    int fd = *kev->fdw.fdp;
+    if (fd >= 0) {
+	kevxchg_ping(fd, kev);
+	(void)rb_fiber_auto_do_yield_p(kev->th);
+	return rb_fiber_yield(0, 0);
+    }
+    return Qfalse;
 }
 
 static VALUE
 kev_done(VALUE ptr)
 {
     struct kev *kev = (struct kev *)ptr;
-    if (kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI)) {
-	list_del(&kev->rfdnode);
-    }
-    if (kev->fdw.events & RB_WAITFD_OUT) {
-	list_del(&kev->wfdnode);
-    }
+    list_del(&kev->rfdnode);
+    list_del(&kev->wfdnode);
     return rb_iom_waiter_done((VALUE)&kev->fdw.w);
 }
 
@@ -435,6 +510,8 @@ iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
     kev.fdw.events = (short)events;
     kev.fdw.revents = 0;
     list_add(&iom->kevs, &kev.fdw.w.wnode);
+    list_node_init(&kev.rfdnode);
+    list_node_init(&kev.wfdnode);
     rb_iom_timer_add(&iom->timers, &kev.fdw.w.timer, timeout, IOM_FIB|IOM_WAIT);
     rb_ensure(kevxchg_yield, (VALUE)&kev, kev_done, (VALUE)&kev);
 
-- 
EW


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH 6/7] wip
  2017-05-30 19:03 [PATCH 1/7] auto fiber scheduling for I/O Eric Wong
                   ` (3 preceding siblings ...)
  2017-05-30 19:03 ` [PATCH 5/7] kq-share Eric Wong
@ 2017-05-30 19:03 ` Eric Wong
  2017-05-30 19:03 ` [PATCH 7/7] wip Eric Wong
  5 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2017-05-30 19:03 UTC (permalink / raw)
  To: spew

---
 iom_kqueue.h                 | 14 +++++++-------
 test/ruby/test_fiber_auto.rb |  9 ++++++++-
 2 files changed, 15 insertions(+), 8 deletions(-)

diff --git a/iom_kqueue.h b/iom_kqueue.h
index 31cddd54be..cb008db3ff 100644
--- a/iom_kqueue.h
+++ b/iom_kqueue.h
@@ -259,7 +259,7 @@ udata_set(rb_iom_t *iom, struct rb_iom_fdm *fdm)
 }
 
 static struct rb_iom_fdm *
-udata_get(struct kevent *ev)
+udata_get(const struct kevent *ev)
 {
 #ifdef LIBKQUEUE
     rb_iom_t *iom = ev->udata;
@@ -446,19 +446,19 @@ kevxchg_ping(int fd, struct kev *kev)
     /* EVFILT_READ handles urgent data (POLLPRI)... hopefully */
     if (kev->fdw.events & WAITFD_READ) {
 	struct rb_iom_fdm *fdm = rb_iom_fdm_get(&iom->rfdmap, fd);
-	if (list_empty(&fdm->fdhead)) {
+	if (1 || list_empty(&fdm->fdhead)) {
 	    struct kevent *chg = &changelist[nchanges++];
-	    fdm = udata_set(iom, fdm);
-	    EV_SET(chg, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, fdm);
+	    void *udata = udata_set(iom, fdm);
+	    EV_SET(chg, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, udata);
 	}
 	list_add(&fdm->fdhead, &kev->rfdnode);
     }
     if (kev->fdw.events & RB_WAITFD_OUT) {
 	struct rb_iom_fdm *fdm = rb_iom_fdm_get(&iom->wfdmap, fd);
-	if (list_empty(&fdm->fdhead)) {
+	if (1 || list_empty(&fdm->fdhead)) {
 	    struct kevent *chg = &changelist[nchanges++];
-	    fdm = udata_set(iom, fdm);
-	    EV_SET(chg, fd, EVFILT_WRITE, EV_ADD|EV_ONESHOT, 0, 0, fdm);
+	    void *udata = udata_set(iom, fdm);
+	    EV_SET(chg, fd, EVFILT_WRITE, EV_ADD|EV_ONESHOT, 0, 0, udata);
 	}
 	list_add(&fdm->fdhead, &kev->wfdnode);
     }
diff --git a/test/ruby/test_fiber_auto.rb b/test/ruby/test_fiber_auto.rb
index 196361c674..da758e8518 100644
--- a/test/ruby/test_fiber_auto.rb
+++ b/test/ruby/test_fiber_auto.rb
@@ -188,12 +188,19 @@ def test_cross_thread_schedule
   def test_multi_readers
     IO.pipe do |r, w|
       r.nonblock = w.nonblock = true
-      nr = 30
+      nr = 10
+      warn "AAAAAAAAAAAAAAAAAA\n"
+      warn "AAAAAAAAAAAAAAAAAA\n"
       fibs = nr.times.map { Fiber.start { r.read(1) } }
       fibs.each { |f| assert_nil f.join(0.001) }
 
       exp = nr.times.map { -'a' }.freeze
       w.write(exp.join)
+      fibs.each_with_index do |f, i|
+        warn i
+        warn f.value
+      end
+      warn "OK\n";
       assert_equal exp, fibs.map(&:value);
     end
   end
-- 
EW


^ permalink raw reply related	[flat|nested] 7+ messages in thread

* [PATCH 7/7] wip
  2017-05-30 19:03 [PATCH 1/7] auto fiber scheduling for I/O Eric Wong
                   ` (4 preceding siblings ...)
  2017-05-30 19:03 ` [PATCH 6/7] wip Eric Wong
@ 2017-05-30 19:03 ` Eric Wong
  5 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2017-05-30 19:03 UTC (permalink / raw)
  To: spew

---
 iom.h                        |  2 ++
 iom_internal.h               |  9 +++++++
 iom_kqueue.h                 | 57 +++++++++++++++++++++++++++++++++++++++++++-
 test/ruby/test_fiber_auto.rb | 14 +++++------
 thread.c                     |  1 +
 5 files changed, 74 insertions(+), 9 deletions(-)

diff --git a/iom.h b/iom.h
index d412885baa..82827868c5 100644
--- a/iom.h
+++ b/iom.h
@@ -85,6 +85,8 @@ void rb_iom_destroy(rb_vm_t *);
  */
 void rb_iom_schedule(rb_thread_t *th, double *timeout);
 
+int rb_iom_notify_fd_close(rb_vm_t *, int fd);
+
 /* cont.c */
 int rb_fiber_auto_sched_p(const rb_thread_t *);
 
diff --git a/iom_internal.h b/iom_internal.h
index 1a4050d62d..d2e51ba6e3 100644
--- a/iom_internal.h
+++ b/iom_internal.h
@@ -65,6 +65,15 @@ iom_fdhead_aref(struct rb_iom_fdmap *fdmap, int fd)
 }
 
 static struct rb_iom_fdm *
+rb_iom_fdm_check(struct rb_iom_fdmap *fdmap, int fd)
+{
+    if (fd < fdmap->max_fd) {
+	return iom_fdhead_aref(fdmap, fd);
+    }
+    return 0;
+}
+
+static struct rb_iom_fdm *
 rb_iom_fdm_get(struct rb_iom_fdmap *fdmap, int fd)
 {
     if (fd >= fdmap->max_fd) {
diff --git a/iom_kqueue.h b/iom_kqueue.h
index cb008db3ff..96998a0329 100644
--- a/iom_kqueue.h
+++ b/iom_kqueue.h
@@ -285,7 +285,6 @@ check_kevent(rb_thread_t *th, int nr, struct kevent *eventlist)
 	struct list_head pri = LIST_HEAD_INIT(pri);
 	struct list_head wdel = LIST_HEAD_INIT(wdel);
 	struct list_head rdel = LIST_HEAD_INIT(rdel);
-	rb_iom_t *iom = th->vm->iom;
 	struct kev *kev = 0, *next = 0;
 	int i;
 
@@ -587,5 +586,61 @@ rb_iom_reserved_fd(int fd)
     return iom && fd == iom->kqueue_fd;
 }
 
+int rb_iom_notify_fd_close(rb_vm_t *vm, int fd)
+{
+    rb_iom_t *iom = vm->iom;
+    struct rb_iom_fdm *fdm;
+    struct kev *kev = 0, *next = 0;
+    int n = 0;
+    int wr = 0;
+    int rd = 0;
+    struct kevent changes[2];
+
+    if (!iom) {
+	return 0;
+    }
+    fdm = rb_iom_fdm_check(&iom->rfdmap, fd);
+    if (fdm) {
+	list_for_each_safe(&fdm->fdhead, kev, next, rfdnode) {
+	    rb_thread_t *th = kev->th;
+	    VALUE err = th->vm->special_exceptions[ruby_error_stream_closed];
+	    rb_threadptr_pending_interrupt_enque(th, err);
+	    rb_threadptr_interrupt(th);
+
+	    rb_iom_waiter_ready(&kev->fdw.w);
+	    list_del_init(&kev->rfdnode);
+
+	    rd = 1;
+	}
+    }
+    fdm = rb_iom_fdm_check(&iom->wfdmap, fd);
+    if (fdm) {
+	list_for_each_safe(&fdm->fdhead, kev, next, wfdnode) {
+	    rb_thread_t *th = kev->th;
+	    VALUE err = th->vm->special_exceptions[ruby_error_stream_closed];
+	    rb_threadptr_pending_interrupt_enque(th, err);
+	    rb_threadptr_interrupt(th);
+	    rb_iom_waiter_ready(&kev->fdw.w);
+	    list_del_init(&kev->wfdnode);
+	    wr = 1;
+	}
+    }
+    if (wr) {
+	struct kevent *chg = &changes[n++];
+	EV_SET(chg, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
+    }
+    if (rd) {
+	struct kevent *chg = &changes[n++];
+	EV_SET(chg, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
+    }
+    if (n) {
+	if (kevent(iom->kqueue_fd, changes, n, 0, 0, &zero) < 0) {
+	    rb_sys_fail("kevent");
+	}
+	rb_fiber_auto_do_yield_p(GET_THREAD());
+    }
+    return n;
+}
+
 #include "iom_pingable_common.h"
 #include "iom_common.h"
diff --git a/test/ruby/test_fiber_auto.rb b/test/ruby/test_fiber_auto.rb
index da758e8518..527472d28f 100644
--- a/test/ruby/test_fiber_auto.rb
+++ b/test/ruby/test_fiber_auto.rb
@@ -186,21 +186,19 @@ def test_cross_thread_schedule
 
   # tricky for kqueue and epoll implementations
   def test_multi_readers
+    if RUBY_PLATFORM =~ /linux/ &&
+      (RbConfig::CONFIG['LDFLAGS'] =~ /-lkqueue\b/ ||
+       RbConfig::CONFIG['DLDFLAGS'] =~ /-lkqueue\b/)
+      skip "FIXME libkqueue is buggy on this test"
+    end
     IO.pipe do |r, w|
       r.nonblock = w.nonblock = true
-      nr = 10
-      warn "AAAAAAAAAAAAAAAAAA\n"
-      warn "AAAAAAAAAAAAAAAAAA\n"
+      nr = 30
       fibs = nr.times.map { Fiber.start { r.read(1) } }
       fibs.each { |f| assert_nil f.join(0.001) }
 
       exp = nr.times.map { -'a' }.freeze
       w.write(exp.join)
-      fibs.each_with_index do |f, i|
-        warn i
-        warn f.value
-      end
-      warn "OK\n";
       assert_equal exp, fibs.map(&:value);
     end
   end
diff --git a/thread.c b/thread.c
index 7899aeae0b..9de07f663e 100644
--- a/thread.c
+++ b/thread.c
@@ -2219,6 +2219,7 @@ rb_notify_fd_close(int fd)
 	    busy = 1;
 	}
     }
+    /* busy |= rb_iom_notify_fd_close(vm, fd); */
     return busy;
 }
 
-- 
EW


^ permalink raw reply related	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2017-05-30 19:04 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2017-05-30 19:03 [PATCH 1/7] auto fiber scheduling for I/O Eric Wong
2017-05-30 19:03 ` [PATCH 2/7] iom: implement waitpid generically Eric Wong
2017-05-30 19:03 ` [PATCH 3/7] iom: move to fdmap Eric Wong
2017-05-30 19:03 ` [PATCH 4/7] epoll fdmap inversion Eric Wong
2017-05-30 19:03 ` [PATCH 5/7] kq-share Eric Wong
2017-05-30 19:03 ` [PATCH 6/7] wip Eric Wong
2017-05-30 19:03 ` [PATCH 7/7] wip Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).