From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS57858 46.29.248.0/23 X-Spam-Status: No, score=-1.7 required=3.0 tests=AWL,BAYES_00,RCVD_IN_SBL, RCVD_IN_XBL,RDNS_NONE,SPF_FAIL,SPF_HELO_FAIL shortcircuit=no autolearn=no autolearn_force=no version=3.4.0 Received: from 80x24.org (unknown [46.29.248.238]) by dcvr.yhbt.net (Postfix) with ESMTP id 6B9481FAE2 for ; Sun, 28 Jan 2018 10:39:24 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [PATCH 1/2] Threadlet: green threads implemented using fibers Date: Sun, 28 Jan 2018 10:39:06 +0000 Message-Id: <20180128103907.12069-2-e@80x24.org> In-Reply-To: <20180128103907.12069-1-e@80x24.org> References: <20180128103907.12069-1-e@80x24.org> List-Id: Threadlet is a subclass of Fiber which gives Thread-like behavior with implementation characteristics similar to the Thread class from Ruby 1.8 (but allowing kqueue and epoll on modern OSes). This adds following scheduling points in the C-API: * rb_wait_for_single_fd * rb_thread_fd_select * rb_waitpid main Ruby API: Threadlet#start -> enable auto-scheduling and run Threadlet until it automatically yields (due to EAGAIN/EWOULDBLOCK) The following behave like their Thread counterparts: Threadlet.start - Threadlet.new + Threadlet#start (prelude.rb) Threadlet#join - run internal scheduler until Threadlet is terminated Threadlet#value - ditto Threadlet#run - like Threadlet#start (prelude.rb) cont.c - rb_threadlet_sched_p checks if the current execution context is Changes to existing functions are minimal. New files (all new structs and relations should be documented): iom.h - internal API for the rest of RubyVM (incomplete?) iom_internal.h - internal header for iom_(select|epoll|kqueue).h iom_epoll.h - epoll-specific pieces iom_kqueue.h - kqueue-specific pieces iom_select.h - select-specific pieces iom_pingable_common.h - common code for iom_(epoll|kqueue).h iom_common.h - common footer for iom_(select|epoll|kqueue).h Changes to existing data structures: rb_thread_t.afrunq - list of Threadlets to auto-resume rb_vm_t.iom - Ruby I/O Manager (rb_iom_t) :) Besides rb_iom_t, all the new structs are stack-only and relies extensively on ccan/list for branch-less, O(1) insert/delete. As usual, understanding the data structures first should help you understand the code. Right now, I reuse some static functions in thread.c, so thread.c includes iom_(select|epoll|kqueue).h TODO: Hijack other blocking functions (IO.select, ...) I am using "double" for timeout since it is more convenient for arithmetic like parts of thread.c. Most platforms have good FP, I think. Also, all "blocking" functions (rb_iom_wait*) will have timeout support. ./configure gains a new --with-iom=(select|epoll|kqueue) switch libkqueue: libkqueue support is incomplete; corner cases are not handled well: 1) multiple Threadlets waiting on the same FD 2) waiting for both read and write events on the same FD Bugfixes to libkqueue may be necessary to support all corner cases. Supporting these corner cases for native kqueue was challenging, even. See comments on iom_kqueue.h and iom_epoll.h for nuances. Limitations Test script I used to download a file from my server: ----8<--- require 'net/http' require 'uri' require 'digest/sha1' require 'fiber' url = 'https://80x24.org/git-i-forgot-to-pack/objects/pack/pack-97b25a76c03b489d4cbbd85b12d0e1ad28717e55.idx' uri = URI(url) use_ssl = "https" == uri.scheme fibs = 10.times.map do Threadlet.start do cur = Threadlet.current.object_id # XXX getaddrinfo() and connect() are blocking # XXX resolv/replace + connect_nonblock Net::HTTP.start(uri.host, uri.port, use_ssl: use_ssl) do |http| req = Net::HTTP::Get.new(uri) http.request(req) do |res| dig = Digest::SHA1.new res.read_body do |buf| dig.update(buf) # warn "#{cur} #{buf.bytesize}\n" end warn "#{cur} #{dig.hexdigest}\n" end end warn "done\n" :done end end warn "joining #{Time.now}\n" fibs[-1].join(4) warn "joined #{Time.now}\n" all = fibs.dup warn "1 joined, wait for the rest\n" until fibs.empty? fibs.each(&:join) fibs.keep_if(&:alive?) warn fibs.inspect end p all.map(&:value) Threadlet.new do puts 'HI' end.run.join --- common.mk | 7 + configure.ac | 32 + cont.c | 123 ++- include/ruby/io.h | 2 + iom.h | 95 +++ iom_common.h | 204 +++++ iom_epoll.h | 709 ++++++++++++++++ iom_internal.h | 280 +++++++ iom_kqueue.h | 899 +++++++++++++++++++++ iom_pingable_common.h | 54 ++ iom_select.h | 448 ++++++++++ prelude.rb | 12 + process.c | 14 +- signal.c | 39 +- .../wait_for_single_fd/test_wait_for_single_fd.rb | 62 ++ test/lib/leakchecker.rb | 9 + test/ruby/test_threadlet.rb | 283 +++++++ thread.c | 76 +- thread_pthread.c | 5 + vm.c | 12 +- vm_core.h | 4 + 21 files changed, 3347 insertions(+), 22 deletions(-) create mode 100644 iom.h create mode 100644 iom_common.h create mode 100644 iom_epoll.h create mode 100644 iom_internal.h create mode 100644 iom_kqueue.h create mode 100644 iom_pingable_common.h create mode 100644 iom_select.h create mode 100644 test/ruby/test_threadlet.rb diff --git a/common.mk b/common.mk index c888815395..850e3e3c77 100644 --- a/common.mk +++ b/common.mk @@ -2681,6 +2681,13 @@ thread.$(OBJEXT): {$(VPATH)}intern.h thread.$(OBJEXT): {$(VPATH)}internal.h thread.$(OBJEXT): {$(VPATH)}io.h thread.$(OBJEXT): {$(VPATH)}iseq.h +thread.$(OBJEXT): {$(VPATH)}iom.h +thread.$(OBJEXT): {$(VPATH)}iom_internal.h +thread.$(OBJEXT): {$(VPATH)}iom_common.h +thread.$(OBJEXT): {$(VPATH)}iom_epoll.h +thread.$(OBJEXT): {$(VPATH)}iom_kqueue.h +thread.$(OBJEXT): {$(VPATH)}iom_pingable_common.h +thread.$(OBJEXT): {$(VPATH)}iom_select.h thread.$(OBJEXT): {$(VPATH)}method.h thread.$(OBJEXT): {$(VPATH)}missing.h thread.$(OBJEXT): {$(VPATH)}node.h diff --git a/configure.ac b/configure.ac index 352a8140ca..737ebe6654 100644 --- a/configure.ac +++ b/configure.ac @@ -981,6 +981,8 @@ AC_CHECK_HEADERS(process.h) AC_CHECK_HEADERS(pwd.h) AC_CHECK_HEADERS(setjmpex.h) AC_CHECK_HEADERS(sys/attr.h) +AC_CHECK_HEADERS(sys/epoll.h) +AC_CHECK_HEADERS(sys/event.h) AC_CHECK_HEADERS(sys/fcntl.h) AC_CHECK_HEADERS(sys/file.h) AC_CHECK_HEADERS(sys/id.h) @@ -1739,6 +1741,10 @@ AC_CHECK_FUNCS(dladdr) AC_CHECK_FUNCS(dup) AC_CHECK_FUNCS(dup3) AC_CHECK_FUNCS(eaccess) +AC_CHECK_FUNCS(epoll_create) +AC_CHECK_FUNCS(epoll_create1) +AC_CHECK_FUNCS(epoll_ctl) +AC_CHECK_FUNCS(epoll_wait) AC_CHECK_FUNCS(endgrent) AC_CHECK_FUNCS(fchmod) AC_CHECK_FUNCS(fchown) @@ -1772,7 +1778,9 @@ AC_CHECK_FUNCS(initgroups) AC_CHECK_FUNCS(ioctl) AC_CHECK_FUNCS(isfinite) AC_CHECK_FUNCS(issetugid) +AC_CHECK_FUNCS(kevent) AC_CHECK_FUNCS(killpg) +AC_CHECK_FUNCS(kqueue) AC_CHECK_FUNCS(lchmod) AC_CHECK_FUNCS(lchown) AC_CHECK_FUNCS(link) @@ -2735,6 +2743,29 @@ AC_ARG_WITH(valgrind, AS_IF([test x$with_valgrind != xno], [AC_CHECK_HEADERS(valgrind/memcheck.h)]) +AC_DEFINE_UNQUOTED(IOM_SELECT, 0) +AC_DEFINE_UNQUOTED(IOM_KQUEUE, 1) +AC_DEFINE_UNQUOTED(IOM_EPOLL, 2) + +iom_default=select +AS_CASE([$ac_cv_func_kqueue:$ac_cv_func_kevent:$ac_cv_header_sys_event_h], +[yes:yes:yes], [iom_default=kqueue], +[*], + [AS_CASE( + [$ac_cv_func_epoll_wait:$ac_cv_func_epoll_create:$ac_cv_header_sys_epoll_h], + [yes:yes:yes], [iom_default=epoll])] +) + +AC_ARG_WITH(iom, + AS_HELP_STRING([--with-iom=XXXXX], + [I/O manager (select|kqueue|epoll)]), + [with_iom="$withval"], [with_iom="$iom_default"]) +AS_CASE(["$with_iom"], + [select], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_SELECT)], + [kqueue], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_KQUEUE)], + [epoll], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_EPOLL)], + [AC_MSG_ERROR(unknown I/O manager: $with_iom)]) + dln_a_out_works=no AS_IF([test "$ac_cv_header_a_out_h" = yes], [ AS_IF([test "$with_dln_a_out" = yes || test "$rb_cv_dlopen" = unknown], [ @@ -3886,5 +3917,6 @@ config_summary "install doc" "$install_doc" config_summary "man page type" "$MANTYPE" config_summary "search path" "$search_path" config_summary "static-linked-ext" ${EXTSTATIC:+"yes"} +config_summary "I/O manager" ${with_iom} echo "" echo "---" diff --git a/cont.c b/cont.c index f48fd2e52b..7772fcb7d0 100644 --- a/cont.c +++ b/cont.c @@ -9,6 +9,7 @@ **********************************************************************/ +#include "iom.h" #include "internal.h" #include "vm_core.h" #include "gc.h" @@ -159,6 +160,7 @@ struct rb_fiber_struct { * You shouldn't mix "transfer" and "resume". */ int transferred; + unsigned int is_threadlet:1; #if FIBER_USE_NATIVE #ifdef _WIN32 @@ -1745,19 +1747,30 @@ rb_fiber_terminate(rb_fiber_t *fib, int need_interrupt) fiber_switch(ret_fib, 1, &value, 0); } -VALUE -rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv) +int +rb_fiber_resumable_p(const rb_thread_t *th, const rb_fiber_t *fib) { - rb_fiber_t *fib; - GetFiberPtr(fibval, fib); + return th->root_fiber != fib && !fib->prev; +} +static void +fiber_check_resume(const rb_fiber_t *fib) +{ if (fib->prev != 0 || fib->cont.type == ROOT_FIBER_CONTEXT) { rb_raise(rb_eFiberError, "double resume"); } if (fib->transferred != 0) { rb_raise(rb_eFiberError, "cannot resume transferred Fiber"); } +} +VALUE +rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv) +{ + rb_fiber_t *fib; + GetFiberPtr(fibval, fib); + + fiber_check_resume(fib); return fiber_switch(fib, argc, argv, 1); } @@ -1927,6 +1940,101 @@ fiber_to_s(VALUE fibval) return rb_block_to_s(fibval, &proc->block, status_info); } +/* Returns true if Threadlet is enabled for current fiber */ +int +rb_threadlet_sched_p(const rb_thread_t *th) +{ + const rb_fiber_t *cur = th->ec->fiber_ptr; + + return (cur && cur->is_threadlet && th->root_fiber != cur); +} + +/* + * start running a Threadlet + */ +static VALUE +rb_threadlet_start(int argc, VALUE *argv, VALUE self) +{ + rb_thread_t *th = GET_THREAD(); + rb_fiber_t *fib; + GetFiberPtr(self, fib); + + if (th->root_fiber == fib) { + rb_raise(rb_eFiberError, "Root fiber cannot #start"); + } + if (fib->is_threadlet) { + rb_raise(rb_eFiberError, "Threadlet already started"); + } + fib->is_threadlet = 1; + fiber_check_resume(fib); + return fiber_switch(fib, argc, argv, 1); +} + +rb_thread_t * +rb_fiber_owner_thread(VALUE self) +{ + rb_fiber_t *fib; + + GetFiberPtr(self, fib); + + return rb_thread_ptr(cont_thread_value(&fib->cont)); +} + +static void +threadlet_join(rb_fiber_t *fib, double *timeout) +{ + rb_thread_t *th = GET_THREAD(); + rb_fiber_t *cur = fiber_current(); + + if (cur == fib) { + rb_raise(rb_eFiberError, "Target Threadlet must not be current fiber"); + } + if (th->root_fiber == fib) { + rb_raise(rb_eFiberError, "Target Threadlet must not be root fiber"); + } + if (cont_thread_value(&fib->cont) != th->self) { + rb_raise(rb_eFiberError, "Target Threadlet not owned by current thread"); + } + if (!fib->is_threadlet) { + rb_raise(rb_eFiberError, "Target is not a Threadlet"); + } + + while (fib->status != FIBER_TERMINATED && (!timeout || *timeout >= 0.0)) { + rb_iom_schedule(th, timeout); + } +} + +static VALUE +rb_threadlet_join(int argc, VALUE *argv, VALUE self) +{ + rb_fiber_t *fib; + double timeout, *t; + VALUE limit; + + GetFiberPtr(self, fib); + rb_scan_args(argc, argv, "01", &limit); + + if (NIL_P(limit)) { + t = 0; + } else { + timeout = rb_num2dbl(limit); + t = &timeout; + } + + threadlet_join(fib, t); + return fib->status == FIBER_TERMINATED ? fib->cont.self : Qnil; +} + +static VALUE +rb_threadlet_value(VALUE self) +{ + rb_fiber_t *fib; + GetFiberPtr(self, fib); + + threadlet_join(fib, 0); + return fib->cont.value; +} + /* * Document-class: FiberError * @@ -1943,6 +2051,8 @@ fiber_to_s(VALUE fibval) void Init_Cont(void) { + VALUE rb_cThreadlet; + #if FIBER_USE_NATIVE rb_thread_t *th = GET_THREAD(); @@ -1964,6 +2074,11 @@ Init_Cont(void) rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1); rb_define_method(rb_cFiber, "to_s", fiber_to_s, 0); rb_define_alias(rb_cFiber, "inspect", "to_s"); + + rb_cThreadlet = rb_define_class("Threadlet", rb_cFiber); + rb_define_method(rb_cThreadlet, "start", rb_threadlet_start, -1); + rb_define_method(rb_cThreadlet, "join", rb_threadlet_join, -1); + rb_define_method(rb_cThreadlet, "value", rb_threadlet_value, 0); } RUBY_SYMBOL_EXPORT_BEGIN diff --git a/include/ruby/io.h b/include/ruby/io.h index cde932ff32..6e2d086ba4 100644 --- a/include/ruby/io.h +++ b/include/ruby/io.h @@ -120,6 +120,8 @@ typedef struct rb_io_t { /* #define FMODE_UNIX 0x00200000 */ /* #define FMODE_INET 0x00400000 */ /* #define FMODE_INET6 0x00800000 */ +/* #define FMODE_IOM_PRIVATE1 0x01000000 */ /* OS-dependent */ +/* #define FMODE_IOM_PRIVATE2 0x02000000 */ /* OS-dependent */ #define GetOpenFile(obj,fp) rb_io_check_closed((fp) = RFILE(rb_io_taint_check(obj))->fptr) diff --git a/iom.h b/iom.h new file mode 100644 index 0000000000..7007ed97fa --- /dev/null +++ b/iom.h @@ -0,0 +1,95 @@ +/* + * iom -> I/O Manager for RubyVM (Threadlet-aware) + * + * On platforms with epoll or kqueue, this should be ready for multicore; + * even if the rest of the RubyVM is not. + * + * Some inspiration taken from Mio in GHC: + * http://haskell.cs.yale.edu/wp-content/uploads/2013/08/hask035-voellmy.pdf + */ +#ifndef RUBY_IOM_H +#define RUBY_IOM_H +#include "ruby.h" +#include "ruby/io.h" +#include "ruby/intern.h" +#include "vm_core.h" + +typedef struct rb_iom_struct rb_iom_t; + +/* WARNING: unstable API, only for Ruby internal use */ + +/* + * Note: the first "rb_thread_t *" is a placeholder and may be replaced + * with "rb_execution_context_t *" in the future. + */ + +/* + * All functions with "wait" in it take an optional double * +timeout+ + * argument specifying the timeout in seconds. If NULL, it can wait + * forever until the event happens (or the fiber is explicitly resumed). + * + * (maybe) TODO: If non-NULL, the timeout will be updated to the + * remaining time upon returning. Not sure if useful, could just be + * a a waste of cycles; so not implemented, yet. + */ + +/* + * Relinquish calling fiber while waiting for +events+ on the given + * +rb_io_t+ + * + * Multiple native threads can enter this function at the same time. + * + * Events are RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI + * + * Returns a mask of events. + */ + +int rb_iom_waitio(rb_thread_t *, rb_io_t *, int events, double *timeout); + +/* + * Identical to rb_iom_waitio, but takes a pointer to an integer file + * descriptor, instead of rb_io_t. Use rb_iom_waitio when possible, + * since it allows us to optimize epoll (and perhaps avoid kqueue + * portability bugs across different *BSDs). + */ +int rb_iom_waitfd(rb_thread_t *, int *fdp, int events, double *timeout); + +int rb_iom_select(rb_thread_t *, int maxfd, rb_fdset_t *r, + rb_fdset_t *w, rb_fdset_t *e, double *timeout); + +/* + * Relinquish calling fiber to wait for the given PID to change status. + * Multiple native threads can enter this function at the same time. + * If timeout is negative, wait forever. + */ +rb_pid_t rb_iom_waitpid(rb_thread_t *, + rb_pid_t, int *status, int options, double *timeout); + +/* + * Relinquish calling fiber for at least the duration of given timeout + * in seconds. If timeout is negative, wait forever (until explicitly + * resumed). + * Multiple native threads can enter this function at the same time. + */ +void rb_iom_sleep(rb_thread_t *, double *timeout); + +/* callback for SIGCHLD, needed to implemented for rb_iom_waitpid */ +void rb_iom_sigchld(rb_vm_t *); + +/* + * there is no public create function, creation is lazy to avoid incurring + * overhead for small scripts which do not need fibers, we only need this + * at VM destruction + */ +void rb_iom_destroy(rb_vm_t *); + +/* + * schedule + */ +void rb_iom_schedule(rb_thread_t *th, double *timeout); + +/* cont.c */ +int rb_threadlet_sched_p(const rb_thread_t *); +rb_thread_t *rb_fiber_owner_thread(VALUE); + +#endif /* RUBY_IOM_H */ diff --git a/iom_common.h b/iom_common.h new file mode 100644 index 0000000000..5334604117 --- /dev/null +++ b/iom_common.h @@ -0,0 +1,204 @@ +/* included by iom_(epoll|select|kqueue).h */ + +/* we lazily create this, small scripts may never need iom */ +static rb_iom_t * +rb_iom_new(rb_thread_t *th) +{ + rb_iom_t *iom = ALLOC(rb_iom_t); + rb_iom_init(iom); + return iom; +} + +static rb_iom_t * +rb_iom_get(rb_thread_t *th) +{ + VM_ASSERT(th && th->vm); + if (!th->vm->iom) { + th->vm->iom = rb_iom_new(th); + } + return th->vm->iom; +} + +/* check for expired timers */ +static void +rb_iom_timer_check(const rb_thread_t *th) +{ + rb_iom_t *iom = th->vm->iom; + if (iom && !list_empty(&iom->timers)) { + struct rb_iom_timer *t = 0, *next = 0; + double now = timeofday(); + + list_for_each_safe(&iom->timers, t, next, n.tnode) { + if (t->expires_at <= now) { + struct rb_iom_waiter *w = rb_iom_waiter_of(t); + VALUE fibval = rb_iom_timer_fibval(t); + + if (w) { + list_del_init(&w->wnode); + } + list_del_init(&t->n.tnode); + /* non-Thibers may set timer in rb_iom_schedule */ + if (fibval != Qfalse) { + rb_thread_t *owner = rb_fiber_owner_thread(fibval); + list_add_tail(&owner->afrunq, &t->n.rnode); + } + } + return; /* done, timers is a sorted list */ + } + } +} + +/* insert a new +timer+ into +timers+, maintain sort order by expires_at */ +static void +rb_iom_timer_add(rb_thread_t *th, struct rb_iom_timer *add, + const double *timeout, int flags) +{ + add->_fibval = flags & IOM_FIB ? rb_fiber_current() : Qfalse; + add->_fibval |= flags & IOM_WAIT ? 0 : IOM_FIBMASK; + rb_iom_timer_check(th); + + if (timeout) { + rb_iom_t *iom = rb_iom_get(th); + struct rb_iom_timer *i = 0; + add->expires_at = timeofday() + *timeout; + + /* + * search backwards: assume typical projects have multiple objects + * sharing the same timeout values, so new timers will expire later + * than existing timers + */ + list_for_each_rev(&iom->timers, i, n.tnode) { + if (add->expires_at > i->expires_at) { + list_add_after(&iom->timers, &i->n.tnode, &add->n.tnode); + return; + } + } + list_add(&iom->timers, &add->n.tnode); + } + else { + /* not active, just allow list_del to function */ + list_node_init(&add->n.tnode); + } +} + +/* max == -1 : wake all */ +static void +rb_iom_blockers_notify(rb_iom_t *iom, int max) +{ + struct rb_iom_blocker *b = 0, *next = 0; + + list_for_each_safe(&iom->blockers, b, next, bnode) { + list_del_init(&b->bnode); + ubf_select(b->th); + if (--max == 0) { + break; + } + } +} + +/* + * TODO: consider EVFILT_PROC for kqueue and netlink+epoll on Linux; + * see the "god" RubyGem for usage examples. + * However, I doubt rb_waitpid scalability will be a problem and + * the simplicity of a single implementation for all is appealing. + */ +#ifdef HAVE_SYS_TYPES_H +# include +#endif +#ifdef HAVE_SYS_WAIT_H +# include +#endif +#if defined(WNOHANG) && WNOHANG != 0 && \ + (defined(HAVE_WAITPID) || defined(HAVE_WAIT4)) + +static VALUE +iom_schedule_pid(VALUE ptr) +{ + struct rb_iom_pid_waiter *pw = (struct rb_iom_pid_waiter *)ptr; + rb_thread_t *th = pw->th; + size_t nresume; + + rb_threadlet_do_yield_p(th, &nresume); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + + /* check ints above could've unlinked us and marked us ready */ + if (list_empty((struct list_head *)&pw->w.wnode)) { + list_del_init(&pw->w.timer.n.rnode); + return Qfalse; + } + return rb_fiber_yield(0, 0); +} + +rb_pid_t +rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options, + double *timeout) +{ + struct rb_iom_pid_waiter pw; + + pw.options = options; + VM_ASSERT((options & WNOHANG) == 0 && + "WNOHANG should be handled in rb_waitpid"); + + /* + * unlike rb_iom_waitfd, we typically call *waitpid before + * trying with a non-blocking operation + */ + pw.pid = rb_waitpid(pid, &pw.status, pw.options | WNOHANG); + + if (pw.pid == 0) { + rb_iom_t *iom = rb_iom_get(th); + + pw.th = th; + pw.pid = pid; + rb_iom_timer_add(th, &pw.w.timer, timeout, IOM_FIB|IOM_WAIT); + + /* LIFO, to match Linux wait4() blocking behavior */ + list_add(&iom->pids, &pw.w.wnode); + rb_ensure(iom_schedule_pid, (VALUE)&pw, + rb_iom_waiter_done, (VALUE)&pw.w); + if (pw.pid == -1) { + errno = pw.errnum; + } + } + if (status) { + *status = pw.status; + } + if (pw.pid > 0) { + rb_last_status_set(pw.status, pw.pid); + } + return pw.pid; +} + +void +rb_iom_sigchld(rb_vm_t *vm) +{ + rb_iom_t *iom = vm->iom; + if (iom) { + struct rb_iom_pid_waiter *pw = 0, *next = 0; + size_t nr = 0; + + list_for_each_safe(&iom->pids, pw, next, w.wnode) { + pid_t r = rb_waitpid(pw->pid, &pw->status, pw->options | WNOHANG); + + if (r == 0) { + continue; + } + if (r == -1) { + pw->errnum = errno; + } + nr++; + pw->pid = r; + rb_iom_waiter_ready(&pw->w); + } + if (nr) + rb_iom_blockers_notify(iom, -1); + } +} +#else +rb_pid_t +rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options, + double *timeout) +{ + rb_bug("Should not get here, WNOHANG not implemented"); +} +#endif /* defined(WNOHANG) && (defined(HAVE_WAITPID) || defined(HAVE_WAIT4)) */ diff --git a/iom_epoll.h b/iom_epoll.h new file mode 100644 index 0000000000..c38b418c2c --- /dev/null +++ b/iom_epoll.h @@ -0,0 +1,709 @@ +/* + * Linux-only epoll-based implementation of I/O Manager for RubyVM + * + * Notes: + * + * TODO: epoll_wait only has millisecond resolution; if we need higher + * resolution we can use timerfd or ppoll on the epoll_fd itself. + * + * Inside the Linux kernel, select/poll/ppoll/epoll_wait all use the + * same notification callback (struct file_operations)->poll. + * Unlike with kqueue across different *BSDs; we do not need to worry + * about inconsistencies between these syscalls. + * + * See also notes in iom_kqueue.h + */ +#include "iom_internal.h" +#include +#include /* round() */ +#define FMODE_IOM_ADDED FMODE_IOM_PRIVATE1 + +/* allocated on heap (rb_vm_t.iom) */ +struct rb_iom_struct { + /* + * Everything here is protected by GVL at this time, + * URCU lists (LGPL-2.1+) may be used in the future + */ + + /* we NEVER need to scan these, only insert + delete + empty check */ + struct list_head epws; /* -epw.w.wnode, order agnostic */ + struct list_head esets; /* -epw_set.fsw.w.wnode, order agnostic */ + + struct list_head timers; /* -rb_iom_timer.n.tnode, sort by expire_at */ + struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */ + struct rb_iom_fdmap fdmap; /* maps each FD to multiple epw */ + + int epoll_fd; + int maxevents; /* auto-increases */ + struct list_head blockers; /* -rb_iom_blocker.bnode */ +}; + +struct epw_set; + +struct epfdn { + union { + struct list_node fdnode; + struct { + rb_thread_t *th; + struct rb_iom_fd *fdh; + } pre_ctl; + } as; + union { + int *flags; /* &fptr->mode */ + struct epw_set *set; + } owner; +}; + +/* + * Not using rb_iom_fd_waiter here, since we never need to reread the + * FD on this implementation. + * Allocated on stack + */ +struct epw { + struct epfdn fdn; + struct rb_iom_waiter w; + int fd; /* no need for "int *", here, we never reread */ + short events; /* requested events, like poll(2) */ + short revents; /* returned events, like poll(2) */ +}; + +/* allocated on-stack */ +struct epw_set { + struct rb_iom_fdset_waiter fsw; + st_table *tbl; /* after: fd -> epw_set_item */ + uint8_t idx; +}; + +/* + * per-FD in rb_thread_fd_select arg, value for epw_set.fdn.as.tbl, + * allocated on-heap + */ +struct epw_set_item { + struct epfdn fdn; + int fd; + short events; /* requested events, like poll(2) */ + short revents; /* returned events, like poll(2) */ +}; + +static void +increase_maxevents(rb_iom_t *iom, int retries) +{ + /* 1024 is the RUBY_ALLOCV_LIMIT on such systems */ + const int max_alloca = 1024 / sizeof(struct epoll_event); + const int max = max_alloca * 2; + + if (retries) { + iom->maxevents *= retries; + if (iom->maxevents > max || iom->maxevents <= 0) { + iom->maxevents = max; + } + } +} + +static int +double2msec(double sec) +{ + /* + * clamp timeout to workaround a Linux <= 2.6.37 bug, + * see epoll_wait(2) manpage + */ + const int max_msec = 35 * 60 * 1000; /* floor(35.79 minutes) */ + if (sec < 0) { + return -1; + } + else { + double msec = round(sec * 1000); + + if (msec < (double)max_msec) { + int ret = (int)msec; + return ret < 0 ? 0 : ret; + } + return max_msec; + } +} + +/* we can avoid branches when mapping RB_WAIT_* bits to EPOLL* bits */ +STATIC_ASSERT(epbits_matches_waitfd_bits, + RB_WAITFD_IN == EPOLLIN && RB_WAITFD_OUT == EPOLLOUT && + RB_WAITFD_PRI == EPOLLPRI); + +/* what goes into epoll_ctl... */ +static int +rb_events2ep(int events) +{ + return EPOLLONESHOT | events; +} + +/* ...what comes out of epoll_wait */ +static short +rb_ep2revents(int revents) +{ + return (short)(revents & (EPOLLIN|EPOLLOUT|EPOLLPRI)); +} + +/* lazily create epoll FD, since not everybody waits on I/O */ +static int +iom_epfd(rb_iom_t *iom) +{ + if (iom->epoll_fd < 0) { +#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) + iom->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (iom->epoll_fd < 0) { + int err = errno; + if (rb_gc_for_fd(err)) { + iom->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (iom->epoll_fd < 0) { + rb_sys_fail("epoll_create1"); + } + } + else if (err != ENOSYS) { + rb_syserr_fail(err, "epoll_create1"); + } + else { /* libc >= kernel || build-env > run-env */ +#endif /* HAVE_EPOLL_CREATE1 */ + iom->epoll_fd = epoll_create(1); + if (iom->epoll_fd < 0) { + if (rb_gc_for_fd(errno)) { + iom->epoll_fd = epoll_create(1); + } + } + if (iom->epoll_fd < 0) { + rb_sys_fail("epoll_create"); + } + rb_maygvl_fd_fix_cloexec(iom->epoll_fd); +#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) + } + } +#endif /* HAVE_EPOLL_CREATE1 */ + rb_update_max_fd(iom->epoll_fd); + } + return iom->epoll_fd; +} + +static void +rb_iom_init(rb_iom_t *iom) +{ + list_head_init(&iom->timers); + list_head_init(&iom->epws); + list_head_init(&iom->esets); + list_head_init(&iom->pids); + list_head_init(&iom->blockers); + iom->maxevents = 8; + iom->epoll_fd = -1; + rb_iom_fdmap_init(&iom->fdmap); +} + +static void +rearm_fd(rb_thread_t *th, struct rb_iom_fd *fdh) +{ + int fd = -1; + struct epoll_event ev; + struct epw_set_item *esi; + struct epw *epw; + struct epfdn *fdn; + + ev.events = EPOLLONESHOT; + ev.data.ptr = fdh; + list_for_each(&fdh->fdhead, fdn, as.fdnode) { + if (fdn->owner.set) { + esi = container_of(fdn, struct epw_set_item, fdn); + fd = esi->fd; + ev.events |= rb_events2ep(esi->events); + } + else { + epw = container_of(fdn, struct epw, fdn); + fd = epw->fd; + ev.events |= rb_events2ep(epw->events); + } + } + if (epoll_ctl(th->vm->iom->epoll_fd, EPOLL_CTL_MOD, fd, &ev) != 0) { + rb_sys_fail("epoll_ctl"); + } +} + +static void +check_epoll_wait(rb_thread_t *th, int nr, struct epoll_event *ev) +{ + if (nr >= 0) { + int i; + + for (i = 0; i < nr; i++) { + struct rb_iom_fd *fdh = ev[i].data.ptr; + struct epfdn *fdn, *next; + short revents = rb_ep2revents(ev[i].events); + struct epw_set_item *esi; + struct epw *epw; + + /* + * Typical list size is 1; only multiple fibers waiting + * on the same FD increases fdh list size + */ + list_for_each_safe(&fdh->fdhead, fdn, next, as.fdnode) { + struct epw_set *eset = fdn->owner.set; + + if (eset) { + esi = container_of(fdn, struct epw_set_item, fdn); + esi->revents = esi->events & revents; + if (esi->revents) { + int ret = eset->fsw.ret++; + list_del_init(&fdn->as.fdnode); + if (!ret) { + rb_iom_waiter_ready(&eset->fsw.w); + } + } + } + else { + epw = container_of(fdn, struct epw, fdn); + epw->revents = epw->events & revents; + if (epw->revents) { + list_del_init(&fdn->as.fdnode); + rb_iom_waiter_ready(&epw->w); + } + } + } + if (RB_UNLIKELY(!list_empty(&fdh->fdhead))) { + rearm_fd(th, fdh); + } + } + + /* notify the waiter thread in case we enqueued fibers for them */ + if (nr > 0) { + rb_iom_blockers_notify(th->vm->iom, -1); + } + } + else { + int err = errno; + if (err != EINTR) { + rb_syserr_fail(err, "epoll_wait"); + } + } + rb_iom_timer_check(th); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); +} + +/* perform a non-blocking epoll_wait while holding GVL */ +static void +ping_events(rb_thread_t *th, struct epw_set *eset) +{ + rb_iom_t *iom = th->vm->iom; + int epfd = iom ? iom->epoll_fd : -1; + + if (epfd >= 0) { + VALUE v; + int nr; + int maxevents = iom->maxevents; + struct epoll_event *ev = ALLOCV_N(struct epoll_event, v, maxevents); + int retries = 0; + + do { + nr = epoll_wait(epfd, ev, maxevents, 0); + check_epoll_wait(th, nr, ev); + } while (list_empty(&th->afrunq) && nr == maxevents && ++retries); + if (v) { + ALLOCV_END(v); + } + increase_maxevents(iom, retries); + } +} + +/* for iom_pingable_common.h */ +static void +rb_iom_do_wait(rb_thread_t *th, rb_iom_t *iom) +{ + int maxevents = iom->maxevents; + int nr = maxevents; + double timeout; + + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + timeout = list_empty(&th->afrunq) ? rb_iom_next_timeout(&iom->timers) : 0; + + if (timeout != 0 && (!list_empty(&iom->epws) || !list_empty(&iom->pids) + || !list_empty(&iom->esets))) { + VALUE v; + int epfd = iom_epfd(th->vm->iom); /* may raise */ + struct epoll_event *ev = ALLOCV_N(struct epoll_event, v, maxevents); + int msec = double2msec(timeout); + struct rb_iom_blocker cur; + + VM_ASSERT(epfd >= 0); + cur.th = th; + list_add_tail(&iom->blockers, &cur.bnode); + nr = 0; + BLOCKING_REGION({ + nr = epoll_wait(epfd, ev, maxevents, msec); + }, ubf_select, th, TRUE); + list_del(&cur.bnode); + check_epoll_wait(th, nr, ev); + if (v) { + ALLOCV_END(v); + } + } + if (nr == maxevents) { /* || timeout == 0 */ + ping_events(th, 0); + } +} + +static void +merge_events(struct epoll_event *ev, struct rb_iom_fd *fdh) +{ + struct epfdn *fdn; + + list_for_each(&fdh->fdhead, fdn, as.fdnode) { + if (fdn->owner.set) { + struct epw_set_item *esi; + + esi = container_of(fdn, struct epw_set_item, fdn); + ev->events |= rb_events2ep(esi->events); + } + else { + struct epw *epw = container_of(fdn, struct epw, fdn); + ev->events |= rb_events2ep(epw->events); + } + } +} + +static void +epoll_ctl_or_raise(rb_thread_t *th, struct epw *epw) +{ + int e; + int epfd; + struct epoll_event ev; + int *flags = epw->fdn.owner.flags; + + epw->fdn.owner.flags = 0; /* ensure it's not confused with owner.set */ + + /* we cannot raise until list_add: */ + { + struct rb_iom_fd *fdh = epw->fdn.as.pre_ctl.fdh; + + ev.data.ptr = fdh; + ev.events = rb_events2ep(epw->events); + /* + * merge events from other threads/fibers waiting on the same + * [ descriptor (int fd), description (struct file *) ] tuplet + */ + if (!list_empty(&fdh->fdhead)) { /* uncommon, I hope... */ + merge_events(&ev, fdh); + } + list_add(&fdh->fdhead, &epw->fdn.as.fdnode); + } + + epfd = iom_epfd(th->vm->iom); /* may raise */ + + /* we want to track if an FD is already being watched ourselves */ + if (flags) { + if (*flags & FMODE_IOM_ADDED) { /* ideal situation */ + e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev); + } + else { + e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev); + if (e == 0) { + *flags |= FMODE_IOM_ADDED; + } + else if (e < 0 && errno == EEXIST) { + /* + * possible EEXIST if several fptrs point to the same FD: + * f1 = Fiber.start { io1.read(1) } + * io2 = IO.for_fd(io1.fileno) + * f2 = Fiber.start { io2.read(1) } + */ + *flags |= FMODE_IOM_ADDED; + e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev); + } + } + } + else { /* don't know if added or not, fall back to add on ENOENT */ + e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev); + if (e < 0 && errno == ENOENT) { + e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev); + } + } + if (e < 0) { + e = errno; + if (e == EPERM) { /* all ready if File or Dir */ + epw->revents = epw->events; + } + else { + rb_syserr_fail(e, "epoll_ctl"); + } + } +} + +static VALUE +epmod_yield(VALUE ptr) +{ + /* we must have no posibility of raising until list_add: */ + struct epw *epw = (struct epw *)ptr; + rb_thread_t *th = epw->fdn.as.pre_ctl.th; + size_t nresume; + + epoll_ctl_or_raise(th, epw); + ping_events(th, 0); + (void)rb_threadlet_do_yield_p(th, &nresume); + if (epw->revents) { + list_del_init(&epw->w.timer.n.rnode); + return Qfalse; + } + return rb_fiber_yield(0, 0); +} + +static VALUE +epw_done(VALUE ptr) +{ + struct epw *epw = (struct epw *)ptr; + list_del(&epw->fdn.as.fdnode); + return rb_iom_waiter_done((VALUE)&epw->w); +} + +static int +iom_waitfd(rb_thread_t *th, int fd, int *flags, int events, double *timeout) +{ + rb_iom_t *iom = rb_iom_get(th); + struct epw epw; + + /* unlike kqueue or select, we never need to reread fd */ + epw.fd = fd; + if (epw.fd < 0) { /* TODO: behave like poll(2) and sleep? */ + return 0; + } + + /* may raise on OOM: */ + epw.fdn.as.pre_ctl.fdh = rb_iom_fd_get(&iom->fdmap, epw.fd); + epw.fdn.as.pre_ctl.th = th; + epw.fdn.owner.flags = flags; + /* + * if we did not have GVL, revents may be set immediately + * upon epoll_ctl by another thread running epoll_wait, + * so we must initialize it before epoll_ctl: + */ + epw.revents = 0; + epw.events = (short)events; + + list_add(&iom->epws, &epw.w.wnode); + rb_iom_timer_add(th, &epw.w.timer, timeout, IOM_FIB|IOM_WAIT); + rb_ensure(epmod_yield, (VALUE)&epw, epw_done, (VALUE)&epw); + + return (int)epw.revents; /* may be zero if timed out */ +} + +int +rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, double *timeout) +{ + return iom_waitfd(th, fptr->fd, &fptr->mode, events, timeout); +} + +int +rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout) +{ + return iom_waitfd(th, *fdp, 0, events, timeout); +} + +static const short idx2events[] = { EPOLLIN, EPOLLOUT, EPOLLPRI }; + +static int +eset_upd(st_data_t *key, st_data_t *v, st_data_t arg, int existing) +{ + int fd = (int)*key; + struct epw_set *eset = (struct epw_set *)arg; + short events = idx2events[eset->idx]; + struct epw_set_item **val = (struct epw_set_item **)v; + struct epw_set_item *esi; + struct epoll_event ev; + int epfd = iom_epfd(eset->fsw.th->vm->iom); /* may raise */ + struct rb_iom_fd *fdh = rb_iom_fd_get(&eset->fsw.th->vm->iom->fdmap, fd); + + ev.data.ptr = fdh; + ev.events = rb_events2ep(events); + if (existing) { + /* + * this happens when an FD is in two fdsets for a single + * rb_thread_fd_select call. Does not happen if different + * Fibers call rb_thread_fd_select on the same FD. + */ + esi = *val; + esi->events |= events; + merge_events(&ev, fdh); + } + else { + esi = ALLOC(struct epw_set_item); + *val = esi; + esi->fd = fd; + esi->revents = 0; + esi->events = events; + esi->fdn.owner.set = eset; + if (!list_empty(&fdh->fdhead)) { + /* + * happens when different Fibers call rb_thread_fd_select + * or rb_wait_for_single_fd on the same FD + */ + merge_events(&ev, fdh); + } + list_add(&fdh->fdhead, &esi->fdn.as.fdnode); + } + if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) != 0) { + int e = errno; + + if (e == ENOENT) { + if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) == 0) { + return ST_CONTINUE; + } + e = errno; + } + if (e == EPERM) { + esi->revents = esi->events; + eset->fsw.ret++; + } + else { + *val = 0; + list_del_init(&esi->fdn.as.fdnode); + xfree(esi); + rb_syserr_fail(e, "epoll_ctl"); + } + } + return ST_CONTINUE; +} + +static VALUE +epfdset_yield(VALUE ptr) +{ + struct epw_set *eset = (struct epw_set *)ptr; + size_t nresume; + eset->tbl = st_init_numtable(); /* may raise */ + + /* must not call epoll_wait while this loop is running: */ + for (eset->idx = 0; eset->idx < 3; eset->idx++) { + const rb_fdset_t *src = eset->fsw.in[eset->idx]; + + if (src) { + int max = rb_fd_max(src); + int fd; + + for (fd = 0; fd < max; fd++) { + if (rb_fd_isset(fd, src)) { + st_update(eset->tbl, (st_data_t)fd, + eset_upd, (st_data_t)eset); + } + } + } + } + /* OK to call epoll_wait, now: */ + + ping_events(eset->fsw.th, 0); + (void)rb_threadlet_do_yield_p(eset->fsw.th, &nresume); + if (eset->fsw.ret) { + list_del_init(&eset->fsw.w.timer.n.rnode); + return Qfalse; + } + return rb_fiber_yield(0, 0); +} + +static rb_fdset_t * +fd_init_once(rb_fdset_t **dst, rb_fdset_t *src) +{ + if (!*dst) { + rb_fd_init(src); + *dst = src; + } + return *dst; +} + +static int +eset_term(st_data_t key, st_data_t val, st_data_t ptr) +{ + struct epw_set *eset = (struct epw_set *)ptr; + int fd = (int)key; + struct epw_set_item *esi = (struct epw_set_item *)val; + + if (esi) { + struct rb_iom_fdset_waiter *fsw = &eset->fsw; + + list_del(&esi->fdn.as.fdnode); + for (eset->idx = 0; eset->idx < 3; eset->idx++) { + if (esi->revents & idx2events[eset->idx]) { + rb_fd_set(fd, + fd_init_once(&fsw->out[eset->idx], &fsw->sets[eset->idx])); + } + } + xfree(esi); + } + + return ST_DELETE; +} + +static VALUE +epfdset_done(VALUE ptr) +{ + struct epw_set *eset = (struct epw_set *)ptr; + + if (eset->tbl) { + st_foreach(eset->tbl, eset_term, (VALUE)eset); + st_free_table(eset->tbl); + } + + list_del(&eset->fsw.w.timer.n.tnode); + list_del(&eset->fsw.w.wnode); + for (eset->idx = 0; eset->idx < 3; eset->idx++) { + rb_fdset_t *orig = eset->fsw.in[eset->idx]; + rb_fdset_t *res = eset->fsw.out[eset->idx]; + + if (res) { + rb_fd_dup(orig, res); + rb_fd_term(res); + } + } + return Qfalse; +} + +int +rb_iom_select(rb_thread_t *th, int maxfd, + rb_fdset_t *r, rb_fdset_t *w, rb_fdset_t *e, + double *timeout) +{ + rb_iom_t *iom = rb_iom_get(th); + struct epw_set eset; + + eset.tbl = 0; + rb_iom_fdset_waiter_init(th, &eset.fsw, maxfd, r, w, e); + list_add(&iom->esets, &eset.fsw.w.wnode); + rb_iom_timer_add(th, &eset.fsw.w.timer, timeout, IOM_FIB|IOM_WAIT); + rb_ensure(epfdset_yield, (VALUE)&eset, epfdset_done, (VALUE)&eset); + + return eset.fsw.ret; +} + +void +rb_iom_destroy(rb_vm_t *vm) +{ + rb_iom_t *iom = vm->iom; + vm->iom = 0; + if (iom) { + /* + * it's possible; but crazy to share epoll FDs across processes + * (kqueue has a rather unique close-on-fork behavior) + */ + if (iom->epoll_fd >= 0) { + close(iom->epoll_fd); + } + rb_iom_fdmap_destroy(&iom->fdmap); + xfree(iom); + } +} + +/* used by thread.c::rb_thread_atfork */ +static void +rb_iom_atfork_child(rb_thread_t *th) +{ + rb_iom_destroy(th->vm); +} + +/* used by thread_pthread.c */ +static int +rb_iom_reserved_fd(int fd) +{ + rb_iom_t *iom = GET_VM()->iom; + + return iom && fd == iom->epoll_fd; +} + +#include "iom_pingable_common.h" +#include "iom_common.h" diff --git a/iom_internal.h b/iom_internal.h new file mode 100644 index 0000000000..bd41c7841e --- /dev/null +++ b/iom_internal.h @@ -0,0 +1,280 @@ +#ifndef RB_IOM_COMMON_H +#define RB_IOM_COMMON_H + +#include "internal.h" +#include "iom.h" + +/* cont.c */ +void rb_threadlet_enqueue(VALUE fibval); + +#define FMODE_IOM_PRIVATE1 0x01000000 +#define FMODE_IOM_PRIVATE2 0x02000000 + +#define IOM_FIBMASK ((VALUE)0x1) +#define IOM_FIB (0x2) +#define IOM_WAIT (0x1) /* container_of(..., struct rb_iom_waiter, timer) */ + +/* + * fdmap is a singleton. + * + * It makes zero sense to have multiple fdmaps per-process; even less so + * than multiple ioms. The file descriptor table in POSIX is per-process; + * and POSIX requires syscalls to allocate the lowest available FD. + * This is also why we use an array instead of a hash table, as there will + * be no holes for big processes. + * + * If contention becomes a problem, we can pad (struct rb_iom_fd) to + * 64-bytes for cache alignment. + * + * Currently we use fdmap to deal with FD aliasing with epoll + * and kqueue interfaces.. FD aliasing happens when multiple + * Fibers wait on the same FD; but epoll/kqueue APIs only allow + * registering a single data pointer per FD. + * + * In the future, we may implement rb_notify_fd_close using fdmap. + */ + +/* linear growth based on power-of-two */ +#define RB_IOM_FD_PER_HEAP 64 +/* on-heap and persistent for process lifetime keep as small as possible. */ +struct rb_iom_fd { + struct list_head fdhead; /* -kev.(rfdnode|wfdnode), epw.fdnode */ +}; + +/* singleton (per-rb_iom_t, or per process, if we ever need > 1 iom) */ +struct rb_iom_fdmap { + struct rb_iom_fd **map; + unsigned int heaps; + int max_fd; +}; + +/* allocated on stack */ +/* Every Thiber has this on stack */ +struct rb_iom_timer { + union { + struct list_node rnode; /* <=> rb_thread_t.afrunq */ + struct list_node tnode; /* <=> rb_iom_struct.timers */ + } n; + double expires_at; /* absolute monotonic time */ + VALUE _fibval; +}; + +/* common waiter struct for waiting fds and pids */ +struct rb_iom_waiter { + struct rb_iom_timer timer; + struct list_node wnode; /* <=> rb_iom_struct.(fds|pids) */ +}; + +/* for rb_wait_for_single_fd: */ +struct rb_iom_fd_waiter { + struct rb_iom_waiter w; /* w.wnode - iom->fds */ + int *fdp; /* (ideally), a pointer fptr->fd to detect closed FDs */ + short events; /* requested events, like poll(2) */ + short revents; /* returned events, like poll(2) */ +}; + +/* for rb_thread_fd_select: */ +struct rb_iom_fdset_waiter { + struct rb_iom_waiter w; /* w.wnode - iom->fdsets */ + rb_thread_t *th; + int max; + int ret; + rb_fdset_t *in[3]; + rb_fdset_t *out[3]; + rb_fdset_t sets[3]; +}; + +struct rb_iom_pid_waiter { + struct rb_iom_waiter w; /* w.wnode - iom->pids */ + rb_thread_t *th; + /* same pid, status, options same as waitpid(2) */ + rb_pid_t pid; + int status; + int options; + int errnum; +}; + +/* threads sleeping in select, epoll_wait or kevent w/o GVL; on stack */ +struct rb_iom_blocker { + rb_thread_t *th; + struct list_node bnode; /* -iom->blockers */ +}; + +#if (RUBYVM_IOM == IOM_KQUEUE || RUBYVM_IOM == IOM_EPOLL) +/* TODO: IOM_SELECT may use this for rb_notify_fd_close */ +static struct rb_iom_fd * +iom_fdhead_aref(struct rb_iom_fdmap *fdmap, int fd) +{ + VM_ASSERT(fd >= 0); + return &fdmap->map[fd / RB_IOM_FD_PER_HEAP][fd % RB_IOM_FD_PER_HEAP]; +} + +static struct rb_iom_fd * +rb_iom_fd_get(struct rb_iom_fdmap *fdmap, int fd) +{ + if (fd >= fdmap->max_fd) { + struct rb_iom_fd *base, *h; + unsigned n = fdmap->heaps + 1; + unsigned i; + + fdmap->map = xrealloc2(fdmap->map, n, sizeof(struct rb_iom_fd *)); + base = h = ALLOC_N(struct rb_iom_fd, RB_IOM_FD_PER_HEAP); + for (i = 0; i < RB_IOM_FD_PER_HEAP; i++) { + list_head_init(&h->fdhead); + h++; + } + fdmap->map[fdmap->heaps] = base; + fdmap->max_fd += RB_IOM_FD_PER_HEAP; + } + return iom_fdhead_aref(fdmap, fd); +} + +static void +rb_iom_fdmap_init(struct rb_iom_fdmap *fdmap) +{ + fdmap->max_fd = 0; + fdmap->heaps = 0; + fdmap->map = 0; +} + +static void +rb_iom_fdmap_destroy(struct rb_iom_fdmap *fdmap) +{ + unsigned n; + + for (n = 0; n < fdmap->heaps; n++) { + xfree(fdmap->map[n]); + } + xfree(fdmap->map); + rb_iom_fdmap_init(fdmap); +} +#endif /* (RUBYVM_IOM == IOM_KQUEUE || RUBYVM_IOM == IOM_EPOLL) */ + +static VALUE +rb_iom_timer_fibval(const struct rb_iom_timer *t) +{ + return t->_fibval & ~IOM_FIBMASK; +} + +static struct rb_iom_waiter * +rb_iom_waiter_of(struct rb_iom_timer *t) +{ + if (t->_fibval & IOM_FIBMASK) { + return 0; + } + return container_of(t, struct rb_iom_waiter, timer); +} + +static double +rb_iom_next_timeout(struct list_head *timers) +{ + struct rb_iom_timer *t = list_top(timers, struct rb_iom_timer, n.tnode); + + if (t) { + double diff = t->expires_at - timeofday(); + return diff <= 0.0 ? 0 : diff; + } + else { + return -1; + } +} + +static void rb_iom_timer_check(const rb_thread_t *); +static void rb_iom_timer_add(rb_thread_t *, struct rb_iom_timer *, + const double *timeout, int flags); + +static VALUE +rb_iom_timer_done(VALUE ptr) +{ + struct rb_iom_timer *t = (struct rb_iom_timer *)ptr; + list_del(&t->n.tnode); + return Qfalse; +} + +static void +rb_iom_waiter_ready(struct rb_iom_waiter *w) +{ + VALUE fibval = rb_iom_timer_fibval(&w->timer); + + list_del_init(&w->wnode); + list_del_init(&w->timer.n.tnode); + if (fibval != Qfalse) { + rb_thread_t *owner = rb_fiber_owner_thread(fibval); + list_add_tail(&owner->afrunq, &w->timer.n.rnode); + } +} + +static VALUE +rb_iom_waiter_done(VALUE ptr) +{ + struct rb_iom_waiter *w = (struct rb_iom_waiter *)ptr; + list_del(&w->timer.n.tnode); + list_del(&w->wnode); + return Qfalse; +} + +/* cont.c */ +int rb_fiber_resumable_p(const rb_thread_t *, const rb_fiber_t *); + +/* + * resume all "ready" fibers belonging to a given thread + * stop when a fiber has not yielded, yet. + */ +static int +rb_threadlet_do_yield_p(rb_thread_t *th, size_t *nresume) +{ + rb_fiber_t *cur_threadlet = rb_threadlet_sched_p(th) ? th->ec->fiber_ptr : 0; + struct rb_iom_timer *t = 0, *next = 0; + LIST_HEAD(tmp); + + *nresume = 0; + + /* + * do not infinite loop as new fibers get added to + * th->afrunq, only work off a temporary list: + */ + list_append_list(&tmp, &th->afrunq); + list_for_each_safe(&tmp, t, next, n.rnode) { + VALUE fibval = rb_iom_timer_fibval(t); + rb_fiber_t *fib = RTYPEDDATA_DATA(fibval); + + if (fib == cur_threadlet || !rb_fiber_resumable_p(th, fib)) { + /* tell the caller to yield */ + list_prepend_list(&th->afrunq, &tmp); + return 1; + } + (*nresume)++; + rb_fiber_resume(fibval, 0, 0); + } + return 0; +} + +static void +rb_iom_fdset_waiter_init(rb_thread_t *th, struct rb_iom_fdset_waiter *fsw, + int maxfd, rb_fdset_t *r, rb_fdset_t *w, rb_fdset_t *e) +{ + fsw->th = th; + fsw->ret = 0; + fsw->max = maxfd; + assert(fsw->max > 0); + fsw->in[0] = r; + fsw->in[1] = w; + fsw->in[2] = e; + memset(fsw->out, 0, sizeof(fsw->out)); +} + +/* XXX: is this necessary? */ +void +rb_iom_mark_runq(const rb_thread_t *th) +{ + struct rb_iom_timer *t = 0; + + list_for_each(&th->afrunq, t, n.rnode) { + rb_gc_mark(rb_iom_timer_fibval(t)); + } +} + +static rb_iom_t *rb_iom_get(rb_thread_t *); +static void rb_iom_blockers_notify(rb_iom_t *, int max); + +#endif /* IOM_COMMON_H */ diff --git a/iom_kqueue.h b/iom_kqueue.h new file mode 100644 index 0000000000..42c542c202 --- /dev/null +++ b/iom_kqueue.h @@ -0,0 +1,899 @@ +/* + * kqueue-based implementation of I/O Manager for RubyVM on *BSD + * + * The kevent API has an advantage over epoll_ctl+epoll_wait since + * it can simultaneously add filters and check for events with one + * syscall. It also allows EV_ADD to be used idempotently for + * enabling filters, where as epoll_ctl requires separate ADD and + * MOD operations. + * + * These are advantages in the common case... + * + * The epoll API has advantages in more esoteric cases: + * + * epoll has the advantage over kqueue when watching for multiple + * events (POLLIN|POLLOUT|POLLPRI) (which is rare). We also have + * to install two kevent filters to watch POLLIN|POLLOUT simutaneously. + * See udata_set/udata_get functions below for more on this. + * + * Finally, kevent does not support POLLPRI directly, we need to use + * select() (or perhaps poll() on some platforms) with a zero + * timeout to check for POLLPRI after EVFILT_READ returns. + * + * Finally, several *BSDs implement kqueue; and the quality of each + * implementation may vary. Anecdotally, *BSDs are not known to even + * support poll() consistently across different types of files. + * We will need to selective and careful about injecting them into + * kevent(). + */ +#include "iom_internal.h" + +/* LIST_HEAD (via ccan/list) conflicts with sys/queue.h (via sys/event.h) */ +#undef LIST_HEAD +#include +#include +#include + +/* We need to use EVFILT_READ to watch RB_WAITFD_PRI */ +#define WAITFD_READ (RB_WAITFD_IN|RB_WAITFD_PRI) + +/* allocated on heap (rb_vm_t.iom) */ +struct rb_iom_struct { + /* + * Everything here is protected by GVL at this time, + * URCU lists (LGPL-2.1+) may be used in the future + */ + + /* we NEVER need to scan kevs , only insert + delete + empty check */ + struct list_head kevs; /* -kev.fdw.w.wnode, order agnostic */ + struct list_head ksets; /* -kqw_set.fsw.w.wnode, order agnostic */ + + struct list_head timers; /* -rb_iom_timer.n.tnode, sort by expire_at */ + struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */ + struct rb_iom_fdmap rfdmap; /* holds fdh for EVFILT_READ */ + struct rb_iom_fdmap wfdmap; /* holds fdh for EVFILT_WRITE */ + + int kqueue_fd; + int nevents; /* auto-increases */ + struct list_head blockers; /* -rb_iom_blocker.bnode */ +}; + +struct kqw_set; +struct kqfdn { + /* + * both rfdnode and wfdnode are overloaded for deleting paired + * filters when watching both EVFILT_READ and EVFILT_WRITE on + * a single FD + */ + struct list_node rfdnode; /* -(ev.udata==fdh)->fdhead (EVFILT_READ) */ + struct list_node wfdnode; /* -(ev.udata==fdh)->fdhead (EVFILT_WRITE) */ + union { + rb_thread_t *th; + struct kqw_set *set; + } owner; +}; + +/* allocated on stack */ +struct kev { + struct kqfdn fdn; + + /* fdw.w.wnode is overloaded for checking RB_WAITFD_PRI (see check_pri) */ + struct rb_iom_fd_waiter fdw; +}; + +/* allocated on-stack */ +struct kqw_set { + struct rb_iom_fdset_waiter fsw; + st_table *tbl; /* after: fd -> kqw_set_item */ + VALUE vchanges; + int nchanges; + uint8_t idx; +}; + +/* + * per-FD in rb_thread_fd_select arg, value for kqw_set.fdn.as.tbl, + * allocated on-heap + */ +struct kqw_set_item { + struct kqfdn fdn; + int fd; + short events; /* requested events, like poll(2) */ + short revents; /* returned events, like poll(2) */ +}; + +/* + * like our epoll implementation, we "ping" using kevent with zero-timeout + * and can do so on any thread. + */ +static const struct timespec zero; + +static void +increase_nevents(rb_iom_t *iom, int retries) +{ + /* 1024 is the RUBY_ALLOCV_LIMIT on such systems */ + const int max_alloca = 1024 / sizeof(struct kevent); + const int max = max_alloca * 2; + + if (retries) { + iom->nevents *= retries; + if (iom->nevents > max || iom->nevents <= 0) { + iom->nevents = max; + } + } +} + +static int +iom_kqfd(rb_iom_t *iom) +{ + if (iom->kqueue_fd >= 0) { + return iom->kqueue_fd; + } + iom->kqueue_fd = kqueue(); + if (iom->kqueue_fd < 0) { + if (rb_gc_for_fd(errno)) { + iom->kqueue_fd = kqueue(); + } + if (iom->kqueue_fd < 0) { + rb_sys_fail("kqueue"); + } + } + rb_fd_fix_cloexec(iom->kqueue_fd); + return iom->kqueue_fd; +} + +static void +rb_iom_init(rb_iom_t *iom) +{ + list_head_init(&iom->timers); + list_head_init(&iom->ksets); + list_head_init(&iom->kevs); + list_head_init(&iom->pids); + list_head_init(&iom->blockers); + iom->nevents = 8; + iom->kqueue_fd = -1; + rb_iom_fdmap_init(&iom->rfdmap); + rb_iom_fdmap_init(&iom->wfdmap); +} + +/* + * kevent does not have an EVFILT_* flag for (rarely-used) urgent data, + * at least not on FreeBSD 11.0, so we call select() separately after + * EVFILT_READ returns to set the RB_WAITFD_PRI bit: + */ +static void +check_pri(rb_thread_t *th, struct list_head *pri) +{ + rb_fdset_t efds; + int max, r; + int err = 0; + struct kev *kev = 0, *next = 0; + struct timeval tv; + + if (list_empty(pri)) { + return; + } + + r = 0; + max = -1; + rb_fd_init(&efds); + + list_for_each(pri, kev, fdw.w.wnode) { + int fd = *kev->fdw.fdp; + if (fd >= 0) { + rb_fd_set(fd, &efds); + if (fd > max) { + max = fd; + } + } + } + if (max >= 0) { +again: + tv.tv_sec = 0; + tv.tv_usec = 0; + r = native_fd_select(max + 1, 0, 0, &efds, &tv, th); + if (r < 0) { + err = errno; + if (err == EINTR) { + goto again; + } + } + if (r >= 0) { + list_for_each_safe(pri, kev, next, fdw.w.wnode) { + int fd = *kev->fdw.fdp; + + list_del_init(&kev->fdw.w.wnode); + /* + * favor spurious wakeup over empty revents: + * I have observed select(2) failing to detect urgent data + * readiness after EVFILT_READ fires for it on FreeBSD 11.0. + * test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb + * is relaxed on non-Linux for this reason. + * We could use a non-zero timeout for select(2), but + * we also don't want to release GVL and have th->afrunq + * processed before we set revents, so we favor spurious + * RB_WAITFD_PRI over an empty revents field + */ + if (fd >= 0 && (rb_fd_isset(fd, &efds) || !kev->fdw.revents)) { + kev->fdw.revents |= RB_WAITFD_PRI; + } + } + } + } + rb_fd_term(&efds); +} + +/* + * kqueue is a more complicated than epoll for corner cases because we + * install separate filters for simultaneously watching read and write + * on the same FD, and now we must clear shared filters if only the + * event from the other side came in. + */ +static void +drop_pairs(rb_thread_t *th, int max, struct kevent *changelist, + struct list_head *rdel, struct list_head *wdel) +{ + int nchanges = 0; + struct kev *kev = 0, *next = 0; + + list_for_each_safe(rdel, kev, next, fdn.wfdnode) { + int fd = *kev->fdw.fdp; + list_del_init(&kev->fdn.wfdnode); /* delete from rdel */ + assert(kev->fdw.revents & RB_WAITFD_OUT); + + if (fd >= 0 && !(kev->fdw.revents & WAITFD_READ)) { + struct rb_iom_fd *fdh = rb_iom_fd_get(&th->vm->iom->rfdmap, fd); + + if (list_empty(&fdh->fdhead)) { + struct kevent *chg = &changelist[nchanges++]; + assert(nchanges <= max); + EV_SET(chg, fd, EVFILT_READ, EV_DELETE, 0, 0, 0); + } + } + } + list_for_each_safe(wdel, kev, next, fdn.rfdnode) { + int fd = *kev->fdw.fdp; + list_del_init(&kev->fdn.rfdnode); /* delete from wdel */ + assert(kev->fdw.revents & WAITFD_READ); + + if (fd >= 0 && !(kev->fdw.revents & RB_WAITFD_OUT)) { + struct rb_iom_fd *fdh = rb_iom_fd_get(&th->vm->iom->wfdmap, fd); + + if (list_empty(&fdh->fdhead)) { + struct kevent *chg = &changelist[nchanges++]; + assert(nchanges <= max); + EV_SET(chg, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0); + } + } + } + if (nchanges) { + int kqfd = th->vm->iom->kqueue_fd; + if (kevent(kqfd, changelist, nchanges, 0, 0, &zero) < 0) { + rb_sys_fail("kevent (dropping paired FDs)"); + } + } +} + +/* + * kqueue requires separate filters when watching for simultaneously + * watching read and write events on the same FD. Different filters + * means they can have different udata pointers; at least with + * native kqueue. + * + * When emulating native kqueue behavior using epoll as libkqueue does, + * there is only space for one udata pointer. This is because epoll + * allows each "filter" (epitem in the kernel) to watch read and write + * simultaneously. + * epoll keys epitems using: [ fd, file description (struct file *) ]. + * In contrast, kevent keys its filters using: [ fd (ident), filter ]. + * + * So, with native kqueue, we can at least optimize away rb_iom_fd_get + * function calls; but we cannot do that when libkqueue emulates + * kqueue with epoll and need to retrieve the fdh from the correct + * fdmap. + * + * Finally, the epoll implementation only needs one fdmap. + */ +static void * +udata_set(rb_iom_t *iom, struct rb_iom_fd *fdh) +{ +#ifdef LIBKQUEUE +# warning libkqueue support is incomplete and does not handle corner cases + return iom; +#else /* native kqueue */ + return fdh; +#endif +} + +static struct rb_iom_fd * +udata_get(const struct kevent *ev) +{ +#ifdef LIBKQUEUE + rb_iom_t *iom = ev->udata; + int fd = ev->ident; + + switch (ev->filter) { + case EVFILT_READ: + return rb_iom_fd_get(&iom->rfdmap, fd); + case EVFILT_WRITE: + return rb_iom_fd_get(&iom->wfdmap, fd); + default: + rb_bug("bad filter in libkqueue compatibility mode: %d", ev->filter); + } +#endif + return ev->udata; +} + +static void +check_kevent(rb_thread_t *th, int nr, struct kevent *eventlist) +{ + int err = 0; + if (nr > 0) { + struct list_head pri = LIST_HEAD_INIT(pri); + struct list_head wdel = LIST_HEAD_INIT(wdel); + struct list_head rdel = LIST_HEAD_INIT(rdel); + struct kqfdn *fdn = 0, *next = 0; + int i; + + for (i = 0; i < nr; i++) { + struct kevent *ev = &eventlist[i]; + /* + * fdh is keyed to both FD and filter, so + * it's either in iom->rfdmap or iom->wfdmap + */ + struct rb_iom_fd *fdh = udata_get(ev); + + if (ev->filter == EVFILT_READ) { + int pair_queued = 0; + + list_for_each_safe(&fdh->fdhead, fdn, next, rfdnode) { + struct kqw_set *kset = fdn->owner.set; + int rbits; + + if (kset) { + struct kqw_set_item *ksi; + + ksi = container_of(fdn, struct kqw_set_item, fdn); + rbits = ksi->events & WAITFD_READ; + assert(rbits && "unexpected EVFILT_READ"); + + /* + * The following may spuriously set RB_WAITFD_PRI: + * (see comments in check_pri for reasoning) + */ + ksi->revents |= rbits; + list_del_init(&fdn->rfdnode); + if (!kset->fsw.ret++) { + rb_iom_waiter_ready(&kset->fsw.w); + } + } + else { + struct kev *kev = container_of(fdn, struct kev, fdn); + + rbits = kev->fdw.events & WAITFD_READ; + assert(rbits && "unexpected EVFILT_READ"); + + if (!kev->fdw.revents) { + rb_iom_waiter_ready(&kev->fdw.w); + } + kev->fdw.revents |= (RB_WAITFD_IN & rbits); + list_del_init(&kev->fdn.rfdnode); + + if (*kev->fdw.fdp < 0) { + continue; + } + if (rbits & RB_WAITFD_PRI) { + list_add(&pri, &kev->fdw.w.wnode); + } + if ((kev->fdw.events & RB_WAITFD_OUT) && !pair_queued) { + pair_queued = 1; + list_add(&wdel, &kev->fdn.rfdnode); + } + } + } + } + else if (ev->filter == EVFILT_WRITE) { + int pair_queued = 0; + + list_for_each_safe(&fdh->fdhead, fdn, next, wfdnode) { + struct kqw_set *kset = fdn->owner.set; + int rbits; + + if (kset) { + struct kqw_set_item *ksi; + + ksi = container_of(fdn, struct kqw_set_item, fdn); + rbits = ksi->events & RB_WAITFD_OUT; + assert(rbits && "unexpected EVFILT_WRITE"); + ksi->revents |= rbits; + list_del_init(&fdn->wfdnode); + if (!kset->fsw.ret++) { + rb_iom_waiter_ready(&kset->fsw.w); + } + } + else { + struct kev *kev = container_of(fdn, struct kev, fdn); + + if (!kev->fdw.revents) { + rb_iom_waiter_ready(&kev->fdw.w); + } + kev->fdw.revents |= RB_WAITFD_OUT; + list_del_init(&kev->fdn.wfdnode); + + if ((kev->fdw.events & WAITFD_READ) && + *kev->fdw.fdp >= 0 && + !pair_queued) { + pair_queued = 1; + list_add(&rdel, &kev->fdn.wfdnode); + } + } + } + } + else { + rb_bug("unexpected filter: %d", ev->filter); + } + } + check_pri(th, &pri); + drop_pairs(th, nr, eventlist, &rdel, &wdel); + + /* notify the waiter thread in case we enqueued fibers for them */ + rb_iom_blockers_notify(th->vm->iom, -1); + } + else if (nr < 0) { + err = errno; + } + if (err && err != EINTR) { + rb_syserr_fail(err, "kevent"); + } + rb_iom_timer_check(th); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); +} + +/* perform a non-blocking kevent check while holding GVL */ +static void +ping_events(rb_thread_t *th, struct kqw_set *kset) +{ + rb_iom_t *iom = th->vm->iom; + int kqfd = iom ? iom->kqueue_fd : -1; + + if (kqfd >= 0) { + VALUE v; + int nr; + int nevents = iom->nevents; + struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents); + int retries = 0; + + do { + struct kevent *changelist = 0; + int nchanges = 0; + if (kset) { + changelist = (struct kevent *)RSTRING_PTR(kset->vchanges); + nchanges = kset->nchanges; + kset = 0; + } + nr = kevent(kqfd, changelist, nchanges, eventlist, nevents, &zero); + check_kevent(th, nr, eventlist); + } while (nr == nevents && ++retries); + if (v) { + ALLOCV_END(v); + } + increase_nevents(iom, retries); + } +} + +static struct timespec * +double2timespec(double timeout, struct timespec *ts) +{ + if (timeout >= 0) { + ts->tv_sec = timeout >= LONG_MAX ? LONG_MAX : (long)timeout; + ts->tv_nsec = (long)((timeout - (double)ts->tv_sec) * 1e9); + if (ts->tv_sec < 0) { + ts->tv_sec = 0; + } + if (ts->tv_nsec < 0) { + ts->tv_nsec = 0; + } + return ts; + } + return 0; /* infinity */ +} + +/* for iom_pingable_common.h */ +static void +rb_iom_do_wait(rb_thread_t *th, rb_iom_t *iom) +{ + int nevents = iom->nevents; + int nr = nevents; + double timeout; + + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + timeout = list_empty(&th->afrunq) ? rb_iom_next_timeout(&iom->timers) : 0; + + if (timeout != 0 && (!list_empty(&iom->kevs) || !list_empty(&iom->pids) + || !list_empty(&iom->ksets))) { + VALUE v; + int kqfd = iom_kqfd(iom); /* may raise */ + struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents); + struct timespec ts; + const struct timespec *tsp = double2timespec(timeout, &ts); + struct rb_iom_blocker cur; + + VM_ASSERT(kqfd >= 0); + cur.th = th; + list_add_tail(&iom->blockers, &cur.bnode); + nr = 0; + BLOCKING_REGION({ + nr = kevent(kqfd, 0, 0, eventlist, nevents, tsp); + }, ubf_select, th, TRUE); + list_del(&cur.bnode); + check_kevent(th, nr, eventlist); + if (v) { + ALLOCV_END(v); + } + } + if (nr == nevents) { + ping_events(th, 0); + } +} + +/* kqueue shines here because there's only one syscall in the common case */ +static void +kevxchg_ping(int fd, struct kev *kev) +{ + rb_thread_t *th = kev->fdn.owner.th; + rb_iom_t *iom = rb_iom_get(th); + int retries = 0; + int nchanges = 0; + int nr; + VALUE v; + int nevents = iom->nevents; + struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents); + struct kevent *changelist = eventlist; + int kqfd = iom_kqfd(iom); + + /* + * we must clear this before reading since we check for owner.set in + * check_kevent to distinguish between single and multi-FD (select(2)) + * callers + */ + kev->fdn.owner.th = 0; + VM_ASSERT(nevents >= 2); + + /* EVFILT_READ handles urgent data (POLLPRI)... hopefully */ + if (kev->fdw.events & WAITFD_READ) { + struct rb_iom_fd *fdh = rb_iom_fd_get(&iom->rfdmap, fd); + if (list_empty(&fdh->fdhead)) { + struct kevent *chg = &changelist[nchanges++]; + void *udata = udata_set(iom, fdh); + EV_SET(chg, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, udata); + } + list_add(&fdh->fdhead, &kev->fdn.rfdnode); + } + if (kev->fdw.events & RB_WAITFD_OUT) { + struct rb_iom_fd *fdh = rb_iom_fd_get(&iom->wfdmap, fd); + if (list_empty(&fdh->fdhead)) { + struct kevent *chg = &changelist[nchanges++]; + void *udata = udata_set(iom, fdh); + EV_SET(chg, fd, EVFILT_WRITE, EV_ADD|EV_ONESHOT, 0, 0, udata); + } + list_add(&fdh->fdhead, &kev->fdn.wfdnode); + } + do { + nr = kevent(kqfd, changelist, nchanges, eventlist, nevents, &zero); + + /* kevent may fail with ENOMEM when processing changelist */ + if (nr < 0 && nchanges && rb_gc_for_fd(errno)) { + continue; + } + check_kevent(th, nr, eventlist); + } while (nr == nevents && ++retries && (nchanges = 0) == 0); + if (v) { + ALLOCV_END(v); + } + increase_nevents(iom, retries); +} + +static VALUE +kevxchg_yield(VALUE ptr) +{ + struct kev *kev = (struct kev *)ptr; + int fd = *kev->fdw.fdp; + rb_thread_t *th = kev->fdn.owner.th; + + if (fd >= 0) { + size_t nresume; + + kevxchg_ping(fd, kev); + (void)rb_threadlet_do_yield_p(th, &nresume); + if (kev->fdw.revents) { + list_del_init(&kev->fdw.w.timer.n.rnode); + return Qfalse; + } + return rb_fiber_yield(0, 0); + } + return Qfalse; +} + +static VALUE +kev_done(VALUE ptr) +{ + struct kev *kev = (struct kev *)ptr; + list_del(&kev->fdn.rfdnode); + list_del(&kev->fdn.wfdnode); + return rb_iom_waiter_done((VALUE)&kev->fdw.w); +} + +static int +iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout) +{ + rb_iom_t *iom = rb_iom_get(th); + struct kev kev; + + kev.fdn.owner.th = th; /* read and cleared ASAP in kevxchg_yield */ + kev.fdw.fdp = fdp; + kev.fdw.events = (short)events; + kev.fdw.revents = 0; + list_add(&iom->kevs, &kev.fdw.w.wnode); + list_node_init(&kev.fdn.rfdnode); + list_node_init(&kev.fdn.wfdnode); + rb_iom_timer_add(th, &kev.fdw.w.timer, timeout, IOM_FIB|IOM_WAIT); + rb_ensure(kevxchg_yield, (VALUE)&kev, kev_done, (VALUE)&kev); + + return kev.fdw.revents; /* may be zero if timed out */ +} + +/* + * XXX we may use fptr->mode to mark FDs which kqueue cannot handle, + * different BSDs may have different bugs which prevent certain + * FD types from being handled by kqueue... + */ +int +rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, double *timeout) +{ + return iom_waitfd(th, &fptr->fd, events, timeout); +} + +int +rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout) +{ + return iom_waitfd(th, fdp, events, timeout); +} + +static const short idx2events[] = { RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI }; + +static void +kset_add_event(struct kqw_set *kset, int fd, short filter, void *udata) +{ + static const long kevent_size = (long)sizeof(struct kevent); + VALUE dst = kset->vchanges; + long old_len = RSTRING_LEN(dst); + struct kevent *chg; + long new_len = old_len + kevent_size; + + rb_str_resize(dst, new_len); + chg = (struct kevent *)(RSTRING_PTR(dst) + old_len); + EV_SET(chg, fd, filter, EV_ADD|EV_ONESHOT, 0, 0, udata); + rb_str_set_len(dst, new_len); + kset->nchanges++; +} + +static int +kset_upd(st_data_t *key, st_data_t *v, st_data_t arg, int existing) +{ + int fd = (int)*key; + struct kqw_set *kset = (struct kqw_set *)arg; + rb_iom_t *iom = kset->fsw.th->vm->iom; + short events = idx2events[kset->idx]; + struct kqw_set_item **val = (struct kqw_set_item **)v; + struct kqw_set_item *ksi; + int nevents, oevents; + struct rb_iom_fd *fdh; + void *udata; + + if (existing) { + /* + * this happens when an FD is in two fdsets for a single + * rb_thread_fd_select call. Does not happen if different + * Fibers call rb_thread_fd_select on the same FD. + */ + ksi = *val; + nevents = ~ksi->events & events; + oevents = ksi->events; + ksi->events |= events; + } + else { + ksi = ALLOC(struct kqw_set_item); + *val = ksi; + ksi->fd = fd; + ksi->revents = 0; + nevents = ksi->events = events; + oevents = 0; + ksi->fdn.owner.set = kset; + list_node_init(&ksi->fdn.rfdnode); + list_node_init(&ksi->fdn.wfdnode); + } + + if ((nevents == RB_WAITFD_PRI && !(oevents & RB_WAITFD_IN)) || + (nevents & RB_WAITFD_IN)) { + fdh = rb_iom_fd_get(&iom->rfdmap, fd); + udata = udata_set(iom, fdh); + if (list_empty(&fdh->fdhead)) { + kset_add_event(kset, fd, EVFILT_READ, udata); + } + list_add(&fdh->fdhead, &ksi->fdn.rfdnode); + } + if (nevents & RB_WAITFD_OUT) { + fdh = rb_iom_fd_get(&iom->wfdmap, fd); + udata = udata_set(iom, fdh); + if (list_empty(&fdh->fdhead)) { + kset_add_event(kset, fd, EVFILT_WRITE, udata); + } + list_add(&fdh->fdhead, &ksi->fdn.wfdnode); + } + return ST_CONTINUE; +} + +static VALUE +kset_yield(VALUE ptr) +{ + struct kqw_set *kset = (struct kqw_set *)ptr; + size_t nresume; + kset->tbl = st_init_numtable(); /* may raise */ + + /* must not call kevent to retry events while this loop is running: */ + for (kset->idx = 0; kset->idx < 3; kset->idx++) { + const rb_fdset_t *src = kset->fsw.in[kset->idx]; + + if (src) { + int max = rb_fd_max(src); + int fd; + + for (fd = 0; fd < max; fd++) { + if (rb_fd_isset(fd, src)) { + st_update(kset->tbl, (st_data_t)fd, + kset_upd, (st_data_t)kset); + } + } + } + } + + /* OK to call kevent, now: */ + ping_events(kset->fsw.th, kset); + rb_str_resize(kset->vchanges, 0); + rb_gc_force_recycle(kset->vchanges); + (void)rb_threadlet_do_yield_p(kset->fsw.th, &nresume); + if (kset->fsw.ret) { + list_del_init(&kset->fsw.w.timer.n.rnode); + return Qfalse; + } + return rb_fiber_yield(0, 0); +} + +static rb_fdset_t * +fd_init_once(rb_fdset_t **dst, rb_fdset_t *src) +{ + if (!*dst) { + rb_fd_init(src); + *dst = src; + } + return *dst; +} + +static int +kset_term(st_data_t key, st_data_t val, st_data_t ptr) +{ + struct kqw_set *kset = (struct kqw_set *)ptr; + int fd = (int)key; + struct kqw_set_item *ksi = (struct kqw_set_item *)val; + + if (ksi) { + struct rb_iom_fdset_waiter *fsw = &kset->fsw; + + list_del_init(&ksi->fdn.rfdnode); + list_del_init(&ksi->fdn.wfdnode); + for (kset->idx = 0; kset->idx < 3; kset->idx++) { + if (ksi->revents & idx2events[kset->idx]) { + rb_fd_set(fd, + fd_init_once(&fsw->out[kset->idx], &fsw->sets[kset->idx])); + } + } + xfree(ksi); + } + + return ST_DELETE; +} + +static VALUE +kset_done(VALUE ptr) +{ + struct kqw_set *kset = (struct kqw_set *)ptr; + + if (kset->tbl) { + st_foreach(kset->tbl, kset_term, (VALUE)kset); + st_free_table(kset->tbl); + } + + list_del(&kset->fsw.w.timer.n.tnode); + list_del(&kset->fsw.w.wnode); + for (kset->idx = 0; kset->idx < 3; kset->idx++) { + rb_fdset_t *orig = kset->fsw.in[kset->idx]; + rb_fdset_t *res = kset->fsw.out[kset->idx]; + + if (res) { + rb_fd_dup(orig, res); + rb_fd_term(res); + } + } + return Qfalse; +} + +int +rb_iom_select(rb_thread_t *th, int maxfd, + rb_fdset_t *r, rb_fdset_t *w, rb_fdset_t *e, + double *timeout) +{ + rb_iom_t *iom = rb_iom_get(th); + struct kqw_set kset = { 0 }; + + kset.vchanges = rb_str_tmp_new(0); + rb_iom_fdset_waiter_init(th, &kset.fsw, maxfd, r, w, e); + list_add(&iom->ksets, &kset.fsw.w.wnode); + rb_iom_timer_add(th, &kset.fsw.w.timer, timeout, IOM_FIB|IOM_WAIT); + rb_ensure(kset_yield, (VALUE)&kset, kset_done, (VALUE)&kset); + + return kset.fsw.ret; +} + +static void +iom_free(rb_iom_t *iom) +{ + rb_iom_fdmap_destroy(&iom->rfdmap); + rb_iom_fdmap_destroy(&iom->wfdmap); + xfree(iom); +} + +void +rb_iom_destroy(rb_vm_t *vm) +{ + rb_iom_t *iom = vm->iom; + vm->iom = 0; + if (iom) { + if (iom->kqueue_fd >= 0) { + close(iom->kqueue_fd); + } + iom_free(iom); + } +} + +/* used by thread.c::rb_thread_atfork */ +static void +rb_iom_atfork_child(rb_thread_t *th) +{ +#ifdef LIBKQUEUE + /* + * libkqueue uses epoll, /dev/poll FD, or poll/select on a pipe, + * so there is an FD. + * I guess we're going to hit an error in case somebody uses + * libkqueue with a real native kqueue backend, but nobody does + * that in production, hopefully. + */ + rb_iom_destroy(th->vm); +#else /* native kqueue is close-on-fork, so we do not close ourselves */ + rb_iom_t *iom = th->vm->iom; + if (iom) { + iom_free(iom); + th->vm->iom = 0; + } +#endif +} + +/* used by thread_pthread.c */ +static int +rb_iom_reserved_fd(int fd) +{ + rb_iom_t *iom = GET_VM()->iom; + + return iom && fd == iom->kqueue_fd; +} + +#include "iom_pingable_common.h" +#include "iom_common.h" diff --git a/iom_pingable_common.h b/iom_pingable_common.h new file mode 100644 index 0000000000..0058a9cfd0 --- /dev/null +++ b/iom_pingable_common.h @@ -0,0 +1,54 @@ +/* shared between "pingable" implementations (iom_kqueue.h and iom_epoll.h) */ + +/* only for iom_kqueue.h and iom_epoll.h */ +static void rb_iom_do_wait(rb_thread_t *, rb_iom_t *); + +static VALUE +rb_iom_do_schedule(VALUE ptr) +{ + rb_thread_t *th = (rb_thread_t *)ptr; + rb_iom_t *iom = th->vm->iom; + size_t nresume; + + if (rb_threadlet_do_yield_p(th, &nresume)) { + rb_fiber_yield(0, 0); + } + if (iom) { + if (nresume) { + ping_events(th, 0); + } + else { + rb_iom_do_wait(th, iom); + } + } + if (rb_threadlet_do_yield_p(th, &nresume)) { + rb_fiber_yield(0, 0); + } + return Qfalse; +} + +void +rb_iom_schedule(rb_thread_t *th, double *timeout) +{ + if (rb_threadlet_sched_p(th)) { + size_t nresume; + + rb_iom_timer_check(th); + ping_events(th, 0); + if (rb_threadlet_do_yield_p(th, &nresume)) { + rb_fiber_yield(0, 0); + } + } + else if (timeout) { + double t0 = timeofday(); + struct rb_iom_timer t; + + rb_iom_timer_add(th, &t, timeout, 0); + rb_ensure(rb_iom_do_schedule, (VALUE)th, rb_iom_timer_done, (VALUE)&t); + *timeout -= timeofday() - t0; + } + else { + rb_iom_timer_check(th); + rb_iom_do_schedule((VALUE)th); + } +} diff --git a/iom_select.h b/iom_select.h new file mode 100644 index 0000000000..f49b78e774 --- /dev/null +++ b/iom_select.h @@ -0,0 +1,448 @@ +/* + * select()-based implementation of I/O Manager for RubyVM + * + * This is crippled and relies heavily on GVL compared to the + * epoll and kqueue versions. Every call to select requires + * scanning the entire iom->fds list; so it gets silly expensive + * with hundreds or thousands of FDs. + */ +#include "iom_internal.h" + +/* allocated on heap (rb_vm_t.iom) */ +struct rb_iom_struct { + /* + * Everything here is protected by GVL at this time, + * URCU lists (LGPL-2.1+) may be used in the future + */ + struct list_head timers; /* -rb_iom_timer.n.tnode, sort by expire_at */ + struct list_head fds; /* -rb_iom_fd_waiter.w.wnode, FIFO order */ + struct list_head fdsets; /* -rb_iom_fdset_waiter.w.wnode, unordered */ + struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */ + + /* + * generation counters for determining if we need to redo select() + * for newly registered FDs. kevent and epoll APIs do not require + * this, as events can be registered and retrieved at any time by + * different threads. + */ + rb_serial_t select_start; /* WR protected by GVL (no need to protect RD) */ + rb_serial_t select_gen; /* RDWR protected by GVL */ + + /* + * unlike in epoll/kqueue, order matters for blockers in this + * implementation: first blocker is running select; the reset + * are waiting for the first blocker + * Protected by GVL + */ + struct list_head blockers; /* -rb_iom_blocker.bnode */ +}; + +/* allocated on stack */ +struct select_do { + rb_thread_t *th; + int do_wait; +}; + +static void +rb_iom_init(rb_iom_t *iom) +{ + iom->select_start = 0; + iom->select_gen = 0; + list_head_init(&iom->timers); + list_head_init(&iom->fds); + list_head_init(&iom->fdsets); + list_head_init(&iom->pids); + list_head_init(&iom->blockers); +} + +static rb_fdset_t * +fd_init_once(rb_fdset_t **dst, rb_fdset_t *src) +{ + if (!*dst) { + rb_fd_init(src); + *dst = src; + } + return *dst; +} + +/* FIXME inefficient, but maybe not worth optimizing ... */ +static void +fd_merge(rb_fdset_t *dst, const rb_fdset_t *src) +{ + int max = rb_fd_max(src); + int fd; + for (fd = 0; fd < max; fd++) { + if (rb_fd_isset(fd, src)) { + rb_fd_set(fd, dst); + } + } +} + +static int +fd_intersect(struct rb_iom_fdset_waiter *fsw, size_t i, const rb_fdset_t *res) +{ + int ret = 0; + rb_fdset_t *orig = fsw->in[i]; + + if (orig) { + int max = rb_fd_max(orig); + int fd; + + for (fd = 0; fd < max; fd++) { + if (rb_fd_isset(fd, orig) && rb_fd_isset(fd, res)) { + rb_fd_set(fd, fd_init_once(&fsw->out[i], &fsw->sets[i])); + ret++; + } + } + + /* copy results back to user-supplied fdset */ + if (ret) { + rb_fd_dup(orig, fsw->out[i]); + } + } + return ret; +} + +static void +iom_select_wait(struct select_do *sd) +{ + rb_thread_t *th = sd->th; + rb_iom_t *iom = th->vm->iom; + int max = -1; + struct timeval tv, *tvp; + struct rb_iom_fd_waiter *fdw = 0, *next = 0; + struct rb_iom_fdset_waiter *fsw = 0, *nxt = 0; + rb_fdset_t *rfds, *wfds, *efds; + rb_fdset_t fdsets[3]; + double timeout = 0; + struct rb_iom_blocker cur; + int ret = 0; + + iom->select_start = iom->select_gen; + rfds = wfds = efds = 0; + + list_for_each_safe(&iom->fdsets, fsw, nxt, w.wnode) { + if (fsw->max > max) { + max = fsw->max; + } + if (fsw->in[0]) { + fd_merge(fd_init_once(&rfds, &fdsets[0]), fsw->in[0]); + } + if (fsw->in[1]) { + fd_merge(fd_init_once(&wfds, &fdsets[1]), fsw->in[1]); + } + if (fsw->in[2]) { + fd_merge(fd_init_once(&efds, &fdsets[2]), fsw->in[2]); + } + } + + list_for_each_safe(&iom->fds, fdw, next, w.wnode) { + int fd = *fdw->fdp; + if (fd < 0) { /* closed */ + fdw->revents = fdw->events; + rb_iom_waiter_ready(&fdw->w); + sd->do_wait = 0; + continue; + } + if (fd > max) { + max = fd; + } + if (fdw->events & RB_WAITFD_IN) { + rb_fd_set(fd, fd_init_once(&rfds, &fdsets[0])); + } + if (fdw->events & RB_WAITFD_OUT) { + rb_fd_set(fd, fd_init_once(&wfds, &fdsets[1])); + } + if (fdw->events & RB_WAITFD_PRI) { + rb_fd_set(fd, fd_init_once(&efds, &fdsets[2])); + } + } + + if (sd->do_wait && (max >= 0 || !list_empty(&iom->pids))) { + timeout = rb_iom_next_timeout(&iom->timers); + } + if (timeout >= 0) { + tv = double2timeval(timeout); + tvp = &tv; + } + else { + tvp = 0; + } + cur.th = th; + VM_ASSERT(list_empty(&iom->blockers)); + list_add(&iom->blockers, &cur.bnode); + BLOCKING_REGION({ + ret = native_fd_select(max + 1, rfds, wfds, efds, tvp, th); + }, ubf_select, th, TRUE); + list_del(&cur.bnode); + + if (ret > 0) { + list_for_each_safe(&iom->fdsets, fsw, nxt, w.wnode) { + fsw->ret += fd_intersect(fsw, 0, rfds); + fsw->ret += fd_intersect(fsw, 1, wfds); + fsw->ret += fd_intersect(fsw, 2, efds); + if (fsw->ret) { + rb_iom_waiter_ready(&fsw->w); + } + } + + list_for_each_safe(&iom->fds, fdw, next, w.wnode) { + int fd = *fdw->fdp; + + if (fd < 0) { /* closed */ + fdw->revents = fdw->events; + rb_iom_waiter_ready(&fdw->w); + continue; + } + if (rfds && rb_fd_isset(fd, rfds)) { + fdw->revents |= fdw->events & RB_WAITFD_IN; + } + if (wfds && rb_fd_isset(fd, wfds)) { + fdw->revents |= fdw->events & RB_WAITFD_OUT; + } + if (efds && rb_fd_isset(fd, efds)) { + fdw->revents |= fdw->events & RB_WAITFD_PRI; + } + + /* got revents? enqueue ourselves to be run! */ + if (fdw->revents) { + rb_iom_waiter_ready(&fdw->w); + } + } + } + + if (rfds) rb_fd_term(rfds); + if (wfds) rb_fd_term(wfds); + if (efds) rb_fd_term(efds); + rb_iom_blockers_notify(iom, -1); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); +} + +static int +retry_select_p(rb_iom_t *iom) +{ + /* + * if somebody changed iom->fds while we were inside select, + * rerun it with zero timeout to avoid a race condition. + * This is not necessary for epoll or kqueue because the kernel + * is constantly monitoring the watched set. + */ + return (iom->select_start != iom->select_gen); +} + +static VALUE +iom_do_select(VALUE ptr) +{ + struct select_do *sd = (struct select_do *)ptr; + rb_thread_t *th = sd->th; + rb_iom_t *iom; + int retry = 0; + size_t nresume; + + if (rb_threadlet_do_yield_p(th, &nresume)) { + rb_fiber_yield(0, 0); + } + if (nresume) { + sd->do_wait = 0; + } + iom = th->vm->iom; + if (!iom) { + return Qfalse; + } + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + do { + if (list_empty(&iom->blockers)) { + if (!list_empty(&th->afrunq)) { + sd->do_wait = 0; + } + iom_select_wait(sd); + rb_iom_timer_check(th); + retry = retry_select_p(iom); + } + else if (list_empty(&th->afrunq)) { + struct rb_iom_blocker cur; + + cur.th = th; + list_add_tail(&iom->blockers, &cur.bnode); + BLOCKING_REGION({ + native_fd_select(0, 0, 0, 0, 0, th); + }, ubf_select, th, TRUE); + list_del(&cur.bnode); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + } + rb_threadlet_do_yield_p(th, &nresume); + if (nresume) { + retry = 0; + sd->do_wait = 0; + } + } while (retry); + if (rb_threadlet_do_yield_p(th, &nresume)) { + rb_fiber_yield(0, 0); + } + return Qfalse; +} + +void +rb_iom_schedule(rb_thread_t *th, double *timeout) +{ + struct select_do sd; + + sd.th = th; + sd.do_wait = !rb_threadlet_sched_p(th); + + if (timeout && sd.do_wait) { + double t0 = timeofday(); + struct rb_iom_timer t; + + rb_iom_timer_add(th, &t, timeout, 0); + rb_ensure(iom_do_select, (VALUE)&sd, rb_iom_timer_done, (VALUE)&t); + *timeout -= timeofday() - t0; + } + else { + rb_iom_timer_check(th); + iom_do_select((VALUE)&sd); + } +} + +static void +iom_schedule_th(rb_thread_t *th) +{ + rb_iom_t *iom = rb_iom_get(th); + size_t nresume; + + (void)rb_threadlet_do_yield_p(th, &nresume); + if (list_empty(&iom->blockers)) { + struct select_do sd; + + sd.th = th; + sd.do_wait = 0; + iom_select_wait(&sd); + rb_iom_timer_check(th); + } + else { + /* + * kick the first select thread, retry_select_p will return true + * since caller bumped iom->select_gen + */ + rb_iom_blockers_notify(iom, 1); + } +} + +static VALUE +iom_schedule_fdw(VALUE ptr) +{ + rb_thread_t *th = GET_THREAD(); + struct rb_iom_fd_waiter *fdw = (struct rb_iom_fd_waiter *)ptr; + + iom_schedule_th(th); + if (fdw->revents) { + list_del_init(&fdw->w.timer.n.rnode); + return Qfalse; + } + return rb_fiber_yield(0, 0); +} + +/* only epoll takes advantage of this (kqueue may for portability bugs) */ +int +rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, double *timeout) +{ + return rb_iom_waitfd(th, &fptr->fd, events, timeout); +} + +int +rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout) +{ + rb_iom_t *iom = rb_iom_get(th); + struct rb_iom_fd_waiter fdw; + + if (*fdp < 0) return 0; + fdw.fdp = fdp; + fdw.events = (short)events; + fdw.revents = 0; + + /* use FIFO order for fairness */ + list_add_tail(&iom->fds, &fdw.w.wnode); + rb_iom_timer_add(th, &fdw.w.timer, timeout, IOM_FIB|IOM_WAIT); + iom->select_gen++; + rb_ensure(iom_schedule_fdw, (VALUE)&fdw, rb_iom_waiter_done, (VALUE)&fdw.w); + + return (int)fdw.revents; /* may be zero if timed out */ +} + +static VALUE +rb_iom_select_done(VALUE ptr) +{ + struct rb_iom_fdset_waiter *fsw = (struct rb_iom_fdset_waiter *)ptr; + size_t i; + + list_del(&fsw->w.timer.n.tnode); + list_del(&fsw->w.wnode); + for (i = 0; i < 3; i++) { + if (fsw->out[i]) { + rb_fd_term(fsw->out[i]); + } + } + + return Qfalse; +} + +static VALUE +iom_schedule_fsw(VALUE ptr) +{ + struct rb_iom_fdset_waiter *fsw = (struct rb_iom_fdset_waiter *)ptr; + + iom_schedule_th(fsw->th); + if (fsw->ret) { + list_del_init(&fsw->w.timer.n.rnode); + return Qfalse; + } + return rb_fiber_yield(0, 0); +} + +int +rb_iom_select(rb_thread_t *th, int maxfd, + rb_fdset_t *r, rb_fdset_t *w, rb_fdset_t *e, + double *timeout) +{ + rb_iom_t *iom = rb_iom_get(th); + VALUE v; + struct rb_iom_fdset_waiter *fsw; + int ret; + + fsw = ALLOCV_N(struct rb_iom_fdset_waiter, v, 1); + rb_iom_fdset_waiter_init(th, fsw, maxfd, r, w, e); + list_add(&iom->fdsets, &fsw->w.wnode); + rb_iom_timer_add(th, &fsw->w.timer, timeout, IOM_FIB|IOM_WAIT); + iom->select_gen++; + rb_ensure(iom_schedule_fsw, (VALUE)fsw, rb_iom_select_done, (VALUE)fsw); + ret = fsw->ret; + if (v) { + ALLOCV_END(v); + } + return ret; +} + +void +rb_iom_destroy(rb_vm_t *vm) +{ + if (vm->iom) { + xfree(vm->iom); + vm->iom = 0; + } +} + +/* used by thread.c::rb_thread_atfork */ +static void +rb_iom_atfork_child(rb_thread_t *th) +{ + rb_iom_destroy(th->vm); +} + +/* used by thread_pthread.c */ +static int +rb_iom_reserved_fd(int fd) +{ + return 0; +} + +#include "iom_common.h" diff --git a/prelude.rb b/prelude.rb index 9e94aadf9c..bcd2c183a2 100644 --- a/prelude.rb +++ b/prelude.rb @@ -13,6 +13,18 @@ def exclusive(&block) end if false end end +class Threadlet < Fiber + def self.start(*args, &block) + # loses Fiber#resume return value, but maybe people do not care + new(&block).run + end + + def run + start + self + end +end + class IO # call-seq: diff --git a/process.c b/process.c index 8b8268e9f1..8c285f8a96 100644 --- a/process.c +++ b/process.c @@ -17,6 +17,7 @@ #include "ruby/thread.h" #include "ruby/util.h" #include "vm_core.h" +#include "iom.h" #include #include @@ -918,9 +919,15 @@ rb_waitpid(rb_pid_t pid, int *st, int flags) result = do_waitpid(pid, st, flags); } else { - while ((result = do_waitpid_nonblocking(pid, st, flags)) < 0 && - (errno == EINTR)) { - RUBY_VM_CHECK_INTS(GET_EC()); + rb_thread_t *th = GET_THREAD(); + if (rb_threadlet_sched_p(th) && WNOHANG) { + return rb_iom_waitpid(th, pid, st, flags, 0); + } + else { + while ((result = do_waitpid_nonblocking(pid, st, flags)) < 0 && + (errno == EINTR)) { + RUBY_VM_CHECK_INTS(th->ec); + } } } if (result > 0) { @@ -1170,6 +1177,7 @@ proc_detach(VALUE obj, VALUE pid) static void before_exec_async_signal_safe(void) { + rb_iom_destroy(GET_VM()); } static void diff --git a/signal.c b/signal.c index d71f757ca5..df5eb41b83 100644 --- a/signal.c +++ b/signal.c @@ -11,6 +11,7 @@ **********************************************************************/ +#include "iom.h" #include "internal.h" #include "vm_core.h" #include @@ -1036,6 +1037,17 @@ rb_trap_exit(void) } } +static int +sig_is_chld(int sig) +{ +#if defined(SIGCLD) + return (sig == SIGCLD); +#elif defined(SIGCHLD) + return (sig == SIGCHLD); +#endif + return 0; +} + void rb_signal_exec(rb_thread_t *th, int sig) { @@ -1043,6 +1055,9 @@ rb_signal_exec(rb_thread_t *th, int sig) VALUE cmd = vm->trap_list.cmd[sig]; int safe = vm->trap_list.safe[sig]; + if (sig_is_chld(sig)) { + rb_iom_sigchld(vm); + } if (cmd == 0) { switch (sig) { case SIGINT: @@ -1101,6 +1116,11 @@ default_handler(int sig) #endif #ifdef SIGUSR2 case SIGUSR2: +#endif +#ifdef SIGCLD + case SIGCLD: +#elif defined(SIGCHLD) + case SIGCHLD: #endif func = sighandler; break; @@ -1139,6 +1159,9 @@ trap_handler(VALUE *cmd, int sig) VALUE command; if (NIL_P(*cmd)) { + if (sig_is_chld(sig)) { + goto sig_dfl; + } func = SIG_IGN; } else { @@ -1159,6 +1182,9 @@ trap_handler(VALUE *cmd, int sig) break; case 14: if (memcmp(cptr, "SYSTEM_DEFAULT", 14) == 0) { + if (sig_is_chld(sig)) { + goto sig_dfl; + } func = SIG_DFL; *cmd = 0; } @@ -1166,6 +1192,9 @@ trap_handler(VALUE *cmd, int sig) case 7: if (memcmp(cptr, "SIG_IGN", 7) == 0) { sig_ign: + if (sig_is_chld(sig)) { + goto sig_dfl; + } func = SIG_IGN; *cmd = Qtrue; } @@ -1420,15 +1449,13 @@ static int init_sigchld(int sig) { sighandler_t oldfunc; + sighandler_t func = sighandler; oldfunc = ruby_signal(sig, SIG_DFL); if (oldfunc == SIG_ERR) return -1; - if (oldfunc != SIG_DFL && oldfunc != SIG_IGN) { - ruby_signal(sig, oldfunc); - } - else { - GET_VM()->trap_list.cmd[sig] = 0; - } + ruby_signal(sig, func); + GET_VM()->trap_list.cmd[sig] = 0; + return 0; } diff --git a/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb b/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb index 1141dd317c..3a4c714094 100644 --- a/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb +++ b/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb @@ -1,5 +1,6 @@ # frozen_string_literal: false require 'test/unit' +require 'socket' class TestWaitForSingleFD < Test::Unit::TestCase require '-test-/wait_for_single_fd' @@ -49,5 +50,66 @@ def test_wait_for_closed_pipe end end + def test_fiber_start_rw + host = '127.0.0.1' + TCPServer.open(host, 0) do |srv| + nbc = Socket.new(:INET, :STREAM, 0) + con = TCPSocket.new(host, srv.addr[1]) + acc = srv.accept + begin + # 2 events, but only one triggers: + flags = RB_WAITFD_IN|RB_WAITFD_OUT + f = Threadlet.start { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal RB_WAITFD_OUT, f.value + # trigger readability + con.write('a') + flags = RB_WAITFD_IN + f = Threadlet.start { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + # ensure both events trigger: + flags = RB_WAITFD_IN|RB_WAITFD_OUT + f = Threadlet.start { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + # ensure the EVFILT_WRITE (kqueue) is disabled after previous test: + flags = RB_WAITFD_IN + f = Threadlet.start { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + skip 'missing MSG_OOB' unless Socket.const_defined?(:MSG_OOB) + + # make sure urgent data works with kqueue: + assert_equal 3, con.send('123', Socket::MSG_OOB) + flags = RB_WAITFD_PRI + f = Threadlet.start { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + flags = RB_WAITFD_IN|RB_WAITFD_OUT|RB_WAITFD_PRI + f = Threadlet.start { IO.wait_for_single_fd(acc.fileno, flags, nil) } + if RUBY_PLATFORM =~ /linux/ + assert_equal flags, f.value + else + kq_possible = [ RB_WAITFD_IN|RB_WAITFD_OUT, flags ] + assert_include kq_possible, f.value + end + + nbc.connect_nonblock(srv.local_address, exception: false) + fd = nbc.fileno + flags = RB_WAITFD_IN|RB_WAITFD_OUT + f1 = Threadlet.start { IO.wait_for_single_fd(fd, flags, nil) } + f2 = Threadlet.start { IO.wait_for_single_fd(fd, RB_WAITFD_IN, nil) } + assert_equal RB_WAITFD_OUT, f1.value + abc = srv.accept + abc.write('!') + assert_equal RB_WAITFD_IN, f2.value + ensure + con.close + acc.close + nbc.close + abc&.close + end + end + end end diff --git a/test/lib/leakchecker.rb b/test/lib/leakchecker.rb index dbe6f5cdbb..5a99b57687 100644 --- a/test/lib/leakchecker.rb +++ b/test/lib/leakchecker.rb @@ -35,6 +35,15 @@ def find_fds if d.respond_to? :fileno a -= [d.fileno] end + + # only place we use epoll is internally, do not count it against us + a.delete_if do |fd| + begin + File.readlink("#{fd_dir}/#{fd}").match?(/\beventpoll\b/) + rescue + false + end + end a } fds.sort diff --git a/test/ruby/test_threadlet.rb b/test/ruby/test_threadlet.rb new file mode 100644 index 0000000000..84cc4ec662 --- /dev/null +++ b/test/ruby/test_threadlet.rb @@ -0,0 +1,283 @@ +# frozen_string_literal: true +require 'test/unit' +require 'fiber' +require 'io/nonblock' +require 'io/wait' +require 'socket' + +class TestThreadlet < Test::Unit::TestCase + def test_value + nr = 0 + f = Threadlet.start { nr += 1 } + assert_equal 1, f.value, 'value returns as expected' + assert_equal 1, f.value, 'value is idempotent' + end + + def test_join + nr = 0 + f = Threadlet.start { nr += 1 } + assert_equal f, f.join + assert_equal 1, f.value + assert_equal f, f.join, 'join is idempotent' + assert_equal f, f.join(10), 'join is idempotent w/ timeout' + end + + def test_io_wait_read + tick = 0.1 # 100ms, TIME_QUANTUM_USEC in thread_pthread.c + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + rdr = Threadlet.start { r.read(1) } + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + assert_nil rdr.join(tick), 'join returns nil on timeout' + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, tick, 'Threadlet still running' + w.write('a') + assert_equal 'a', rdr.value, 'finished' + + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + rwait = Threadlet.start { r.wait_readable(tick) } + assert_nil rwait.value, 'IO returned no events after timeout' + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, tick, 'Threadlet waited for timeout' + end + end + + # make sure we do not create extra FDs (from epoll/kqueue) + # for operations which do not need it + def test_fd_creation_and_fork + assert_separately(%w(-r io/nonblock), <<~'end;') # do + fdrange = (3..64) + reserved = -'The given fd is not accessible because RubyVM reserves it' + fdstats = lambda do + stats = Hash.new(0) + fdrange.map do |fd| + begin + IO.for_fd(fd, autoclose: false) + stats[:good] += 1 + rescue Errno::EBADF # ignore + rescue ArgumentError => e + raise if e.message != reserved + stats[:reserved] += 1 + end + end + stats.freeze + end + + before = fdstats.call + Threadlet.start { :fib }.join + assert_equal before, fdstats.call, + 'Threadlet.start + Threadlet#join does not create new FD' + + # now, epoll/kqueue implementations create FDs, ensure they're reserved + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + + before_io = fdstats.call + assert_equal before[:reserved], before_io[:reserved], + 'no reserved FDs created before I/O attempted' + + f1 = Threadlet.start { r.read(1) } + after_io = fdstats.call + assert_equal before_io[:good], after_io[:good], + 'no change in unreserved FDs during I/O wait' + + w.write('!') + assert_equal '!', f1.value, 'auto-Threadlet IO#read works' + assert_operator before_io[:reserved], :<=, after_io[:reserved], + 'a reserved FD may be created on some implementations' + + # ensure we do not share epoll/kqueue FDs across fork + if Process.respond_to?(:fork) && Process.respond_to?(:waitpid2) + pid = fork do + after_fork = fdstats.call + w.write(after_fork.inspect == before_io.inspect ? '1' : '0') + + # ensure auto-Threadlet works properly after forking + IO.pipe do |a, b| + a.nonblock = b.nonblock = true + f1 = Threadlet.start { a.read(1) } + f2 = Threadlet.start { b.write('!') } + assert_equal 1, f2.value + w.write(f1.value == '!' ? '!' : '?') + end + + exit!(0) + end + assert_equal '1', r.read(1), 'reserved FD cleared after fork' + assert_equal '!', r.read(1), 'auto-Threadlet works after forking' + _, status = Process.waitpid2(pid) + assert_predicate status, :success?, 'forked child exited properly' + end + end + end; + end + + def test_waitpid + assert_separately(%w(-r io/nonblock), <<~'end;') # do + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + pid = fork do + sleep 0.1 + exit!(0) + end + f = Threadlet.start { Process.waitpid2(pid) } + wpid, st = f.value + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, 0.1, 'child did not return immediately' + assert_equal pid, wpid, 'waited pid matches' + assert_predicate st, :success?, 'process exited successfully' + + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + + # "blocking" Process.waitpid2 takes precedence over trap(:CHLD) + waited = [] + chld_called = false + trap(:CHLD) { chld_called = true; waited.concat(Process.waitall) } + pid = fork do + r.read(1) + exit!(0) + end + assert_nil Process.waitpid2(pid, Process::WNOHANG), 'child is waiting' + assert_nil(Threadlet.start do + Process.waitpid2(pid, Process::WNOHANG) + end.value, 'WNOHANG works normally in Threadlet') + f = Threadlet.start { Process.waitpid2(pid) } + assert_equal 1, w.write('.'), 'woke child' + wpid, st = f.value + assert_equal pid, wpid, 'waited pid matches' + assert_predicate st, :success?, 'child exited successfully' + assert_empty waited, 'waitpid had predence' + assert chld_called, 'CHLD handler got called anyways' + + chld_called = false + [ 'DEFAULT', 'SIG_DFL', 'IGNORE', 'SYSTEM_DEFAULT', nil + ].each do |handler| + trap(:CHLD, handler) + pid = fork do + r.read(1) + exit!(0) + end + t = "trap(:CHLD, #{handler.inspect})" + f = Threadlet.start { Process.waitpid2(pid) } + assert_equal 1, w.write('.'), "woke child for #{t}" + wpid, st = f.value + assert_predicate st, :success?, "child exited successfully for #{t}" + assert_equal wpid, pid, "return value correct for #{t}" + assert_equal false, chld_called, + "old CHLD handler did not fire for #{t}" + end + end + end; + end if Process.respond_to?(:fork) && Process.respond_to?(:waitpid2) && + Process.const_defined?(:WNOHANG) + + def test_cpu_usage + require 'benchmark' # avoid skewing assert_cpu_usage_low + disabled = GC.disable + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + t = fv = wr = nil + + # select() requires higher CPU% :< + assert_cpu_usage_low(pct: 0.50) do + t = Thread.new { Threadlet.start { r.read(1) }.value } + wr = Thread.new do + sleep 0.2 + w.write('..') + end + fv = Threadlet.start { r.read(1) }.value + end + assert_equal '.', fv + assert_equal '.', t.value + wr.join + end + + thrs = [] + pid = spawn(*%W(#{EnvUtil.rubybin} --disable=gems -e #{'sleep 0.1'})) + 2.times do + thrs << Thread.new { Threadlet.start { Process.waitall }.value } + end + assert_cpu_usage_low(pct: 0.44) do + thrs.map!(&:value) + end + ensure + GC.enable unless disabled + end + + # As usual, cannot resume/run Threadlets across threads, but the + # scheduler works across threads + def test_cross_thread_schedule + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + t = Thread.new { Threadlet.start { r.read(1) }.value } + assert_nil t.join(0.1) + f = Threadlet.start { w.write('a') } + assert_equal 'a', t.value + assert_equal 1, f.value + end + end + + # tricky for kqueue and epoll implementations + def test_multi_readers + if RUBY_PLATFORM =~ /linux/ && + (RbConfig::CONFIG['LDFLAGS'] =~ /-lkqueue\b/ || + RbConfig::CONFIG['DLDFLAGS'] =~ /-lkqueue\b/) + skip "FIXME libkqueue is buggy on this test" + end + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + nr = 30 + fibs = nr.times.map { Threadlet.start { r.read(1) } } + fibs.each { |f| assert_nil f.join(0.001) } + + exp = nr.times.map { -'a' }.freeze + w.write(exp.join) + assert_equal exp, fibs.map(&:value); + end + end + + def test_unix_rw + unless defined?(UNIXSocket) && UNIXSocket.respond_to?(:pair) + skip 'no UNIXSocket.pair' + end + # ensure simultaneous waits on the same FD works + UNIXSocket.pair do |a, b| + assert_predicate a, :wait_writable + f0 = Threadlet.start { a.wait_readable } # cold + f9 = Threadlet.start { a.wait_writable } # already armed + assert_equal a, f9.value + + f4 = Threadlet.start { a.wait_readable } + b.write('.') + f5 = Threadlet.start { a.wait_writable } + f6 = Threadlet.start { a.wait_readable } + assert_equal a, f4.value + assert_equal a, f5.value + assert_equal a, f6.value + assert_equal a, f0.value + end + end + + def test_io_select_pipe + IO.pipe do |r, w| + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + f1 = Threadlet.start { IO.select([r], nil, nil, 0.02) } + assert_nil f1.value + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, 0.02 + assert_operator diff, :<, 0.2 + w.write '.' + f2 = Threadlet.start { IO.select([r], [w]) } + assert_equal [ [r], [w], [] ], f2.value + end + end + + def test_reg_file + File.open(__FILE__) do |fp| + t = Threadlet.start { fp.wait_readable } + assert_same fp, t.value + t = Threadlet.start { IO.select([fp]) } + assert_equal [[fp],[],[]], t.value + end + end +end diff --git a/thread.c b/thread.c index acb53354fd..3a18b79e14 100644 --- a/thread.c +++ b/thread.c @@ -73,6 +73,7 @@ #include "ruby/debug.h" #include "internal.h" #include "iseq.h" +#include "iom.h" #ifndef USE_NATIVE_THREAD_PRIORITY #define USE_NATIVE_THREAD_PRIORITY 0 @@ -3507,6 +3508,24 @@ rb_thread_priority_set(VALUE thread, VALUE prio) /* for IO */ +static double * +timeval2double(double *d, const struct timeval *tv) +{ + if (tv) { + *d = (double)tv->tv_sec + (double)tv->tv_usec * 1e-6; + return d; + } + return NULL; +} + +static int +rb_iom_waitfd_tv(rb_thread_t *th, int *fdp, int events, struct timeval *tv) +{ + double d; + + return rb_iom_waitfd(th, fdp, events, timeval2double(&d, tv)); +} + #if defined(NFDBITS) && defined(HAVE_RB_FD_INIT) /* @@ -3773,13 +3792,6 @@ do_select(int n, rb_fdset_t *const readfds, rb_fdset_t *const writefds, update_timeval(timeout, limit), \ TRUE) - if (timeout) { - limit = timeofday(); - limit += (double)timeout->tv_sec+(double)timeout->tv_usec*1e-6; - wait_rest = *timeout; - timeout = &wait_rest; - } - #define fd_init_copy(f) \ (f##fds) ? rb_fd_init_copy(&orig_##f, f##fds) : rb_fd_no_init(&orig_##f) fd_init_copy(read); @@ -3787,6 +3799,14 @@ do_select(int n, rb_fdset_t *const readfds, rb_fdset_t *const writefds, fd_init_copy(except); #undef fd_init_copy + + if (timeout) { + limit = timeofday(); + limit += (double)timeout->tv_sec+(double)timeout->tv_usec*1e-6; + wait_rest = *timeout; + timeout = &wait_rest; + } + do { lerrno = 0; @@ -3845,6 +3865,8 @@ int rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except, struct timeval *timeout) { + rb_thread_t *th; + if (!read && !write && !except) { if (!timeout) { rb_thread_sleep_forever(); @@ -3863,7 +3885,16 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * if (except) { rb_fd_resize(max - 1, except); } - return do_select(max, read, write, except, timeout); + + th = GET_THREAD(); + if (rb_threadlet_sched_p(th)) { + double d; + return rb_iom_select(th, max, read, write, except, + timeval2double(&d, timeout)); + } + else { + return do_select(max, read, write, except, timeout); + } } /* @@ -3937,6 +3968,10 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv) struct timespec *timeout = NULL; rb_thread_t *th = GET_THREAD(); + if (rb_threadlet_sched_p(th)) { + return rb_iom_waitfd_tv(th, &fd, events, tv); + } + #define poll_update() \ (update_timespec(timeout, limit), \ TRUE) @@ -4048,7 +4083,11 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv) struct select_args args; int r; VALUE ptr = (VALUE)&args; + rb_thread_t *th = GET_THREAD(); + if (rb_threadlet_sched_p(th)) { + return rb_iom_waitfd_tv(th, &fd, events, tv); + } args.as.fd = fd; args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL; args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL; @@ -4207,10 +4246,13 @@ terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th) } } +static void rb_iom_atfork_child(rb_thread_t *); + void rb_thread_atfork(void) { rb_thread_t *th = GET_THREAD(); + rb_iom_atfork_child(th); rb_thread_atfork_internal(th, terminate_atfork_i); th->join_list = NULL; @@ -5245,3 +5287,21 @@ rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data) return rb_ensure(b_proc, data, rb_ary_pop, cur_th->pending_interrupt_mask_stack); } + +#ifndef RUBYVM_IOM +# if defined(HAVE_SYS_EVENT_H) && defined(HAVE_KQUEUE) && defined(HAVE_KEVENT) +# define RUBYVM_IOM IOM_KQUEUE +# elif defined(HAVE_SYS_EPOLL_H) && defined(HAVE_EPOLL_CREATE) && \ + defined(HAVE_EPOLL_CTL) && defined(HAVE_EPOLL_WAIT) +# define RUBYVM_IOM IOM_EPOLL +# else +# define RUBYVM_IOM IOM_SELECT +# endif +#endif +#if RUBYVM_IOM == IOM_KQUEUE +# include "iom_kqueue.h" +#elif RUBYVM_IOM == IOM_EPOLL +# include "iom_epoll.h" +#else +# include "iom_select.h" +#endif diff --git a/thread_pthread.c b/thread_pthread.c index f54e5f04d8..04dff67575 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -1747,9 +1747,14 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr) } #endif +static int rb_iom_reserved_fd(int); /* check kqueue or epoll FD */ + int rb_reserved_fd_p(int fd) { + if (rb_iom_reserved_fd(fd)) { + return 1; + } #if USE_SLEEPY_TIMER_THREAD if ((fd == timer_thread_pipe.normal[0] || fd == timer_thread_pipe.normal[1] || diff --git a/vm.c b/vm.c index 6c1c63e37e..cded7f9fff 100644 --- a/vm.c +++ b/vm.c @@ -8,6 +8,7 @@ **********************************************************************/ +#include "iom.h" #include "internal.h" #include "ruby/vm.h" #include "ruby/st.h" @@ -2169,6 +2170,7 @@ ruby_vm_destruct(rb_vm_t *vm) rb_fiber_reset_root_local_storage(th->self); thread_free(th); } + rb_iom_destroy(vm); rb_vm_living_threads_init(vm); ruby_vm_run_at_exit_hooks(vm); if (vm->loading_table) { @@ -2337,6 +2339,9 @@ rb_thread_recycle_stack_release(VALUE *stack) ruby_xfree(stack); } +void rb_fiber_mark_self(rb_fiber_t *fib); +void rb_iom_mark_runq(const rb_thread_t *th); + void rb_execution_context_mark(const rb_execution_context_t *ec) { @@ -2389,7 +2394,6 @@ rb_execution_context_mark(const rb_execution_context_t *ec) RUBY_MARK_UNLESS_NULL(ec->local_storage_recursive_hash_for_trace); } -void rb_fiber_mark_self(rb_fiber_t *fib); void rb_threadptr_root_fiber_setup(rb_thread_t *th); void rb_threadptr_root_fiber_release(rb_thread_t *th); @@ -2411,6 +2415,11 @@ thread_mark(void *ptr) RUBY_MARK_UNLESS_NULL(th->top_self); RUBY_MARK_UNLESS_NULL(th->top_wrapper); if (th->root_fiber) rb_fiber_mark_self(th->root_fiber); + + /* th->afrunq may be 0 early on */ + if (th->afrunq.n.next && !list_empty(&th->afrunq)) { + rb_iom_mark_runq(th); + } RUBY_MARK_UNLESS_NULL(th->stat_insn_usage); RUBY_MARK_UNLESS_NULL(th->last_status); RUBY_MARK_UNLESS_NULL(th->locking_mutex); @@ -2499,6 +2508,7 @@ th_init(rb_thread_t *th, VALUE self) { th->self = self; rb_threadptr_root_fiber_setup(th); + list_head_init(&th->afrunq); /* allocate thread stack */ #ifdef USE_SIGALTSTACK diff --git a/vm_core.h b/vm_core.h index 8bce7892b5..eaec350088 100644 --- a/vm_core.h +++ b/vm_core.h @@ -515,6 +515,8 @@ typedef struct rb_hook_list_struct { int need_clean; } rb_hook_list_t; +struct rb_iom_struct; + typedef struct rb_vm_struct { VALUE self; @@ -523,6 +525,7 @@ typedef struct rb_vm_struct { struct rb_thread_struct *main_thread; struct rb_thread_struct *running_thread; + struct rb_iom_struct *iom; struct list_head waiting_fds; /* <=> struct waiting_fd */ struct list_head living_threads; @@ -862,6 +865,7 @@ typedef struct rb_thread_struct { /* fiber */ rb_fiber_t *root_fiber; rb_jmpbuf_t root_jmpbuf; + struct list_head afrunq; /* -rb_iom_timer.as.rnode */ /* misc */ unsigned int abort_on_exception: 1; -- EW