From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.1 (2015-04-28) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS16125 46.166.160.0/21 X-Spam-Status: No, score=0.1 required=3.0 tests=AWL,BAYES_00,RCVD_IN_MSPIKE_BL, RCVD_IN_MSPIKE_ZBI,RCVD_IN_XBL,RDNS_NONE,SPF_FAIL,SPF_HELO_FAIL, TO_EQ_FM_DOM_SPF_FAIL shortcircuit=no autolearn=no autolearn_force=no version=3.4.1 Received: from 80x24.org (unknown [46.166.162.53]) by dcvr.yhbt.net (Postfix) with ESMTP id 20C2F1F403 for ; Wed, 13 Jun 2018 00:35:25 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [WIP r63641] Threadlet: green threads implemented using fibers Date: Wed, 13 Jun 2018 00:35:24 +0000 Message-Id: <20180613003524.9256-1-e@80x24.org> List-Id: Threadlet is a subclass of Fiber which gives Thread-like behavior with implementation characteristics similar to the Thread class from Ruby 1.8 (but allowing kqueue and epoll on modern OSes). This adds following scheduling points in the C-API: * rb_wait_for_single_fd * rb_thread_fd_select * rb_waitpid main Ruby API: Threadlet#start -> enable auto-scheduling and run Threadlet until it automatically yields (due to EAGAIN/EWOULDBLOCK) The following behave like their Thread counterparts: Threadlet.start - Threadlet.new + Threadlet#start (prelude.rb) Threadlet#join - run internal scheduler until Threadlet is terminated Threadlet#value - ditto Threadlet#run - like Threadlet#start (prelude.rb) cont.c - rb_threadlet_sched_p checks if the current execution context is Changes to existing functions are minimal. New files (all new structs and relations should be documented): iom.h - internal API for the rest of RubyVM (incomplete?) iom_internal.h - internal header for iom_(select|epoll|kqueue).h iom_epoll.h - epoll-specific pieces iom_kqueue.h - kqueue-specific pieces iom_select.h - select-specific pieces iom_pingable_common.h - common code for iom_(epoll|kqueue).h iom_common.h - common footer for iom_(select|epoll|kqueue).h Changes to existing data structures: rb_thread_t.afrunq - list of Threadlets to auto-resume rb_vm_t.iom - Ruby I/O Manager (rb_iom_t) :) Besides rb_iom_t, all the new structs are stack-only and relies extensively on ccan/list for branch-less, O(1) insert/delete. As usual, understanding the data structures first should help you understand the code. Right now, I reuse some static functions in thread.c, so thread.c includes iom_(select|epoll|kqueue).h TODO: Hijack other blocking functions (IO.select, ...) I am using "double" for timeout since it is more convenient for arithmetic like parts of thread.c. Most platforms have good FP, I think. Also, all "blocking" functions (rb_iom_wait*) will have timeout support. ./configure gains a new --with-iom=(select|epoll|kqueue) switch libkqueue: libkqueue support is incomplete; corner cases are not handled well: 1) multiple Threadlets waiting on the same FD 2) waiting for both read and write events on the same FD Bugfixes to libkqueue may be necessary to support all corner cases. Supporting these corner cases for native kqueue was challenging, even. See comments on iom_kqueue.h and iom_epoll.h for nuances. Limitations Test script I used to download a file from my server: ----8<--- require 'net/http' require 'uri' require 'digest/sha1' require 'fiber' url = 'https://80x24.org/git-i-forgot-to-pack/objects/pack/pack-97b25a76c03b489d4cbbd85b12d0e1ad28717e55.idx' uri = URI(url) use_ssl = "https" == uri.scheme fibs = 10.times.map do Threadlet.start do cur = Threadlet.current.object_id # XXX getaddrinfo() and connect() are blocking # XXX resolv/replace + connect_nonblock Net::HTTP.start(uri.host, uri.port, use_ssl: use_ssl) do |http| req = Net::HTTP::Get.new(uri) http.request(req) do |res| dig = Digest::SHA1.new res.read_body do |buf| dig.update(buf) # warn "#{cur} #{buf.bytesize}\n" end warn "#{cur} #{dig.hexdigest}\n" end end warn "done\n" :done end end warn "joining #{Time.now}\n" fibs[-1].join(4) warn "joined #{Time.now}\n" all = fibs.dup warn "1 joined, wait for the rest\n" until fibs.empty? fibs.each(&:join) fibs.keep_if(&:alive?) warn fibs.inspect end p all.map(&:value) Threadlet.new do puts 'HI' end.run.join threadlet: non-native fiber support Non-native fibers requires us to use heap allocation instead of stack allocation, so expose FIBER_USE_NATIVE via fiber.h to allow iom_*.h to fall back to heap allocation on certain platforms. --- common.mk | 9 + configure.ac | 32 + cont.c | 173 +++- fiber.h | 54 ++ include/ruby/io.h | 2 + internal.h | 5 + iom.h | 93 ++ iom_common.h | 244 +++++ iom_epoll.h | 753 ++++++++++++++ iom_internal.h | 391 ++++++++ iom_kqueue.h | 917 ++++++++++++++++++ iom_pingable_common.h | 49 + iom_select.h | 481 +++++++++ prelude.rb | 12 + process.c | 14 +- signal.c | 39 +- .../test_wait_for_single_fd.rb | 64 ++ test/lib/leakchecker.rb | 9 + test/ruby/test_threadlet.rb | 283 ++++++ thread.c | 145 ++- thread_pthread.c | 9 +- vm.c | 12 +- vm_core.h | 5 +- 23 files changed, 3679 insertions(+), 116 deletions(-) create mode 100644 fiber.h create mode 100644 iom.h create mode 100644 iom_common.h create mode 100644 iom_epoll.h create mode 100644 iom_internal.h create mode 100644 iom_kqueue.h create mode 100644 iom_pingable_common.h create mode 100644 iom_select.h create mode 100644 test/ruby/test_threadlet.rb diff --git a/common.mk b/common.mk index da9cab3c11..b4764475fb 100644 --- a/common.mk +++ b/common.mk @@ -1580,6 +1580,7 @@ cont.$(OBJEXT): {$(VPATH)}cont.c cont.$(OBJEXT): {$(VPATH)}defines.h cont.$(OBJEXT): {$(VPATH)}encoding.h cont.$(OBJEXT): {$(VPATH)}eval_intern.h +cont.$(OBJEXT): {$(VPATH)}fiber.h cont.$(OBJEXT): {$(VPATH)}gc.h cont.$(OBJEXT): {$(VPATH)}id.h cont.$(OBJEXT): {$(VPATH)}intern.h @@ -2772,11 +2773,19 @@ thread.$(OBJEXT): {$(VPATH)}defines.h thread.$(OBJEXT): {$(VPATH)}encoding.h thread.$(OBJEXT): {$(VPATH)}eval_intern.h thread.$(OBJEXT): {$(VPATH)}gc.h +thread.$(OBJEXT): {$(VPATH)}fiber.h thread.$(OBJEXT): {$(VPATH)}id.h thread.$(OBJEXT): {$(VPATH)}intern.h thread.$(OBJEXT): {$(VPATH)}internal.h thread.$(OBJEXT): {$(VPATH)}io.h thread.$(OBJEXT): {$(VPATH)}iseq.h +thread.$(OBJEXT): {$(VPATH)}iom.h +thread.$(OBJEXT): {$(VPATH)}iom_internal.h +thread.$(OBJEXT): {$(VPATH)}iom_common.h +thread.$(OBJEXT): {$(VPATH)}iom_epoll.h +thread.$(OBJEXT): {$(VPATH)}iom_kqueue.h +thread.$(OBJEXT): {$(VPATH)}iom_pingable_common.h +thread.$(OBJEXT): {$(VPATH)}iom_select.h thread.$(OBJEXT): {$(VPATH)}method.h thread.$(OBJEXT): {$(VPATH)}missing.h thread.$(OBJEXT): {$(VPATH)}node.h diff --git a/configure.ac b/configure.ac index 13065140a6..28ab2c2efe 100644 --- a/configure.ac +++ b/configure.ac @@ -982,6 +982,8 @@ AC_CHECK_HEADERS(pwd.h) AC_CHECK_HEADERS(setjmpex.h) AC_CHECK_HEADERS(stdalign.h) AC_CHECK_HEADERS(sys/attr.h) +AC_CHECK_HEADERS(sys/epoll.h) +AC_CHECK_HEADERS(sys/event.h) AC_CHECK_HEADERS(sys/fcntl.h) AC_CHECK_HEADERS(sys/file.h) AC_CHECK_HEADERS(sys/id.h) @@ -1753,6 +1755,10 @@ AC_CHECK_FUNCS(dladdr) AC_CHECK_FUNCS(dup) AC_CHECK_FUNCS(dup3) AC_CHECK_FUNCS(eaccess) +AC_CHECK_FUNCS(epoll_create) +AC_CHECK_FUNCS(epoll_create1) +AC_CHECK_FUNCS(epoll_ctl) +AC_CHECK_FUNCS(epoll_wait) AC_CHECK_FUNCS(endgrent) AC_CHECK_FUNCS(fchmod) AC_CHECK_FUNCS(fchown) @@ -1786,7 +1792,9 @@ AC_CHECK_FUNCS(initgroups) AC_CHECK_FUNCS(ioctl) AC_CHECK_FUNCS(isfinite) AC_CHECK_FUNCS(issetugid) +AC_CHECK_FUNCS(kevent) AC_CHECK_FUNCS(killpg) +AC_CHECK_FUNCS(kqueue) AC_CHECK_FUNCS(lchmod) AC_CHECK_FUNCS(lchown) AC_CHECK_FUNCS(link) @@ -2751,6 +2759,29 @@ AC_ARG_WITH(valgrind, AS_IF([test x$with_valgrind != xno], [AC_CHECK_HEADERS(valgrind/memcheck.h)]) +AC_DEFINE_UNQUOTED(IOM_SELECT, 0) +AC_DEFINE_UNQUOTED(IOM_KQUEUE, 1) +AC_DEFINE_UNQUOTED(IOM_EPOLL, 2) + +iom_default=select +AS_CASE([$ac_cv_func_kqueue:$ac_cv_func_kevent:$ac_cv_header_sys_event_h], +[yes:yes:yes], [iom_default=kqueue], +[*], + [AS_CASE( + [$ac_cv_func_epoll_wait:$ac_cv_func_epoll_create:$ac_cv_header_sys_epoll_h], + [yes:yes:yes], [iom_default=epoll])] +) + +AC_ARG_WITH(iom, + AS_HELP_STRING([--with-iom=XXXXX], + [I/O manager (select|kqueue|epoll)]), + [with_iom="$withval"], [with_iom="$iom_default"]) +AS_CASE(["$with_iom"], + [select], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_SELECT)], + [kqueue], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_KQUEUE)], + [epoll], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_EPOLL)], + [AC_MSG_ERROR(unknown I/O manager: $with_iom)]) + dln_a_out_works=no AS_IF([test "$ac_cv_header_a_out_h" = yes], [ AS_IF([test "$with_dln_a_out" = yes || test "$rb_cv_dlopen" = unknown], [ @@ -3907,6 +3938,7 @@ config_summary "install doc" "$install_doc" config_summary "man page type" "$MANTYPE" config_summary "search path" "$search_path" config_summary "static-linked-ext" ${EXTSTATIC:+"yes"} +config_summary "I/O manager" ${with_iom} echo "" echo "---" ]) diff --git a/cont.c b/cont.c index 7c8982087d..a0dbfe0dbc 100644 --- a/cont.c +++ b/cont.c @@ -9,59 +9,14 @@ **********************************************************************/ +#include "iom.h" #include "internal.h" #include "vm_core.h" +#include "fiber.h" #include "gc.h" #include "eval_intern.h" #include "mjit.h" -/* FIBER_USE_NATIVE enables Fiber performance improvement using system - * dependent method such as make/setcontext on POSIX system or - * CreateFiber() API on Windows. - * This hack make Fiber context switch faster (x2 or more). - * However, it decrease maximum number of Fiber. For example, on the - * 32bit POSIX OS, ten or twenty thousands Fiber can be created. - * - * Details is reported in the paper "A Fast Fiber Implementation for Ruby 1.9" - * in Proc. of 51th Programming Symposium, pp.21--28 (2010) (in Japanese). - */ - -#if !defined(FIBER_USE_NATIVE) -# if defined(HAVE_GETCONTEXT) && defined(HAVE_SETCONTEXT) -# if 0 -# elif defined(__NetBSD__) -/* On our experience, NetBSD doesn't support using setcontext() and pthread - * simultaneously. This is because pthread_self(), TLS and other information - * are represented by stack pointer (higher bits of stack pointer). - * TODO: check such constraint on configure. - */ -# define FIBER_USE_NATIVE 0 -# elif defined(__sun) -/* On Solaris because resuming any Fiber caused SEGV, for some reason. - */ -# define FIBER_USE_NATIVE 0 -# elif defined(__ia64) -/* At least, Linux/ia64's getcontext(3) doesn't save register window. - */ -# define FIBER_USE_NATIVE 0 -# elif defined(__GNU__) -/* GNU/Hurd doesn't fully support getcontext, setcontext, makecontext - * and swapcontext functions. Disabling their usage till support is - * implemented. More info at - * http://darnassus.sceen.net/~hurd-web/open_issues/glibc/#getcontext - */ -# define FIBER_USE_NATIVE 0 -# else -# define FIBER_USE_NATIVE 1 -# endif -# elif defined(_WIN32) -# define FIBER_USE_NATIVE 1 -# endif -#endif -#if !defined(FIBER_USE_NATIVE) -#define FIBER_USE_NATIVE 0 -#endif - #if FIBER_USE_NATIVE #ifndef _WIN32 #include @@ -174,6 +129,7 @@ struct rb_fiber_struct { * You shouldn't mix "transfer" and "resume". */ int transferred; + unsigned int is_threadlet:1; #if FIBER_USE_NATIVE #ifdef _WIN32 @@ -1774,19 +1730,30 @@ rb_fiber_terminate(rb_fiber_t *fib, int need_interrupt) fiber_switch(ret_fib, 1, &value, 0); } -VALUE -rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv) +int +rb_fiber_resumable_p(const rb_thread_t *th, const rb_fiber_t *fib) { - rb_fiber_t *fib; - GetFiberPtr(fibval, fib); + return th->root_fiber != fib && !fib->prev; +} +static void +fiber_check_resume(const rb_fiber_t *fib) +{ if (fib->prev != 0 || fib->cont.type == ROOT_FIBER_CONTEXT) { rb_raise(rb_eFiberError, "double resume"); } if (fib->transferred != 0) { rb_raise(rb_eFiberError, "cannot resume transferred Fiber"); } +} +VALUE +rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv) +{ + rb_fiber_t *fib; + GetFiberPtr(fibval, fib); + + fiber_check_resume(fib); return fiber_switch(fib, argc, argv, 1); } @@ -1956,6 +1923,103 @@ fiber_to_s(VALUE fibval) return rb_block_to_s(fibval, &proc->block, status_info); } +/* Returns true if Threadlet is enabled for current fiber */ +int +rb_threadlet_sched_p(const rb_thread_t *th) +{ + const rb_fiber_t *cur = th->ec->fiber_ptr; + + return (cur && cur->is_threadlet && th->root_fiber != cur); +} + +/* + * start running a Threadlet + */ +static VALUE +rb_threadlet_start(int argc, VALUE *argv, VALUE self) +{ + rb_thread_t *th = GET_THREAD(); + rb_fiber_t *fib; + GetFiberPtr(self, fib); + + if (th->root_fiber == fib) { + rb_raise(rb_eFiberError, "Root fiber cannot #start"); + } + if (fib->is_threadlet) { + rb_raise(rb_eFiberError, "Threadlet already started"); + } + fib->is_threadlet = 1; + fiber_check_resume(fib); + return fiber_switch(fib, argc, argv, 1); +} + +rb_thread_t * +rb_fiber_owner_thread(VALUE self) +{ + rb_fiber_t *fib; + + GetFiberPtr(self, fib); + + return rb_thread_ptr(cont_thread_value(&fib->cont)); +} + +static void +threadlet_join(rb_fiber_t *fib, struct timespec *rel) +{ + rb_thread_t *th = GET_THREAD(); + rb_fiber_t *cur = fiber_current(); + + if (cur == fib) { + rb_raise(rb_eFiberError, "Target Threadlet must not be current fiber"); + } + if (th->root_fiber == fib) { + rb_raise(rb_eFiberError, "Target Threadlet must not be root fiber"); + } + if (cont_thread_value(&fib->cont) != th->self) { + rb_raise(rb_eFiberError, "Target Threadlet not owned by current thread"); + } + if (!fib->is_threadlet) { + rb_raise(rb_eFiberError, "Target is not a Threadlet"); + } + + if (rel) { + struct timespec end; + rb_getclockofday(&end); + rb_timespec_add(&end, rel); + while (fib->status != FIBER_TERMINATED) { + rb_iom_schedule(th, rel); + if (rb_timespec_update_expire(rel, &end)) + return; + } + } + else { + while (fib->status != FIBER_TERMINATED) { + rb_iom_schedule(th, 0); + } + } +} + +static VALUE +rb_threadlet_join(int argc, VALUE *argv, VALUE self) +{ + rb_fiber_t *fib; + struct timespec ts; + + GetFiberPtr(self, fib); + threadlet_join(fib, rb_join_interval(&ts, argc, argv)); + return fib->status == FIBER_TERMINATED ? fib->cont.self : Qnil; +} + +static VALUE +rb_threadlet_value(VALUE self) +{ + rb_fiber_t *fib; + GetFiberPtr(self, fib); + + threadlet_join(fib, 0); + return fib->cont.value; +} + /* * Document-class: FiberError * @@ -1972,6 +2036,8 @@ fiber_to_s(VALUE fibval) void Init_Cont(void) { + VALUE rb_cThreadlet; + #if FIBER_USE_NATIVE rb_thread_t *th = GET_THREAD(); @@ -1993,6 +2059,11 @@ Init_Cont(void) rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1); rb_define_method(rb_cFiber, "to_s", fiber_to_s, 0); rb_define_alias(rb_cFiber, "inspect", "to_s"); + + rb_cThreadlet = rb_define_class("Threadlet", rb_cFiber); + rb_define_method(rb_cThreadlet, "start", rb_threadlet_start, -1); + rb_define_method(rb_cThreadlet, "join", rb_threadlet_join, -1); + rb_define_method(rb_cThreadlet, "value", rb_threadlet_value, 0); } RUBY_SYMBOL_EXPORT_BEGIN diff --git a/fiber.h b/fiber.h new file mode 100644 index 0000000000..3f0f5d5500 --- /dev/null +++ b/fiber.h @@ -0,0 +1,54 @@ +#ifndef RUBY_FIBER_H +#define RUBY_FIBER_H + +#include "internal.h" +#include "vm_core.h" + +/* FIBER_USE_NATIVE enables Fiber performance improvement using system + * dependent method such as make/setcontext on POSIX system or + * CreateFiber() API on Windows. + * This hack make Fiber context switch faster (x2 or more). + * However, it decrease maximum number of Fiber. For example, on the + * 32bit POSIX OS, ten or twenty thousands Fiber can be created. + * + * Details is reported in the paper "A Fast Fiber Implementation for Ruby 1.9" + * in Proc. of 51th Programming Symposium, pp.21--28 (2010) (in Japanese). + */ + +#if !defined(FIBER_USE_NATIVE) +# if defined(HAVE_GETCONTEXT) && defined(HAVE_SETCONTEXT) +# if 0 +# elif defined(__NetBSD__) +/* On our experience, NetBSD doesn't support using setcontext() and pthread + * simultaneously. This is because pthread_self(), TLS and other information + * are represented by stack pointer (higher bits of stack pointer). + * TODO: check such constraint on configure. + */ +# define FIBER_USE_NATIVE 0 +# elif defined(__sun) +/* On Solaris because resuming any Fiber caused SEGV, for some reason. + */ +# define FIBER_USE_NATIVE 0 +# elif defined(__ia64) +/* At least, Linux/ia64's getcontext(3) doesn't save register window. + */ +# define FIBER_USE_NATIVE 0 +# elif defined(__GNU__) +/* GNU/Hurd doesn't fully support getcontext, setcontext, makecontext + * and swapcontext functions. Disabling their usage till support is + * implemented. More info at + * http://darnassus.sceen.net/~hurd-web/open_issues/glibc/#getcontext + */ +# define FIBER_USE_NATIVE 0 +# else +# define FIBER_USE_NATIVE 1 +# endif +# elif defined(_WIN32) +# define FIBER_USE_NATIVE 1 +# endif +#endif +#if !defined(FIBER_USE_NATIVE) +#define FIBER_USE_NATIVE 0 +#endif + +#endif /* RUBY_FIBER_H */ diff --git a/include/ruby/io.h b/include/ruby/io.h index cde932ff32..6e2d086ba4 100644 --- a/include/ruby/io.h +++ b/include/ruby/io.h @@ -120,6 +120,8 @@ typedef struct rb_io_t { /* #define FMODE_UNIX 0x00200000 */ /* #define FMODE_INET 0x00400000 */ /* #define FMODE_INET6 0x00800000 */ +/* #define FMODE_IOM_PRIVATE1 0x01000000 */ /* OS-dependent */ +/* #define FMODE_IOM_PRIVATE2 0x02000000 */ /* OS-dependent */ #define GetOpenFile(obj,fp) rb_io_check_closed((fp) = RFILE(rb_io_taint_check(obj))->fptr) diff --git a/internal.h b/internal.h index 8388741c63..4b2041a7c5 100644 --- a/internal.h +++ b/internal.h @@ -1856,6 +1856,11 @@ int rb_thread_to_be_killed(VALUE thread); void rb_mutex_allow_trap(VALUE self, int val); VALUE rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data); VALUE rb_mutex_owned_p(VALUE self); +struct timespec *rb_join_interval(struct timespec *, int argc, VALUE *argv); +int rb_timespec_cmp(const struct timespec *, const struct timespec *); +void rb_getclockofday(struct timespec *); +void rb_timespec_add(struct timespec *, const struct timespec *); +int rb_timespec_update_expire(struct timespec *, const struct timespec *); /* thread_pthread.c, thread_win32.c */ int rb_divert_reserved_fd(int fd); diff --git a/iom.h b/iom.h new file mode 100644 index 0000000000..2d74c85f3b --- /dev/null +++ b/iom.h @@ -0,0 +1,93 @@ +/* + * iom -> I/O Manager for RubyVM (Threadlet-aware) + * + * On platforms with epoll or kqueue, this should be ready for multicore; + * even if the rest of the RubyVM is not. + * + * Some inspiration taken from Mio in GHC: + * http://haskell.cs.yale.edu/wp-content/uploads/2013/08/hask035-voellmy.pdf + */ +#ifndef RUBY_IOM_H +#define RUBY_IOM_H +#include "ruby.h" +#include "ruby/io.h" +#include "ruby/intern.h" +#include "vm_core.h" + +typedef struct rb_iom_struct rb_iom_t; +typedef uint64_t rb_hrtime_t; /* nanosecond */ + +/* WARNING: unstable API, only for Ruby internal use */ + +/* + * Note: the first "rb_thread_t *" is a placeholder and may be replaced + * with "rb_execution_context_t *" in the future. + */ + +/* + * All functions with "wait" in it take an optional +struct timespec *+ + * argument specifying the timeout. If NULL, it can wait forever until + * the event happens (or the fiber is explicitly resumed). + */ + +/* + * Relinquish calling fiber while waiting for +events+ on the given + * +rb_io_t+ + * + * Multiple native threads can enter this function at the same time. + * + * Events are RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI + * + * Returns a mask of events. + */ + +int rb_iom_waitio(rb_thread_t *, rb_io_t *, int events, + const struct timespec *); + +/* + * Identical to rb_iom_waitio, but takes a pointer to an integer file + * descriptor, instead of rb_io_t. Use rb_iom_waitio when possible, + * since it allows us to optimize epoll (and perhaps avoid kqueue + * portability bugs across different *BSDs). + */ +int rb_iom_waitfd(rb_thread_t *, int *fdp, int events, const struct timespec *); + +int rb_iom_select(rb_thread_t *, int maxfd, rb_fdset_t *r, + rb_fdset_t *w, rb_fdset_t *e, const struct timespec *); + +/* + * Relinquish calling fiber to wait for the given PID to change status. + * Multiple native threads can enter this function at the same time. + * If timeout is negative, wait forever. + */ +rb_pid_t rb_iom_waitpid(rb_thread_t *, rb_pid_t, int *status, int options, + const struct timespec *); + +/* + * Relinquish calling fiber for at least the duration of given timeout + * in seconds. If timeout is negative, wait forever (until explicitly + * resumed). + * Multiple native threads can enter this function at the same time. + */ +void rb_iom_sleep(rb_thread_t *, const struct timespec *); + +/* callback for SIGCHLD, needed to implemented for rb_iom_waitpid */ +void rb_iom_sigchld(rb_vm_t *); + +/* + * there is no public create function, creation is lazy to avoid incurring + * overhead for small scripts which do not need fibers, we only need this + * at VM destruction + */ +void rb_iom_destroy(rb_vm_t *); + +/* + * schedule + */ +void rb_iom_schedule(rb_thread_t *th, const struct timespec *rel); + +/* cont.c */ +int rb_threadlet_sched_p(const rb_thread_t *); +rb_thread_t *rb_fiber_owner_thread(VALUE); + +#endif /* RUBY_IOM_H */ diff --git a/iom_common.h b/iom_common.h new file mode 100644 index 0000000000..02142f65be --- /dev/null +++ b/iom_common.h @@ -0,0 +1,244 @@ +/* included by iom_(epoll|select|kqueue).h */ + +/* we lazily create this, small scripts may never need iom */ +static rb_iom_t * +rb_iom_new(rb_thread_t *th) +{ + rb_iom_t *iom = ALLOC(rb_iom_t); + rb_iom_init(iom); + return iom; +} + +static rb_iom_t * +rb_iom_get(rb_thread_t *th) +{ + VM_ASSERT(th && th->vm); + if (!th->vm->iom) { + th->vm->iom = rb_iom_new(th); + } + return th->vm->iom; +} + +static rb_hrtime_t +rb_iom_now(void) +{ + struct timespec ts; + + rb_getclockofday(&ts); + return rb_to_hrtime_t(&ts); +} + +/* check for expired timers */ +static void +rb_iom_timer_check(const rb_thread_t *th) +{ + rb_iom_t *iom = th->vm->iom; + if (iom && !list_empty(&iom->timers)) { + struct rb_iom_timer *t = 0, *next = 0; + rb_hrtime_t now = rb_iom_now(); + + list_for_each_safe(&iom->timers, t, next, n.tnode) { + if (now >= t->expire_at) { + struct rb_iom_waiter *w = rb_iom_waiter_of(t); + VALUE fibval = rb_iom_timer_fibval(t); + + if (w) { + list_del_init(&w->wnode); + } + list_del_init(&t->n.tnode); + /* non-Thribers may set timer in rb_iom_schedule */ + if (fibval != Qfalse) { + rb_thread_t *owner = rb_fiber_owner_thread(fibval); + list_add_tail(&owner->runq, &t->n.rnode); + } + } + return; /* done, timers is a sorted list */ + } + } +} + +/* + * insert a new +timer+ into +timers+, maintain sort order by expire_at + * TODO: more efficient timer wheel + */ +static void +rb_iom_timer_add(rb_thread_t *th, struct rb_iom_timer *add, + const struct timespec *rel, int flags) +{ + add->_fibval = flags & IOM_FIB ? rb_fiber_current() : Qfalse; + add->_fibval |= flags & IOM_WAIT ? 0 : IOM_FIBMASK; + rb_iom_timer_check(th); + + if (rel) { + rb_iom_t *iom = rb_iom_get(th); + struct rb_iom_timer *i = 0; + + add->expire_at = rb_to_hrtime_t(rel) + rb_iom_now(); + /* + * search backwards: assume typical projects have multiple objects + * sharing the same timeout values, so new timers will expire later + * than existing timers + */ + list_for_each_rev(&iom->timers, i, n.tnode) { + if (add->expire_at > i->expire_at) { + list_add_after(&iom->timers, &i->n.tnode, &add->n.tnode); + return; + } + } + list_add(&iom->timers, &add->n.tnode); + } + else { + /* not active, just allow list_del to function */ + list_node_init(&add->n.tnode); + } +} + +/* max == -1 : wake all */ +static void +rb_iom_blockers_notify(rb_iom_t *iom, int max) +{ + struct rb_iom_blocker *b = 0, *next = 0; + + list_for_each_safe(&iom->blockers, b, next, bnode) { + list_del_init(&b->bnode); + ubf_select(b->th); + if (--max == 0) { + break; + } + } +} + +/* + * TODO: consider EVFILT_PROC for kqueue and netlink+epoll on Linux; + * see the "god" RubyGem for usage examples. + * However, I doubt rb_waitpid scalability will be a problem and + * the simplicity of a single implementation for all is appealing. + */ +#ifdef HAVE_SYS_TYPES_H +# include +#endif +#ifdef HAVE_SYS_WAIT_H +# include +#endif +#if defined(WNOHANG) && WNOHANG != 0 && \ + (defined(HAVE_WAITPID) || defined(HAVE_WAIT4)) + +static VALUE +iom_schedule_pid(VALUE ptr) +{ + struct rb_iom_pid_waiter *pw = (struct rb_iom_pid_waiter *)ptr; + rb_thread_t *th = pw->th; + int nresume; + + rb_threadlet_do_yield_p(th, &nresume); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + + /* check ints above could've unlinked us and marked us ready */ + if (list_empty((struct list_head *)&pw->w.wnode)) { + list_del_init(&pw->w.timer.n.rnode); + } + else { /* pw fields will be set in another execution context */ + rb_fiber_yield(0, 0); + + if (pw->statusp) { + *pw->statusp = pw->status; + } + if (pw->pid > 0) { + rb_last_status_set(pw->status, pw->pid); + } + } + return (VALUE)pw->pid; +} + +static VALUE +iom_waitpid_done(VALUE ptr) +{ + struct rb_iom_pid_waiter *pw = (struct rb_iom_pid_waiter *)ptr; + int errnum = pw->pid == -1 ? pw->errnum : 0; + + rb_iom_waiter_done(&pw->w); + + if (!FIBER_USE_NATIVE) { + xfree(pw); + } + if (errnum) { + errno = errnum; + } + return Qfalse; +} + +rb_pid_t +rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options, + const struct timespec *timeout) +{ + rb_iom_t *iom; + struct rb_iom_pid_waiter *pw; + rb_pid_t rpid; + + VM_ASSERT((options & WNOHANG) == 0 && + "WNOHANG should be handled in rb_waitpid"); + /* + * unlike rb_iom_waitfd, typically call *waitpid before + * trying with a non-blocking operation (because users + * typically do not) + */ + rpid = rb_waitpid(pid, status, options | WNOHANG); + if (rpid != 0) { + if (rpid > 0 && status) { + rb_last_status_set(*status, rpid); + } + return rpid; + } + + iom = rb_iom_get(th); + pw = FIBER_USE_NATIVE ? ALLOCA_N(struct rb_iom_pid_waiter, 1) + : ALLOC(struct rb_iom_pid_waiter); + pw->options = options | WNOHANG; + pw->statusp = status; + pw->th = th; + pw->pid = pid; + rb_iom_timer_add(th, &pw->w.timer, timeout, IOM_FIB|IOM_WAIT); + + /* LIFO, to match Linux wait4() blocking behavior */ + list_add(&iom->pids, &pw->w.wnode); + return (pid_t)rb_ensure(iom_schedule_pid, (VALUE)pw, + iom_waitpid_done, (VALUE)pw); +} + +void +rb_iom_sigchld(rb_vm_t *vm) +{ + rb_iom_t *iom = vm->iom; + if (iom) { + struct rb_iom_waiter *w = 0, *next = 0; + size_t nr = 0; + + list_for_each_safe(&iom->pids, w, next, wnode) { + struct rb_iom_pid_waiter *pw; + pid_t r; + + pw = container_of(w, struct rb_iom_pid_waiter, w); + r = rb_waitpid(pw->pid, &pw->status, pw->options); + + if (r == 0) { + continue; + } + if (r == -1) { + pw->errnum = errno; + } + nr++; + pw->pid = r; + rb_iom_waiter_ready(&pw->w); + } + if (nr) + rb_iom_blockers_notify(iom, -1); + } +} +#else +rb_pid_t +rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options, + struct timespec *to) +{ + rb_bug("Should not get here, WNOHANG not implemented"); +} +#endif /* defined(WNOHANG) && (defined(HAVE_WAITPID) || defined(HAVE_WAIT4)) */ diff --git a/iom_epoll.h b/iom_epoll.h new file mode 100644 index 0000000000..bdd43a4824 --- /dev/null +++ b/iom_epoll.h @@ -0,0 +1,753 @@ +/* + * Linux-only epoll-based implementation of I/O Manager for RubyVM + * + * Notes: + * + * TODO: epoll_wait only has millisecond resolution; if we need higher + * resolution we can use timerfd or ppoll on the epoll_fd itself. + * + * Inside the Linux kernel, select/poll/ppoll/epoll_wait all use the + * same notification callback (struct file_operations)->poll. + * Unlike with kqueue across different *BSDs; we do not need to worry + * about inconsistencies between these syscalls. + * + * See also notes in iom_kqueue.h + */ +#include "iom_internal.h" +#include +#include /* round() */ +#define FMODE_IOM_ADDED FMODE_IOM_PRIVATE1 + +/* allocated on heap (rb_vm_t.iom) */ +struct rb_iom_struct { + /* + * Everything here is protected by GVL at this time, + * URCU lists (LGPL-2.1+) may be used in the future + */ + + /* we NEVER need to scan these, only insert + delete + empty check */ + struct list_head epws; /* -epw.w.wnode, order agnostic */ + struct list_head esets; /* -epw_set.fsw.w.wnode, order agnostic */ + + struct list_head timers; /* -rb_iom_timer.n.tnode, sort by expire_at */ + struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */ + struct rb_iom_fdmap fdmap; /* maps each FD to multiple epw */ + + int epoll_fd; + int maxevents; /* auto-increases */ + struct list_head blockers; /* -rb_iom_blocker.bnode */ +}; + +struct epw_set; + +union epfdn_as { + struct list_node fdnode; + struct { + rb_thread_t *th; + struct rb_iom_fd *fdh; + } pre_ctl; +}; + +struct epfdn { + union epfdn_as as; + union { + int *flags; /* &fptr->mode */ + struct epw_set *set; + } owner; +}; + +/* + * Not using rb_iom_fd_waiter here, since we never need to reread the + * FD on this implementation. + * Allocated on stack + */ +struct epw { + struct epfdn fdn; + struct rb_iom_waiter w; + int fd; /* no need for "int *", here, we never reread */ + short events; /* requested events, like poll(2) */ + short revents; /* returned events, like poll(2) */ +}; + +/* allocated on-stack */ +struct epw_set { + struct rb_iom_fdset_waiter fsw; + st_table *tbl; /* after: fd -> epw_set_item */ + uint8_t idx; +}; + +/* + * per-FD in rb_thread_fd_select arg, value for epw_set.fdn.as.tbl, + * allocated on-heap + */ +struct epw_set_item { + struct epfdn fdn; + int fd; + short events; /* requested events, like poll(2) */ + short revents; /* returned events, like poll(2) */ +}; + +static void +increase_maxevents(rb_iom_t *iom, int retries) +{ + /* 1024 is the RUBY_ALLOCV_LIMIT on such systems */ + const int max_alloca = 1024 / sizeof(struct epoll_event); + const int max = max_alloca * 2; + + if (retries) { + iom->maxevents *= retries; + if (iom->maxevents > max || iom->maxevents <= 0) { + iom->maxevents = max; + } + } +} + +static int +timespec2msec(const struct timespec *ts) +{ + /* + * clamp timeout to workaround a Linux <= 2.6.37 bug, + * see epoll_wait(2) manpage + */ + const time_t max_sec = 35 * 60; + const int max_msec = max_sec * 1000; /* floor(35.79 minutes) */ + + if (!ts) { /* infinite */ + return -1; + } + else if (ts->tv_sec < 0) { /* bug of caller */ + return 0; + } + else { + long msec = ts->tv_sec * 1000; + + if (msec < ts->tv_sec || msec > max_msec) { + return max_msec; + } + + /* + * round up to avoid busy waiting, the rest of the code uses + * nanosecond resolution and we risk wasting cycles looping + * on epoll_wait(..., timeout=0) calls w/o rounding up, here: + */ + msec += (ts->tv_nsec + 500000) / 1000000L; + + return msec > max_msec ? max_msec : (int)msec; + } +} + +/* we can avoid branches when mapping RB_WAIT_* bits to EPOLL* bits */ +STATIC_ASSERT(epbits_matches_waitfd_bits, + RB_WAITFD_IN == EPOLLIN && RB_WAITFD_OUT == EPOLLOUT && + RB_WAITFD_PRI == EPOLLPRI); + +/* what goes into epoll_ctl... */ +static int +rb_events2ep(int events) +{ + return EPOLLONESHOT | events; +} + +/* ...what comes out of epoll_wait */ +static short +rb_ep2revents(int revents) +{ + return (short)(revents & (EPOLLIN|EPOLLOUT|EPOLLPRI)); +} + +/* lazily create epoll FD, since not everybody waits on I/O */ +static int +iom_epfd(rb_iom_t *iom) +{ + if (iom->epoll_fd < 0) { +#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) + iom->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (iom->epoll_fd < 0) { + int err = errno; + if (rb_gc_for_fd(err)) { + iom->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (iom->epoll_fd < 0) { + rb_sys_fail("epoll_create1"); + } + } + else if (err != ENOSYS) { + rb_syserr_fail(err, "epoll_create1"); + } + else { /* libc >= kernel || build-env > run-env */ +#endif /* HAVE_EPOLL_CREATE1 */ + iom->epoll_fd = epoll_create(1); + if (iom->epoll_fd < 0) { + if (rb_gc_for_fd(errno)) { + iom->epoll_fd = epoll_create(1); + } + } + if (iom->epoll_fd < 0) { + rb_sys_fail("epoll_create"); + } + rb_maygvl_fd_fix_cloexec(iom->epoll_fd); +#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) + } + } +#endif /* HAVE_EPOLL_CREATE1 */ + rb_update_max_fd(iom->epoll_fd); + } + return iom->epoll_fd; +} + +static void +rb_iom_init(rb_iom_t *iom) +{ + list_head_init(&iom->timers); + list_head_init(&iom->epws); + list_head_init(&iom->esets); + list_head_init(&iom->pids); + list_head_init(&iom->blockers); + iom->maxevents = 8; + iom->epoll_fd = -1; + rb_iom_fdmap_init(&iom->fdmap); +} + +static void +rearm_fd(rb_thread_t *th, struct rb_iom_fd *fdh) +{ + int fd = -1; + struct epoll_event ev; + struct epw_set_item *esi; + struct epw *epw; + union epfdn_as *as; + + ev.events = EPOLLONESHOT; + ev.data.ptr = fdh; + list_for_each(&fdh->fdhead, as, fdnode) { + struct epfdn *fdn = container_of(as, struct epfdn, as); + + if (fdn->owner.set) { + esi = container_of(fdn, struct epw_set_item, fdn); + fd = esi->fd; + ev.events |= rb_events2ep(esi->events); + } + else { + epw = container_of(fdn, struct epw, fdn); + fd = epw->fd; + ev.events |= rb_events2ep(epw->events); + } + } + if (epoll_ctl(th->vm->iom->epoll_fd, EPOLL_CTL_MOD, fd, &ev) != 0) { + rb_sys_fail("epoll_ctl"); + } +} + +static void +check_epoll_wait(rb_thread_t *th, int nr, struct epoll_event *ev) +{ + if (nr >= 0) { + int i; + + for (i = 0; i < nr; i++) { + struct rb_iom_fd *fdh = ev[i].data.ptr; + short revents = rb_ep2revents(ev[i].events); + struct epw_set_item *esi; + struct epw *epw; + union epfdn_as *as, *next; + + /* + * Typical list size is 1; only multiple fibers waiting + * on the same FD increases fdh list size + */ + list_for_each_safe(&fdh->fdhead, as, next, fdnode) { + struct epfdn *fdn = container_of(as, struct epfdn, as); + struct epw_set *eset = fdn->owner.set; + + if (eset) { + esi = container_of(fdn, struct epw_set_item, fdn); + esi->revents = esi->events & revents; + if (esi->revents) { + int ret = eset->fsw.ret++; + list_del_init(&fdn->as.fdnode); + if (!ret) { + rb_iom_waiter_ready(&eset->fsw.w); + } + } + } + else { + epw = container_of(fdn, struct epw, fdn); + epw->revents = epw->events & revents; + if (epw->revents) { + list_del_init(&fdn->as.fdnode); + rb_iom_waiter_ready(&epw->w); + } + } + } + if (RB_UNLIKELY(!list_empty(&fdh->fdhead))) { + rearm_fd(th, fdh); + } + } + + /* notify the waiter thread in case we enqueued fibers for them */ + if (nr > 0) { + rb_iom_blockers_notify(th->vm->iom, -1); + } + } + else { + int err = errno; + if (err != EINTR) { + rb_syserr_fail(err, "epoll_wait"); + } + } + rb_iom_timer_check(th); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); +} + +/* perform a non-blocking epoll_wait while holding GVL */ +static void +ping_events(rb_thread_t *th, struct epw_set *eset) +{ + rb_iom_t *iom = th->vm->iom; + int epfd = iom ? iom->epoll_fd : -1; + + if (epfd >= 0) { + VALUE v; + int nr; + int maxevents = iom->maxevents; + struct epoll_event *ev = ALLOCV_N(struct epoll_event, v, maxevents); + int retries = 0; + + do { + nr = epoll_wait(epfd, ev, maxevents, 0); + check_epoll_wait(th, nr, ev); + } while (list_empty(&th->runq) && nr == maxevents && ++retries); + if (v) { + ALLOCV_END(v); + } + increase_maxevents(iom, retries); + } +} + +static int +ep_events_pending(const rb_iom_t *iom) +{ + if (!list_empty(&iom->epws)) return 1; + if (!list_empty(&iom->esets)) return 1; + if (!list_empty(&iom->pids)) return 1; + return 0; +} + +/* for iom_pingable_common.h */ +static void +rb_iom_do_wait(rb_thread_t *th, rb_iom_t *iom) +{ + int maxevents = iom->maxevents; + int nr = maxevents; + struct timespec ts; + struct timespec *rel; + int msec; + + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + rel = list_empty(&th->runq) ? rb_iom_next_timeout(&ts, &iom->timers) : 0; + msec = timespec2msec(rel); + + if (msec != 0) { + VALUE v; + int epfd = iom_epfd(th->vm->iom); /* may raise */ + struct epoll_event *ev = ALLOCV_N(struct epoll_event, v, maxevents); + struct rb_iom_blocker cur; + + VM_ASSERT(epfd >= 0); + cur.th = th; + list_add_tail(&iom->blockers, &cur.bnode); + nr = 0; + BLOCKING_REGION(th, { + nr = epoll_wait(epfd, ev, maxevents, msec); + }, ubf_select, th, TRUE); + list_del(&cur.bnode); + check_epoll_wait(th, nr, ev); + if (v) { + ALLOCV_END(v); + } + } + if (nr == maxevents && ep_events_pending(iom)) { /* || msec == 0 */ + ping_events(th, 0); + } +} + +static void +merge_events(struct epoll_event *ev, struct rb_iom_fd *fdh) +{ + union epfdn_as *as; + + list_for_each(&fdh->fdhead, as, fdnode) { + struct epfdn *fdn = container_of(as, struct epfdn, as); + + if (fdn->owner.set) { + struct epw_set_item *esi; + + esi = container_of(fdn, struct epw_set_item, fdn); + ev->events |= rb_events2ep(esi->events); + } + else { + struct epw *epw = container_of(fdn, struct epw, fdn); + ev->events |= rb_events2ep(epw->events); + } + } +} + +static void +epoll_ctl_or_raise(rb_thread_t *th, struct epw *epw) +{ + int e; + int epfd; + struct epoll_event ev; + int *flags = epw->fdn.owner.flags; + + epw->fdn.owner.flags = 0; /* ensure it's not confused with owner.set */ + + /* we cannot raise until list_add: */ + { + struct rb_iom_fd *fdh = epw->fdn.as.pre_ctl.fdh; + + ev.data.ptr = fdh; + ev.events = rb_events2ep(epw->events); + /* + * merge events from other threads/fibers waiting on the same + * [ descriptor (int fd), description (struct file *) ] tuplet + */ + if (!list_empty(&fdh->fdhead)) { /* uncommon, I hope... */ + merge_events(&ev, fdh); + } + list_add(&fdh->fdhead, &epw->fdn.as.fdnode); + } + + epfd = iom_epfd(th->vm->iom); /* may raise */ + + /* we want to track if an FD is already being watched ourselves */ + if (flags) { + if (*flags & FMODE_IOM_ADDED) { /* ideal situation */ + e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev); + } + else { + e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev); + if (e == 0) { + *flags |= FMODE_IOM_ADDED; + } + else if (e < 0 && errno == EEXIST) { + /* + * possible EEXIST if several fptrs point to the same FD: + * f1 = Fiber.start { io1.read(1) } + * io2 = IO.for_fd(io1.fileno) + * f2 = Fiber.start { io2.read(1) } + */ + *flags |= FMODE_IOM_ADDED; + e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev); + } + } + } + else { /* don't know if added or not, fall back to add on ENOENT */ + e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev); + if (e < 0 && errno == ENOENT) { + e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev); + } + } + if (e < 0) { + e = errno; + if (e == EPERM) { /* all ready if File or Dir */ + epw->revents = epw->events; + } + else { + rb_syserr_fail(e, "epoll_ctl"); + } + } +} + +static VALUE +epmod_yield(VALUE ptr) +{ + /* we must have no posibility of raising until list_add: */ + struct epw *epw = (struct epw *)ptr; + rb_thread_t *th = epw->fdn.as.pre_ctl.th; + int nresume; + + if (epw->fd < 0) { /* TODO: behave like poll(2) and sleep? */ + return (VALUE)0; + } + + /* may raise on OOM: */ + epw->fdn.as.pre_ctl.fdh = rb_iom_fd_get(&th->vm->iom->fdmap, epw->fd); + + epoll_ctl_or_raise(th, epw); + ping_events(th, 0); + (void)rb_threadlet_do_yield_p(th, &nresume); + if (epw->revents) { + list_del_init(&epw->w.timer.n.rnode); + } + else { /* revents will be set in another execution context */ + rb_fiber_yield(0, 0); + } + return (VALUE)epw->revents; /* may be zero if timed out */ +} + +static VALUE +epw_done(VALUE ptr) +{ + struct epw *epw = (struct epw *)ptr; + + list_del(&epw->fdn.as.fdnode); + rb_iom_waiter_done(&epw->w); + + if (!FIBER_USE_NATIVE) { + xfree(epw); + } + return Qfalse; +} + +static int +iom_waitfd(rb_thread_t *th, int fd, int *flags, int events, + const struct timespec *rel) +{ + rb_iom_t *iom = rb_iom_get(th); + struct epw *epw = FIBER_USE_NATIVE ? ALLOCA_N(struct epw, 1) + : ALLOC(struct epw); + + /* unlike kqueue or select, we never need to reread fd */ + epw->fd = fd; + epw->fdn.as.pre_ctl.th = th; + epw->fdn.owner.flags = flags; + /* + * if we did not have GVL, revents may be set immediately + * upon epoll_ctl by another thread running epoll_wait, + * so we must initialize it before epoll_ctl: + */ + epw->revents = 0; + epw->events = (short)events; + + list_add(&iom->epws, &epw->w.wnode); + rb_iom_timer_add(th, &epw->w.timer, rel, IOM_FIB|IOM_WAIT); + + return (int)rb_ensure(epmod_yield, (VALUE)epw, epw_done, (VALUE)epw); +} + +int +rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, + const struct timespec *rel) +{ + return iom_waitfd(th, fptr->fd, &fptr->mode, events, rel); +} + +int +rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, const struct timespec *rel) +{ + return iom_waitfd(th, *fdp, 0, events, rel); +} + +static const short idx2events[] = { EPOLLIN, EPOLLOUT, EPOLLPRI }; + +static int +eset_upd(st_data_t *key, st_data_t *v, st_data_t arg, int existing) +{ + int fd = (int)*key; + struct epw_set *eset = (struct epw_set *)arg; + short events = idx2events[eset->idx]; + struct epw_set_item **val = (struct epw_set_item **)v; + struct epw_set_item *esi; + struct epoll_event ev; + int epfd = iom_epfd(eset->fsw.th->vm->iom); /* may raise */ + struct rb_iom_fd *fdh = rb_iom_fd_get(&eset->fsw.th->vm->iom->fdmap, fd); + + ev.data.ptr = fdh; + ev.events = rb_events2ep(events); + if (existing) { + /* + * this happens when an FD is in two fdsets for a single + * rb_thread_fd_select call. Does not happen if different + * Fibers call rb_thread_fd_select on the same FD. + */ + esi = *val; + esi->events |= events; + merge_events(&ev, fdh); + } + else { + esi = ALLOC(struct epw_set_item); + *val = esi; + esi->fd = fd; + esi->revents = 0; + esi->events = events; + esi->fdn.owner.set = eset; + if (!list_empty(&fdh->fdhead)) { + /* + * happens when different Fibers call rb_thread_fd_select + * or rb_wait_for_single_fd on the same FD + */ + merge_events(&ev, fdh); + } + list_add(&fdh->fdhead, &esi->fdn.as.fdnode); + } + if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) != 0) { + int e = errno; + + if (e == ENOENT) { + if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) == 0) { + return ST_CONTINUE; + } + e = errno; + } + if (e == EPERM) { + esi->revents = esi->events; + eset->fsw.ret++; + } + else { + *val = 0; + list_del_init(&esi->fdn.as.fdnode); + xfree(esi); + rb_syserr_fail(e, "epoll_ctl"); + } + } + return ST_CONTINUE; +} + +static VALUE +epfdset_yield(VALUE ptr) +{ + struct epw_set *eset = (struct epw_set *)ptr; + int nresume; + eset->tbl = st_init_numtable(); /* may raise */ + + /* must not call epoll_wait while this loop is running: */ + for (eset->idx = 0; eset->idx < 3; eset->idx++) { + const rb_fdset_t *src = eset->fsw.in[eset->idx]; + + if (src) { + int max = rb_fd_max(src); + int fd; + + for (fd = 0; fd < max; fd++) { + if (rb_fd_isset(fd, src)) { + st_update(eset->tbl, (st_data_t)fd, + eset_upd, (st_data_t)eset); + } + } + } + } + /* OK to call epoll_wait, now: */ + + ping_events(eset->fsw.th, 0); + (void)rb_threadlet_do_yield_p(eset->fsw.th, &nresume); + if (eset->fsw.ret) { + list_del_init(&eset->fsw.w.timer.n.rnode); + } + else { /* fsw.ret will be set in another execution context */ + rb_fiber_yield(0, 0); + } + return (VALUE)eset->fsw.ret; +} + +static rb_fdset_t * +fd_init_once(rb_fdset_t **dst, rb_fdset_t *src) +{ + if (!*dst) { + rb_fd_init(src); + *dst = src; + } + return *dst; +} + +static int +eset_term(st_data_t key, st_data_t val, st_data_t ptr) +{ + struct epw_set *eset = (struct epw_set *)ptr; + int fd = (int)key; + struct epw_set_item *esi = (struct epw_set_item *)val; + + if (esi) { + struct rb_iom_fdset_waiter *fsw = &eset->fsw; + + list_del(&esi->fdn.as.fdnode); + for (eset->idx = 0; eset->idx < 3; eset->idx++) { + if (esi->revents & idx2events[eset->idx]) { + rb_fd_set(fd, + fd_init_once(&fsw->out[eset->idx], &fsw->sets[eset->idx])); + } + } + xfree(esi); + } + + return ST_DELETE; +} + +static VALUE +epfdset_done(VALUE ptr) +{ + struct epw_set *eset = (struct epw_set *)ptr; + + if (eset->tbl) { + st_foreach(eset->tbl, eset_term, (VALUE)eset); + st_free_table(eset->tbl); + } + + list_del(&eset->fsw.w.timer.n.tnode); + list_del(&eset->fsw.w.wnode); + for (eset->idx = 0; eset->idx < 3; eset->idx++) { + rb_fdset_t *orig = eset->fsw.in[eset->idx]; + rb_fdset_t *res = eset->fsw.out[eset->idx]; + + if (res) { + rb_fd_dup(orig, res); + rb_fd_term(res); + } + } + if (!FIBER_USE_NATIVE) { + xfree(eset); + } + return Qfalse; +} + +int +rb_iom_select(rb_thread_t *th, int maxfd, + rb_fdset_t *r, rb_fdset_t *w, rb_fdset_t *e, + const struct timespec *rel) +{ + rb_iom_t *iom = rb_iom_get(th); + struct epw_set *eset = FIBER_USE_NATIVE ? ALLOCA_N(struct epw_set, 1) + : ALLOC(struct epw_set); + eset->tbl = 0; + rb_iom_fdset_waiter_init(th, &eset->fsw, maxfd, r, w, e); + list_add(&iom->esets, &eset->fsw.w.wnode); + rb_iom_timer_add(th, &eset->fsw.w.timer, rel, IOM_FIB|IOM_WAIT); + + return (int)rb_ensure(epfdset_yield, (VALUE)eset, epfdset_done, (VALUE)eset); +} + +void +rb_iom_destroy(rb_vm_t *vm) +{ + rb_iom_t *iom = vm->iom; + vm->iom = 0; + if (iom) { + /* + * it's possible; but crazy to share epoll FDs across processes + * (kqueue has a rather unique close-on-fork behavior) + */ + if (iom->epoll_fd >= 0) { + close(iom->epoll_fd); + } + rb_iom_fdmap_destroy(&iom->fdmap); + xfree(iom); + } +} + +/* used by thread.c::rb_thread_atfork */ +static void +rb_iom_atfork_child(rb_thread_t *th) +{ + rb_iom_destroy(th->vm); +} + +/* used by thread_pthread.c */ +static int +rb_iom_reserved_fd(int fd) +{ + rb_iom_t *iom = GET_VM()->iom; + + return iom && fd == iom->epoll_fd; +} + +#include "iom_pingable_common.h" +#include "iom_common.h" diff --git a/iom_internal.h b/iom_internal.h new file mode 100644 index 0000000000..b66009fca2 --- /dev/null +++ b/iom_internal.h @@ -0,0 +1,391 @@ +#ifndef RB_IOM_COMMON_H +#define RB_IOM_COMMON_H + +#include "internal.h" +#include "iom.h" + +/* + * FIBER_USE_NATIVE (see cont.c) allows us to read machine stacks of + * yielded (stopped) fibers. Without native fiber support, cont.c needs + * to copy fiber stacks around, so we must use slower heap allocation + * instead of stack allocation (and increase our chance of hitting + * memory leak bugs) + */ +#include "fiber.h" + +/* cont.c */ +void rb_threadlet_enqueue(VALUE fibval); + +#define FMODE_IOM_PRIVATE1 0x01000000 +#define FMODE_IOM_PRIVATE2 0x02000000 + +#define IOM_FIBMASK ((VALUE)0x1) +#define IOM_FIB (0x2) +#define IOM_WAIT (0x1) /* container_of(..., struct rb_iom_waiter, timer) */ + +/* + * fdmap is a singleton. + * + * It makes zero sense to have multiple fdmaps per-process; even less so + * than multiple ioms. The file descriptor table in POSIX is per-process; + * and POSIX requires syscalls to allocate the lowest available FD. + * This is also why we use an array instead of a hash table, as there will + * be no holes for big processes. + * + * If contention becomes a problem, we can pad (struct rb_iom_fd) to + * 64-bytes for cache alignment. + * + * Currently we use fdmap to deal with FD aliasing with epoll + * and kqueue interfaces.. FD aliasing happens when multiple + * Fibers wait on the same FD; but epoll/kqueue APIs only allow + * registering a single data pointer per FD. + * + * In the future, we may implement rb_notify_fd_close using fdmap. + */ + +/* linear growth */ +#define RB_IOM_FD_PER_HEAP 64 +/* on-heap and persistent for process lifetime keep as small as possible. */ +struct rb_iom_fd { + struct list_head fdhead; /* -kev.(rfdnode|wfdnode), epw.fdnode */ +}; + +/* singleton (per-rb_iom_t, or per process, if we ever need > 1 iom) */ +struct rb_iom_fdmap { + struct rb_iom_fd **map; + unsigned int heaps; + int max_fd; +}; + +/* allocated on stack */ +/* Every Threadlet has this on stack */ +struct rb_iom_timer { + union { + struct list_node rnode; /* <=> rb_thread_t.runq */ + struct list_node tnode; /* <=> rb_iom_struct.timers */ + } n; + rb_hrtime_t expire_at; /* absolute monotonic time */ + VALUE _fibval; +}; + +/* common waiter struct for waiting fds and pids */ +struct rb_iom_waiter { + struct rb_iom_timer timer; + struct list_node wnode; /* <=> rb_iom_struct.(fds|pids) */ +}; + +/* for rb_wait_for_single_fd: */ +struct rb_iom_fd_waiter { + struct rb_iom_waiter w; /* w.wnode - iom->fds */ +#if FIBER_USE_NATIVE + int *fdp; /* (ideally), a pointer fptr->fd to detect closed FDs */ +#else + int fd; /* need to do full copy with non-native fiber */ +#endif + short events; /* requested events, like poll(2) */ + short revents; /* returned events, like poll(2) */ +}; + +/* for rb_thread_fd_select: */ +struct rb_iom_fdset_waiter { + struct rb_iom_waiter w; /* w.wnode - iom->fdsets */ + rb_thread_t *th; + int max; + int ret; + rb_fdset_t *in[3]; + + /* + * without native fibers, other threads and fibers cannot safely + * read the stacks of stopped fibers + */ +#if FIBER_USE_NATIVE == 0 && RUBYVM_IOM == IOM_SELECT + rb_fdset_t *onstack_dst[3]; +#endif + rb_fdset_t *out[3]; + rb_fdset_t sets[3]; +}; + +struct rb_iom_pid_waiter { + struct rb_iom_waiter w; /* w.wnode - iom->pids */ + rb_thread_t *th; + int *statusp; /* points to on-stack destination */ + /* same pid, status, options same as waitpid(2) */ + rb_pid_t pid; + int status; + int options; + int errnum; +}; + +/* threads sleeping in select, epoll_wait or kevent w/o GVL; on stack */ +struct rb_iom_blocker { + rb_thread_t *th; + struct list_node bnode; /* -iom->blockers */ +}; + +static const struct timespec zero; + +static rb_hrtime_t +rb_to_hrtime_t(const struct timespec *ts) +{ + return (rb_hrtime_t)ts->tv_sec * 1000000000UL + ts->tv_nsec; +} + +static struct timespec * +rb_to_timespec(struct timespec *ts, rb_hrtime_t nsec) +{ + ts->tv_sec = nsec / 1000000000UL; + ts->tv_nsec = nsec % 1000000000UL; + return ts; +} + +#if (RUBYVM_IOM == IOM_KQUEUE || RUBYVM_IOM == IOM_EPOLL) +/* TODO: IOM_SELECT may use this for rb_notify_fd_close */ +static struct rb_iom_fd * +iom_fdhead_aref(struct rb_iom_fdmap *fdmap, int fd) +{ + VM_ASSERT(fd >= 0); + return &fdmap->map[fd / RB_IOM_FD_PER_HEAP][fd % RB_IOM_FD_PER_HEAP]; +} + +static struct rb_iom_fd * +rb_iom_fd_get(struct rb_iom_fdmap *fdmap, int fd) +{ + while (fd >= fdmap->max_fd) { + struct rb_iom_fd *base, *h; + unsigned n = fdmap->heaps + 1; + unsigned i; + + fdmap->map = xrealloc2(fdmap->map, n, sizeof(struct rb_iom_fd *)); + base = h = ALLOC_N(struct rb_iom_fd, RB_IOM_FD_PER_HEAP); + for (i = 0; i < RB_IOM_FD_PER_HEAP; i++) { + list_head_init(&h->fdhead); + h++; + } + fdmap->map[fdmap->heaps++] = base; + fdmap->max_fd += RB_IOM_FD_PER_HEAP; + } + return iom_fdhead_aref(fdmap, fd); +} + +static void +rb_iom_fdmap_init(struct rb_iom_fdmap *fdmap) +{ + fdmap->max_fd = 0; + fdmap->heaps = 0; + fdmap->map = 0; +} + +static void +rb_iom_fdmap_destroy(struct rb_iom_fdmap *fdmap) +{ + unsigned n; + + for (n = 0; n < fdmap->heaps; n++) { + xfree(fdmap->map[n]); + } + xfree(fdmap->map); + rb_iom_fdmap_init(fdmap); +} +#endif /* (RUBYVM_IOM == IOM_KQUEUE || RUBYVM_IOM == IOM_EPOLL) */ + +static VALUE +rb_iom_timer_fibval(const struct rb_iom_timer *t) +{ + return t->_fibval & ~IOM_FIBMASK; +} + +static struct rb_iom_waiter * +rb_iom_waiter_of(struct rb_iom_timer *t) +{ + if (t->_fibval & IOM_FIBMASK) { + return 0; + } + return container_of(t, struct rb_iom_waiter, timer); +} + +/* + * returns the timespec interval to wait for the next pending event + * returns NULL if we may wait indefinitely + */ +static struct timespec * +rb_iom_next_timeout(struct timespec *rel, struct list_head *timers) +{ + struct rb_iom_timer *t = list_top(timers, struct rb_iom_timer, n.tnode); + + if (t) { + struct timespec ts; + + if (rb_timespec_update_expire(rel, rb_to_timespec(&ts, t->expire_at))) { + /* force an immediate return from epoll_wait/kevent if expired */ + *rel = zero; + } + return rel; + } + else { + return 0; + } +} + +static void rb_iom_timer_check(const rb_thread_t *); +static void rb_iom_timer_add(rb_thread_t *, struct rb_iom_timer *, + const struct timespec *to, int flags); + +static VALUE +rb_iom_timer_done(VALUE ptr) +{ + struct rb_iom_timer *t = (struct rb_iom_timer *)ptr; + + list_del(&t->n.tnode); + if (!FIBER_USE_NATIVE) { + xfree(t); + } + return Qfalse; +} + +static void +rb_iom_timer_do(rb_thread_t *th, const struct timespec *rel, + VALUE (*fn)(ANYARGS), void *arg) +{ + struct rb_iom_timer *t = FIBER_USE_NATIVE ? ALLOCA_N(struct rb_iom_timer, 1) + : ALLOC(struct rb_iom_timer); + rb_iom_timer_add(th, t, rel, 0); + rb_ensure(fn, (VALUE)arg, rb_iom_timer_done, (VALUE)t); +} + +static void +rb_iom_waiter_ready(struct rb_iom_waiter *w) +{ + VALUE fibval = rb_iom_timer_fibval(&w->timer); + + list_del_init(&w->wnode); + list_del_init(&w->timer.n.tnode); + if (fibval != Qfalse) { + rb_thread_t *owner = rb_fiber_owner_thread(fibval); + list_add_tail(&owner->runq, &w->timer.n.rnode); + } +} + +static void +rb_iom_waiter_done(struct rb_iom_waiter *w) +{ + list_del(&w->timer.n.tnode); + list_del(&w->wnode); +} + +/* cont.c */ +int rb_fiber_resumable_p(const rb_thread_t *, const rb_fiber_t *); + +/* + * resume all "ready" fibers belonging to a given thread + * stop when a fiber has not yielded, yet. + */ +static int +rb_threadlet_do_yield_p(rb_thread_t *th, int *nresume) +{ + rb_fiber_t *cur = rb_threadlet_sched_p(th) ? th->ec->fiber_ptr : 0; + struct rb_iom_timer *t = 0, *next = 0; + VALUE v; + int n = 0; + int ret = 0; + struct list_head *tmp; + + tmp = FIBER_USE_NATIVE ? ALLOCA_N(struct list_head, 1) : 0; + if (!tmp) { /* should be embed */ + v = rb_str_tmp_new((long)sizeof(struct list_head)); + tmp = (struct list_head *)RSTRING_PTR(v); + } + + list_head_init(tmp); + *nresume = 0; + + /* + * do not infinite loop as new fibers get added to + * th->runq, only work off a temporary list: + */ + list_append_list(tmp, &th->runq); + list_for_each_safe(tmp, t, next, n.rnode) { + VALUE fibval = rb_iom_timer_fibval(t); + rb_fiber_t *fib = RTYPEDDATA_DATA(fibval); + + if (fib == cur || !rb_fiber_resumable_p(th, fib)) { + /* tell the caller to yield */ + list_prepend_list(&th->runq, tmp); + ret = 1; + goto out; + } + n++; + rb_fiber_resume(fibval, 0, 0); + } +out: + if (!FIBER_USE_NATIVE) { + rb_str_resize(v, 0); + rb_gc_force_recycle(v); + } + *nresume = n; + return ret; +} + +static void +rb_iom_fdset_waiter_init(rb_thread_t *th, struct rb_iom_fdset_waiter *fsw, + int maxfd, rb_fdset_t *r, rb_fdset_t *w, rb_fdset_t *e) +{ + fsw->th = th; + fsw->ret = 0; + fsw->max = maxfd; + assert(fsw->max > 0); + fsw->in[0] = r; + fsw->in[1] = w; + fsw->in[2] = e; + memset(fsw->out, 0, sizeof(fsw->out)); + +#if FIBER_USE_NATIVE == 0 && RUBYVM_IOM == IOM_SELECT + { + int i; + for (i = 0; i < 3; i++) { + if (fsw->in[i]) { + fsw->onstack_dst[i] = fsw->in[i]; + fsw->in[i] = ALLOC(rb_fdset_t); + rb_fd_init_copy(fsw->in[i], fsw->onstack_dst[i]); + } + } + } +#endif +} + +/* XXX: is this necessary? */ +void +rb_iom_mark_runq(const rb_thread_t *th) +{ + struct rb_iom_timer *t = 0; + + list_for_each(&th->runq, t, n.rnode) { + rb_gc_mark(rb_iom_timer_fibval(t)); + } +} + +static inline int +rb_iom_fdw_get_fd(struct rb_iom_fd_waiter *fdw) +{ +#if FIBER_USE_NATIVE + return *fdw->fdp; +#else + return fdw->fd; +#endif +} + +static inline void +rb_iom_fdw_init(struct rb_iom_fd_waiter *fdw, int *fdp, int events) +{ +#if FIBER_USE_NATIVE + fdw->fdp = fdp; +#else + fdw->fd = *fdp; +#endif + fdw->events = (short)events; + fdw->revents = 0; +} + +static rb_iom_t *rb_iom_get(rb_thread_t *); +static void rb_iom_blockers_notify(rb_iom_t *, int max); + +#endif /* IOM_COMMON_H */ diff --git a/iom_kqueue.h b/iom_kqueue.h new file mode 100644 index 0000000000..0ce8365813 --- /dev/null +++ b/iom_kqueue.h @@ -0,0 +1,917 @@ +/* + * kqueue-based implementation of I/O Manager for RubyVM on *BSD + * + * The kevent API has an advantage over epoll_ctl+epoll_wait since + * it can simultaneously add filters and check for events with one + * syscall. It also allows EV_ADD to be used idempotently for + * enabling filters, where as epoll_ctl requires separate ADD and + * MOD operations. + * + * These are advantages in the common case... + * + * The epoll API has advantages in more esoteric cases: + * + * epoll has the advantage over kqueue when watching for multiple + * events (POLLIN|POLLOUT|POLLPRI) (which is rare). We also have + * to install two kevent filters to watch POLLIN|POLLOUT simutaneously. + * See udata_set/udata_get functions below for more on this. + * + * Finally, kevent does not support POLLPRI directly, we need to use + * select() (or perhaps poll() on some platforms) with a zero + * timeout to check for POLLPRI after EVFILT_READ returns. + * + * Finally, several *BSDs implement kqueue; and the quality of each + * implementation may vary. Anecdotally, *BSDs are not known to even + * support poll() consistently across different types of files. + * We will need to selective and careful about injecting them into + * kevent(). + */ +#include "iom_internal.h" + +/* LIST_HEAD (via ccan/list) conflicts with sys/queue.h (via sys/event.h) */ +#undef LIST_HEAD +#include +#include +#include + +/* We need to use EVFILT_READ to watch RB_WAITFD_PRI */ +#define WAITFD_READ (RB_WAITFD_IN|RB_WAITFD_PRI) + +/* allocated on heap (rb_vm_t.iom) */ +struct rb_iom_struct { + /* + * Everything here is protected by GVL at this time, + * URCU lists (LGPL-2.1+) may be used in the future + */ + + /* we NEVER need to scan kevs , only insert + delete + empty check */ + struct list_head kevs; /* -kev.fdw.w.wnode, order agnostic */ + struct list_head ksets; /* -kqw_set.fsw.w.wnode, order agnostic */ + + struct list_head timers; /* -rb_iom_timer.n.tnode, sort by expire_at */ + struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */ + struct rb_iom_fdmap rfdmap; /* holds fdh for EVFILT_READ */ + struct rb_iom_fdmap wfdmap; /* holds fdh for EVFILT_WRITE */ + + int kqueue_fd; + int nevents; /* auto-increases */ + struct list_head blockers; /* -rb_iom_blocker.bnode */ +}; + +struct kqw_set; +struct kqfdn { + /* + * both rfdnode and wfdnode are overloaded for deleting paired + * filters when watching both EVFILT_READ and EVFILT_WRITE on + * a single FD + */ + struct list_node rfdnode; /* -(ev.udata==fdh)->fdhead (EVFILT_READ) */ + struct list_node wfdnode; /* -(ev.udata==fdh)->fdhead (EVFILT_WRITE) */ + union { + rb_thread_t *th; + struct kqw_set *set; + } owner; +}; + +/* allocated on stack */ +struct kev { + struct kqfdn fdn; + + /* fdw.w.wnode is overloaded for checking RB_WAITFD_PRI (see check_pri) */ + struct rb_iom_fd_waiter fdw; +}; + +/* allocated on-stack */ +struct kqw_set { + struct rb_iom_fdset_waiter fsw; + st_table *tbl; /* after: fd -> kqw_set_item */ + VALUE vchanges; + int nchanges; + uint8_t idx; +}; + +/* + * per-FD in rb_thread_fd_select arg, value for kqw_set.fdn.as.tbl, + * allocated on-heap + */ +struct kqw_set_item { + struct kqfdn fdn; + int fd; + short events; /* requested events, like poll(2) */ + short revents; /* returned events, like poll(2) */ +}; + +static void +increase_nevents(rb_iom_t *iom, int retries) +{ + /* 1024 is the RUBY_ALLOCV_LIMIT on such systems */ + const int max_alloca = 1024 / sizeof(struct kevent); + const int max = max_alloca * 2; + + if (retries) { + iom->nevents *= retries; + if (iom->nevents > max || iom->nevents <= 0) { + iom->nevents = max; + } + } +} + +static int +iom_kqfd(rb_iom_t *iom) +{ + if (iom->kqueue_fd >= 0) { + return iom->kqueue_fd; + } + iom->kqueue_fd = kqueue(); + if (iom->kqueue_fd < 0) { + if (rb_gc_for_fd(errno)) { + iom->kqueue_fd = kqueue(); + } + if (iom->kqueue_fd < 0) { + rb_sys_fail("kqueue"); + } + } + rb_fd_fix_cloexec(iom->kqueue_fd); + return iom->kqueue_fd; +} + +static void +rb_iom_init(rb_iom_t *iom) +{ + list_head_init(&iom->timers); + list_head_init(&iom->ksets); + list_head_init(&iom->kevs); + list_head_init(&iom->pids); + list_head_init(&iom->blockers); + iom->nevents = 8; + iom->kqueue_fd = -1; + rb_iom_fdmap_init(&iom->rfdmap); + rb_iom_fdmap_init(&iom->wfdmap); +} + +/* + * kevent does not have an EVFILT_* flag for (rarely-used) urgent data, + * at least not on FreeBSD 11.0, so we call select() separately after + * EVFILT_READ returns to set the RB_WAITFD_PRI bit: + */ +static void +check_pri(rb_thread_t *th, struct list_head *pri) +{ + rb_fdset_t efds; + int max, r; + int err = 0; + struct rb_iom_waiter *w, *next; + struct timeval tv; + + if (list_empty(pri)) { + return; + } + + r = 0; + max = -1; + rb_fd_init(&efds); + + list_for_each(pri, w, wnode) { + struct rb_iom_fd_waiter *fdw; + struct kev *kev; + int fd; + + fdw = container_of(w, struct rb_iom_fd_waiter, w); + kev = container_of(fdw, struct kev, fdw); + fd = rb_iom_fdw_get_fd(&kev->fdw); + if (fd >= 0) { + rb_fd_set(fd, &efds); + if (fd > max) { + max = fd; + } + } + } + if (max >= 0) { +again: + tv.tv_sec = 0; + tv.tv_usec = 0; + r = native_fd_select(max + 1, 0, 0, &efds, &tv, th); + if (r < 0) { + err = errno; + if (err == EINTR) { + goto again; + } + } + if (r >= 0) { + list_for_each_safe(pri, w, next, wnode) { + struct rb_iom_fd_waiter *fdw; + struct kev *kev; + int fd; + + fdw = container_of(w, struct rb_iom_fd_waiter, w); + kev = container_of(fdw, struct kev, fdw); + fd = rb_iom_fdw_get_fd(&kev->fdw); + + list_del_init(&kev->fdw.w.wnode); + /* + * favor spurious wakeup over empty revents: + * I have observed select(2) failing to detect urgent data + * readiness after EVFILT_READ fires for it on FreeBSD 11.0. + * test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb + * is relaxed on non-Linux for this reason. + * We could use a non-zero timeout for select(2), but + * we also don't want to release GVL and have th->runq + * processed before we set revents, so we favor spurious + * RB_WAITFD_PRI over an empty revents field + */ + if (fd >= 0 && (rb_fd_isset(fd, &efds) || !kev->fdw.revents)) { + kev->fdw.revents |= RB_WAITFD_PRI; + } + } + } + } + rb_fd_term(&efds); +} + +/* + * kqueue is a more complicated than epoll for corner cases because we + * install separate filters for simultaneously watching read and write + * on the same FD, and now we must clear shared filters if only the + * event from the other side came in. + */ +static void +drop_pairs(rb_thread_t *th, int max, struct kevent *changelist, + struct list_head *rdel, struct list_head *wdel) +{ + int nchanges = 0; + struct kqfdn *fdn, *next; + + list_for_each_safe(rdel, fdn, next, wfdnode) { + struct kev *kev = container_of(fdn, struct kev, fdn); + int fd = rb_iom_fdw_get_fd(&kev->fdw); + + list_del_init(&kev->fdn.wfdnode); /* delete from rdel */ + assert(kev->fdw.revents & RB_WAITFD_OUT); + + if (fd >= 0 && !(kev->fdw.revents & WAITFD_READ)) { + struct rb_iom_fd *fdh = rb_iom_fd_get(&th->vm->iom->rfdmap, fd); + + if (list_empty(&fdh->fdhead)) { + struct kevent *chg = &changelist[nchanges++]; + assert(nchanges <= max); + EV_SET(chg, fd, EVFILT_READ, EV_DELETE, 0, 0, 0); + } + } + } + list_for_each_safe(wdel, fdn, next, rfdnode) { + struct kev *kev = container_of(fdn, struct kev, fdn); + int fd = rb_iom_fdw_get_fd(&kev->fdw); + + list_del_init(&kev->fdn.rfdnode); /* delete from wdel */ + assert(kev->fdw.revents & WAITFD_READ); + + if (fd >= 0 && !(kev->fdw.revents & RB_WAITFD_OUT)) { + struct rb_iom_fd *fdh = rb_iom_fd_get(&th->vm->iom->wfdmap, fd); + + if (list_empty(&fdh->fdhead)) { + struct kevent *chg = &changelist[nchanges++]; + assert(nchanges <= max); + EV_SET(chg, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0); + } + } + } + if (nchanges) { + int kqfd = th->vm->iom->kqueue_fd; + if (kevent(kqfd, changelist, nchanges, 0, 0, &zero) < 0) { + rb_sys_fail("kevent (dropping paired FDs)"); + } + } +} + +/* + * kqueue requires separate filters when watching for simultaneously + * watching read and write events on the same FD. Different filters + * means they can have different udata pointers; at least with + * native kqueue. + * + * When emulating native kqueue behavior using epoll as libkqueue does, + * there is only space for one udata pointer. This is because epoll + * allows each "filter" (epitem in the kernel) to watch read and write + * simultaneously. + * epoll keys epitems using: [ fd, file description (struct file *) ]. + * In contrast, kevent keys its filters using: [ fd (ident), filter ]. + * + * So, with native kqueue, we can at least optimize away rb_iom_fd_get + * function calls; but we cannot do that when libkqueue emulates + * kqueue with epoll and need to retrieve the fdh from the correct + * fdmap. + * + * Finally, the epoll implementation only needs one fdmap. + */ +static void * +udata_set(rb_iom_t *iom, struct rb_iom_fd *fdh) +{ +#ifdef LIBKQUEUE +# warning libkqueue support is incomplete and does not handle corner cases + return iom; +#else /* native kqueue */ + return fdh; +#endif +} + +static struct rb_iom_fd * +udata_get(const struct kevent *ev) +{ +#ifdef LIBKQUEUE + rb_iom_t *iom = ev->udata; + int fd = ev->ident; + + switch (ev->filter) { + case EVFILT_READ: + return rb_iom_fd_get(&iom->rfdmap, fd); + case EVFILT_WRITE: + return rb_iom_fd_get(&iom->wfdmap, fd); + default: + rb_bug("bad filter in libkqueue compatibility mode: %d", ev->filter); + } +#endif + return ev->udata; +} + +static void +check_kevent(rb_thread_t *th, int nr, struct kevent *eventlist) +{ + int err = 0; + if (nr > 0) { + struct list_head pri, wdel, rdel; + struct kqfdn *fdn = 0, *next = 0; + int i; + + list_head_init(&pri); + list_head_init(&wdel); + list_head_init(&rdel); + + for (i = 0; i < nr; i++) { + struct kevent *ev = &eventlist[i]; + /* + * fdh is keyed to both FD and filter, so + * it's either in iom->rfdmap or iom->wfdmap + */ + struct rb_iom_fd *fdh = udata_get(ev); + + if (ev->filter == EVFILT_READ) { + int pair_queued = 0; + + list_for_each_safe(&fdh->fdhead, fdn, next, rfdnode) { + struct kqw_set *kset = fdn->owner.set; + int rbits; + + if (kset) { + struct kqw_set_item *ksi; + + ksi = container_of(fdn, struct kqw_set_item, fdn); + rbits = ksi->events & WAITFD_READ; + assert(rbits && "unexpected EVFILT_READ"); + + /* + * The following may spuriously set RB_WAITFD_PRI: + * (see comments in check_pri for reasoning) + */ + ksi->revents |= rbits; + list_del_init(&fdn->rfdnode); + if (!kset->fsw.ret++) { + rb_iom_waiter_ready(&kset->fsw.w); + } + } + else { + struct kev *kev = container_of(fdn, struct kev, fdn); + + rbits = kev->fdw.events & WAITFD_READ; + assert(rbits && "unexpected EVFILT_READ"); + + if (!kev->fdw.revents) { + rb_iom_waiter_ready(&kev->fdw.w); + } + kev->fdw.revents |= (RB_WAITFD_IN & rbits); + list_del_init(&kev->fdn.rfdnode); + + if (rb_iom_fdw_get_fd(&kev->fdw) < 0) { + continue; + } + if (rbits & RB_WAITFD_PRI) { + list_add(&pri, &kev->fdw.w.wnode); + } + if ((kev->fdw.events & RB_WAITFD_OUT) && !pair_queued) { + pair_queued = 1; + list_add(&wdel, &kev->fdn.rfdnode); + } + } + } + } + else if (ev->filter == EVFILT_WRITE) { + int pair_queued = 0; + + list_for_each_safe(&fdh->fdhead, fdn, next, wfdnode) { + struct kqw_set *kset = fdn->owner.set; + int rbits; + + if (kset) { + struct kqw_set_item *ksi; + + ksi = container_of(fdn, struct kqw_set_item, fdn); + rbits = ksi->events & RB_WAITFD_OUT; + assert(rbits && "unexpected EVFILT_WRITE"); + ksi->revents |= rbits; + list_del_init(&fdn->wfdnode); + if (!kset->fsw.ret++) { + rb_iom_waiter_ready(&kset->fsw.w); + } + } + else { + struct kev *kev = container_of(fdn, struct kev, fdn); + + if (!kev->fdw.revents) { + rb_iom_waiter_ready(&kev->fdw.w); + } + kev->fdw.revents |= RB_WAITFD_OUT; + list_del_init(&kev->fdn.wfdnode); + + if ((kev->fdw.events & WAITFD_READ) && + rb_iom_fdw_get_fd(&kev->fdw) >= 0 && + !pair_queued) { + pair_queued = 1; + list_add(&rdel, &kev->fdn.wfdnode); + } + } + } + } + else { + rb_bug("unexpected filter: %d", ev->filter); + } + } + check_pri(th, &pri); + drop_pairs(th, nr, eventlist, &rdel, &wdel); + + /* notify the waiter thread in case we enqueued fibers for them */ + rb_iom_blockers_notify(th->vm->iom, -1); + } + else if (nr < 0) { + err = errno; + } + if (err && err != EINTR) { + rb_syserr_fail(err, "kevent"); + } + rb_iom_timer_check(th); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); +} + +/* perform a non-blocking kevent check while holding GVL */ +static void +ping_events(rb_thread_t *th, struct kqw_set *kset) +{ + rb_iom_t *iom = th->vm->iom; + int kqfd = iom ? iom->kqueue_fd : -1; + + if (kqfd >= 0) { + VALUE v; + int nr; + int nevents = iom->nevents; + struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents); + int retries = 0; + + do { + struct kevent *changelist = 0; + int nchanges = 0; + if (kset) { + changelist = (struct kevent *)RSTRING_PTR(kset->vchanges); + nchanges = kset->nchanges; + kset = 0; + } + nr = kevent(kqfd, changelist, nchanges, eventlist, nevents, &zero); + check_kevent(th, nr, eventlist); + } while (nr == nevents && ++retries); + if (v) { + ALLOCV_END(v); + } + increase_nevents(iom, retries); + } +} + +static int +kq_events_pending(const rb_iom_t *iom) +{ + if (!list_empty(&iom->kevs)) return 1; + if (!list_empty(&iom->ksets)) return 1; + if (!list_empty(&iom->pids)) return 1; + return 0; +} + +static int +need_sleep(const struct timespec *rel) +{ + if (!rel) return 1; + return rb_timespec_cmp(rel, &zero); +} + +/* for iom_pingable_common.h */ +static void +rb_iom_do_wait(rb_thread_t *th, rb_iom_t *iom) +{ + int nevents = iom->nevents; + int nr = nevents; + struct timespec ts; + struct timespec *rel; + + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + rel = list_empty(&th->runq) ? rb_iom_next_timeout(&ts, &iom->timers) : 0; + + if (need_sleep(rel)) { + VALUE v; + int kqfd = iom_kqfd(iom); /* may raise */ + struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents); + struct rb_iom_blocker cur; + + VM_ASSERT(kqfd >= 0); + cur.th = th; + list_add_tail(&iom->blockers, &cur.bnode); + nr = 0; + BLOCKING_REGION(th, { + nr = kevent(kqfd, 0, 0, eventlist, nevents, rel); + }, ubf_select, th, TRUE); + list_del(&cur.bnode); + check_kevent(th, nr, eventlist); + if (v) { + ALLOCV_END(v); + } + } + if (nr == nevents && kq_events_pending(iom)) { + ping_events(th, 0); + } +} + +/* kqueue shines here because there's only one syscall in the common case */ +static void +kevxchg_ping(int fd, struct kev *kev) +{ + rb_thread_t *th = kev->fdn.owner.th; + rb_iom_t *iom = rb_iom_get(th); + int retries = 0; + int nchanges = 0; + int nr; + VALUE v; + int nevents = iom->nevents; + struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents); + struct kevent *changelist = eventlist; + int kqfd = iom_kqfd(iom); + + /* + * we must clear this before reading since we check for owner.set in + * check_kevent to distinguish between single and multi-FD (select(2)) + * callers + */ + kev->fdn.owner.th = 0; + VM_ASSERT(nevents >= 2); + + /* EVFILT_READ handles urgent data (POLLPRI)... hopefully */ + if (kev->fdw.events & WAITFD_READ) { + struct rb_iom_fd *fdh = rb_iom_fd_get(&iom->rfdmap, fd); + if (list_empty(&fdh->fdhead)) { + struct kevent *chg = &changelist[nchanges++]; + void *udata = udata_set(iom, fdh); + EV_SET(chg, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, udata); + } + list_add(&fdh->fdhead, &kev->fdn.rfdnode); + } + if (kev->fdw.events & RB_WAITFD_OUT) { + struct rb_iom_fd *fdh = rb_iom_fd_get(&iom->wfdmap, fd); + if (list_empty(&fdh->fdhead)) { + struct kevent *chg = &changelist[nchanges++]; + void *udata = udata_set(iom, fdh); + EV_SET(chg, fd, EVFILT_WRITE, EV_ADD|EV_ONESHOT, 0, 0, udata); + } + list_add(&fdh->fdhead, &kev->fdn.wfdnode); + } + do { + nr = kevent(kqfd, changelist, nchanges, eventlist, nevents, &zero); + + /* kevent may fail with ENOMEM when processing changelist */ + if (nr < 0 && nchanges && rb_gc_for_fd(errno)) { + continue; + } + check_kevent(th, nr, eventlist); + } while (nr == nevents && ++retries && (nchanges = 0) == 0); + if (v) { + ALLOCV_END(v); + } + increase_nevents(iom, retries); +} + +static VALUE +kevxchg_yield(VALUE ptr) +{ + struct kev *kev = (struct kev *)ptr; + int fd = rb_iom_fdw_get_fd(&kev->fdw); + rb_thread_t *th = kev->fdn.owner.th; + + if (fd >= 0) { + int nresume; + + kevxchg_ping(fd, kev); + (void)rb_threadlet_do_yield_p(th, &nresume); + if (kev->fdw.revents) { + list_del_init(&kev->fdw.w.timer.n.rnode); + } + else { + rb_fiber_yield(0, 0); + } + } + return (VALUE)kev->fdw.revents; /* may be zero if timed out */ +} + +static VALUE +kev_done(VALUE ptr) +{ + struct kev *kev = (struct kev *)ptr; + list_del(&kev->fdn.rfdnode); + list_del(&kev->fdn.wfdnode); + rb_iom_waiter_done(&kev->fdw.w); + if (!FIBER_USE_NATIVE) { + xfree(kev); + } + return Qfalse; +} + +static int +iom_waitfd(rb_thread_t *th, int *fdp, int events, const struct timespec *rel) +{ + rb_iom_t *iom = rb_iom_get(th); + struct kev *kev = FIBER_USE_NATIVE ? ALLOCA_N(struct kev, 1) + : ALLOC(struct kev); + + kev->fdn.owner.th = th; /* read and cleared ASAP in kevxchg_yield */ + rb_iom_fdw_init(&kev->fdw, fdp, events); + list_add(&iom->kevs, &kev->fdw.w.wnode); + list_node_init(&kev->fdn.rfdnode); + list_node_init(&kev->fdn.wfdnode); + rb_iom_timer_add(th, &kev->fdw.w.timer, rel, IOM_FIB|IOM_WAIT); + return (int)rb_ensure(kevxchg_yield, (VALUE)kev, kev_done, (VALUE)kev); +} + +/* + * XXX we may use fptr->mode to mark FDs which kqueue cannot handle, + * different BSDs may have different bugs which prevent certain + * FD types from being handled by kqueue... + */ +int +rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, + const struct timespec *rel) +{ + return iom_waitfd(th, &fptr->fd, events, rel); +} + +int +rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, const struct timespec *rel) +{ + return iom_waitfd(th, fdp, events, rel); +} + +static const short idx2events[] = { RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI }; + +static void +kset_add_event(struct kqw_set *kset, int fd, short filter, void *udata) +{ + static const long kevent_size = (long)sizeof(struct kevent); + VALUE dst = kset->vchanges; + long old_len = RSTRING_LEN(dst); + struct kevent *chg; + long new_len = old_len + kevent_size; + + rb_str_resize(dst, new_len); + chg = (struct kevent *)(RSTRING_PTR(dst) + old_len); + EV_SET(chg, fd, filter, EV_ADD|EV_ONESHOT, 0, 0, udata); + rb_str_set_len(dst, new_len); + kset->nchanges++; +} + +static int +kset_upd(st_data_t *key, st_data_t *v, st_data_t arg, int existing) +{ + int fd = (int)*key; + struct kqw_set *kset = (struct kqw_set *)arg; + rb_iom_t *iom = kset->fsw.th->vm->iom; + short events = idx2events[kset->idx]; + struct kqw_set_item **val = (struct kqw_set_item **)v; + struct kqw_set_item *ksi; + int nevents, oevents; + struct rb_iom_fd *fdh; + void *udata; + + if (existing) { + /* + * this happens when an FD is in two fdsets for a single + * rb_thread_fd_select call. Does not happen if different + * Fibers call rb_thread_fd_select on the same FD. + */ + ksi = *val; + nevents = ~ksi->events & events; + oevents = ksi->events; + ksi->events |= events; + } + else { + ksi = ALLOC(struct kqw_set_item); + *val = ksi; + ksi->fd = fd; + ksi->revents = 0; + nevents = ksi->events = events; + oevents = 0; + ksi->fdn.owner.set = kset; + list_node_init(&ksi->fdn.rfdnode); + list_node_init(&ksi->fdn.wfdnode); + } + + if ((nevents == RB_WAITFD_PRI && !(oevents & RB_WAITFD_IN)) || + (nevents & RB_WAITFD_IN)) { + fdh = rb_iom_fd_get(&iom->rfdmap, fd); + udata = udata_set(iom, fdh); + if (list_empty(&fdh->fdhead)) { + kset_add_event(kset, fd, EVFILT_READ, udata); + } + list_add(&fdh->fdhead, &ksi->fdn.rfdnode); + } + if (nevents & RB_WAITFD_OUT) { + fdh = rb_iom_fd_get(&iom->wfdmap, fd); + udata = udata_set(iom, fdh); + if (list_empty(&fdh->fdhead)) { + kset_add_event(kset, fd, EVFILT_WRITE, udata); + } + list_add(&fdh->fdhead, &ksi->fdn.wfdnode); + } + return ST_CONTINUE; +} + +static VALUE +kset_yield(VALUE ptr) +{ + struct kqw_set *kset = (struct kqw_set *)ptr; + int nresume; + VALUE vchanges = kset->vchanges = rb_str_tmp_new(0); + + kset->tbl = st_init_numtable(); /* may raise */ + + /* must not call kevent to retry events while this loop is running: */ + for (kset->idx = 0; kset->idx < 3; kset->idx++) { + const rb_fdset_t *src = kset->fsw.in[kset->idx]; + + if (src) { + int max = rb_fd_max(src); + int fd; + + for (fd = 0; fd < max; fd++) { + if (rb_fd_isset(fd, src)) { + st_update(kset->tbl, (st_data_t)fd, + kset_upd, (st_data_t)kset); + } + } + } + } + + /* OK to call kevent, now: */ + ping_events(kset->fsw.th, kset); + rb_str_resize(vchanges, 0); + rb_gc_force_recycle(vchanges); + (void)rb_threadlet_do_yield_p(kset->fsw.th, &nresume); + if (kset->fsw.ret) { + list_del_init(&kset->fsw.w.timer.n.rnode); + } + else { + rb_fiber_yield(0, 0); + } + return (VALUE)kset->fsw.ret; +} + +static rb_fdset_t * +fd_init_once(rb_fdset_t **dst, rb_fdset_t *src) +{ + if (!*dst) { + rb_fd_init(src); + *dst = src; + } + return *dst; +} + +static int +kset_term(st_data_t key, st_data_t val, st_data_t ptr) +{ + struct kqw_set *kset = (struct kqw_set *)ptr; + int fd = (int)key; + struct kqw_set_item *ksi = (struct kqw_set_item *)val; + + if (ksi) { + struct rb_iom_fdset_waiter *fsw = &kset->fsw; + + list_del_init(&ksi->fdn.rfdnode); + list_del_init(&ksi->fdn.wfdnode); + for (kset->idx = 0; kset->idx < 3; kset->idx++) { + if (ksi->revents & idx2events[kset->idx]) { + rb_fd_set(fd, + fd_init_once(&fsw->out[kset->idx], &fsw->sets[kset->idx])); + } + } + xfree(ksi); + } + + return ST_DELETE; +} + +static VALUE +kset_done(VALUE ptr) +{ + struct kqw_set *kset = (struct kqw_set *)ptr; + + if (kset->tbl) { + st_foreach(kset->tbl, kset_term, (VALUE)kset); + st_free_table(kset->tbl); + } + + list_del(&kset->fsw.w.timer.n.tnode); + list_del(&kset->fsw.w.wnode); + for (kset->idx = 0; kset->idx < 3; kset->idx++) { + rb_fdset_t *orig = kset->fsw.in[kset->idx]; + rb_fdset_t *res = kset->fsw.out[kset->idx]; + + if (res) { + rb_fd_dup(orig, res); + rb_fd_term(res); + } + } + if (!FIBER_USE_NATIVE) { + xfree(kset); + } + return Qfalse; +} + +int +rb_iom_select(rb_thread_t *th, int maxfd, + rb_fdset_t *r, rb_fdset_t *w, rb_fdset_t *e, + const struct timespec *rel) +{ + rb_iom_t *iom = rb_iom_get(th); + struct kqw_set *kset = FIBER_USE_NATIVE ? ALLOCA_N(struct kqw_set, 1) + : ALLOC(struct kqw_set); + kset->nchanges = 0; + rb_iom_fdset_waiter_init(th, &kset->fsw, maxfd, r, w, e); + list_add(&iom->ksets, &kset->fsw.w.wnode); + rb_iom_timer_add(th, &kset->fsw.w.timer, rel, IOM_FIB|IOM_WAIT); + + return (int)rb_ensure(kset_yield, (VALUE)kset, kset_done, (VALUE)kset); +} + +static void +iom_free(rb_iom_t *iom) +{ + rb_iom_fdmap_destroy(&iom->rfdmap); + rb_iom_fdmap_destroy(&iom->wfdmap); + xfree(iom); +} + +void +rb_iom_destroy(rb_vm_t *vm) +{ + rb_iom_t *iom = vm->iom; + vm->iom = 0; + if (iom) { + if (iom->kqueue_fd >= 0) { + close(iom->kqueue_fd); + } + iom_free(iom); + } +} + +/* used by thread.c::rb_thread_atfork */ +static void +rb_iom_atfork_child(rb_thread_t *th) +{ +#ifdef LIBKQUEUE + /* + * libkqueue uses epoll, /dev/poll FD, or poll/select on a pipe, + * so there is an FD. + * I guess we're going to hit an error in case somebody uses + * libkqueue with a real native kqueue backend, but nobody does + * that in production, hopefully. + */ + rb_iom_destroy(th->vm); +#else /* native kqueue is close-on-fork, so we do not close ourselves */ + rb_iom_t *iom = th->vm->iom; + if (iom) { + iom_free(iom); + th->vm->iom = 0; + } +#endif +} + +/* used by thread_pthread.c */ +static int +rb_iom_reserved_fd(int fd) +{ + rb_iom_t *iom = GET_VM()->iom; + + return iom && fd == iom->kqueue_fd; +} + +#include "iom_pingable_common.h" +#include "iom_common.h" diff --git a/iom_pingable_common.h b/iom_pingable_common.h new file mode 100644 index 0000000000..971295fd8e --- /dev/null +++ b/iom_pingable_common.h @@ -0,0 +1,49 @@ +/* shared between "pingable" implementations (iom_kqueue.h and iom_epoll.h) */ + +/* only for iom_kqueue.h and iom_epoll.h */ +static void rb_iom_do_wait(rb_thread_t *, rb_iom_t *); + +static VALUE +rb_iom_do_schedule(VALUE ptr) +{ + rb_thread_t *th = (rb_thread_t *)ptr; + rb_iom_t *iom = th->vm->iom; + int nresume; + + if (rb_threadlet_do_yield_p(th, &nresume)) { + rb_fiber_yield(0, 0); + } + if (iom) { + if (nresume) { + ping_events(th, 0); + } + else { + rb_iom_do_wait(th, iom); + } + } + if (rb_threadlet_do_yield_p(th, &nresume)) { + rb_fiber_yield(0, 0); + } + return Qfalse; +} + +void +rb_iom_schedule(rb_thread_t *th, const struct timespec *rel) +{ + if (rb_threadlet_sched_p(th)) { + int nresume; + + rb_iom_timer_check(th); + ping_events(th, 0); + if (rb_threadlet_do_yield_p(th, &nresume)) { + rb_fiber_yield(0, 0); + } + } + else if (rel) { + rb_iom_timer_do(th, rel, rb_iom_do_schedule, th); + } + else { + rb_iom_timer_check(th); + rb_iom_do_schedule((VALUE)th); + } +} diff --git a/iom_select.h b/iom_select.h new file mode 100644 index 0000000000..8b9dddf218 --- /dev/null +++ b/iom_select.h @@ -0,0 +1,481 @@ +/* + * select()-based implementation of I/O Manager for RubyVM + * + * This is crippled and relies heavily on GVL compared to the + * epoll and kqueue versions. Every call to select requires + * scanning the entire iom->fds list; so it gets silly expensive + * with hundreds or thousands of FDs. + */ +#include "iom_internal.h" + +/* allocated on heap (rb_vm_t.iom) */ +struct rb_iom_struct { + /* + * Everything here is protected by GVL at this time, + * URCU lists (LGPL-2.1+) may be used in the future + */ + struct list_head timers; /* -rb_iom_timer.n.tnode, sort by expire_at */ + struct list_head fds; /* -rb_iom_fd_waiter.w.wnode, FIFO order */ + struct list_head fdsets; /* -rb_iom_fdset_waiter.w.wnode, unordered */ + struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */ + + /* + * generation counters for determining if we need to redo select() + * for newly registered FDs. kevent and epoll APIs do not require + * this, as events can be registered and retrieved at any time by + * different threads. + */ + rb_serial_t select_start; /* WR protected by GVL (no need to protect RD) */ + rb_serial_t select_gen; /* RDWR protected by GVL */ + + /* + * unlike in epoll/kqueue, order matters for blockers in this + * implementation: first blocker is running select; the rest + * are waiting for the first blocker + * Protected by GVL + */ + struct list_head blockers; /* -rb_iom_blocker.bnode */ +}; + +/* allocated on stack */ +struct select_do { + rb_thread_t *th; + int do_wait; +}; + +static void +rb_iom_init(rb_iom_t *iom) +{ + iom->select_start = 0; + iom->select_gen = 0; + list_head_init(&iom->timers); + list_head_init(&iom->fds); + list_head_init(&iom->fdsets); + list_head_init(&iom->pids); + list_head_init(&iom->blockers); +} + +static rb_fdset_t * +fd_init_once(rb_fdset_t **dst, rb_fdset_t *src) +{ + if (!*dst) { + rb_fd_init(src); + *dst = src; + } + return *dst; +} + +/* FIXME inefficient, but maybe not worth optimizing ... */ +static void +fd_merge(rb_fdset_t *dst, const rb_fdset_t *src) +{ + int max = rb_fd_max(src); + int fd; + for (fd = 0; fd < max; fd++) { + if (rb_fd_isset(fd, src)) { + rb_fd_set(fd, dst); + } + } +} + +static int +fd_intersect(struct rb_iom_fdset_waiter *fsw, size_t i, const rb_fdset_t *res) +{ + int ret = 0; + rb_fdset_t *orig = fsw->in[i]; + + if (orig) { + int max = rb_fd_max(orig); + int fd; + + for (fd = 0; fd < max; fd++) { + if (rb_fd_isset(fd, orig) && rb_fd_isset(fd, res)) { + rb_fd_set(fd, fd_init_once(&fsw->out[i], &fsw->sets[i])); + ret++; + } + } + + /* copy results back to user-supplied fdset */ + if (ret) { + rb_fd_dup(orig, fsw->out[i]); + } + } + return ret; +} + +static void +iom_select_wait(struct select_do *sd) +{ + rb_thread_t *th = sd->th; + rb_iom_t *iom = th->vm->iom; + int max = -1; + struct timeval tv, *tvp = 0; + struct rb_iom_waiter *w = 0, *next = 0; + rb_fdset_t *rfds, *wfds, *efds; + rb_fdset_t fdsets[3]; + const struct timespec *rel = &zero; + struct timespec ts; + struct rb_iom_blocker cur; + int ret = 0; + + iom->select_start = iom->select_gen; + rfds = wfds = efds = 0; + + list_for_each_safe(&iom->fdsets, w, next, wnode) { + struct rb_iom_fdset_waiter *fsw; + + fsw = container_of(w, struct rb_iom_fdset_waiter, w); + if (fsw->max > max) { + max = fsw->max; + } + if (fsw->in[0]) { + fd_merge(fd_init_once(&rfds, &fdsets[0]), fsw->in[0]); + } + if (fsw->in[1]) { + fd_merge(fd_init_once(&wfds, &fdsets[1]), fsw->in[1]); + } + if (fsw->in[2]) { + fd_merge(fd_init_once(&efds, &fdsets[2]), fsw->in[2]); + } + } + + list_for_each_safe(&iom->fds, w, next, wnode) { + struct rb_iom_fd_waiter *fdw; + int fd; + + fdw = container_of(w, struct rb_iom_fd_waiter, w); + fd = rb_iom_fdw_get_fd(fdw); + if (fd < 0) { /* closed */ + fdw->revents = fdw->events; + rb_iom_waiter_ready(&fdw->w); + sd->do_wait = 0; + continue; + } + if (fd > max) { + max = fd; + } + if (fdw->events & RB_WAITFD_IN) { + rb_fd_set(fd, fd_init_once(&rfds, &fdsets[0])); + } + if (fdw->events & RB_WAITFD_OUT) { + rb_fd_set(fd, fd_init_once(&wfds, &fdsets[1])); + } + if (fdw->events & RB_WAITFD_PRI) { + rb_fd_set(fd, fd_init_once(&efds, &fdsets[2])); + } + } + + if (sd->do_wait && (max >= 0 || !list_empty(&iom->pids))) { + rel = rb_iom_next_timeout(&ts, &iom->timers); + } + tvp = timeval_for(&tv, rel); + cur.th = th; + VM_ASSERT(list_empty(&iom->blockers)); + list_add(&iom->blockers, &cur.bnode); + BLOCKING_REGION(th, { + ret = native_fd_select(max + 1, rfds, wfds, efds, tvp, th); + }, ubf_select, th, TRUE); + list_del(&cur.bnode); + + if (ret > 0) { + list_for_each_safe(&iom->fdsets, w, next, wnode) { + struct rb_iom_fdset_waiter *fsw; + + fsw = container_of(w, struct rb_iom_fdset_waiter, w); + fsw->ret += fd_intersect(fsw, 0, rfds); + fsw->ret += fd_intersect(fsw, 1, wfds); + fsw->ret += fd_intersect(fsw, 2, efds); + if (fsw->ret) { + rb_iom_waiter_ready(&fsw->w); + } + } + + list_for_each_safe(&iom->fds, w, next, wnode) { + struct rb_iom_fd_waiter *fdw; + int fd; + + fdw = container_of(w, struct rb_iom_fd_waiter, w); + fd = rb_iom_fdw_get_fd(fdw); + if (fd < 0) { /* closed */ + fdw->revents = fdw->events; + rb_iom_waiter_ready(&fdw->w); + continue; + } + if (rfds && rb_fd_isset(fd, rfds)) { + fdw->revents |= fdw->events & RB_WAITFD_IN; + } + if (wfds && rb_fd_isset(fd, wfds)) { + fdw->revents |= fdw->events & RB_WAITFD_OUT; + } + if (efds && rb_fd_isset(fd, efds)) { + fdw->revents |= fdw->events & RB_WAITFD_PRI; + } + + /* got revents? enqueue ourselves to be run! */ + if (fdw->revents) { + rb_iom_waiter_ready(&fdw->w); + } + } + } + + if (rfds) rb_fd_term(rfds); + if (wfds) rb_fd_term(wfds); + if (efds) rb_fd_term(efds); + rb_iom_blockers_notify(iom, -1); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); +} + +static int +retry_select_p(rb_iom_t *iom) +{ + /* + * if somebody changed iom->fds while we were inside select, + * rerun it with zero timeout to avoid a race condition. + * This is not necessary for epoll or kqueue because the kernel + * is constantly monitoring the watched set. + */ + return (iom->select_start != iom->select_gen); +} + +static VALUE +iom_do_select(VALUE ptr) +{ + struct select_do *sd = (struct select_do *)ptr; + rb_thread_t *th = sd->th; + rb_iom_t *iom; + int retry = 0; + int nresume; + + if (rb_threadlet_do_yield_p(th, &nresume)) { + rb_fiber_yield(0, 0); + } + if (nresume) { + sd->do_wait = 0; + } + iom = th->vm->iom; + if (!iom) { + return Qfalse; + } + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + do { + if (list_empty(&iom->blockers)) { + if (!list_empty(&th->runq)) { + sd->do_wait = 0; + } + iom_select_wait(sd); + rb_iom_timer_check(th); + retry = retry_select_p(iom); + } + else if (list_empty(&th->runq)) { + struct rb_iom_blocker cur; + + cur.th = th; + list_add_tail(&iom->blockers, &cur.bnode); + BLOCKING_REGION(th, { + native_fd_select(0, 0, 0, 0, 0, th); + }, ubf_select, th, TRUE); + list_del(&cur.bnode); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + } + rb_threadlet_do_yield_p(th, &nresume); + if (nresume) { + retry = 0; + sd->do_wait = 0; + } + } while (retry); + if (rb_threadlet_do_yield_p(th, &nresume)) { + rb_fiber_yield(0, 0); + } + return Qfalse; +} + +void +rb_iom_schedule(rb_thread_t *th, const struct timespec *rel) +{ + struct select_do sd; + + sd.th = th; + sd.do_wait = !rb_threadlet_sched_p(th); + + if (rel && sd.do_wait) { + rb_iom_timer_do(th, rel, iom_do_select, &sd); + } + else { + rb_iom_timer_check(th); + iom_do_select((VALUE)&sd); + } +} + +static void +iom_schedule_th(rb_thread_t *th) +{ + rb_iom_t *iom = rb_iom_get(th); + int nresume; + + (void)rb_threadlet_do_yield_p(th, &nresume); + if (list_empty(&iom->blockers)) { + struct select_do sd; + + sd.th = th; + sd.do_wait = 0; + iom_select_wait(&sd); + rb_iom_timer_check(th); + } + else { + /* + * kick the first select thread, retry_select_p will return true + * since caller bumped iom->select_gen + */ + rb_iom_blockers_notify(iom, 1); + } +} + +static VALUE +iom_schedule_fdw(VALUE ptr) +{ + rb_thread_t *th = GET_THREAD(); + struct rb_iom_fd_waiter *fdw = (struct rb_iom_fd_waiter *)ptr; + + iom_schedule_th(th); + if (fdw->revents) { + list_del_init(&fdw->w.timer.n.rnode); + } + else { + rb_fiber_yield(0, 0); + } + + return (VALUE)fdw->revents; /* may be zero if timed out */ +} + +/* only epoll takes advantage of this (kqueue may for portability bugs) */ +int +rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, + const struct timespec *rel) +{ + return rb_iom_waitfd(th, &fptr->fd, events, rel); +} + +static VALUE +fdw_done(VALUE ptr) +{ + struct rb_iom_fd_waiter *fdw = (struct rb_iom_fd_waiter *)ptr; + + rb_iom_waiter_done(&fdw->w); + + if (!FIBER_USE_NATIVE) { + xfree(fdw); + } + return Qfalse; +} + +int +rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, const struct timespec *rel) +{ + rb_iom_t *iom = rb_iom_get(th); + struct rb_iom_fd_waiter *fdw; + + if (*fdp < 0) return 0; + + fdw = FIBER_USE_NATIVE ? ALLOCA_N(struct rb_iom_fd_waiter, 1) + : ALLOC(struct rb_iom_fd_waiter); + rb_iom_fdw_init(fdw, fdp, events); + + /* use FIFO order for fairness */ + list_add_tail(&iom->fds, &fdw->w.wnode); + rb_iom_timer_add(th, &fdw->w.timer, rel, IOM_FIB|IOM_WAIT); + iom->select_gen++; + return (int)rb_ensure(iom_schedule_fdw, (VALUE)fdw, fdw_done, (VALUE)fdw); +} + +static int fsw_on_heap(void) +{ + if (sizeof(struct rb_iom_fdset_waiter) > RUBY_ALLOCV_LIMIT) { + return 1; + } + return FIBER_USE_NATIVE ? 0 : 1; +} + +static VALUE +rb_iom_select_done(VALUE ptr) +{ + struct rb_iom_fdset_waiter *fsw = (struct rb_iom_fdset_waiter *)ptr; + size_t i; + + list_del(&fsw->w.timer.n.tnode); + list_del(&fsw->w.wnode); + for (i = 0; i < 3; i++) { +#if !FIBER_USE_NATIVE + if (fsw->in[i]) { + rb_fd_dup(fsw->onstack_dst[i], fsw->in[i]); + rb_fd_term(fsw->in[i]); + xfree(fsw->in[i]); + } +#endif + if (fsw->out[i]) { + rb_fd_term(fsw->out[i]); + } + } + if (fsw_on_heap()) { + xfree(fsw); + } + + return Qfalse; +} + +static VALUE +iom_schedule_fsw(VALUE ptr) +{ + struct rb_iom_fdset_waiter *fsw = (struct rb_iom_fdset_waiter *)ptr; + + iom_schedule_th(fsw->th); + if (fsw->ret) { + list_del_init(&fsw->w.timer.n.rnode); + } + else { /* fsw->ret will be set in another execution context */ + rb_fiber_yield(0, 0); + } + return (VALUE)fsw->ret; +} + +int +rb_iom_select(rb_thread_t *th, int maxfd, + rb_fdset_t *r, rb_fdset_t *w, rb_fdset_t *e, + const struct timespec *rel) +{ + rb_iom_t *iom = rb_iom_get(th); + struct rb_iom_fdset_waiter *fsw; + + fsw = fsw_on_heap() ? ALLOC(struct rb_iom_fdset_waiter) + : ALLOCA_N(struct rb_iom_fdset_waiter, 1); + rb_iom_fdset_waiter_init(th, fsw, maxfd, r, w, e); + list_add(&iom->fdsets, &fsw->w.wnode); + rb_iom_timer_add(th, &fsw->w.timer, rel, IOM_FIB|IOM_WAIT); + iom->select_gen++; + return (int)rb_ensure(iom_schedule_fsw, (VALUE)fsw, + rb_iom_select_done, (VALUE)fsw); +} + +void +rb_iom_destroy(rb_vm_t *vm) +{ + if (vm->iom) { + xfree(vm->iom); + vm->iom = 0; + } +} + +/* used by thread.c::rb_thread_atfork */ +static void +rb_iom_atfork_child(rb_thread_t *th) +{ + rb_iom_destroy(th->vm); +} + +/* used by thread_pthread.c */ +static int +rb_iom_reserved_fd(int fd) +{ + return 0; +} + +#include "iom_common.h" diff --git a/prelude.rb b/prelude.rb index 9e94aadf9c..bcd2c183a2 100644 --- a/prelude.rb +++ b/prelude.rb @@ -13,6 +13,18 @@ def exclusive(&block) end if false end end +class Threadlet < Fiber + def self.start(*args, &block) + # loses Fiber#resume return value, but maybe people do not care + new(&block).run + end + + def run + start + self + end +end + class IO # call-seq: diff --git a/process.c b/process.c index ca172d3e56..f46f16b539 100644 --- a/process.c +++ b/process.c @@ -17,6 +17,7 @@ #include "ruby/thread.h" #include "ruby/util.h" #include "vm_core.h" +#include "iom.h" #include #include @@ -933,9 +934,15 @@ rb_waitpid(rb_pid_t pid, int *st, int flags) result = do_waitpid(pid, st, flags); } else { - while ((result = do_waitpid_nonblocking(pid, st, flags)) < 0 && - (errno == EINTR)) { - RUBY_VM_CHECK_INTS(GET_EC()); + rb_thread_t *th = GET_THREAD(); + if (rb_threadlet_sched_p(th) && WNOHANG) { + return rb_iom_waitpid(th, pid, st, flags, 0); + } + else { + while ((result = do_waitpid_nonblocking(pid, st, flags)) < 0 && + (errno == EINTR)) { + RUBY_VM_CHECK_INTS(th->ec); + } } } if (result > 0) { @@ -1185,6 +1192,7 @@ proc_detach(VALUE obj, VALUE pid) static void before_exec_async_signal_safe(void) { + rb_iom_destroy(GET_VM()); } static void diff --git a/signal.c b/signal.c index c781c38c62..17182db2be 100644 --- a/signal.c +++ b/signal.c @@ -11,6 +11,7 @@ **********************************************************************/ +#include "iom.h" #include "internal.h" #include "vm_core.h" #include @@ -1052,6 +1053,17 @@ rb_trap_exit(void) } } +static int +sig_is_chld(int sig) +{ +#if defined(SIGCLD) + return (sig == SIGCLD); +#elif defined(SIGCHLD) + return (sig == SIGCHLD); +#endif + return 0; +} + void rb_signal_exec(rb_thread_t *th, int sig) { @@ -1059,6 +1071,9 @@ rb_signal_exec(rb_thread_t *th, int sig) VALUE cmd = vm->trap_list.cmd[sig]; int safe = vm->trap_list.safe[sig]; + if (sig_is_chld(sig)) { + rb_iom_sigchld(vm); + } if (cmd == 0) { switch (sig) { case SIGINT: @@ -1117,6 +1132,11 @@ default_handler(int sig) #endif #ifdef SIGUSR2 case SIGUSR2: +#endif +#ifdef SIGCLD + case SIGCLD: +#elif defined(SIGCHLD) + case SIGCHLD: #endif func = sighandler; break; @@ -1155,6 +1175,9 @@ trap_handler(VALUE *cmd, int sig) VALUE command; if (NIL_P(*cmd)) { + if (sig_is_chld(sig)) { + goto sig_dfl; + } func = SIG_IGN; } else { @@ -1175,6 +1198,9 @@ trap_handler(VALUE *cmd, int sig) break; case 14: if (memcmp(cptr, "SYSTEM_DEFAULT", 14) == 0) { + if (sig_is_chld(sig)) { + goto sig_dfl; + } func = SIG_DFL; *cmd = 0; } @@ -1182,6 +1208,9 @@ trap_handler(VALUE *cmd, int sig) case 7: if (memcmp(cptr, "SIG_IGN", 7) == 0) { sig_ign: + if (sig_is_chld(sig)) { + goto sig_dfl; + } func = SIG_IGN; *cmd = Qtrue; } @@ -1418,15 +1447,13 @@ static int init_sigchld(int sig) { sighandler_t oldfunc; + sighandler_t func = sighandler; oldfunc = ruby_signal(sig, SIG_DFL); if (oldfunc == SIG_ERR) return -1; - if (oldfunc != SIG_DFL && oldfunc != SIG_IGN) { - ruby_signal(sig, oldfunc); - } - else { - GET_VM()->trap_list.cmd[sig] = 0; - } + ruby_signal(sig, func); + GET_VM()->trap_list.cmd[sig] = 0; + return 0; } diff --git a/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb b/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb index 777e9d14dd..9a973c34f5 100644 --- a/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb +++ b/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb @@ -1,5 +1,6 @@ # frozen_string_literal: false require 'test/unit' +require 'socket' class TestWaitForSingleFD < Test::Unit::TestCase require '-test-/wait_for_single_fd' @@ -46,4 +47,67 @@ def test_wait_for_kqueue skip 'no kqueue' unless IO.respond_to?(:kqueue_test_wait) assert_equal RB_WAITFD_IN, IO.kqueue_test_wait end + + def test_fiber_start_rw + host = '127.0.0.1' + TCPServer.open(host, 0) do |srv| + nbc = Socket.new(:INET, :STREAM, 0) + con = TCPSocket.new(host, srv.addr[1]) + acc = srv.accept + begin + # 2 events, but only one triggers: + flags = RB_WAITFD_IN|RB_WAITFD_OUT + f = Threadlet.start { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal RB_WAITFD_OUT, f.value + + # trigger readability + con.write('a') + flags = RB_WAITFD_IN + f = Threadlet.start { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + # ensure both events trigger: + flags = RB_WAITFD_IN|RB_WAITFD_OUT + f = Threadlet.start { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + # ensure the EVFILT_WRITE (kqueue) is disabled after previous test: + flags = RB_WAITFD_IN + f = Threadlet.start { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + skip 'missing MSG_OOB' unless Socket.const_defined?(:MSG_OOB) + + # make sure urgent data works with kqueue: + assert_equal 3, con.send('123', Socket::MSG_OOB) + flags = RB_WAITFD_PRI + f = Threadlet.start { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + flags = RB_WAITFD_IN|RB_WAITFD_OUT|RB_WAITFD_PRI + f = Threadlet.start { IO.wait_for_single_fd(acc.fileno, flags, nil) } + if RUBY_PLATFORM =~ /linux/ + assert_equal flags, f.value + else + kq_possible = [ RB_WAITFD_IN|RB_WAITFD_OUT, flags ] + assert_include kq_possible, f.value + end + + nbc.connect_nonblock(srv.local_address, exception: false) + fd = nbc.fileno + flags = RB_WAITFD_IN|RB_WAITFD_OUT + f1 = Threadlet.start { IO.wait_for_single_fd(fd, flags, nil) } + f2 = Threadlet.start { IO.wait_for_single_fd(fd, RB_WAITFD_IN, nil) } + assert_equal RB_WAITFD_OUT, f1.value + abc = srv.accept + abc.write('!') + assert_equal RB_WAITFD_IN, f2.value + ensure + con.close + acc.close + nbc.close + abc&.close + end + end + end end diff --git a/test/lib/leakchecker.rb b/test/lib/leakchecker.rb index af9200bf77..625b07992c 100644 --- a/test/lib/leakchecker.rb +++ b/test/lib/leakchecker.rb @@ -41,6 +41,15 @@ def find_fds if d.respond_to? :fileno a -= [d.fileno] end + + # only place we use epoll is internally, do not count it against us + a.delete_if do |fd| + begin + File.readlink("#{fd_dir}/#{fd}").match?(/\beventpoll\b/) + rescue + false + end + end a } fds.sort diff --git a/test/ruby/test_threadlet.rb b/test/ruby/test_threadlet.rb new file mode 100644 index 0000000000..84cc4ec662 --- /dev/null +++ b/test/ruby/test_threadlet.rb @@ -0,0 +1,283 @@ +# frozen_string_literal: true +require 'test/unit' +require 'fiber' +require 'io/nonblock' +require 'io/wait' +require 'socket' + +class TestThreadlet < Test::Unit::TestCase + def test_value + nr = 0 + f = Threadlet.start { nr += 1 } + assert_equal 1, f.value, 'value returns as expected' + assert_equal 1, f.value, 'value is idempotent' + end + + def test_join + nr = 0 + f = Threadlet.start { nr += 1 } + assert_equal f, f.join + assert_equal 1, f.value + assert_equal f, f.join, 'join is idempotent' + assert_equal f, f.join(10), 'join is idempotent w/ timeout' + end + + def test_io_wait_read + tick = 0.1 # 100ms, TIME_QUANTUM_USEC in thread_pthread.c + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + rdr = Threadlet.start { r.read(1) } + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + assert_nil rdr.join(tick), 'join returns nil on timeout' + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, tick, 'Threadlet still running' + w.write('a') + assert_equal 'a', rdr.value, 'finished' + + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + rwait = Threadlet.start { r.wait_readable(tick) } + assert_nil rwait.value, 'IO returned no events after timeout' + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, tick, 'Threadlet waited for timeout' + end + end + + # make sure we do not create extra FDs (from epoll/kqueue) + # for operations which do not need it + def test_fd_creation_and_fork + assert_separately(%w(-r io/nonblock), <<~'end;') # do + fdrange = (3..64) + reserved = -'The given fd is not accessible because RubyVM reserves it' + fdstats = lambda do + stats = Hash.new(0) + fdrange.map do |fd| + begin + IO.for_fd(fd, autoclose: false) + stats[:good] += 1 + rescue Errno::EBADF # ignore + rescue ArgumentError => e + raise if e.message != reserved + stats[:reserved] += 1 + end + end + stats.freeze + end + + before = fdstats.call + Threadlet.start { :fib }.join + assert_equal before, fdstats.call, + 'Threadlet.start + Threadlet#join does not create new FD' + + # now, epoll/kqueue implementations create FDs, ensure they're reserved + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + + before_io = fdstats.call + assert_equal before[:reserved], before_io[:reserved], + 'no reserved FDs created before I/O attempted' + + f1 = Threadlet.start { r.read(1) } + after_io = fdstats.call + assert_equal before_io[:good], after_io[:good], + 'no change in unreserved FDs during I/O wait' + + w.write('!') + assert_equal '!', f1.value, 'auto-Threadlet IO#read works' + assert_operator before_io[:reserved], :<=, after_io[:reserved], + 'a reserved FD may be created on some implementations' + + # ensure we do not share epoll/kqueue FDs across fork + if Process.respond_to?(:fork) && Process.respond_to?(:waitpid2) + pid = fork do + after_fork = fdstats.call + w.write(after_fork.inspect == before_io.inspect ? '1' : '0') + + # ensure auto-Threadlet works properly after forking + IO.pipe do |a, b| + a.nonblock = b.nonblock = true + f1 = Threadlet.start { a.read(1) } + f2 = Threadlet.start { b.write('!') } + assert_equal 1, f2.value + w.write(f1.value == '!' ? '!' : '?') + end + + exit!(0) + end + assert_equal '1', r.read(1), 'reserved FD cleared after fork' + assert_equal '!', r.read(1), 'auto-Threadlet works after forking' + _, status = Process.waitpid2(pid) + assert_predicate status, :success?, 'forked child exited properly' + end + end + end; + end + + def test_waitpid + assert_separately(%w(-r io/nonblock), <<~'end;') # do + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + pid = fork do + sleep 0.1 + exit!(0) + end + f = Threadlet.start { Process.waitpid2(pid) } + wpid, st = f.value + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, 0.1, 'child did not return immediately' + assert_equal pid, wpid, 'waited pid matches' + assert_predicate st, :success?, 'process exited successfully' + + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + + # "blocking" Process.waitpid2 takes precedence over trap(:CHLD) + waited = [] + chld_called = false + trap(:CHLD) { chld_called = true; waited.concat(Process.waitall) } + pid = fork do + r.read(1) + exit!(0) + end + assert_nil Process.waitpid2(pid, Process::WNOHANG), 'child is waiting' + assert_nil(Threadlet.start do + Process.waitpid2(pid, Process::WNOHANG) + end.value, 'WNOHANG works normally in Threadlet') + f = Threadlet.start { Process.waitpid2(pid) } + assert_equal 1, w.write('.'), 'woke child' + wpid, st = f.value + assert_equal pid, wpid, 'waited pid matches' + assert_predicate st, :success?, 'child exited successfully' + assert_empty waited, 'waitpid had predence' + assert chld_called, 'CHLD handler got called anyways' + + chld_called = false + [ 'DEFAULT', 'SIG_DFL', 'IGNORE', 'SYSTEM_DEFAULT', nil + ].each do |handler| + trap(:CHLD, handler) + pid = fork do + r.read(1) + exit!(0) + end + t = "trap(:CHLD, #{handler.inspect})" + f = Threadlet.start { Process.waitpid2(pid) } + assert_equal 1, w.write('.'), "woke child for #{t}" + wpid, st = f.value + assert_predicate st, :success?, "child exited successfully for #{t}" + assert_equal wpid, pid, "return value correct for #{t}" + assert_equal false, chld_called, + "old CHLD handler did not fire for #{t}" + end + end + end; + end if Process.respond_to?(:fork) && Process.respond_to?(:waitpid2) && + Process.const_defined?(:WNOHANG) + + def test_cpu_usage + require 'benchmark' # avoid skewing assert_cpu_usage_low + disabled = GC.disable + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + t = fv = wr = nil + + # select() requires higher CPU% :< + assert_cpu_usage_low(pct: 0.50) do + t = Thread.new { Threadlet.start { r.read(1) }.value } + wr = Thread.new do + sleep 0.2 + w.write('..') + end + fv = Threadlet.start { r.read(1) }.value + end + assert_equal '.', fv + assert_equal '.', t.value + wr.join + end + + thrs = [] + pid = spawn(*%W(#{EnvUtil.rubybin} --disable=gems -e #{'sleep 0.1'})) + 2.times do + thrs << Thread.new { Threadlet.start { Process.waitall }.value } + end + assert_cpu_usage_low(pct: 0.44) do + thrs.map!(&:value) + end + ensure + GC.enable unless disabled + end + + # As usual, cannot resume/run Threadlets across threads, but the + # scheduler works across threads + def test_cross_thread_schedule + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + t = Thread.new { Threadlet.start { r.read(1) }.value } + assert_nil t.join(0.1) + f = Threadlet.start { w.write('a') } + assert_equal 'a', t.value + assert_equal 1, f.value + end + end + + # tricky for kqueue and epoll implementations + def test_multi_readers + if RUBY_PLATFORM =~ /linux/ && + (RbConfig::CONFIG['LDFLAGS'] =~ /-lkqueue\b/ || + RbConfig::CONFIG['DLDFLAGS'] =~ /-lkqueue\b/) + skip "FIXME libkqueue is buggy on this test" + end + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + nr = 30 + fibs = nr.times.map { Threadlet.start { r.read(1) } } + fibs.each { |f| assert_nil f.join(0.001) } + + exp = nr.times.map { -'a' }.freeze + w.write(exp.join) + assert_equal exp, fibs.map(&:value); + end + end + + def test_unix_rw + unless defined?(UNIXSocket) && UNIXSocket.respond_to?(:pair) + skip 'no UNIXSocket.pair' + end + # ensure simultaneous waits on the same FD works + UNIXSocket.pair do |a, b| + assert_predicate a, :wait_writable + f0 = Threadlet.start { a.wait_readable } # cold + f9 = Threadlet.start { a.wait_writable } # already armed + assert_equal a, f9.value + + f4 = Threadlet.start { a.wait_readable } + b.write('.') + f5 = Threadlet.start { a.wait_writable } + f6 = Threadlet.start { a.wait_readable } + assert_equal a, f4.value + assert_equal a, f5.value + assert_equal a, f6.value + assert_equal a, f0.value + end + end + + def test_io_select_pipe + IO.pipe do |r, w| + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + f1 = Threadlet.start { IO.select([r], nil, nil, 0.02) } + assert_nil f1.value + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, 0.02 + assert_operator diff, :<, 0.2 + w.write '.' + f2 = Threadlet.start { IO.select([r], [w]) } + assert_equal [ [r], [w], [] ], f2.value + end + end + + def test_reg_file + File.open(__FILE__) do |fp| + t = Threadlet.start { fp.wait_readable } + assert_same fp, t.value + t = Threadlet.start { IO.select([fp]) } + assert_equal [[fp],[],[]], t.value + end + end +end diff --git a/thread.c b/thread.c index 214df5b0df..f505982a09 100644 --- a/thread.c +++ b/thread.c @@ -74,6 +74,7 @@ #include "internal.h" #include "iseq.h" #include "vm_core.h" +#include "iom.h" #ifndef USE_NATIVE_THREAD_PRIORITY #define USE_NATIVE_THREAD_PRIORITY 0 @@ -99,10 +100,7 @@ static int rb_threadptr_dead(rb_thread_t *th); static void rb_check_deadlock(rb_vm_t *vm); static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th); static const char *thread_status_name(rb_thread_t *th, int detail); -static void timespec_add(struct timespec *, const struct timespec *); static void timespec_sub(struct timespec *, const struct timespec *); -static int timespec_update_expire(struct timespec *, const struct timespec *); -static void getclockofday(struct timespec *); #define eKillSignal INT2FIX(0) #define eTerminateSignal INT2FIX(1) @@ -915,8 +913,8 @@ thread_join_sleep(VALUE arg) struct timespec end; if (p->limit) { - getclockofday(&end); - timespec_add(&end, p->limit); + rb_getclockofday(&end); + rb_timespec_add(&end, p->limit); } while (target_th->status != THREAD_KILLED) { @@ -928,7 +926,7 @@ thread_join_sleep(VALUE arg) th->vm->sleeper--; } else { - if (timespec_update_expire(p->limit, &end)) { + if (rb_timespec_update_expire(p->limit, &end)) { thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n", thread_id_str(target_th)); return Qfalse; @@ -1006,6 +1004,31 @@ thread_join(rb_thread_t *target_th, struct timespec *ts) static struct timespec *double2timespec(struct timespec *, double); +struct timespec * +rb_join_interval(struct timespec *ts, int argc, VALUE *argv) +{ + VALUE limit; + + rb_scan_args(argc, argv, "01", &limit); + + /* + * This supports INFINITY and negative values, so we can't use + * rb_time_interval right now... + */ + switch (TYPE(limit)) { + case T_NIL: return 0; + case T_FIXNUM: + ts->tv_sec = NUM2TIMET(limit); + if (ts->tv_sec < 0) + ts->tv_sec = 0; + ts->tv_nsec = 0; + break; + default: + ts = double2timespec(ts, rb_num2dbl(limit)); + } + return ts; +} + /* * call-seq: * thr.join -> thr @@ -1048,30 +1071,9 @@ static struct timespec *double2timespec(struct timespec *, double); static VALUE thread_join_m(int argc, VALUE *argv, VALUE self) { - VALUE limit; - struct timespec timespec; - struct timespec *ts = 0; - - rb_scan_args(argc, argv, "01", &limit); - - /* - * This supports INFINITY and negative values, so we can't use - * rb_time_interval right now... - */ - switch (TYPE(limit)) { - case T_NIL: break; - case T_FIXNUM: - timespec.tv_sec = NUM2TIMET(limit); - if (timespec.tv_sec < 0) - timespec.tv_sec = 0; - timespec.tv_nsec = 0; - ts = ×pec; - break; - default: - ts = double2timespec(×pec, rb_num2dbl(limit)); - } + struct timespec ts; - return thread_join(rb_thread_ptr(self), ts); + return thread_join(rb_thread_ptr(self), rb_join_interval(&ts, argc, argv)); } /* @@ -1158,8 +1160,8 @@ sleep_forever(rb_thread_t *th, int deadlockable, int spurious_check) th->status = prev_status; } -static void -getclockofday(struct timespec *ts) +void +rb_getclockofday(struct timespec *ts) { #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) if (clock_gettime(CLOCK_MONOTONIC, ts) == 0) @@ -1168,8 +1170,8 @@ getclockofday(struct timespec *ts) rb_timespec_now(ts); } -static void -timespec_add(struct timespec *dst, const struct timespec *ts) +void +rb_timespec_add(struct timespec *dst, const struct timespec *ts) { if (TIMESPEC_SEC_MAX - ts->tv_sec < dst->tv_sec) dst->tv_sec = TIMESPEC_SEC_MAX; @@ -1196,8 +1198,8 @@ timespec_sub(struct timespec *dst, const struct timespec *tv) } } -static int -timespec_cmp(const struct timespec *a, const struct timespec *b) +int +rb_timespec_cmp(const struct timespec *a, const struct timespec *b) { if (a->tv_sec > b->tv_sec) { return 1; @@ -1221,14 +1223,14 @@ timespec_cmp(const struct timespec *a, const struct timespec *b) * Returns true if @end has past * Updates @ts and returns false otherwise */ -static int -timespec_update_expire(struct timespec *ts, const struct timespec *end) +int +rb_timespec_update_expire(struct timespec *ts, const struct timespec *end) { struct timespec now; - getclockofday(&now); - if (timespec_cmp(&now, end) >= 0) return 1; - thread_debug("timespec_update_expire: " + rb_getclockofday(&now); + if (rb_timespec_cmp(&now, end) >= 0) return 1; + thread_debug("rb_timespec_update_expire: " "%"PRI_TIMET_PREFIX"d.%.6ld > %"PRI_TIMET_PREFIX"d.%.6ld\n", (time_t)end->tv_sec, (long)end->tv_nsec, (time_t)now.tv_sec, (long)now.tv_nsec); @@ -1243,14 +1245,14 @@ sleep_timespec(rb_thread_t *th, struct timespec ts, int spurious_check) struct timespec end; enum rb_thread_status prev_status = th->status; - getclockofday(&end); - timespec_add(&end, &ts); + rb_getclockofday(&end); + rb_timespec_add(&end, &ts); th->status = THREAD_STOPPED; RUBY_VM_CHECK_INTS_BLOCKING(th->ec); while (th->status == THREAD_STOPPED) { native_sleep(th, &ts); RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - if (timespec_update_expire(&ts, &end)) + if (rb_timespec_update_expire(&ts, &end)) break; if (!spurious_check) break; @@ -3794,7 +3796,7 @@ wait_retryable(int *result, int errnum, struct timespec *timeout, case ERESTART: #endif *result = 0; - if (timeout && timespec_update_expire(timeout, end)) { + if (timeout && rb_timespec_update_expire(timeout, end)) { timeout->tv_sec = 0; timeout->tv_nsec = 0; } @@ -3805,7 +3807,7 @@ wait_retryable(int *result, int errnum, struct timespec *timeout, else if (*result == 0) { /* check for spurious wakeup */ if (timeout) { - return !timespec_update_expire(timeout, end); + return !rb_timespec_update_expire(timeout, end); } return TRUE; } @@ -3840,9 +3842,8 @@ do_select(int n, rb_fdset_t *const readfds, rb_fdset_t *const writefds, TRUE) if (timeout) { - getclockofday(&end); - timespec_add(&end, timespec_for(&ts, timeout)); - tsp = &ts; + rb_getclockofday(&end); + rb_timespec_add(&end, tsp = timespec_for(&ts, timeout)); } #define fd_init_copy(f) \ @@ -3913,6 +3914,8 @@ int rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except, struct timeval *timeout) { + rb_thread_t *th; + if (!read && !write && !except) { if (!timeout) { rb_thread_sleep_forever(); @@ -3931,7 +3934,16 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * if (except) { rb_fd_resize(max - 1, except); } - return do_select(max, read, write, except, timeout); + + th = GET_THREAD(); + if (rb_threadlet_sched_p(th)) { + struct timespec ts; + return rb_iom_select(th, max, read, write, except, + timespec_for(&ts, timeout)); + } + else { + return do_select(max, read, write, except, timeout); + } } #ifdef USE_POLL @@ -3987,10 +3999,13 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) struct timespec *tsp = 0; rb_thread_t *th = GET_THREAD(); + if (rb_threadlet_sched_p(th)) { + return rb_iom_waitfd(th, &fd, events, timespec_for(&ts, timeout)); + } + if (timeout) { - getclockofday(&end); - timespec_add(&end, timespec_for(&ts, timeout)); - tsp = &ts; + rb_getclockofday(&end); + rb_timespec_add(&end, tsp = timespec_for(&ts, timeout)); } fds.fd = fd; @@ -4099,7 +4114,12 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv) struct select_args args; int r; VALUE ptr = (VALUE)&args; + rb_thread_t *th = GET_THREAD(); + if (rb_threadlet_sched_p(th)) { + struct timespec ts; + return rb_iom_waitfd(th, &fd, events, timespec_for(&ts, tv)); + } args.as.fd = fd; args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL; args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL; @@ -4260,10 +4280,13 @@ terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th) } } +static void rb_iom_atfork_child(rb_thread_t *); + void rb_thread_atfork(void) { rb_thread_t *th = GET_THREAD(); + rb_iom_atfork_child(th); rb_thread_atfork_internal(th, terminate_atfork_i); th->join_list = NULL; rb_mutex_cleanup_keeping_mutexes(th); @@ -5299,3 +5322,21 @@ rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data) return rb_ensure(b_proc, data, rb_ary_pop, cur_th->pending_interrupt_mask_stack); } + +#ifndef RUBYVM_IOM +# if defined(HAVE_SYS_EVENT_H) && defined(HAVE_KQUEUE) && defined(HAVE_KEVENT) +# define RUBYVM_IOM IOM_KQUEUE +# elif defined(HAVE_SYS_EPOLL_H) && defined(HAVE_EPOLL_CREATE) && \ + defined(HAVE_EPOLL_CTL) && defined(HAVE_EPOLL_WAIT) +# define RUBYVM_IOM IOM_EPOLL +# else +# define RUBYVM_IOM IOM_SELECT +# endif +#endif +#if RUBYVM_IOM == IOM_KQUEUE +# include "iom_kqueue.h" +#elif RUBYVM_IOM == IOM_EPOLL +# include "iom_epoll.h" +#else +# include "iom_select.h" +#endif diff --git a/thread_pthread.c b/thread_pthread.c index 069c50ed7a..a311a561cb 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -349,12 +349,12 @@ native_cond_timeout(rb_nativethread_cond_t *cond, struct timespec timeout_rel) struct timespec abs; if (condattr_monotonic) { - getclockofday(&abs); + rb_getclockofday(&abs); } else { rb_timespec_now(&abs); } - timespec_add(&abs, &timeout_rel); + rb_timespec_add(&abs, &timeout_rel); return abs; } @@ -1704,9 +1704,14 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr) } #endif +static int rb_iom_reserved_fd(int); /* check kqueue or epoll FD */ + int rb_reserved_fd_p(int fd) { + if (rb_iom_reserved_fd(fd)) { + return 1; + } #if USE_SLEEPY_TIMER_THREAD if ((fd == timer_thread_pipe.normal[0] || fd == timer_thread_pipe.normal[1] || diff --git a/vm.c b/vm.c index 7a40e79d6c..308869762c 100644 --- a/vm.c +++ b/vm.c @@ -8,6 +8,7 @@ **********************************************************************/ +#include "iom.h" #include "internal.h" #include "ruby/vm.h" #include "ruby/st.h" @@ -2199,6 +2200,7 @@ ruby_vm_destruct(rb_vm_t *vm) rb_fiber_reset_root_local_storage(th->self); thread_free(th); } + rb_iom_destroy(vm); rb_vm_living_threads_init(vm); ruby_vm_run_at_exit_hooks(vm); if (vm->loading_table) { @@ -2368,6 +2370,9 @@ rb_thread_recycle_stack_release(VALUE *stack) ruby_xfree(stack); } +void rb_fiber_mark_self(rb_fiber_t *fib); +void rb_iom_mark_runq(const rb_thread_t *th); + void rb_execution_context_mark(const rb_execution_context_t *ec) { @@ -2420,7 +2425,6 @@ rb_execution_context_mark(const rb_execution_context_t *ec) RUBY_MARK_UNLESS_NULL(ec->local_storage_recursive_hash_for_trace); } -void rb_fiber_mark_self(rb_fiber_t *fib); void rb_threadptr_root_fiber_setup(rb_thread_t *th); void rb_threadptr_root_fiber_release(rb_thread_t *th); @@ -2442,6 +2446,11 @@ thread_mark(void *ptr) RUBY_MARK_UNLESS_NULL(th->top_self); RUBY_MARK_UNLESS_NULL(th->top_wrapper); if (th->root_fiber) rb_fiber_mark_self(th->root_fiber); + + /* th->runq may be 0 early on */ + if (th->runq.n.next && !list_empty(&th->runq)) { + rb_iom_mark_runq(th); + } RUBY_MARK_UNLESS_NULL(th->stat_insn_usage); RUBY_MARK_UNLESS_NULL(th->last_status); RUBY_MARK_UNLESS_NULL(th->locking_mutex); @@ -2527,6 +2536,7 @@ th_init(rb_thread_t *th, VALUE self) { th->self = self; rb_threadptr_root_fiber_setup(th); + list_head_init(&th->runq); { /* vm_stack_size is word number. diff --git a/vm_core.h b/vm_core.h index 0a185a6ceb..47aa7cee2a 100644 --- a/vm_core.h +++ b/vm_core.h @@ -540,6 +540,8 @@ typedef struct rb_hook_list_struct { int need_clean; } rb_hook_list_t; +struct rb_iom_struct; + typedef struct rb_vm_struct { VALUE self; @@ -551,7 +553,7 @@ typedef struct rb_vm_struct { #ifdef USE_SIGALTSTACK void *main_altstack; #endif - + struct rb_iom_struct *iom; rb_serial_t fork_gen; struct list_head waiting_fds; /* <=> struct waiting_fd */ struct list_head living_threads; @@ -891,6 +893,7 @@ typedef struct rb_thread_struct { /* fiber */ rb_fiber_t *root_fiber; rb_jmpbuf_t root_jmpbuf; + struct list_head runq; /* -rb_iom_timer.as.rnode */ /* misc */ unsigned int abort_on_exception: 1; -- EW