From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on dcvr.yhbt.net X-Spam-Level: ** X-Spam-ASN: AS34456 83.220.176.0/20 X-Spam-Status: No, score=2.8 required=3.0 tests=BAYES_00, RCVD_IN_BL_SPAMCOP_NET,RCVD_IN_BRBL_LASTEXT,RCVD_IN_MSPIKE_BL, RCVD_IN_MSPIKE_ZBI,RCVD_IN_RP_RNBL,RCVD_IN_SORBS_SPAM,RCVD_IN_XBL,RDNS_NONE, SPF_FAIL,SPF_HELO_FAIL,TO_EQ_FM_DOM_SPF_FAIL shortcircuit=no autolearn=no autolearn_force=no version=3.4.0 Received: from 80x24.org (unknown [83.220.185.87]) by dcvr.yhbt.net (Postfix) with ESMTP id 22CAD202B3 for ; Thu, 29 Jun 2017 04:35:13 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [PATCH v2] auto fiber schedule for rb_wait_for_single_fd and rb_waitpid Date: Thu, 29 Jun 2017 04:35:09 +0000 Message-Id: <20170629043509.14939-1-e@80x24.org> List-Id: Implement automatic Fiber yield and resume when running rb_wait_for_single_fd and rb_waitpid. The Ruby API changes for Fiber are named after existing Thread methods. main Ruby API: Fiber#start -> enable auto-scheduling and run Fiber until it automatically yields (due to EAGAIN/EWOULDBLOCK) The following behave like their Thread counterparts: Fiber.start - Fiber.new + Fiber#start (prelude.rb) Fiber#join - run internal scheduler until Fiber is terminated Fiber#value - ditto Fiber#run - like Fiber#start (prelude.rb) Right now, it takes over rb_wait_for_single_fd() and rb_waitpid() function if the running Fiber is auto-enabled (cont.c::rb_fiber_auto_sched_p) Changes to existing functions are minimal. New files (all new structs and relations should be documented): iom.h - internal API for the rest of RubyVM (incomplete?) iom_internal.h - internal header for iom_(select|epoll|kqueue).h iom_epoll.h - epoll-specific pieces iom_kqueue.h - kqueue-specific pieces iom_select.h - select-specific pieces iom_pingable_common.h - common code for iom_(epoll|kqueue).h iom_common.h - common footer for iom_(select|epoll|kqueue).h Changes to existing data structures: rb_thread_t.afrunq - list of fibers to auto-resume rb_vm_t.iom - Ruby I/O Manager (rb_iom_t) :) Besides rb_iom_t, all the new structs are stack-only and relies extensively on ccan/list for branch-less, O(1) insert/delete. As usual, understanding the data structures first should help you understand the code. Right now, I reuse some static functions in thread.c, so thread.c includes iom_(select|epoll|kqueue).h TODO: Hijack other blocking functions (IO.select, ...) I am using "double" for timeout since it is more convenient for arithmetic like parts of thread.c. Most platforms have good FP, I think. Also, all "blocking" functions (rb_iom_wait*) will have timeout support. ./configure gains a new --with-iom=(select|epoll|kqueue) switch libkqueue: libkqueue support is incomplete; corner cases are not handled well: 1) multiple fibers waiting on the same FD 2) waiting for both read and write events on the same FD Bugfixes to libkqueue may be necessary to support all corner cases. Supporting these corner cases for native kqueue was challenging, even. See comments on iom_kqueue.h and iom_epoll.h for nuances. Limitations Test script I used to download a file from my server: ----8<--- require 'net/http' require 'uri' require 'digest/sha1' require 'fiber' url = 'http://80x24.org/git-i-forgot-to-pack/objects/pack/pack-97b25a76c03b489d4cbbd85b12d0e1ad28717e55.idx' uri = URI(url) use_ssl = "https" == uri.scheme fibs = 10.times.map do Fiber.start do cur = Fiber.current.object_id # XXX getaddrinfo() and connect() are blocking # XXX resolv/replace + connect_nonblock Net::HTTP.start(uri.host, uri.port, use_ssl: use_ssl) do |http| req = Net::HTTP::Get.new(uri) http.request(req) do |res| dig = Digest::SHA1.new res.read_body do |buf| dig.update(buf) #warn "#{cur} #{buf.bytesize}\n" end warn "#{cur} #{dig.hexdigest}\n" end end warn "done\n" :done end end warn "joining #{Time.now}\n" fibs[-1].join(4) warn "joined #{Time.now}\n" all = fibs.dup warn "1 joined, wait for the rest\n" until fibs.empty? fibs.each(&:join) fibs.keep_if(&:alive?) warn fibs.inspect end p all.map(&:value) Fiber.new do puts 'HI' end.run.join --- common.mk | 7 + configure.in | 32 ++ cont.c | 117 +++- include/ruby/io.h | 2 + iom.h | 92 ++++ iom_common.h | 196 +++++++ iom_epoll.h | 423 +++++++++++++++ iom_internal.h | 251 +++++++++ iom_kqueue.h | 600 +++++++++++++++++++++ iom_pingable_common.h | 46 ++ iom_select.h | 306 +++++++++++ prelude.rb | 12 + process.c | 14 +- signal.c | 40 +- .../wait_for_single_fd/test_wait_for_single_fd.rb | 44 ++ test/lib/leakchecker.rb | 9 + test/ruby/test_fiber_auto.rb | 238 ++++++++ thread.c | 42 ++ thread_pthread.c | 5 + vm.c | 9 + vm_core.h | 4 + 21 files changed, 2475 insertions(+), 14 deletions(-) create mode 100644 iom.h create mode 100644 iom_common.h create mode 100644 iom_epoll.h create mode 100644 iom_internal.h create mode 100644 iom_kqueue.h create mode 100644 iom_pingable_common.h create mode 100644 iom_select.h create mode 100644 test/ruby/test_fiber_auto.rb diff --git a/common.mk b/common.mk index 1e4ae6e384..42737fae36 100644 --- a/common.mk +++ b/common.mk @@ -2613,6 +2613,13 @@ thread.$(OBJEXT): {$(VPATH)}id.h thread.$(OBJEXT): {$(VPATH)}intern.h thread.$(OBJEXT): {$(VPATH)}internal.h thread.$(OBJEXT): {$(VPATH)}io.h +thread.$(OBJEXT): {$(VPATH)}iom.h +thread.$(OBJEXT): {$(VPATH)}iom_internal.h +thread.$(OBJEXT): {$(VPATH)}iom_common.h +thread.$(OBJEXT): {$(VPATH)}iom_epoll.h +thread.$(OBJEXT): {$(VPATH)}iom_kqueue.h +thread.$(OBJEXT): {$(VPATH)}iom_pingable_common.h +thread.$(OBJEXT): {$(VPATH)}iom_select.h thread.$(OBJEXT): {$(VPATH)}method.h thread.$(OBJEXT): {$(VPATH)}missing.h thread.$(OBJEXT): {$(VPATH)}node.h diff --git a/configure.in b/configure.in index e30ae6c9c9..e9e48816fa 100644 --- a/configure.in +++ b/configure.in @@ -1396,6 +1396,8 @@ AC_CHECK_HEADERS(process.h) AC_CHECK_HEADERS(pwd.h) AC_CHECK_HEADERS(setjmpex.h) AC_CHECK_HEADERS(sys/attr.h) +AC_CHECK_HEADERS(sys/epoll.h) +AC_CHECK_HEADERS(sys/event.h) AC_CHECK_HEADERS(sys/fcntl.h) AC_CHECK_HEADERS(sys/file.h) AC_CHECK_HEADERS(sys/id.h) @@ -2412,6 +2414,10 @@ AC_CHECK_FUNCS(dladdr) AC_CHECK_FUNCS(dup) AC_CHECK_FUNCS(dup3) AC_CHECK_FUNCS(eaccess) +AC_CHECK_FUNCS(epoll_create) +AC_CHECK_FUNCS(epoll_create1) +AC_CHECK_FUNCS(epoll_ctl) +AC_CHECK_FUNCS(epoll_wait) AC_CHECK_FUNCS(endgrent) AC_CHECK_FUNCS(fchmod) AC_CHECK_FUNCS(fchown) @@ -2445,7 +2451,9 @@ AC_CHECK_FUNCS(initgroups) AC_CHECK_FUNCS(ioctl) AC_CHECK_FUNCS(isfinite) AC_CHECK_FUNCS(issetugid) +AC_CHECK_FUNCS(kevent) AC_CHECK_FUNCS(killpg) +AC_CHECK_FUNCS(kqueue) AC_CHECK_FUNCS(lchmod) AC_CHECK_FUNCS(lchown) AC_CHECK_FUNCS(link) @@ -3597,6 +3605,29 @@ AC_ARG_WITH(valgrind, AS_IF([test x$with_valgrind != xno], [AC_CHECK_HEADERS(valgrind/memcheck.h)]) +AC_DEFINE_UNQUOTED(IOM_SELECT, 0) +AC_DEFINE_UNQUOTED(IOM_KQUEUE, 1) +AC_DEFINE_UNQUOTED(IOM_EPOLL, 2) + +iom_default=select +AS_CASE([$ac_cv_func_kqueue:$ac_cv_func_kevent:$ac_cv_header_sys_event_h], +[yes:yes:yes], [iom_default=kqueue], +[*], + [AS_CASE( + [$ac_cv_func_epoll_wait:$ac_cv_func_epoll_create:$ac_cv_header_sys_epoll_h], + [yes:yes:yes], [iom_default=epoll])] +) + +AC_ARG_WITH(iom, + AS_HELP_STRING([--with-iom=XXXXX], + [I/O manager (select|kqueue|epoll)]), + [with_iom="$withval"], [with_iom="$iom_default"]) +AS_CASE(["$with_iom"], + [select], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_SELECT)], + [kqueue], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_KQUEUE)], + [epoll], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_EPOLL)], + [AC_MSG_ERROR(unknown I/O manager: $with_iom)]) + dln_a_out_works=no AS_IF([test "$ac_cv_header_a_out_h" = yes], [ AS_IF([test "$with_dln_a_out" = yes || test "$rb_cv_dlopen" = unknown], [ @@ -4752,5 +4783,6 @@ config_summary "install doc" "$install_doc" config_summary "man page type" "$MANTYPE" config_summary "search path" "$search_path" config_summary "static-linked-ext" ${EXTSTATIC:+"yes"} +config_summary "I/O manager" ${with_iom} echo "" echo "---" diff --git a/cont.c b/cont.c index 095cc86767..57058f4ab7 100644 --- a/cont.c +++ b/cont.c @@ -13,6 +13,7 @@ #include "vm_core.h" #include "gc.h" #include "eval_intern.h" +#include "iom.h" /* FIBER_USE_NATIVE enables Fiber performance improvement using system * dependent method such as make/setcontext on POSIX system or @@ -133,6 +134,7 @@ struct rb_fiber_struct { * You shouldn't mix "transfer" and "resume". */ int transferred; + unsigned int auto_fiber:1; #if FIBER_USE_NATIVE #ifdef _WIN32 @@ -1477,19 +1479,30 @@ rb_fiber_terminate(rb_fiber_t *fib) fiber_switch(return_fiber(), 1, &value, 0); } -VALUE -rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv) +int +rb_fiber_resumable_p(const rb_thread_t *th, const rb_fiber_t *fib) { - rb_fiber_t *fib; - GetFiberPtr(fibval, fib); + return th->root_fiber != fib && !fib->prev; +} +static void +fiber_check_resume(const rb_fiber_t *fib) +{ if (fib->prev != 0 || fib->cont.type == ROOT_FIBER_CONTEXT) { rb_raise(rb_eFiberError, "double resume"); } if (fib->transferred != 0) { rb_raise(rb_eFiberError, "cannot resume transferred Fiber"); } +} +VALUE +rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv) +{ + rb_fiber_t *fib; + GetFiberPtr(fibval, fib); + + fiber_check_resume(fib); return fiber_switch(fib, argc, argv, 1); } @@ -1631,7 +1644,100 @@ rb_fiber_s_current(VALUE klass) return rb_fiber_current(); } +/* Returns true if auto-fiber is enabled for current fiber */ +int +rb_fiber_auto_sched_p(const rb_thread_t *th) +{ + const rb_fiber_t *cur = th->fiber; + + return (cur && cur->auto_fiber && th->root_fiber != cur); +} + +/* + * Enable auto-scheduling for the Fiber and resume it + */ +static VALUE +rb_fiber_auto_start(int argc, VALUE *argv, VALUE self) +{ + rb_thread_t *th = GET_THREAD(); + rb_fiber_t *fib; + GetFiberPtr(self, fib); + + if (th->root_fiber == fib) { + rb_raise(rb_eFiberError, "Root fiber cannot #start"); + } + if (fib->auto_fiber) { + rb_raise(rb_eFiberError, "Fiber already started"); + } + fib->auto_fiber = 1; + fiber_check_resume(fib); + return fiber_switch(fib, argc, argv, 1); +} + +rb_thread_t * +rb_fiber_owner_thread(VALUE self) +{ + rb_fiber_t *fib; + + GetFiberPtr(self, fib); + + return rb_thread_ptr(fib->cont.saved_thread.self); +} + +static void +fiber_auto_join(rb_fiber_t *fib, double *timeout) +{ + rb_thread_t *th = GET_THREAD(); + rb_fiber_t *cur = fiber_current(); + + if (cur == fib) { + rb_raise(rb_eFiberError, "Target fiber must not be current fiber"); + } + if (th->root_fiber == fib) { + rb_raise(rb_eFiberError, "Target fiber must not be root fiber"); + } + if (fib->cont.saved_thread.self != th->self) { + rb_raise(rb_eFiberError, "Target fiber not owned by current thread"); + } + if (!fib->auto_fiber) { + rb_raise(rb_eFiberError, "Target fiber is not an auto-fiber"); + } + + while (fib->status != FIBER_TERMINATED && (timeout == 0 || *timeout >= 0.0)) { + rb_iom_schedule(th, timeout); + } +} +static VALUE +rb_fiber_auto_join(int argc, VALUE *argv, VALUE self) +{ + rb_fiber_t *fib; + double timeout, *t; + VALUE limit; + + GetFiberPtr(self, fib); + rb_scan_args(argc, argv, "01", &limit); + + if (NIL_P(limit)) { + t = 0; + } else { + timeout = rb_num2dbl(limit); + t = &timeout; + } + + fiber_auto_join(fib, t); + return fib->status == FIBER_TERMINATED ? fib->cont.self : Qnil; +} + +static VALUE +rb_fiber_auto_value(VALUE self) +{ + rb_fiber_t *fib; + GetFiberPtr(self, fib); + + fiber_auto_join(fib, 0); + return fib->cont.value; +} /* * Document-class: FiberError @@ -1668,6 +1774,9 @@ Init_Cont(void) rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1); rb_define_method(rb_cFiber, "initialize", rb_fiber_init, 0); rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1); + rb_define_method(rb_cFiber, "start", rb_fiber_auto_start, -1); + rb_define_method(rb_cFiber, "join", rb_fiber_auto_join, -1); + rb_define_method(rb_cFiber, "value", rb_fiber_auto_value, 0); } RUBY_SYMBOL_EXPORT_BEGIN diff --git a/include/ruby/io.h b/include/ruby/io.h index 60d6f6d32e..3bd6f06cf3 100644 --- a/include/ruby/io.h +++ b/include/ruby/io.h @@ -116,6 +116,8 @@ typedef struct rb_io_t { /* #define FMODE_UNIX 0x00200000 */ /* #define FMODE_INET 0x00400000 */ /* #define FMODE_INET6 0x00800000 */ +/* #define FMODE_IOM_PRIVATE1 0x01000000 */ /* OS-dependent */ +/* #define FMODE_IOM_PRIVATE2 0x02000000 */ /* OS-dependent */ #define GetOpenFile(obj,fp) rb_io_check_closed((fp) = RFILE(rb_io_taint_check(obj))->fptr) diff --git a/iom.h b/iom.h new file mode 100644 index 0000000000..0397800edf --- /dev/null +++ b/iom.h @@ -0,0 +1,92 @@ +/* + * iom -> I/O Manager for RubyVM (auto-Fiber-aware) + * + * On platforms with epoll or kqueue, this should be ready for multicore; + * even if the rest of the RubyVM is not. + * + * Some inspiration taken from Mio in GHC: + * http://haskell.cs.yale.edu/wp-content/uploads/2013/08/hask035-voellmy.pdf + */ +#ifndef RUBY_IOM_H +#define RUBY_IOM_H +#include "ruby.h" +#include "ruby/io.h" +#include "ruby/intern.h" +#include "vm_core.h" + +typedef struct rb_iom_struct rb_iom_t; + +/* WARNING: unstable API, only for Ruby internal use */ + +/* + * Note: the first "rb_thread_t *" is a placeholder and may be replaced + * with "rb_execution_context_t *" in the future. + */ + +/* + * All functions with "wait" in it take an optional double * +timeout+ + * argument specifying the timeout in seconds. If NULL, it can wait + * forever until the event happens (or the fiber is explicitly resumed). + * + * (maybe) TODO: If non-NULL, the timeout will be updated to the + * remaining time upon returning. Not sure if useful, could just be + * a a waste of cycles; so not implemented, yet. + */ + +/* + * Relinquish calling fiber while waiting for +events+ on the given + * +rb_io_t+ + * + * Multiple native threads can enter this function at the same time. + * + * Events are RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI + * + * Returns a mask of events. + */ + +int rb_iom_waitio(rb_thread_t *, rb_io_t *, int events, double *timeout); + +/* + * Identical to rb_iom_waitio, but takes a pointer to an integer file + * descriptor, instead of rb_io_t. Use rb_iom_waitio when possible, + * since it allows us to optimize epoll (and perhaps avoid kqueue + * portability bugs across different *BSDs). + */ +int rb_iom_waitfd(rb_thread_t *, int *fdp, int events, double *timeout); + +/* + * Relinquish calling fiber to wait for the given PID to change status. + * Multiple native threads can enter this function at the same time. + * If timeout is negative, wait forever. + */ +rb_pid_t rb_iom_waitpid(rb_thread_t *, + rb_pid_t, int *status, int options, double *timeout); + +/* + * Relinquish calling fiber for at least the duration of given timeout + * in seconds. If timeout is negative, wait forever (until explicitly + * resumed). + * Multiple native threads can enter this function at the same time. + */ +void rb_iom_sleep(rb_thread_t *, double *timeout); + +/* callback for SIGCHLD, needed to implemented for rb_iom_waitpid */ +void rb_iom_sigchld(rb_vm_t *); + +/* + * there is no public create function, creation is lazy to avoid incurring + * overhead for small scripts which do not need fibers, we only need this + * at VM destruction + */ +void rb_iom_destroy(rb_vm_t *); + +/* + * schedule + */ +void rb_iom_schedule(rb_thread_t *th, double *timeout); + +/* cont.c */ +int rb_fiber_auto_sched_p(const rb_thread_t *); +rb_thread_t *rb_fiber_owner_thread(VALUE); + +#endif /* RUBY_IOM_H */ diff --git a/iom_common.h b/iom_common.h new file mode 100644 index 0000000000..13570054b0 --- /dev/null +++ b/iom_common.h @@ -0,0 +1,196 @@ +/* included by iom_(epoll|select|kqueue).h */ + +/* we lazily create this, small scripts may never need iom */ +static rb_iom_t * +rb_iom_new(rb_thread_t *th) +{ + rb_iom_t *iom = ALLOC(rb_iom_t); + rb_iom_init(iom); + return iom; +} + +static rb_iom_t * +rb_iom_get(rb_thread_t *th) +{ + VM_ASSERT(th && th->vm); + if (!th->vm->iom) { + th->vm->iom = rb_iom_new(th); + } + return th->vm->iom; +} + +/* check for expired timers */ +static void +rb_iom_timer_check(const rb_thread_t *th) +{ + rb_iom_t *iom = th->vm->iom; + if (iom) { + struct rb_iom_timer *t = 0, *next = 0; + double now = timeofday(); + + list_for_each_safe(&iom->timers, t, next, n.tnode) { + if (t->expires_at <= now) { + struct rb_iom_waiter *w = rb_iom_waiter_of(t); + VALUE fibval = rb_iom_timer_fibval(t); + + if (w) { + list_del_init(&w->wnode); + } + list_del_init(&t->n.tnode); + /* non-auto-fibers may set timer in rb_iom_schedule */ + if (fibval != Qfalse) { + rb_thread_t *owner = rb_fiber_owner_thread(fibval); + list_add_tail(&owner->afrunq, &t->n.rnode); + } + } + return; /* done, timers is a sorted list */ + } + } +} + +/* insert a new +timer+ into +timers+, maintain sort order by expires_at */ +static void +rb_iom_timer_add(rb_thread_t *th, struct rb_iom_timer *add, + const double *timeout, int flags) +{ + add->_fibval = flags & IOM_FIB ? rb_fiber_current() : Qfalse; + add->_fibval |= flags & IOM_WAIT ? 0 : IOM_FIBMASK; + rb_iom_timer_check(th); + + if (timeout) { + rb_iom_t *iom = rb_iom_get(th); + struct rb_iom_timer *i = 0; + add->expires_at = timeofday() + *timeout; + + /* + * search backwards: assume typical projects have multiple objects + * sharing the same timeout values, so new timers will expire later + * than existing timers + */ + list_for_each_rev(&iom->timers, i, n.tnode) { + if (add->expires_at > i->expires_at) { + list_add_after(&iom->timers, &i->n.tnode, &add->n.tnode); + return; + } + } + list_add(&iom->timers, &add->n.tnode); + } + else { + /* not active, just allow list_del to function */ + list_node_init(&add->n.tnode); + } +} + +/* max == -1 : wake all */ +static void +rb_iom_blockers_notify(rb_iom_t *iom, int max) +{ + struct rb_iom_blocker *b = 0, *next = 0; + + list_for_each_safe(&iom->blockers, b, next, bnode) { + list_del_init(&b->bnode); + ubf_select(b->th); + if (--max == 0) { + break; + } + } +} + +/* + * TODO: consider EVFILT_PROC for kqueue and netlink+epoll on Linux; + * see the "god" RubyGem for usage examples. + * However, I doubt rb_waitpid scalability will be a problem and + * the simplicity of a single implementation for all is appealing. + */ +#ifdef HAVE_SYS_TYPES_H +# include +#endif +#ifdef HAVE_SYS_WAIT_H +# include +#endif +#if defined(WNOHANG) && WNOHANG != 0 && \ + (defined(HAVE_WAITPID) || defined(HAVE_WAIT4)) + +static VALUE +iom_schedule_pid(VALUE ptr) +{ + struct rb_iom_pid_waiter *pw = (struct rb_iom_pid_waiter *)ptr; + rb_thread_t *th = pw->th; + + rb_fiber_auto_do_yield_p(th); + RUBY_VM_CHECK_INTS_BLOCKING(th); + return rb_fiber_yield(0, 0); +} + +rb_pid_t +rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options, + double *timeout) +{ + struct rb_iom_pid_waiter pw; + + pw.options = options; + VM_ASSERT((options & WNOHANG) == 0 && + "WNOHANG should be handled in rb_waitpid"); + + /* + * unlike rb_iom_waitfd, we typically call *waitpid before + * trying with a non-blocking operation + */ + pw.pid = rb_waitpid(pid, &pw.status, pw.options | WNOHANG); + + if (pw.pid == 0) { + rb_iom_t *iom = rb_iom_get(th); + + pw.th = th; + pw.pid = pid; + rb_iom_timer_add(th, &pw.w.timer, timeout, IOM_FIB|IOM_WAIT); + + /* LIFO, to match Linux wait4() blocking behavior */ + list_add(&iom->pids, &pw.w.wnode); + rb_ensure(iom_schedule_pid, (VALUE)&pw, + rb_iom_waiter_done, (VALUE)&pw.w); + if (pw.pid == -1) { + errno = pw.errnum; + } + } + if (status) { + *status = pw.status; + } + if (pw.pid > 0) { + rb_last_status_set(pw.status, pw.pid); + } + return pw.pid; +} + +void +rb_iom_sigchld(rb_vm_t *vm) +{ + rb_iom_t *iom = vm->iom; + if (iom) { + struct rb_iom_pid_waiter *pw = 0, *next = 0; + size_t nr = 0; + + list_for_each_safe(&iom->pids, pw, next, w.wnode) { + pid_t r = rb_waitpid(pw->pid, &pw->status, pw->options | WNOHANG); + + if (r == 0) { + continue; + } + if (r == -1) { + pw->errnum = errno; + } + nr++; + pw->pid = r; + rb_iom_waiter_ready(&pw->w); + } + rb_iom_blockers_notify(iom, -1); + } +} +#else +rb_pid_t +rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options, + double *timeout) +{ + rb_bug("Should not get here, WNOHANG not implemented"); +} +#endif /* defined(WNOHANG) && (defined(HAVE_WAITPID) || defined(HAVE_WAIT4)) */ diff --git a/iom_epoll.h b/iom_epoll.h new file mode 100644 index 0000000000..1da8d422dd --- /dev/null +++ b/iom_epoll.h @@ -0,0 +1,423 @@ +/* + * Linux-only epoll-based implementation of I/O Manager for RubyVM + * + * Notes: + * + * TODO: epoll_wait only has millisecond resolution; if we need higher + * resolution we can use timerfd or ppoll on the epoll_fd itself. + * + * Inside the Linux kernel, select/poll/ppoll/epoll_wait all use the + * same notification callback (struct file_operations)->poll. + * Unlike with kqueue across different *BSDs; we do not need to worry + * about inconsistencies between these syscalls. + * + * See also notes in iom_kqueue.h + */ +#include "iom_internal.h" +#include +#include /* round() */ +#define FMODE_IOM_ADDED FMODE_IOM_PRIVATE1 + +/* allocated on heap (rb_vm_t.iom) */ +struct rb_iom_struct { + /* + * Everything here is protected by GVL at this time, + * URCU lists (LGPL-2.1+) may be used in the future + */ + + /* we NEVER need to scan epws, only insert + delete + empty check */ + struct list_head epws; /* -epw.w.wnode, order agnostic */ + + struct list_head timers; /* -rb_iom_timer.n.tnode, sort by expire_at */ + struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */ + struct rb_iom_fdmap fdmap; /* maps each FD to multiple epw */ + + int epoll_fd; + int maxevents; /* auto-increases */ + struct list_head blockers; /* -rb_iom_blocker.bnode */ +}; + +/* + * Not using rb_iom_fd_waiter here, since we never need to reread the + * FD on this implementation. + * Allocated on stack + */ +struct epw { + struct rb_iom_waiter w; + union { + struct list_node fdnode; + struct { + rb_thread_t *th; + struct rb_iom_fd *fdh; + } pre_ctl; + } as; + int fd; /* no need for "int *", here, we never reread */ + short events; /* requested events, like poll(2) */ + short revents; /* returned events, like poll(2) */ + int *flags; /* &fptr->mode */ +}; + +static void +increase_maxevents(rb_iom_t *iom, int retries) +{ + /* 1024 is the RUBY_ALLOCV_LIMIT on such systems */ + const int max_alloca = 1024 / sizeof(struct epoll_event); + const int max = max_alloca * 2; + + if (retries) { + iom->maxevents *= retries; + if (iom->maxevents > max || iom->maxevents <= 0) { + iom->maxevents = max; + } + } +} + +static int +double2msec(double sec) +{ + /* + * clamp timeout to workaround a Linux <= 2.6.37 bug, + * see epoll_wait(2) manpage + */ + const int max_msec = 35 * 60 * 1000; /* floor(35.79 minutes) */ + if (sec < 0) { + return -1; + } + else { + double msec = round(sec * 1000); + + if (msec < (double)max_msec) { + int ret = (int)msec; + return ret < 0 ? 0 : ret; + } + return max_msec; + } +} + +/* we can avoid branches when mapping RB_WAIT_* bits to EPOLL* bits */ +STATIC_ASSERT(epbits_matches_waitfd_bits, + RB_WAITFD_IN == EPOLLIN && RB_WAITFD_OUT == EPOLLOUT && + RB_WAITFD_PRI == EPOLLPRI); + +/* what goes into epoll_ctl... */ +static int +rb_events2ep(int events) +{ + return EPOLLONESHOT | events; +} + +/* ...what comes out of epoll_wait */ +static short +rb_ep2revents(int revents) +{ + return (short)(revents & (EPOLLIN|EPOLLOUT|EPOLLPRI)); +} + +/* lazily create epoll FD, since not everybody waits on I/O */ +static int +iom_epfd(rb_iom_t *iom) +{ + if (iom->epoll_fd < 0) { +#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) + iom->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (iom->epoll_fd < 0) { + int err = errno; + if (rb_gc_for_fd(err)) { + iom->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (iom->epoll_fd < 0) { + rb_sys_fail("epoll_create1"); + } + } + else if (err != ENOSYS) { + rb_syserr_fail(err, "epoll_create1"); + } + else { /* libc >= kernel || build-env > run-env */ +#endif /* HAVE_EPOLL_CREATE1 */ + iom->epoll_fd = epoll_create(1); + if (iom->epoll_fd < 0) { + if (rb_gc_for_fd(errno)) { + iom->epoll_fd = epoll_create(1); + } + } + if (iom->epoll_fd < 0) { + rb_sys_fail("epoll_create"); + } + rb_maygvl_fd_fix_cloexec(iom->epoll_fd); +#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) + } + } +#endif /* HAVE_EPOLL_CREATE1 */ + rb_update_max_fd(iom->epoll_fd); + } + return iom->epoll_fd; +} + +static void +rb_iom_init(rb_iom_t *iom) +{ + list_head_init(&iom->timers); + list_head_init(&iom->epws); + list_head_init(&iom->pids); + list_head_init(&iom->blockers); + iom->maxevents = 8; + iom->epoll_fd = -1; + rb_iom_fdmap_init(&iom->fdmap); +} + +static void +check_epoll_wait(rb_thread_t *th, int nr, struct epoll_event *ev) +{ + if (nr >= 0) { + int i; + + for (i = 0; i < nr; i++) { + struct rb_iom_fd *fdh = ev[i].data.ptr; + struct epw *epw = 0, *next = 0; + short revents = rb_ep2revents(ev[i].events); + + /* + * Typical list size is 1; only multiple fibers waiting + * on the same FD increases fdh list size + */ + list_for_each_safe(&fdh->fdhead, epw, next, as.fdnode) { + epw->revents = epw->events & revents; + list_del_init(&epw->as.fdnode); + rb_iom_waiter_ready(&epw->w); + } + } + + /* notify the waiter thread in case we enqueued fibers for them */ + if (nr > 0) { + rb_iom_blockers_notify(th->vm->iom, -1); + } + } + else { + int err = errno; + if (err != EINTR) { + rb_syserr_fail(err, "epoll_wait"); + } + } + rb_iom_timer_check(th); + RUBY_VM_CHECK_INTS_BLOCKING(th); +} + +/* perform a non-blocking epoll_wait while holding GVL */ +static void +ping_events(rb_thread_t *th) +{ + rb_iom_t *iom = th->vm->iom; + int epfd = iom ? iom->epoll_fd : -1; + + if (epfd >= 0) { + VALUE v; + int nr; + int maxevents = iom->maxevents; + struct epoll_event *ev = ALLOCV_N(struct epoll_event, v, maxevents); + int retries = 0; + + do { + nr = epoll_wait(epfd, ev, maxevents, 0); + check_epoll_wait(th, nr, ev); + } while (nr == maxevents && ++retries); + if (v) { + ALLOCV_END(v); + } + increase_maxevents(iom, retries); + } +} + +/* for iom_pingable_common.h */ +static void +rb_iom_do_wait(rb_thread_t *th, rb_iom_t *iom) +{ + int maxevents = iom->maxevents; + int nr = maxevents; + double timeout; + + RUBY_VM_CHECK_INTS_BLOCKING(th); + timeout = rb_iom_next_timeout(&iom->timers); + + if (timeout != 0 && (!list_empty(&iom->epws) || !list_empty(&iom->pids))) { + VALUE v; + int epfd = iom_epfd(th->vm->iom); /* may raise */ + struct epoll_event *ev = ALLOCV_N(struct epoll_event, v, maxevents); + int msec = double2msec(timeout); + struct rb_iom_blocker cur; + + VM_ASSERT(epfd >= 0); + cur.th = th; + list_add_tail(&iom->blockers, &cur.bnode); + BLOCKING_REGION({ + nr = epoll_wait(epfd, ev, maxevents, msec); + }, ubf_select, th, FALSE); + list_del(&cur.bnode); + check_epoll_wait(th, nr, ev); + if (v) { + ALLOCV_END(v); + } + } + if (nr == maxevents) { /* || timeout == 0 */ + ping_events(th); + } +} + +static void +epoll_ctl_or_raise(rb_thread_t *th, struct epw *epw) +{ + int e; + int epfd; + struct epoll_event ev; + + /* we cannot raise until list_add: */ + { + struct rb_iom_fd *fdh = epw->as.pre_ctl.fdh; + + ev.data.ptr = fdh; + ev.events = rb_events2ep(epw->events); + /* + * merge events from other threads/fibers waiting on the same + * [ descriptor (int fd), description (struct file *) ] tuplet + */ + if (!list_empty(&fdh->fdhead)) { /* uncommon, I hope... */ + struct epw *cur; + list_for_each(&fdh->fdhead, cur, as.fdnode) { + ev.events |= rb_events2ep(cur->events); + } + } + list_add(&fdh->fdhead, &epw->as.fdnode); + } + + epfd = iom_epfd(th->vm->iom); /* may raise */ + + /* we want to track if an FD is already being watched ourselves */ + if (epw->flags) { + if (*epw->flags & FMODE_IOM_ADDED) { /* ideal situation */ + e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev); + } + else { + e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev); + if (e == 0) { + *epw->flags |= FMODE_IOM_ADDED; + } + else if (e < 0 && errno == EEXIST) { + /* + * possible EEXIST if several fptrs point to the same FD: + * f1 = Fiber.start { io1.read(1) } + * io2 = IO.for_fd(io1.fileno) + * f2 = Fiber.start { io2.read(1) } + */ + *epw->flags |= FMODE_IOM_ADDED; + e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev); + } + } + } + else { /* don't know if added or not, fall back to add on ENOENT */ + e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev); + if (e < 0 && errno == ENOENT) { + e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev); + } + } + if (e < 0) { + rb_sys_fail("epoll_ctl"); + } +} + +static VALUE +epmod_yield(VALUE ptr) +{ + /* we must have no posibility of raising until list_add: */ + struct epw *epw = (struct epw *)ptr; + rb_thread_t *th = epw->as.pre_ctl.th; + epoll_ctl_or_raise(th, epw); + ping_events(th); + (void)rb_fiber_auto_do_yield_p(th); + return rb_fiber_yield(0, 0); +} + +static VALUE +epw_done(VALUE ptr) +{ + struct epw *epw = (struct epw *)ptr; + list_del(&epw->as.fdnode); + return rb_iom_waiter_done((VALUE)&epw->w); +} + +static int +iom_waitfd(rb_thread_t *th, int fd, int *flags, int events, double *timeout) +{ + rb_iom_t *iom = rb_iom_get(th); + struct epw epw; + + /* unlike kqueue or select, we never need to reread fd */ + epw.fd = fd; + if (epw.fd < 0) { /* TODO: behave like poll(2) and sleep? */ + return 0; + } + + /* may raise on OOM: */ + epw.as.pre_ctl.fdh = rb_iom_fd_get(&iom->fdmap, epw.fd); + epw.as.pre_ctl.th = th; + epw.flags = flags; + /* + * if we did not have GVL, revents may be set immediately + * upon epoll_ctl by another thread running epoll_wait, + * so we must initialize it before epoll_ctl: + */ + epw.revents = 0; + epw.events = (short)events; + + list_add(&iom->epws, &epw.w.wnode); + rb_iom_timer_add(th, &epw.w.timer, timeout, IOM_FIB|IOM_WAIT); + rb_ensure(epmod_yield, (VALUE)&epw, epw_done, (VALUE)&epw); + + return (int)epw.revents; /* may be zero if timed out */ +} + +int +rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, double *timeout) +{ + return iom_waitfd(th, fptr->fd, &fptr->mode, events, timeout); +} + +int +rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout) +{ + return iom_waitfd(th, *fdp, 0, events, timeout); +} + +void +rb_iom_destroy(rb_vm_t *vm) +{ + rb_iom_t *iom = vm->iom; + vm->iom = 0; + if (iom) { + /* + * it's possible; but crazy to share epoll FDs across processes + * (kqueue has a rather unique close-on-fork behavior) + */ + if (iom->epoll_fd >= 0) { + close(iom->epoll_fd); + } + rb_iom_fdmap_destroy(&iom->fdmap); + xfree(iom); + } +} + +/* used by thread.c::rb_thread_atfork */ +static void +rb_iom_atfork_child(rb_thread_t *th) +{ + rb_iom_destroy(th->vm); +} + +/* used by thread_pthread.c */ +static int +rb_iom_reserved_fd(int fd) +{ + rb_iom_t *iom = GET_VM()->iom; + + return iom && fd == iom->epoll_fd; +} + +#include "iom_pingable_common.h" +#include "iom_common.h" diff --git a/iom_internal.h b/iom_internal.h new file mode 100644 index 0000000000..3f9709693e --- /dev/null +++ b/iom_internal.h @@ -0,0 +1,251 @@ +#ifndef RB_IOM_COMMON_H +#define RB_IOM_COMMON_H + +#include "internal.h" +#include "iom.h" + +/* cont.c */ +void rb_fiber_auto_enqueue(VALUE fibval); + +#define FMODE_IOM_PRIVATE1 0x01000000 +#define FMODE_IOM_PRIVATE2 0x02000000 + +#define IOM_FIBMASK ((VALUE)0x1) +#define IOM_FIB (0x2) +#define IOM_WAIT (0x1) /* container_of(..., struct rb_iom_waiter, timer) */ + +/* + * fdmap is a singleton. + * + * It makes zero sense to have multiple fdmaps per-process; even less so + * than multiple ioms. The file descriptor table in POSIX is per-process; + * and POSIX requires syscalls to allocate the lowest available FD. + * This is also why we use an array instead of a hash table, as there will + * be no holes for big processes. + * + * If contention becomes a problem, we can pad (struct rb_iom_fd) to + * 64-bytes for cache alignment. + * + * Currently we use fdmap to deal with FD aliasing with epoll + * and kqueue interfaces.. FD aliasing happens when multiple + * Fibers wait on the same FD; but epoll/kqueue APIs only allow + * registering a single data pointer per FD. + * + * In the future, we may implement rb_notify_fd_close using fdmap. + */ + +/* linear growth based on power-of-two */ +#define RB_IOM_FD_PER_HEAP 64 +/* on-heap and persistent for process lifetime keep as small as possible. */ +struct rb_iom_fd { + struct list_head fdhead; /* -kev.(rfdnode|wfdnode), epw.fdnode */ +}; + +/* singleton (per-rb_iom_t, or per process, if we ever need > 1 iom) */ +struct rb_iom_fdmap { + struct rb_iom_fd **map; + unsigned int heaps; + int max_fd; +}; + +/* allocated on stack */ +/* Every auto-yielded fiber has this on stack */ +struct rb_iom_timer { + union { + struct list_node rnode; /* <=> rb_thread_t.afrunq */ + struct list_node tnode; /* <=> rb_iom_struct.timers */ + } n; + double expires_at; /* absolute monotonic time */ + VALUE _fibval; +}; + +/* common waiter struct for waiting fds and pids */ +struct rb_iom_waiter { + struct rb_iom_timer timer; + struct list_node wnode; /* <=> rb_iom_struct.(fds|pids) */ +}; + +struct rb_iom_fd_waiter { + struct rb_iom_waiter w; /* w.wnode - iom->fds */ + int *fdp; /* (ideally), a pointer fptr->fd to detect closed FDs */ + short events; /* requested events, like poll(2) */ + short revents; /* returned events, like poll(2) */ +}; + +struct rb_iom_pid_waiter { + struct rb_iom_waiter w; /* w.wnode - iom->pids */ + rb_thread_t *th; + /* same pid, status, options same as waitpid(2) */ + rb_pid_t pid; + int status; + int options; + int errnum; +}; + +/* threads sleeping in select, epoll_wait or kevent w/o GVL; on stack */ +struct rb_iom_blocker { + rb_thread_t *th; + struct list_node bnode; /* -iom->blockers */ +}; + +#if (RUBYVM_IOM == IOM_KQUEUE || RUBYVM_IOM == IOM_EPOLL) +/* TODO: IOM_SELECT may use this for rb_notify_fd_close */ +static struct rb_iom_fd * +iom_fdhead_aref(struct rb_iom_fdmap *fdmap, int fd) +{ + VM_ASSERT(fd >= 0); + return &fdmap->map[fd / RB_IOM_FD_PER_HEAP][fd % RB_IOM_FD_PER_HEAP]; +} + +static struct rb_iom_fd * +rb_iom_fd_get(struct rb_iom_fdmap *fdmap, int fd) +{ + if (fd >= fdmap->max_fd) { + struct rb_iom_fd *base, *h; + unsigned n = fdmap->heaps + 1; + unsigned i; + + fdmap->map = xrealloc2(fdmap->map, n, sizeof(struct rb_iom_fd *)); + base = h = ALLOC_N(struct rb_iom_fd, RB_IOM_FD_PER_HEAP); + for (i = 0; i < RB_IOM_FD_PER_HEAP; i++) { + list_head_init(&h->fdhead); + h++; + } + fdmap->map[fdmap->heaps] = base; + fdmap->max_fd += RB_IOM_FD_PER_HEAP; + } + return iom_fdhead_aref(fdmap, fd); +} + +static void +rb_iom_fdmap_init(struct rb_iom_fdmap *fdmap) +{ + fdmap->max_fd = 0; + fdmap->heaps = 0; + fdmap->map = 0; +} + +static void +rb_iom_fdmap_destroy(struct rb_iom_fdmap *fdmap) +{ + unsigned n; + + for (n = 0; n < fdmap->heaps; n++) { + xfree(fdmap->map[n]); + } + xfree(fdmap->map); + rb_iom_fdmap_init(fdmap); +} +#endif /* (RUBYVM_IOM == IOM_KQUEUE || RUBYVM_IOM == IOM_EPOLL) */ + +static VALUE +rb_iom_timer_fibval(const struct rb_iom_timer *t) +{ + return t->_fibval & ~IOM_FIBMASK; +} + +static struct rb_iom_waiter * +rb_iom_waiter_of(struct rb_iom_timer *t) +{ + if (t->_fibval & IOM_FIBMASK) { + return 0; + } + return container_of(t, struct rb_iom_waiter, timer); +} + +static double +rb_iom_next_timeout(struct list_head *timers) +{ + struct rb_iom_timer *t = list_top(timers, struct rb_iom_timer, n.tnode); + + if (t) { + double diff = t->expires_at - timeofday(); + return diff <= 0.0 ? 0 : diff; + } + else { + return -1; + } +} + +static void rb_iom_timer_check(const rb_thread_t *); +static void rb_iom_timer_add(rb_thread_t *, struct rb_iom_timer *, + const double *timeout, int flags); + +static VALUE +rb_iom_timer_done(VALUE ptr) +{ + struct rb_iom_timer *t = (struct rb_iom_timer *)ptr; + list_del(&t->n.tnode); + return Qfalse; +} + +static void +rb_iom_waiter_ready(struct rb_iom_waiter *w) +{ + VALUE fibval = rb_iom_timer_fibval(&w->timer); + + list_del_init(&w->wnode); + list_del_init(&w->timer.n.tnode); + if (fibval != Qfalse) { + rb_thread_t *owner = rb_fiber_owner_thread(fibval); + list_add_tail(&owner->afrunq, &w->timer.n.rnode); + } +} + +static VALUE +rb_iom_waiter_done(VALUE ptr) +{ + struct rb_iom_waiter *w = (struct rb_iom_waiter *)ptr; + list_del(&w->timer.n.tnode); + list_del(&w->wnode); + return Qfalse; +} + +/* cont.c */ +int rb_fiber_resumable_p(const rb_thread_t *, const rb_fiber_t *); + +/* + * resume all "ready" fibers belonging to a given thread + * stop when a fiber has not yielded, yet. + */ +static int +rb_fiber_auto_do_yield_p(rb_thread_t *th) +{ + rb_fiber_t *current_auto = rb_fiber_auto_sched_p(th) ? th->fiber : 0; + struct rb_iom_timer *t = 0, *next = 0; + LIST_HEAD(tmp); + + /* + * do not infinite loop as new fibers get added to + * th->afrunq, only work off a temporary list: + */ + list_append_list(&tmp, &th->afrunq); + list_for_each_safe(&tmp, t, next, n.rnode) { + VALUE fibval = rb_iom_timer_fibval(t); + rb_fiber_t *fib = RTYPEDDATA_DATA(fibval); + + if (fib == current_auto || !rb_fiber_resumable_p(th, fib)) { + /* tell the caller to yield */ + list_prepend_list(&th->afrunq, &tmp); + return 1; + } + rb_fiber_resume(fibval, 0, 0); + } + return 0; +} + +/* XXX: is this necessary? */ +void +rb_iom_mark_runq(const rb_thread_t *th) +{ + struct rb_iom_timer *t = 0; + + list_for_each(&th->afrunq, t, n.rnode) { + rb_gc_mark(rb_iom_timer_fibval(t)); + } +} + +static rb_iom_t *rb_iom_get(rb_thread_t *); +static void rb_iom_blockers_notify(rb_iom_t *, int max); + +#endif /* IOM_COMMON_H */ diff --git a/iom_kqueue.h b/iom_kqueue.h new file mode 100644 index 0000000000..10161a7561 --- /dev/null +++ b/iom_kqueue.h @@ -0,0 +1,600 @@ +/* + * kqueue-based implementation of I/O Manager for RubyVM on *BSD + * + * The kevent API has an advantage over epoll_ctl+epoll_wait since + * it can simultaneously add filters and check for events with one + * syscall. It also allows EV_ADD to be used idempotently for + * enabling filters, where as epoll_ctl requires separate ADD and + * MOD operations. + * + * These are advantages in the common case... + * + * The epoll API has advantages in more esoteric cases: + * + * epoll has the advantage over kqueue when watching for multiple + * events (POLLIN|POLLOUT|POLLPRI) (which is rare). We also have + * to install two kevent filters to watch POLLIN|POLLOUT simutaneously. + * See udata_set/udata_get functions below for more on this. + * + * Finally, kevent does not support POLLPRI directly, we need to use + * select() (or perhaps poll() on some platforms) with a zero + * timeout to check for POLLPRI after EVFILT_READ returns. + * + * Finally, several *BSDs implement kqueue; and the quality of each + * implementation may vary. Anecdotally, *BSDs are not known to even + * support poll() consistently across different types of files. + * We will need to selective and careful about injecting them into + * kevent(). + */ +#include "iom_internal.h" + +/* LIST_HEAD (via ccan/list) conflicts with sys/queue.h (via sys/event.h) */ +#undef LIST_HEAD +#include +#include +#include + +/* We need to use EVFILT_READ to watch RB_WAITFD_PRI */ +#define WAITFD_READ (RB_WAITFD_IN|RB_WAITFD_PRI) + +/* allocated on heap (rb_vm_t.iom) */ +struct rb_iom_struct { + /* + * Everything here is protected by GVL at this time, + * URCU lists (LGPL-2.1+) may be used in the future + */ + + /* we NEVER need to scan kevs , only insert + delete + empty check */ + struct list_head kevs; /* -kev.fdw.w.wnode, order agnostic */ + + struct list_head timers; /* -rb_iom_timer.n.tnode, sort by expire_at */ + struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */ + struct rb_iom_fdmap rfdmap; /* holds fdh for EVFILT_READ */ + struct rb_iom_fdmap wfdmap; /* holds fdh for EVFILT_WRITE */ + + int kqueue_fd; + int nevents; /* auto-increases */ + struct list_head blockers; /* -rb_iom_blocker.bnode */ +}; + +/* allocated on stack */ +struct kev { + /* fdw.w.wnode is overloaded for checking RB_WAITFD_PRI (seee check_pri) */ + struct rb_iom_fd_waiter fdw; + rb_thread_t *th; + + /* + * both rfdnode and wfdnode are overloaded for deleting paired + * filters when watching both EVFILT_READ and EVFILT_WRITE on + * a single FD + */ + struct list_node rfdnode; /* -(ev.udata==fdh)->fdhead (EVFILT_READ) */ + struct list_node wfdnode; /* -(ev.udata==fdh)->fdhead (EVFILT_WRITE) */ +}; + +/* + * like our epoll implementation, we "ping" using kevent with zero-timeout + * and can do so on any thread. + */ +static const struct timespec zero; + +static void +increase_nevents(rb_iom_t *iom, int retries) +{ + /* 1024 is the RUBY_ALLOCV_LIMIT on such systems */ + const int max_alloca = 1024 / sizeof(struct kevent); + const int max = max_alloca * 2; + + if (retries) { + iom->nevents *= retries; + if (iom->nevents > max || iom->nevents <= 0) { + iom->nevents = max; + } + } +} + +static int +iom_kqfd(rb_iom_t *iom) +{ + if (iom->kqueue_fd >= 0) { + return iom->kqueue_fd; + } + iom->kqueue_fd = kqueue(); + if (iom->kqueue_fd < 0) { + if (rb_gc_for_fd(errno)) { + iom->kqueue_fd = kqueue(); + } + if (iom->kqueue_fd < 0) { + rb_sys_fail("kqueue"); + } + } + rb_fd_fix_cloexec(iom->kqueue_fd); + return iom->kqueue_fd; +} + +static void +rb_iom_init(rb_iom_t *iom) +{ + list_head_init(&iom->timers); + list_head_init(&iom->kevs); + list_head_init(&iom->pids); + list_head_init(&iom->blockers); + iom->nevents = 8; + iom->kqueue_fd = -1; + rb_iom_fdmap_init(&iom->rfdmap); + rb_iom_fdmap_init(&iom->wfdmap); +} + +/* + * kevent does not have an EVFILT_* flag for (rarely-used) urgent data, + * at least not on FreeBSD 11.0, so we call select() separately after + * EVFILT_READ returns to set the RB_WAITFD_PRI bit: + */ +static void +check_pri(rb_thread_t *th, struct list_head *pri) +{ + rb_fdset_t efds; + int r; + struct timeval tv = { 0 }; + struct kev *kev = 0, *next = 0; + int max = -1; + + if (list_empty(pri)) { + return; + } + rb_fd_init(&efds); + list_for_each(pri, kev, fdw.w.wnode) { + int fd = *kev->fdw.fdp; + if (fd >= 0) { + rb_fd_set(fd, &efds); + if (fd > max) { + max = fd; + } + } + } + + r = native_fd_select(max + 1, 0, 0, &efds, &tv, th); + if (r < 0) { + max = errno; + } + + list_for_each_safe(pri, kev, next, fdw.w.wnode) { + list_del_init(&kev->fdw.w.wnode); + if (r > 0) { + int fd = *kev->fdw.fdp; + if (fd >= 0 && rb_fd_isset(fd, &efds)) { + kev->fdw.revents |= RB_WAITFD_PRI; + } + } + } + rb_fd_term(&efds); + if (r < 0) { + rb_syserr_fail(max, "select for RB_WAITFD_PRI check failed"); + } +} + +/* + * kqueue is a more complicated than epoll for corner cases because we + * install separate filters for simultaneously watching read and write + * on the same FD, and now we must clear shared filters if only the + * event from the other side came in. + */ +static void +drop_pairs(rb_thread_t *th, int max, struct kevent *changelist, + struct list_head *rdel, struct list_head *wdel) +{ + int nchanges = 0; + struct kev *kev = 0, *next = 0; + + list_for_each_safe(rdel, kev, next, wfdnode) { + int fd = *kev->fdw.fdp; + list_del_init(&kev->wfdnode); + assert(kev->fdw.revents & RB_WAITFD_OUT); + + if (fd >= 0 && !(kev->fdw.revents & WAITFD_READ)) { + struct kevent *chg = &changelist[nchanges++]; + assert(nchanges <= max); + EV_SET(chg, fd, EVFILT_READ, EV_DELETE, 0, 0, 0); + } + } + list_for_each_safe(wdel, kev, next, rfdnode) { + int fd = *kev->fdw.fdp; + list_del_init(&kev->rfdnode); + assert(kev->fdw.revents & WAITFD_READ); + if (fd >= 0 && !(kev->fdw.revents & RB_WAITFD_OUT)) { + struct kevent *chg = &changelist[nchanges++]; + assert(nchanges <= max); + EV_SET(chg, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0); + } + } + if (nchanges) { + int kqfd = th->vm->iom->kqueue_fd; + if (kevent(kqfd, changelist, nchanges, 0, 0, &zero) < 0) { + rb_sys_fail("kevent (dropping paired FDs)"); + } + } +} + +/* + * kqueue requires separate filters when watching for simultaneously + * watching read and write events on the same FD. Different filters + * means they can have different udata pointers; at least with + * native kqueue. + * + * When emulating native kqueue behavior using epoll as libkqueue does, + * there is only space for one udata pointer. This is because epoll + * allows each "filter" (epitem in the kernel) to watch read and write + * simultaneously. + * epoll keys epitems using: [ fd, file description (struct file *) ]. + * In contrast, kevent keys its filters using: [ fd (ident), filter ]. + * + * So, with native kqueue, we can at least optimize away rb_iom_fd_get + * function calls; but we cannot do that when libkqueue emulates + * kqueue with epoll and need to retrieve the fdh from the correct + * fdmap. + * + * Finally, the epoll implementation only needs one fdmap. + */ +static void * +udata_set(rb_iom_t *iom, struct rb_iom_fd *fdh) +{ +#ifdef LIBKQUEUE +# warning libkqueue support is incomplete and does not handle corner cases + return iom; +#else /* native kqueue */ + return fdh; +#endif +} + +static struct rb_iom_fd * +udata_get(const struct kevent *ev) +{ +#ifdef LIBKQUEUE + rb_iom_t *iom = ev->udata; + int fd = ev->ident; + + switch (ev->filter) { + case EVFILT_READ: + return rb_iom_fd_get(&iom->rfdmap, fd); + case EVFILT_WRITE: + return rb_iom_fd_get(&iom->wfdmap, fd); + default: + rb_bug("bad filter in libkqueue compatibility mode: %d", ev->filter); + } +#endif + return ev->udata; +} + +/* + * careful, we must not call rb_iom_waiter_ready twice on the same kev + * since we overload fdw.w.wnode for RB_WAITFD_PRI checking + */ +static void +kev_waiter_ready_once(struct kev *kev) +{ + if (!kev->fdw.revents) { + rb_iom_waiter_ready(&kev->fdw.w); + } +} + +static void +check_kevent(rb_thread_t *th, int nr, struct kevent *eventlist) +{ + int err = 0; + if (nr >= 0) { + struct list_head pri = LIST_HEAD_INIT(pri); + struct list_head wdel = LIST_HEAD_INIT(wdel); + struct list_head rdel = LIST_HEAD_INIT(rdel); + struct kev *kev = 0, *next = 0; + int i; + + for (i = 0; i < nr; i++) { + struct kevent *ev = &eventlist[i]; + struct rb_iom_fd *fdh = udata_get(ev); + + if (ev->filter == EVFILT_READ) { + int pair_queued = 0; + + list_for_each_safe(&fdh->fdhead, kev, next, rfdnode) { + int rbits = kev->fdw.events & WAITFD_READ; + + assert(rbits && "unexpected EVFILT_READ"); + + kev_waiter_ready_once(kev); + kev->fdw.revents |= (rbits & RB_WAITFD_IN); + list_del_init(&kev->rfdnode); + + if (rbits & RB_WAITFD_PRI) { + list_add(&pri, &kev->fdw.w.wnode); + } + + if ((kev->fdw.events & RB_WAITFD_OUT) && + *kev->fdw.fdp >= 0 && + !pair_queued) { + pair_queued = 1; + list_add(&wdel, &kev->rfdnode); + } + } + } + else if (ev->filter == EVFILT_WRITE) { + int pair_queued = 0; + + list_for_each_safe(&fdh->fdhead, kev, next, wfdnode) { + assert(kev->fdw.events & RB_WAITFD_OUT && + "unexpected EVFILT_WRITE"); + + kev_waiter_ready_once(kev); + kev->fdw.revents |= RB_WAITFD_OUT; + list_del_init(&kev->wfdnode); + + if ((kev->fdw.events & WAITFD_READ) && + *kev->fdw.fdp >= 0 && + !pair_queued) { + pair_queued = 1; + list_add(&rdel, &kev->wfdnode); + } + } + } + else { + rb_bug("unexpected filter: %d", ev->filter); + } + } + check_pri(th, &pri); + drop_pairs(th, nr, eventlist, &rdel, &wdel); + + /* notify the waiter thread in case we enqueued fibers for them */ + if (nr > 0) { + rb_iom_blockers_notify(th->vm->iom, -1); + } + } + else { + err = errno; + } + if (err && err != EINTR) { + rb_syserr_fail(err, "kevent"); + } + rb_iom_timer_check(th); + RUBY_VM_CHECK_INTS_BLOCKING(th); +} + +/* perform a non-blocking kevent check while holding GVL */ +static void +ping_events(rb_thread_t *th) +{ + rb_iom_t *iom = th->vm->iom; + int kqfd = iom ? iom->kqueue_fd : -1; + + if (kqfd >= 0) { + VALUE v; + int nr; + int nevents = iom->nevents; + struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents); + int retries = 0; + + do { + nr = kevent(kqfd, 0, 0, eventlist, nevents, &zero); + check_kevent(th, nr, eventlist); + } while (nr == nevents && ++retries); + if (v) { + ALLOCV_END(v); + } + increase_nevents(iom, retries); + } +} + +static struct timespec * +double2timespec(double timeout, struct timespec *ts) +{ + if (timeout >= 0) { + ts->tv_sec = timeout >= LONG_MAX ? LONG_MAX : (long)timeout; + ts->tv_nsec = (long)((timeout - (double)ts->tv_sec) * 1e9); + if (ts->tv_sec < 0) { + ts->tv_sec = 0; + } + if (ts->tv_nsec < 0) { + ts->tv_nsec = 0; + } + return ts; + } + return 0; /* infinity */ +} + +/* for iom_pingable_common.h */ +static void +rb_iom_do_wait(rb_thread_t *th, rb_iom_t *iom) +{ + int nevents = iom->nevents; + int nr = nevents; + double timeout; + + RUBY_VM_CHECK_INTS_BLOCKING(th); + timeout = rb_iom_next_timeout(&iom->timers); + + if (timeout != 0 && (!list_empty(&iom->kevs) || !list_empty(&iom->pids))) { + VALUE v; + int kqfd = iom_kqfd(iom); /* may raise */ + struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents); + struct timespec ts; + const struct timespec *tsp = double2timespec(timeout, &ts); + struct rb_iom_blocker cur; + + VM_ASSERT(kqfd >= 0); + cur.th = th; + list_add_tail(&iom->blockers, &cur.bnode); + BLOCKING_REGION({ + nr = kevent(kqfd, 0, 0, eventlist, nevents, tsp); + }, ubf_select, th, FALSE); + list_del(&cur.bnode); + check_kevent(th, nr, eventlist); + if (v) { + ALLOCV_END(v); + } + } + if (nr == nevents) { + ping_events(th); + } +} + +/* kqueue shines here because there's only one syscall in the common case */ +static void +kevxchg_ping(int fd, struct kev *kev) +{ + rb_thread_t *th = kev->th; + rb_iom_t *iom = rb_iom_get(th); + int retries = 0; + int nchanges = 0; + int nr; + VALUE v; + int nevents = iom->nevents; + struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents); + struct kevent *changelist = eventlist; + int kqfd = iom_kqfd(iom); + + VM_ASSERT(nevents >= 2); + + /* EVFILT_READ handles urgent data (POLLPRI)... hopefully */ + if (kev->fdw.events & WAITFD_READ) { + struct rb_iom_fd *fdh = rb_iom_fd_get(&iom->rfdmap, fd); + if (list_empty(&fdh->fdhead)) { + struct kevent *chg = &changelist[nchanges++]; + void *udata = udata_set(iom, fdh); + EV_SET(chg, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, udata); + } + list_add(&fdh->fdhead, &kev->rfdnode); + } + if (kev->fdw.events & RB_WAITFD_OUT) { + struct rb_iom_fd *fdh = rb_iom_fd_get(&iom->wfdmap, fd); + if (list_empty(&fdh->fdhead)) { + struct kevent *chg = &changelist[nchanges++]; + void *udata = udata_set(iom, fdh); + EV_SET(chg, fd, EVFILT_WRITE, EV_ADD|EV_ONESHOT, 0, 0, udata); + } + list_add(&fdh->fdhead, &kev->wfdnode); + } + do { + nr = kevent(kqfd, changelist, nchanges, eventlist, nevents, &zero); + + /* kevent may fail with ENOMEM when processing changelist */ + if (nr < 0 && nchanges && rb_gc_for_fd(errno)) { + continue; + } + check_kevent(th, nr, eventlist); + } while (nr == nevents && ++retries && (nchanges = 0) == 0); + if (v) { + ALLOCV_END(v); + } + increase_nevents(iom, retries); +} + +static VALUE +kevxchg_yield(VALUE ptr) +{ + struct kev *kev = (struct kev *)ptr; + int fd = *kev->fdw.fdp; + if (fd >= 0) { + kevxchg_ping(fd, kev); + (void)rb_fiber_auto_do_yield_p(kev->th); + return rb_fiber_yield(0, 0); + } + return Qfalse; +} + +static VALUE +kev_done(VALUE ptr) +{ + struct kev *kev = (struct kev *)ptr; + list_del(&kev->rfdnode); + list_del(&kev->wfdnode); + return rb_iom_waiter_done((VALUE)&kev->fdw.w); +} + +static int +iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout) +{ + rb_iom_t *iom = rb_iom_get(th); + struct kev kev; + + kev.th = th; + kev.fdw.fdp = fdp; + kev.fdw.events = (short)events; + kev.fdw.revents = 0; + list_add(&iom->kevs, &kev.fdw.w.wnode); + list_node_init(&kev.rfdnode); + list_node_init(&kev.wfdnode); + rb_iom_timer_add(th, &kev.fdw.w.timer, timeout, IOM_FIB|IOM_WAIT); + rb_ensure(kevxchg_yield, (VALUE)&kev, kev_done, (VALUE)&kev); + + return kev.fdw.revents; /* may be zero if timed out */ +} + +/* + * XXX we may use fptr->mode to mark FDs which kqueue cannot handle, + * different BSDs may have different bugs which prevent certain + * FD types from being handled by kqueue... + */ +int +rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, double *timeout) +{ + return iom_waitfd(th, &fptr->fd, events, timeout); +} + +int +rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout) +{ + return iom_waitfd(th, fdp, events, timeout); +} + +static void +iom_free(rb_iom_t *iom) +{ + rb_iom_fdmap_destroy(&iom->rfdmap); + rb_iom_fdmap_destroy(&iom->wfdmap); + xfree(iom); +} + +void +rb_iom_destroy(rb_vm_t *vm) +{ + rb_iom_t *iom = vm->iom; + vm->iom = 0; + if (iom) { + if (iom->kqueue_fd >= 0) { + close(iom->kqueue_fd); + } + iom_free(iom); + } +} + +/* used by thread.c::rb_thread_atfork */ +static void +rb_iom_atfork_child(rb_thread_t *th) +{ +#ifdef LIBKQUEUE + /* + * libkqueue uses epoll, /dev/poll FD, or poll/select on a pipe, + * so there is an FD. + * I guess we're going to hit an error in case somebody uses + * libkqueue with a real native kqueue backend, but nobody does + * that in production, hopefully. + */ + rb_iom_destroy(th->vm); +#else /* native kqueue is close-on-fork, so we do not close ourselves */ + rb_iom_t *iom = th->vm->iom; + if (iom) { + iom_free(iom); + th->vm->iom = 0; + } +#endif +} + +/* used by thread_pthread.c */ +static int +rb_iom_reserved_fd(int fd) +{ + rb_iom_t *iom = GET_VM()->iom; + + return iom && fd == iom->kqueue_fd; +} + +#include "iom_pingable_common.h" +#include "iom_common.h" diff --git a/iom_pingable_common.h b/iom_pingable_common.h new file mode 100644 index 0000000000..fb992da0b5 --- /dev/null +++ b/iom_pingable_common.h @@ -0,0 +1,46 @@ +/* shared between "pingable" implementations (iom_kqueue.h and iom_epoll.h) */ + +/* only for iom_kqueue.h and iom_epoll.h */ +static void rb_iom_do_wait(rb_thread_t *, rb_iom_t *); + +static VALUE +rb_iom_do_schedule(VALUE ptr) +{ + rb_thread_t *th = (rb_thread_t *)ptr; + rb_iom_t *iom = th->vm->iom; + + if (rb_fiber_auto_do_yield_p(th)) { + rb_fiber_yield(0, 0); + } + if (iom) { + rb_iom_do_wait(th, iom); + } + if (rb_fiber_auto_do_yield_p(th)) { + rb_fiber_yield(0, 0); + } + return Qfalse; +} + +void +rb_iom_schedule(rb_thread_t *th, double *timeout) +{ + if (rb_fiber_auto_sched_p(th)) { + rb_iom_timer_check(th); + ping_events(th); + if (rb_fiber_auto_do_yield_p(th)) { + rb_fiber_yield(0, 0); + } + } + else if (timeout) { + double t0 = timeofday(); + struct rb_iom_timer t; + + rb_iom_timer_add(th, &t, timeout, 0); + rb_ensure(rb_iom_do_schedule, (VALUE)th, rb_iom_timer_done, (VALUE)&t); + *timeout -= timeofday() - t0; + } + else { + rb_iom_timer_check(th); + rb_iom_do_schedule((VALUE)th); + } +} diff --git a/iom_select.h b/iom_select.h new file mode 100644 index 0000000000..6c102877e9 --- /dev/null +++ b/iom_select.h @@ -0,0 +1,306 @@ +/* + * select()-based implementation of I/O Manager for RubyVM + * + * This is crippled and relies heavily on GVL compared to the + * epoll and kqueue versions. Every call to select requires + * scanning the entire iom->fds list; so it gets silly expensive + * with hundreds or thousands of FDs. + */ +#include "iom_internal.h" + +/* allocated on heap (rb_vm_t.iom) */ +struct rb_iom_struct { + /* + * Everything here is protected by GVL at this time, + * URCU lists (LGPL-2.1+) may be used in the future + */ + struct list_head timers; /* -rb_iom_timer.n.tnode, sort by expire_at */ + struct list_head fds; /* -rb_iom_fd_waiter.w.wnode, FIFO order */ + struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */ + + /* + * generation counters for determining if we need to redo select() + * for newly registered FDs. kevent and epoll APIs do not require + * this, as events can be registered and retrieved at any time by + * different threads. + */ + rb_serial_t select_start; /* WR protected by GVL (no need to protect RD) */ + rb_serial_t select_gen; /* RDWR protected by GVL */ + + /* these could be on stack, but they're huge on some platforms: */ + rb_fdset_t rfds; + rb_fdset_t wfds; + rb_fdset_t efds; + + /* + * unlike in epoll/kqueue, order matters for blockers in this + * implementation: first blocker is running select; the reset + * are waiting for the first blocker + */ + struct list_head blockers; /* -rb_iom_blocker.bnode */ +}; + +/* allocated on stack */ +struct select_do { + rb_thread_t *th; + int do_wait; +}; + +static void +rb_iom_init(rb_iom_t *iom) +{ + iom->select_start = 0; + iom->select_gen = 0; + list_head_init(&iom->timers); + list_head_init(&iom->fds); + list_head_init(&iom->pids); + list_head_init(&iom->blockers); +} + +static void +iom_select_wait(struct select_do *sd) +{ + rb_thread_t *th = sd->th; + rb_iom_t *iom = th->vm->iom; + int max = -1; + struct timeval tv, *tvp; + struct rb_iom_fd_waiter *fdw = 0, *next = 0; + rb_fdset_t *rfds, *wfds, *efds; + double timeout = 0; + struct rb_iom_blocker cur; + int ret; + + iom->select_start = iom->select_gen; + rfds = wfds = efds = 0; + rb_fd_init(&iom->rfds); + rb_fd_init(&iom->wfds); + rb_fd_init(&iom->efds); + + list_for_each_safe(&iom->fds, fdw, next, w.wnode) { + int fd = *fdw->fdp; + if (fd < 0) { /* closed */ + fdw->revents = fdw->events; + rb_iom_waiter_ready(&fdw->w); + sd->do_wait = 0; + continue; + } + if (fd > max) { + max = fd; + } + if (fdw->events & RB_WAITFD_IN) { + rb_fd_set(fd, rfds = &iom->rfds); + } + if (fdw->events & RB_WAITFD_OUT) { + rb_fd_set(fd, wfds = &iom->wfds); + } + if (fdw->events & RB_WAITFD_PRI) { + rb_fd_set(fd, efds = &iom->efds); + } + } + + if (sd->do_wait && (max >= 0 || !list_empty(&iom->pids))) { + timeout = rb_iom_next_timeout(&iom->timers); + } + if (timeout >= 0) { + tv = double2timeval(timeout); + tvp = &tv; + } + else { + tvp = 0; + } + cur.th = th; + list_add(&iom->blockers, &cur.bnode); + BLOCKING_REGION({ + ret = native_fd_select(max + 1, rfds, wfds, efds, tvp, th); + }, ubf_select, th, FALSE); + list_del(&cur.bnode); + + if (ret > 0) { + list_for_each_safe(&iom->fds, fdw, next, w.wnode) { + int fd = *fdw->fdp; + + if (fd < 0) { /* closed */ + fdw->revents = fdw->events; + rb_iom_waiter_ready(&fdw->w); + continue; + } + if (fdw->events & RB_WAITFD_IN && rb_fd_isset(fd, &iom->rfds)) { + fdw->revents |= RB_WAITFD_IN; + } + if (fdw->events & RB_WAITFD_OUT && rb_fd_isset(fd, &iom->wfds)) { + fdw->revents |= RB_WAITFD_OUT; + } + if (fdw->events & RB_WAITFD_PRI && rb_fd_isset(fd, &iom->efds)) { + fdw->revents |= RB_WAITFD_PRI; + } + + /* got revents? enqueue ourselves to be run! */ + if (fdw->revents) { + rb_iom_waiter_ready(&fdw->w); + } + } + } + + rb_fd_term(&iom->rfds); + rb_fd_term(&iom->wfds); + rb_fd_term(&iom->efds); + rb_iom_blockers_notify(iom, -1); + RUBY_VM_CHECK_INTS_BLOCKING(th); +} + +static int +retry_select_p(rb_iom_t *iom) +{ + /* + * if somebody changed iom->fds while we were inside select, + * rerun it with zero timeout to avoid a race condition. + * This is not necessary for epoll or kqueue because the kernel + * is constantly monitoring the watched set. + */ + return (iom->select_start != iom->select_gen); +} + +static VALUE +iom_do_select(VALUE ptr) +{ + struct select_do *sd = (struct select_do *)ptr; + rb_thread_t *th = sd->th; + rb_iom_t *iom; + int retry = 0; + + if (rb_fiber_auto_do_yield_p(th)) { + rb_fiber_yield(0, 0); + } + iom = th->vm->iom; + if (!iom) { + return Qfalse; + } + RUBY_VM_CHECK_INTS_BLOCKING(th); + do { + if (list_empty(&iom->blockers)) { + iom_select_wait(sd); + rb_iom_timer_check(th); + retry = retry_select_p(iom); + } + else { + struct rb_iom_blocker cur; + + cur.th = th; + list_add_tail(&iom->blockers, &cur.bnode); + BLOCKING_REGION({ + native_fd_select(0, 0, 0, 0, 0, th); + }, ubf_select, th, FALSE); + list_del(&cur.bnode); + RUBY_VM_CHECK_INTS_BLOCKING(th); + } + rb_fiber_auto_do_yield_p(th); + } while (retry); + if (rb_fiber_auto_do_yield_p(th)) { + rb_fiber_yield(0, 0); + } + return Qfalse; +} + +void +rb_iom_schedule(rb_thread_t *th, double *timeout) +{ + struct select_do sd; + + sd.th = th; + sd.do_wait = !rb_fiber_auto_sched_p(th); + + if (timeout && sd.do_wait) { + double t0 = timeofday(); + struct rb_iom_timer t; + + rb_iom_timer_add(th, &t, timeout, 0); + rb_ensure(iom_do_select, (VALUE)&sd, rb_iom_timer_done, (VALUE)&t); + *timeout -= timeofday() - t0; + } + else { + rb_iom_timer_check(th); + iom_do_select((VALUE)&sd); + } +} + +static VALUE +iom_schedule_fd(VALUE ptr) +{ + rb_thread_t *th = (rb_thread_t *)ptr; + rb_iom_t *iom = rb_iom_get(th); + + if (rb_fiber_auto_do_yield_p(th)) { + return rb_fiber_yield(0, 0); + } + + if (list_empty(&iom->blockers)) { + struct select_do sd; + + sd.th = th; + sd.do_wait = 0; + iom_select_wait(&sd); + rb_iom_timer_check(th); + (void)rb_fiber_auto_do_yield_p(th); + } + else { + /* + * kick the first select thread, retry_select_p will return true + * since caller bumped iom->select_gen + */ + rb_iom_blockers_notify(iom, 1); + } + return rb_fiber_yield(0, 0); +} + + +/* only epoll takes advantage of this (kqueue may for portability bugs) */ +int +rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, double *timeout) +{ + return rb_iom_waitfd(th, &fptr->fd, events, timeout); +} + +int +rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout) +{ + rb_iom_t *iom = rb_iom_get(th); + struct rb_iom_fd_waiter fdw; + + if (*fdp < 0) return 0; + fdw.fdp = fdp; + fdw.events = (short)events; + fdw.revents = 0; + + /* use FIFO order for fairness */ + list_add_tail(&iom->fds, &fdw.w.wnode); + rb_iom_timer_add(th, &fdw.w.timer, timeout, IOM_FIB|IOM_WAIT); + iom->select_gen++; + rb_ensure(iom_schedule_fd, (VALUE)th, rb_iom_waiter_done, (VALUE)&fdw.w); + + return (int)fdw.revents; /* may be zero if timed out */ +} + +void +rb_iom_destroy(rb_vm_t *vm) +{ + if (vm->iom) { + xfree(vm->iom); + vm->iom = 0; + } +} + +/* used by thread.c::rb_thread_atfork */ +static void +rb_iom_atfork_child(rb_thread_t *th) +{ + rb_iom_destroy(th->vm); +} + +/* used by thread_pthread.c */ +static int +rb_iom_reserved_fd(int fd) +{ + return 0; +} + +#include "iom_common.h" diff --git a/prelude.rb b/prelude.rb index 7b98e28285..1e71a3f168 100644 --- a/prelude.rb +++ b/prelude.rb @@ -16,6 +16,18 @@ def self.exclusive end end +class Fiber + def self.start(*args, &block) + # loses Fiber#resume return value, but maybe people do not care + new(&block).run + end + + def run + start + self + end +end + class IO # call-seq: diff --git a/process.c b/process.c index f45ff2f5c5..b2ac7d1939 100644 --- a/process.c +++ b/process.c @@ -16,6 +16,7 @@ #include "ruby/thread.h" #include "ruby/util.h" #include "vm_core.h" +#include "iom.h" #include #include @@ -907,10 +908,15 @@ rb_waitpid(rb_pid_t pid, int *st, int flags) result = do_waitpid(pid, st, flags); } else { - while ((result = do_waitpid_nonblocking(pid, st, flags)) < 0 && - (errno == EINTR)) { - rb_thread_t *th = GET_THREAD(); - RUBY_VM_CHECK_INTS(th); + rb_thread_t *th = GET_THREAD(); + if (rb_fiber_auto_sched_p(th) && WNOHANG) { + return rb_iom_waitpid(th, pid, st, flags, 0); + } + else { + while ((result = do_waitpid_nonblocking(pid, st, flags)) < 0 && + (errno == EINTR)) { + RUBY_VM_CHECK_INTS(th); + } } } if (result > 0) { diff --git a/signal.c b/signal.c index 2e69cf08ac..617210840f 100644 --- a/signal.c +++ b/signal.c @@ -13,6 +13,7 @@ #include "internal.h" #include "vm_core.h" +#include "iom.h" #include #include #include @@ -1030,6 +1031,17 @@ rb_trap_exit(void) } } +static int +sig_is_chld(int sig) +{ +#if defined(SIGCLD) + return (sig == SIGCLD); +#elif defined(SIGCHLD) + return (sig == SIGCHLD); +#endif + return 0; +} + void rb_signal_exec(rb_thread_t *th, int sig) { @@ -1037,6 +1049,9 @@ rb_signal_exec(rb_thread_t *th, int sig) VALUE cmd = vm->trap_list[sig].cmd; int safe = vm->trap_list[sig].safe; + if (sig_is_chld(sig)) { + rb_iom_sigchld(vm); + } if (cmd == 0) { switch (sig) { case SIGINT: @@ -1096,6 +1111,11 @@ default_handler(int sig) #ifdef SIGUSR2 case SIGUSR2: #endif +#ifdef SIGCLD + case SIGCLD: +#elif defined(SIGCHLD) + case SIGCHLD: +#endif func = sighandler; break; #ifdef SIGBUS @@ -1133,6 +1153,9 @@ trap_handler(VALUE *cmd, int sig) VALUE command; if (NIL_P(*cmd)) { + if (sig_is_chld(sig)) { + goto sig_dfl; + } func = SIG_IGN; } else { @@ -1153,6 +1176,9 @@ trap_handler(VALUE *cmd, int sig) break; case 14: if (memcmp(cptr, "SYSTEM_DEFAULT", 14) == 0) { + if (sig_is_chld(sig)) { + goto sig_dfl; + } func = SIG_DFL; *cmd = 0; } @@ -1160,6 +1186,9 @@ trap_handler(VALUE *cmd, int sig) case 7: if (memcmp(cptr, "SIG_IGN", 7) == 0) { sig_ign: + if (sig_is_chld(sig)) { + goto sig_dfl; + } func = SIG_IGN; *cmd = Qtrue; } @@ -1415,15 +1444,14 @@ static int init_sigchld(int sig) { sighandler_t oldfunc; + sighandler_t func = sighandler; oldfunc = ruby_signal(sig, SIG_DFL); if (oldfunc == SIG_ERR) return -1; - if (oldfunc != SIG_DFL && oldfunc != SIG_IGN) { - ruby_signal(sig, oldfunc); - } - else { - GET_VM()->trap_list[sig].cmd = 0; - } + + ruby_signal(sig, func); + GET_VM()->trap_list[sig].cmd = 0; + return 0; } # ifndef __native_client__ diff --git a/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb b/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb index 1141dd317c..9e5bcb419d 100644 --- a/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb +++ b/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb @@ -1,5 +1,6 @@ # frozen_string_literal: false require 'test/unit' +require 'socket' class TestWaitForSingleFD < Test::Unit::TestCase require '-test-/wait_for_single_fd' @@ -49,5 +50,48 @@ def test_wait_for_closed_pipe end end + def test_fiber_start_rw + host = '127.0.0.1' + TCPServer.open(host, 0) do |srv| + con = TCPSocket.new(host, srv.addr[1]) + acc = srv.accept + begin + # 2 events, but only one triggers: + flags = RB_WAITFD_IN|RB_WAITFD_OUT + f = Fiber.start { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal RB_WAITFD_OUT, f.value + # trigger readability + con.write('a') + flags = RB_WAITFD_IN + f = Fiber.start { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + # ensure both events trigger: + flags = RB_WAITFD_IN|RB_WAITFD_OUT + f = Fiber.start { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + # ensure the EVFILT_WRITE (kqueue) is disabled after previous test: + flags = RB_WAITFD_IN + f = Fiber.start { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + skip 'missing MSG_OOB' unless Socket.const_defined?(:MSG_OOB) + + # make sure urgent data works with kqueue: + assert_equal 3, con.send('123', Socket::MSG_OOB) + flags = RB_WAITFD_PRI + f = Fiber.start { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + flags = RB_WAITFD_IN|RB_WAITFD_OUT|RB_WAITFD_PRI + f = Fiber.start { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + ensure + con.close + acc.close + end + end + end end diff --git a/test/lib/leakchecker.rb b/test/lib/leakchecker.rb index be4b83681f..cca72b4eb5 100644 --- a/test/lib/leakchecker.rb +++ b/test/lib/leakchecker.rb @@ -28,6 +28,15 @@ def find_fds if d.respond_to? :fileno a -= [d.fileno] end + + # only place we use epoll is internally, do not count it against us + a.delete_if do |fd| + begin + File.readlink("#{fd_dir}/#{fd}").match?(/\beventpoll\b/) + rescue + false + end + end a } fds.sort diff --git a/test/ruby/test_fiber_auto.rb b/test/ruby/test_fiber_auto.rb new file mode 100644 index 0000000000..739770b9c2 --- /dev/null +++ b/test/ruby/test_fiber_auto.rb @@ -0,0 +1,238 @@ +# frozen_string_literal: true +require 'test/unit' +require 'fiber' +require 'io/nonblock' +require 'io/wait' + +class TestFiberAuto < Test::Unit::TestCase + def test_value + nr = 0 + f = Fiber.start { nr += 1 } + assert_equal 1, f.value, 'value returns as expected' + assert_equal 1, f.value, 'value is idempotent' + end + + def test_join + nr = 0 + f = Fiber.start { nr += 1 } + assert_equal f, f.join + assert_equal 1, f.value + assert_equal f, f.join, 'join is idempotent' + assert_equal f, f.join(10), 'join is idempotent w/ timeout' + end + + def test_io_wait_read + tick = 0.1 # 100ms, TIME_QUANTUM_USEC in thread_pthread.c + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + rdr = Fiber.start { r.read(1) } + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + assert_nil rdr.join(tick), 'join returns nil on timeout' + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, tick, 'Fiber still running' + w.write('a') + assert_equal 'a', rdr.value, 'finished' + + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + rwait = Fiber.start { r.wait_readable(tick) } + assert_nil rwait.value, 'IO returned no events after timeout' + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, tick, 'Fiber waited for timeout' + end + end + + # make sure we do not create extra FDs (from epoll/kqueue) + # for operations which do not need it + def test_fd_creation_and_fork + assert_separately(%w(-r io/nonblock), <<~'end;') # do + fdrange = (3..64) + reserved = -'The given fd is not accessible because RubyVM reserves it' + fdstats = lambda do + stats = Hash.new(0) + fdrange.map do |fd| + begin + IO.for_fd(fd, autoclose: false) + stats[:good] += 1 + rescue Errno::EBADF # ignore + rescue ArgumentError => e + raise if e.message != reserved + stats[:reserved] += 1 + end + end + stats.freeze + end + + before = fdstats.call + Fiber.start { :fib }.join + assert_equal before, fdstats.call, + 'Fiber.start + Fiber#join does not create new FD' + + # now, epoll/kqueue implementations create FDs, ensure they're reserved + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + + before_io = fdstats.call + assert_equal before[:reserved], before_io[:reserved], + 'no reserved FDs created before I/O attempted' + + f1 = Fiber.start { r.read(1) } + after_io = fdstats.call + assert_equal before_io[:good], after_io[:good], + 'no change in unreserved FDs during I/O wait' + + w.write('!') + assert_equal '!', f1.value, 'auto-Fiber IO#read works' + assert_operator before_io[:reserved], :<=, after_io[:reserved], + 'a reserved FD may be created on some implementations' + + # ensure we do not share epoll/kqueue FDs across fork + if Process.respond_to?(:fork) && Process.respond_to?(:waitpid2) + pid = fork do + after_fork = fdstats.call + w.write(after_fork.inspect == before_io.inspect ? '1' : '0') + + # ensure auto-Fiber works properly after forking + IO.pipe do |a, b| + a.nonblock = b.nonblock = true + f1 = Fiber.start { a.read(1) } + f2 = Fiber.start { b.write('!') } + assert_equal 1, f2.value + w.write(f1.value == '!' ? '!' : '?') + end + + exit!(0) + end + assert_equal '1', r.read(1), 'reserved FD cleared after fork' + assert_equal '!', r.read(1), 'auto-fiber works after forking' + _, status = Process.waitpid2(pid) + assert_predicate status, :success?, 'forked child exited properly' + end + end + end; + end + + def test_waitpid + assert_separately(%w(-r io/nonblock), <<~'end;') # do + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + pid = fork do + sleep 0.1 + exit!(0) + end + f = Fiber.start { Process.waitpid2(pid) } + wpid, st = f.value + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, 0.1, 'child did not return immediately' + assert_equal pid, wpid, 'waited pid matches' + assert_predicate st, :success?, 'process exited successfully' + + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + + # "blocking" Process.waitpid2 takes precedence over trap(:CHLD) + waited = [] + chld_called = false + trap(:CHLD) { chld_called = true; waited.concat(Process.waitall) } + pid = fork do + r.read(1) + exit!(0) + end + assert_nil Process.waitpid2(pid, Process::WNOHANG), 'child is waiting' + assert_nil(Fiber.start do + Process.waitpid2(pid, Process::WNOHANG) + end.value, 'WNOHANG works normally in fiber') + f = Fiber.start { Process.waitpid2(pid) } + assert_equal 1, w.write('.'), 'woke child' + wpid, st = f.value + assert_equal pid, wpid, 'waited pid matches' + assert_predicate st, :success?, 'child exited successfully' + assert_empty waited, 'waitpid had predence' + assert chld_called, 'CHLD handler got called anyways' + + chld_called = false + [ 'DEFAULT', 'SIG_DFL', 'IGNORE', 'SYSTEM_DEFAULT', nil + ].each do |handler| + trap(:CHLD, handler) + pid = fork do + r.read(1) + exit!(0) + end + t = "trap(:CHLD, #{handler.inspect})" + f = Fiber.start { Process.waitpid2(pid) } + assert_equal 1, w.write('.'), "woke child for #{t}" + wpid, st = f.value + assert_predicate st, :success?, "child exited successfully for #{t}" + assert_equal wpid, pid, "return value correct for #{t}" + assert_equal false, chld_called, + "old CHLD handler did not fire for #{t}" + end + end + end; + end if Process.respond_to?(:fork) && Process.respond_to?(:waitpid2) && + Process.const_defined?(:WNOHANG) + + def test_cpu_usage + require 'benchmark' # avoid skewing assert_cpu_usage_low + disabled = GC.disable + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + t = fv = wr = nil + + # select() requires higher CPU% :< + assert_cpu_usage_low(pct: 0.33) do + t = Thread.new { Fiber.start { r.read(1) }.value } + wr = Thread.new do + sleep 0.1 + w.write('..') + end + fv = Fiber.start { r.read(1) }.value + end + assert_equal '.', fv + assert_equal '.', t.value + wr.join + end + + thrs = [] + pid = spawn(*%W(#{EnvUtil.rubybin} --disable=gems -e #{'sleep 0.1'})) + 2.times do + thrs << Thread.new { Fiber.start { Process.waitall }.value } + end + assert_cpu_usage_low(pct: 0.22) do + thrs.map!(&:value) + end + thrs.compact! + ensure + GC.enable unless disabled + end + + # As usual, cannot resume/run fibers across threads, but the + # scheduler works across threads + def test_cross_thread_schedule + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + t = Thread.new { Fiber.start { r.read(1) }.value } + assert_nil t.join(0.1) + f = Fiber.start { w.write('a') } + assert_equal 'a', t.value + assert_equal 1, f.value + end + end + + # tricky for kqueue and epoll implementations + def test_multi_readers + if RUBY_PLATFORM =~ /linux/ && + (RbConfig::CONFIG['LDFLAGS'] =~ /-lkqueue\b/ || + RbConfig::CONFIG['DLDFLAGS'] =~ /-lkqueue\b/) + skip "FIXME libkqueue is buggy on this test" + end + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + nr = 30 + fibs = nr.times.map { Fiber.start { r.read(1) } } + fibs.each { |f| assert_nil f.join(0.001) } + + exp = nr.times.map { -'a' }.freeze + w.write(exp.join) + assert_equal exp, fibs.map(&:value); + end + end +end diff --git a/thread.c b/thread.c index b7ee1d8d9b..e41c1b9360 100644 --- a/thread.c +++ b/thread.c @@ -70,6 +70,7 @@ #include "ruby/thread.h" #include "ruby/thread_native.h" #include "internal.h" +#include "iom.h" #ifndef USE_NATIVE_THREAD_PRIORITY #define USE_NATIVE_THREAD_PRIORITY 0 @@ -3454,6 +3455,18 @@ rb_thread_priority_set(VALUE thread, VALUE prio) /* for IO */ +static int +rb_iom_waitfd_tv(rb_thread_t *th, int *fdp, int events, struct timeval *tv) +{ + double *to = NULL; + double tout; + if (tv) { + tout = (double)tv->tv_sec + (double)tv->tv_usec * 1e-6; + to = &tout; + } + return rb_iom_waitfd(th, fdp, events, to); +} + #if defined(NFDBITS) && defined(HAVE_RB_FD_INIT) /* @@ -3889,6 +3902,10 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv) struct timespec *timeout = NULL; rb_thread_t *th = GET_THREAD(); + if (rb_fiber_auto_sched_p(th)) { + return rb_iom_waitfd_tv(th, &fd, events, tv); + } + #define poll_update() \ (update_timespec(timeout, limit), \ TRUE) @@ -4000,7 +4017,11 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv) struct select_args args; int r; VALUE ptr = (VALUE)&args; + rb_thread_t *th = GET_THREAD(); + if (rb_fiber_auto_sched_p(th)) { + return rb_iom_waitfd_tv(th, &fd, events, tv); + } args.as.fd = fd; args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL; args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL; @@ -4147,10 +4168,13 @@ terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th) } } +static void rb_iom_atfork_child(rb_thread_t *); + void rb_thread_atfork(void) { rb_thread_t *th = GET_THREAD(); + rb_iom_atfork_child(th); rb_thread_atfork_internal(th, terminate_atfork_i); th->join_list = NULL; @@ -5037,3 +5061,21 @@ rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data) return rb_ensure(b_proc, data, rb_ary_pop, cur_th->pending_interrupt_mask_stack); } + +#ifndef RUBYVM_IOM +# if defined(HAVE_SYS_EVENT_H) && defined(HAVE_KQUEUE) && defined(HAVE_KEVENT) +# define RUBYVM_IOM IOM_KQUEUE +# elif defined(HAVE_SYS_EPOLL_H) && defined(HAVE_EPOLL_CREATE) && \ + defined(HAVE_EPOLL_CTL) && defined(HAVE_EPOLL_WAIT) +# define RUBYVM_IOM IOM_EPOLL +# else +# define RUBYVM_IOM IOM_SELECT +# endif +#endif +#if RUBYVM_IOM == IOM_KQUEUE +# include "iom_kqueue.h" +#elif RUBYVM_IOM == IOM_EPOLL +# include "iom_epoll.h" +#else +# include "iom_select.h" +#endif diff --git a/thread_pthread.c b/thread_pthread.c index f652429637..35e97f6a09 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -1754,9 +1754,14 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr) } #endif +static int rb_iom_reserved_fd(int); /* check kqueue or epoll FD */ + int rb_reserved_fd_p(int fd) { + if (rb_iom_reserved_fd(fd)) { + return 1; + } #if USE_SLEEPY_TIMER_THREAD if ((fd == timer_thread_pipe.normal[0] || fd == timer_thread_pipe.normal[1] || diff --git a/vm.c b/vm.c index 6ff1d234a3..b2a4f82de8 100644 --- a/vm.c +++ b/vm.c @@ -11,6 +11,7 @@ #include "internal.h" #include "ruby/vm.h" #include "ruby/st.h" +#include "iom.h" #include "gc.h" #include "vm_core.h" @@ -2195,6 +2196,7 @@ ruby_vm_destruct(rb_vm_t *vm) rb_fiber_reset_root_local_storage(th->self); thread_free(th); } + rb_iom_destroy(vm); rb_vm_living_threads_init(vm); ruby_vm_run_at_exit_hooks(vm); if (vm->loading_table) { @@ -2361,6 +2363,7 @@ rb_thread_recycle_stack_release(VALUE *stack) } void rb_fiber_mark_self(rb_fiber_t *fib); +void rb_iom_mark_runq(const rb_thread_t *th); void rb_thread_mark(void *ptr) @@ -2412,6 +2415,11 @@ rb_thread_mark(void *ptr) RUBY_MARK_UNLESS_NULL(th->top_wrapper); rb_fiber_mark_self(th->fiber); rb_fiber_mark_self(th->root_fiber); + + /* th->afrunq may be 0 early on */ + if (th->afrunq.n.next && !list_empty(&th->afrunq)) { + rb_iom_mark_runq(th); + } RUBY_MARK_UNLESS_NULL(th->stat_insn_usage); RUBY_MARK_UNLESS_NULL(th->last_status); @@ -2517,6 +2525,7 @@ static void th_init(rb_thread_t *th, VALUE self) { th->self = self; + list_head_init(&th->afrunq); /* allocate thread stack */ #ifdef USE_SIGALTSTACK diff --git a/vm_core.h b/vm_core.h index 679512390d..c1eac3751f 100644 --- a/vm_core.h +++ b/vm_core.h @@ -510,6 +510,8 @@ typedef struct rb_hook_list_struct { int need_clean; } rb_hook_list_t; +struct rb_iom_struct; + typedef struct rb_vm_struct { VALUE self; @@ -518,6 +520,7 @@ typedef struct rb_vm_struct { struct rb_thread_struct *main_thread; struct rb_thread_struct *running_thread; + struct rb_iom_struct *iom; struct list_head waiting_fds; /* <=> struct waiting_fd */ struct list_head living_threads; @@ -848,6 +851,7 @@ typedef struct rb_thread_struct { rb_fiber_t *fiber; rb_fiber_t *root_fiber; rb_jmpbuf_t root_jmpbuf; + struct list_head afrunq; /* -rb_iom_timer.as.rnode */ /* misc */ enum method_missing_reason method_missing_reason: 8; -- EW