From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, HEADER_FROM_DIFFERENT_DOMAINS shortcircuit=no autolearn=ham autolearn_force=no version=3.4.2 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 166EE1F97F; Tue, 20 Nov 2018 08:34:17 +0000 (UTC) From: Eric Wong To: spew@80x24.org Cc: Eric Wong Subject: [PATCH r65832 2/2] Thread::Light: green threads implemented using fibers Date: Tue, 20 Nov 2018 08:34:13 +0000 Message-Id: <20181120083413.43523-3-e@80x24.org> In-Reply-To: <20181120083413.43523-1-e@80x24.org> References: <20181120083413.43523-1-e@80x24.org> List-Id: Thread::Light is a type of Fiber which gives Thread-like behavior with implementation characteristics similar to the Thread class from Ruby 1.8 (but allowing kqueue and epoll on modern OSes). This adds following scheduling points: C-API scheduling points: * rb_wait_for_single_fd * rb_thread_fd_select * rb_waitpid (SIGCHLD platforms only) These three functions are used by many Ruby methods, which all now become scheduling points. additional Ruby API scheduling points: * Kernel#sleep * Thread.pass * IO.copy_stream * {Queue,SizedQueue}#{push,pop} FIXME: the "select"-based implementation still has some missed events problems. Fortunately, relevant production systems use kqueue or epoll which have no known problems at the moment (tested FreeBSD 11.2 and Linux 4.19.2) Thread::Light local storage is implemented (#[], #[]=, #fetch, #key?, and #keys) just like normal Fibers. Thread::Light#stop? and Thread::Light#status are analogous to their regular Thread methods. Thread::Light#run and Thread::Light#wakeup are supported for waking up from Kernel#sleep. Mutex and ConditionVariable are NOT scheduling points for Thread::Light switching; however they may process signal handling and handle I/O dispatch for other native threads. Thread::Light.list does not exist, yet (needed?). Implementation changes along the way: - Fiber#transfer used for switching Thanks to funny.falcon for suggestion [ruby-core:87776], it made the code better, even :) - improve IO.select mapping in kqueue/epoll (st_table => ccan/list), since there is no need for lookup, only scan - sync to use MJIT-friendly rb_waitpid (added th->nogvl_runq since waitpid(2) can run without GVL) - "ensure" support. Only for Thread::Light, Fiber has no ensure (see [Bug #595]). "ensure" is REQUIRED for auto-scheduling safety. I am not sure if regular Fiber can support "ensure" in a way which is both performant and backwards-compatible. With Thread::Light being a new feature, there is no backwards compatibility to worry about, so the "ensure" support adds practically no overhead - more code sharing between iom_{select,kqueue,epoll.h} - switch to rb_hrtime_t for timeouts - extract timeout API, so non-timeout-arg users can benefit from reduced. This will make Timeout-in-VM support easier and more orthogonal to this one. main Ruby API is similar to Thread, except "Thread::Light" "Thread.new" The following behave like their Thread counterparts: Thread::Light.new - Thread.new (TODO: parameter passing) Thread::Light#join - run internal scheduler until target coro is done Thread::Light#value - ditto, with return value Changes to existing functions are minimal. New files (all new structs and relations should be documented): iom.h - internal API for the rest of RubyVM (more features to follow) iom_internal.h - internal header for iom_(select|epoll|kqueue).h iom_epoll.h - epoll-specific pieces iom_kqueue.h - kqueue-specific pieces iom_select.h - select-specific pieces iom_pingable_common.h - common code for iom_(epoll|kqueue).h iom_common.h - common footer for iom_(select|epoll|kqueue).h [Feature #13618] Changes to existing data structures: * rb_thread_t .runq - list of Thread::Light instances to auto-resume .nogvl_runq - same as above for out-of-GVL use .coros - list of running Thread::Light for that native Thread For auto-scheduling, we must have ensure support because Fiber does not yet support ensure (adding "ensure" support to existing Fibers will also cause memory leaks in programs which rely on the old behavior. * rb_execution_context_t .enode - link to rb_thread_t coros rb_vm_t.iom - Ruby I/O Manager (rb_iom_t) :) As usual, understanding the data structures first should help you understand the code. Right now, I reuse some static functions in thread.c, so thread.c includes iom_(select|epoll|kqueue).h ./configure gains a new --with-iom=(select|epoll|kqueue) switch * libkqueue (kqueue emulation for non-*BSD) has caveats: libkqueue support is incomplete; corner cases are not handled well: 1) multiple Thread::Light instances waiting on the same FD 2) waiting for both read and write events on the same FD Bugfixes to libkqueue may be necessary to support all corner cases. Supporting these corner cases for native kqueue was challenging, even. See comments on iom_kqueue.h and iom_epoll.h for nuances. Native BSD kqueue has none of these limitations (tested FreeBSD 11.2) Test script I used to download a file from my server: ----8<--- require 'net/http' require 'uri' require 'digest/sha1' require 'fiber' # for Fiber#alive? url = 'https://80x24.org/git-i-forgot-to-pack/objects/pack/pack-97b25a76c03b489d4cbbd85b12d0e1ad28717e55.idx' cls = defined?(Thread::Light) ? Thread::Light : Thread uri = URI(url) use_ssl = "https" == uri.scheme coros = 10.times.map do cls.start do cur = Thread.current.object_id # XXX getaddrinfo() and connect() are blocking # XXX resolv/replace + connect_nonblock Net::HTTP.start(uri.host, uri.port, use_ssl: use_ssl) do |http| req = Net::HTTP::Get.new(uri) http.request(req) do |res| dig = Digest::SHA1.new res.read_body do |buf| dig.update(buf) # warn "#{cur} #{buf.bytesize}\n" end warn "#{cur} #{dig.hexdigest}\n" end end warn "done\n" :done end end warn "joining #{Time.now}\n" coros[-1].join warn "joined #{Time.now}\n" all = coros.dup warn "1 joined, wait for the rest\n" until coros.empty? coros.each(&:join) coros.keep_if(&:alive?) warn coros.inspect end p all.map(&:value) cls.new do puts 'HI' end.join --- common.mk | 15 + configure.ac | 32 + cont.c | 436 +++++++-- eval.c | 1 + fiber.h | 66 ++ hrtime.h | 8 + include/ruby/io.h | 2 + io.c | 36 +- iom.h | 109 +++ iom_common.h | 253 ++++++ iom_epoll.h | 652 ++++++++++++++ iom_internal.h | 596 +++++++++++++ iom_kqueue.h | 804 +++++++++++++++++ iom_pingable_common.h | 224 +++++ iom_select.h | 569 ++++++++++++ process.c | 138 +-- .../test_wait_for_single_fd.rb | 69 ++ test/lib/leakchecker.rb | 9 + test/net/http/test_http.rb | 2 +- test/ruby/test_thread_light.rb | 828 ++++++++++++++++++ test/ruby/test_thread_queue.rb | 85 +- thread.c | 472 ++++++---- thread_pthread.c | 80 +- thread_sync.c | 81 +- thread_win32.c | 16 +- vm.c | 10 + vm_core.h | 55 +- 27 files changed, 5275 insertions(+), 373 deletions(-) create mode 100644 fiber.h create mode 100644 iom.h create mode 100644 iom_common.h create mode 100644 iom_epoll.h create mode 100644 iom_internal.h create mode 100644 iom_kqueue.h create mode 100644 iom_pingable_common.h create mode 100644 iom_select.h create mode 100644 test/ruby/test_thread_light.rb diff --git a/common.mk b/common.mk index b1798c116d..ba65e090aa 100644 --- a/common.mk +++ b/common.mk @@ -1621,11 +1621,14 @@ cont.$(OBJEXT): {$(VPATH)}cont.c cont.$(OBJEXT): {$(VPATH)}defines.h cont.$(OBJEXT): {$(VPATH)}encoding.h cont.$(OBJEXT): {$(VPATH)}eval_intern.h +cont.$(OBJEXT): {$(VPATH)}fiber.h cont.$(OBJEXT): {$(VPATH)}gc.h +cont.$(OBJEXT): {$(VPATH)}hrtime.h cont.$(OBJEXT): {$(VPATH)}id.h cont.$(OBJEXT): {$(VPATH)}intern.h cont.$(OBJEXT): {$(VPATH)}internal.h cont.$(OBJEXT): {$(VPATH)}io.h +cont.$(OBJEXT): {$(VPATH)}iom.h cont.$(OBJEXT): {$(VPATH)}method.h cont.$(OBJEXT): {$(VPATH)}missing.h cont.$(OBJEXT): {$(VPATH)}mjit.h @@ -2462,10 +2465,12 @@ process.$(OBJEXT): {$(VPATH)}config.h process.$(OBJEXT): {$(VPATH)}defines.h process.$(OBJEXT): {$(VPATH)}dln.h process.$(OBJEXT): {$(VPATH)}encoding.h +process.$(OBJEXT): {$(VPATH)}hrtime.h process.$(OBJEXT): {$(VPATH)}id.h process.$(OBJEXT): {$(VPATH)}intern.h process.$(OBJEXT): {$(VPATH)}internal.h process.$(OBJEXT): {$(VPATH)}io.h +process.$(OBJEXT): {$(VPATH)}iom.h process.$(OBJEXT): {$(VPATH)}method.h process.$(OBJEXT): {$(VPATH)}missing.h process.$(OBJEXT): {$(VPATH)}node.h @@ -2708,10 +2713,12 @@ signal.$(OBJEXT): {$(VPATH)}config.h signal.$(OBJEXT): {$(VPATH)}defines.h signal.$(OBJEXT): {$(VPATH)}encoding.h signal.$(OBJEXT): {$(VPATH)}eval_intern.h +signal.$(OBJEXT): {$(VPATH)}hrtime.h signal.$(OBJEXT): {$(VPATH)}id.h signal.$(OBJEXT): {$(VPATH)}intern.h signal.$(OBJEXT): {$(VPATH)}internal.h signal.$(OBJEXT): {$(VPATH)}io.h +signal.$(OBJEXT): {$(VPATH)}iom.h signal.$(OBJEXT): {$(VPATH)}method.h signal.$(OBJEXT): {$(VPATH)}missing.h signal.$(OBJEXT): {$(VPATH)}node.h @@ -2872,6 +2879,7 @@ thread.$(OBJEXT): {$(VPATH)}debug.h thread.$(OBJEXT): {$(VPATH)}defines.h thread.$(OBJEXT): {$(VPATH)}encoding.h thread.$(OBJEXT): {$(VPATH)}eval_intern.h +thread.$(OBJEXT): {$(VPATH)}fiber.h thread.$(OBJEXT): {$(VPATH)}gc.h thread.$(OBJEXT): {$(VPATH)}hrtime.h thread.$(OBJEXT): {$(VPATH)}id.h @@ -2879,6 +2887,13 @@ thread.$(OBJEXT): {$(VPATH)}intern.h thread.$(OBJEXT): {$(VPATH)}internal.h thread.$(OBJEXT): {$(VPATH)}io.h thread.$(OBJEXT): {$(VPATH)}iseq.h +thread.$(OBJEXT): {$(VPATH)}iom.h +thread.$(OBJEXT): {$(VPATH)}iom_internal.h +thread.$(OBJEXT): {$(VPATH)}iom_common.h +thread.$(OBJEXT): {$(VPATH)}iom_epoll.h +thread.$(OBJEXT): {$(VPATH)}iom_kqueue.h +thread.$(OBJEXT): {$(VPATH)}iom_pingable_common.h +thread.$(OBJEXT): {$(VPATH)}iom_select.h thread.$(OBJEXT): {$(VPATH)}method.h thread.$(OBJEXT): {$(VPATH)}missing.h thread.$(OBJEXT): {$(VPATH)}mjit.h diff --git a/configure.ac b/configure.ac index f4be92039d..a9fb5c2ba4 100644 --- a/configure.ac +++ b/configure.ac @@ -1043,6 +1043,8 @@ AC_CHECK_HEADERS(sanitizer/msan_interface.h) AC_CHECK_HEADERS(setjmpex.h) AC_CHECK_HEADERS(stdalign.h) AC_CHECK_HEADERS(sys/attr.h) +AC_CHECK_HEADERS(sys/epoll.h) +AC_CHECK_HEADERS(sys/event.h) AC_CHECK_HEADERS(sys/eventfd.h) AC_CHECK_HEADERS(sys/fcntl.h) AC_CHECK_HEADERS(sys/file.h) @@ -1824,6 +1826,10 @@ AC_CHECK_FUNCS(dladdr) AC_CHECK_FUNCS(dup) AC_CHECK_FUNCS(dup3) AC_CHECK_FUNCS(eaccess) +AC_CHECK_FUNCS(epoll_create) +AC_CHECK_FUNCS(epoll_create1) +AC_CHECK_FUNCS(epoll_ctl) +AC_CHECK_FUNCS(epoll_wait) AC_CHECK_FUNCS(endgrent) AC_CHECK_FUNCS(eventfd) AC_CHECK_FUNCS(fchmod) @@ -1859,7 +1865,9 @@ AC_CHECK_FUNCS(initgroups) AC_CHECK_FUNCS(ioctl) AC_CHECK_FUNCS(isfinite) AC_CHECK_FUNCS(issetugid) +AC_CHECK_FUNCS(kevent) AC_CHECK_FUNCS(killpg) +AC_CHECK_FUNCS(kqueue) AC_CHECK_FUNCS(lchmod) AC_CHECK_FUNCS(lchown) AC_CHECK_FUNCS(link) @@ -2845,6 +2853,29 @@ AC_ARG_WITH(valgrind, AS_IF([test x$with_valgrind != xno], [AC_CHECK_HEADERS(valgrind/memcheck.h)]) +AC_DEFINE_UNQUOTED(IOM_SELECT, 0) +AC_DEFINE_UNQUOTED(IOM_KQUEUE, 1) +AC_DEFINE_UNQUOTED(IOM_EPOLL, 2) + +iom_default=select +AS_CASE([$ac_cv_func_kqueue:$ac_cv_func_kevent:$ac_cv_header_sys_event_h], +[yes:yes:yes], [iom_default=kqueue], +[*], + [AS_CASE( + [$ac_cv_func_epoll_wait:$ac_cv_func_epoll_create:$ac_cv_header_sys_epoll_h], + [yes:yes:yes], [iom_default=epoll])] +) + +AC_ARG_WITH(iom, + AS_HELP_STRING([--with-iom=XXXXX], + [I/O manager (select|kqueue|epoll)]), + [with_iom="$withval"], [with_iom="$iom_default"]) +AS_CASE(["$with_iom"], + [select], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_SELECT)], + [kqueue], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_KQUEUE)], + [epoll], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_EPOLL)], + [AC_MSG_ERROR(unknown I/O manager: $with_iom)]) + dln_a_out_works=no AS_IF([test "$ac_cv_header_a_out_h" = yes], [ AS_IF([test "$with_dln_a_out" = yes || test "$rb_cv_dlopen" = unknown], [ @@ -4025,6 +4056,7 @@ config_summary "JIT support" "$MJIT_SUPPORT" config_summary "man page type" "$MANTYPE" config_summary "search path" "$search_path" config_summary "static-linked-ext" ${EXTSTATIC:+"yes"} +config_summary "I/O manager" ${with_iom} echo "" echo "---" ]) diff --git a/cont.c b/cont.c index 6387118f4a..1d73302eb1 100644 --- a/cont.c +++ b/cont.c @@ -9,59 +9,14 @@ **********************************************************************/ +#include "iom.h" #include "internal.h" #include "vm_core.h" +#include "fiber.h" #include "gc.h" #include "eval_intern.h" #include "mjit.h" -/* FIBER_USE_NATIVE enables Fiber performance improvement using system - * dependent method such as make/setcontext on POSIX system or - * CreateFiber() API on Windows. - * This hack make Fiber context switch faster (x2 or more). - * However, it decrease maximum number of Fiber. For example, on the - * 32bit POSIX OS, ten or twenty thousands Fiber can be created. - * - * Details is reported in the paper "A Fast Fiber Implementation for Ruby 1.9" - * in Proc. of 51th Programming Symposium, pp.21--28 (2010) (in Japanese). - */ - -#if !defined(FIBER_USE_NATIVE) -# if defined(HAVE_GETCONTEXT) && defined(HAVE_SETCONTEXT) -# if 0 -# elif defined(__NetBSD__) -/* On our experience, NetBSD doesn't support using setcontext() and pthread - * simultaneously. This is because pthread_self(), TLS and other information - * are represented by stack pointer (higher bits of stack pointer). - * TODO: check such constraint on configure. - */ -# define FIBER_USE_NATIVE 0 -# elif defined(__sun) -/* On Solaris because resuming any Fiber caused SEGV, for some reason. - */ -# define FIBER_USE_NATIVE 0 -# elif defined(__ia64) -/* At least, Linux/ia64's getcontext(3) doesn't save register window. - */ -# define FIBER_USE_NATIVE 0 -# elif defined(__GNU__) -/* GNU/Hurd doesn't fully support getcontext, setcontext, makecontext - * and swapcontext functions. Disabling their usage till support is - * implemented. More info at - * http://darnassus.sceen.net/~hurd-web/open_issues/glibc/#getcontext - */ -# define FIBER_USE_NATIVE 0 -# else -# define FIBER_USE_NATIVE 1 -# endif -# elif defined(_WIN32) -# define FIBER_USE_NATIVE 1 -# endif -#endif -#if !defined(FIBER_USE_NATIVE) -#define FIBER_USE_NATIVE 0 -#endif - #if FIBER_USE_NATIVE #ifndef _WIN32 #include @@ -138,6 +93,7 @@ enum fiber_status { #define FIBER_SUSPENDED_P(fib) ((fib)->status == FIBER_SUSPENDED) #define FIBER_TERMINATED_P(fib) ((fib)->status == FIBER_TERMINATED) #define FIBER_RUNNABLE_P(fib) (FIBER_CREATED_P(fib) || FIBER_SUSPENDED_P(fib)) +#define FIBER_ENSURE -2 #if FIBER_USE_NATIVE && !defined(_WIN32) static inline int @@ -300,6 +256,8 @@ fiber_ptr(VALUE obj) return fib; } +#define TL_ERR(msg) rb_raise(rb_eThreadError,msg) + NOINLINE(static VALUE cont_capture(volatile int *volatile stat)); #define THREAD_MUST_BE_RUNNING(th) do { \ @@ -1162,6 +1120,7 @@ static VALUE make_passing_arg(int argc, const VALUE *argv) { switch (argc) { + case FIBER_ENSURE: case 0: return Qnil; case 1: @@ -1468,12 +1427,35 @@ rb_fiber_new(VALUE (*func)(ANYARGS), VALUE obj) static void rb_fiber_terminate(rb_fiber_t *fib, int need_interrupt); +static VALUE +fiber_start_i(VALUE tag, VALUE p) +{ + rb_context_t *cont = (rb_context_t *)p; + int argc = cont->argc; + const VALUE *argv, args = cont->value; + rb_execution_context_t *ec = &cont->saved_ec; + rb_fiber_t *fib = ec->fiber_ptr; + rb_thread_t *th = rb_ec_thread_ptr(ec); + rb_proc_t *proc; + + GetProcPtr(fib->first_proc, proc); + argv = argc > 1 ? RARRAY_CONST_PTR(args) : &args; + cont->value = Qnil; + ec->errinfo = Qnil; + ec->root_lep = rb_vm_proc_local_ep(fib->first_proc); + ec->root_svar = Qfalse; + + EXEC_EVENT_HOOK(ec, RUBY_EVENT_FIBER_SWITCH, th->self, 0, 0, 0, Qnil); + cont->value = rb_vm_invoke_proc(ec, proc, argc, argv, VM_BLOCK_HANDLER_NONE); + + return Qfalse; +} + void rb_fiber_start(void) { rb_thread_t * volatile th = GET_THREAD(); rb_fiber_t *fib = th->ec->fiber_ptr; - rb_proc_t *proc; enum ruby_tag_type state; int need_interrupt = TRUE; @@ -1483,17 +1465,7 @@ rb_fiber_start(void) EC_PUSH_TAG(th->ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { rb_context_t *cont = &VAR_FROM_MEMORY(fib)->cont; - int argc; - const VALUE *argv, args = cont->value; - GetProcPtr(fib->first_proc, proc); - argv = (argc = cont->argc) > 1 ? RARRAY_CONST_PTR(args) : &args; - cont->value = Qnil; - th->ec->errinfo = Qnil; - th->ec->root_lep = rb_vm_proc_local_ep(fib->first_proc); - th->ec->root_svar = Qfalse; - - EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_FIBER_SWITCH, th->self, 0, 0, 0, Qnil); - cont->value = rb_vm_invoke_proc(th->ec, proc, argc, argv, VM_BLOCK_HANDLER_NONE); + rb_catch_obj(FIBER_ENSURE, fiber_start_i, (VALUE)cont); } EC_POP_TAG(); @@ -1556,6 +1528,7 @@ rb_threadptr_root_fiber_setup(rb_thread_t *th) fib->cont.type = FIBER_CONTEXT; fib->cont.saved_ec.fiber_ptr = fib; fib->cont.saved_ec.thread_ptr = th; + list_node_init(&fib->cont.saved_ec.enode); fiber_status_set(fib, FIBER_RESUMED); /* skip CREATED */ th->ec = &fib->cont.saved_ec; @@ -1619,6 +1592,15 @@ rb_fiber_current(void) return fiber_current()->cont.self; } +static void +fiber_check_jump(const rb_fiber_t *fib) +{ + switch (fib->cont.argc) { + case -1: rb_exc_raise(fib->cont.value); + case FIBER_ENSURE: rb_throw_obj(FIBER_ENSURE, Qnil); + } +} + static inline VALUE fiber_store(rb_fiber_t *next_fib, rb_thread_t *th) { @@ -1671,14 +1653,14 @@ fiber_store(rb_fiber_t *next_fib, rb_thread_t *th) } #endif /* not _WIN32 */ fib = th->ec->fiber_ptr; - if (fib->cont.argc == -1) rb_exc_raise(fib->cont.value); + fiber_check_jump(fib); return fib->cont.value; #else /* FIBER_USE_NATIVE */ if (ruby_setjmp(fib->cont.jmpbuf)) { /* restored */ fib = th->ec->fiber_ptr; - if (fib->cont.argc == -1) rb_exc_raise(fib->cont.value); + fiber_check_jump(fib); if (next_fib->cont.value == Qundef) { cont_restore_0(&next_fib->cont, &next_fib->cont.value); VM_UNREACHABLE(fiber_store); @@ -1770,6 +1752,7 @@ rb_fiber_close(rb_fiber_t *fib) VALUE *vm_stack = ec->vm_stack; size_t stack_bytes = ec->vm_stack_size * sizeof(VALUE); + list_del_init(&ec->enode); fiber_status_set(fib, FIBER_TERMINATED); if (stack_bytes == rb_ec_vm_ptr(ec)->default_params.thread_vm_stack_size) { rb_thread_recycle_stack_release(vm_stack); @@ -1811,6 +1794,77 @@ rb_fiber_terminate(rb_fiber_t *fib, int need_interrupt) fiber_switch(ret_fib, 1, &value, 0); } +/* thread_{pthread,win32}.c */ +void rb_native_mutex_lock(rb_nativethread_lock_t *); +void rb_native_mutex_unlock(rb_nativethread_lock_t *); + +/* + * SIGCHLD-based rb_waitpid may enqueue events for this thread + * outside of GVL. Merge them into the GVL-protected th->runq here: + */ +void +rb_tl_merge_runq(rb_thread_t *th) +{ + /* + * no need to allocate, this function is called by rb_iom_mark_runq + * when GC marking runs. + */ + rb_fiber_t *fib = th->ec ? th->ec->fiber_ptr : 0; + + if (fib && !FIBER_TERMINATED_P(fib)) { + rb_native_mutex_lock(&th->interrupt_lock); + list_append_list(&th->runq, &th->nogvl_runq); + rb_native_mutex_unlock(&th->interrupt_lock); + } +} + +/* + * Runs any ready Thread::Light in th->runq + * returns TRUE if @ec is a Thread::Light and we are done scheduling + * ourselves; or if any progress was made. + */ +int +rb_tl_schedule_ready_p(rb_execution_context_t *ec) +{ + rb_thread_t *th = rb_ec_thread_ptr(ec); + struct rb_sched_waiter *sw; + + rb_tl_merge_runq(th); + + if (rb_tl_sched_p(ec) && ec->in_runq) { + ec->in_runq = 0; + return TRUE; + } + if (!rb_tl_switchable(ec)) { + return FALSE; + } + + sw = rb_sched_waiter_dequeue(th); + if (sw) { + /* we are done if we switch; or see ourselves in the runq: */ + if (ec != sw->ec) + fiber_switch(sw->ec->fiber_ptr, 0, 0, 0); + return TRUE; + } + + if (th->root_fiber == ec->fiber_ptr) + return FALSE; /* we are the root fiber and did nothing */ + + /* Nothing to run, let root_fiber do other stuff... */ + if (th->root_fiber) { + fiber_switch(th->root_fiber, 0, 0, 0); + /* root_fiber will switch back to us when we're ready */ + return TRUE; + } + return FALSE; +} + +VALUE +rb_tl_switch(VALUE p) +{ + return fiber_switch((rb_fiber_t *)p, 0, 0, 0); +} + VALUE rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv) { @@ -1996,9 +2050,248 @@ rb_fiber_atfork(rb_thread_t *th) } th->root_fiber->prev = 0; } + th->ec->is_tl = 0; + th->ec->in_runq = 0; + list_head_init(&th->runq); + list_head_init(&th->coros); + list_head_init(&th->nogvl_runq); } #endif +void +rb_tl_finish_i(rb_execution_context_t *ec, rb_fiber_t *fib) +{ + VM_ASSERT(rb_tl_sched_p(&fib->cont.saved_ec)); + + switch (fib->status) { + case FIBER_CREATED: VM_UNREACHABLE(rb_tl_finish_i_CREATED); + case FIBER_RESUMED: VM_UNREACHABLE(rb_tl_finish_i_RESUMED); + break; + case FIBER_SUSPENDED: + fib->prev = ec->fiber_ptr; + EC_PUSH_TAG(ec); + if (EC_EXEC_TAG() == TAG_NONE) + fiber_switch(fib, FIBER_ENSURE, 0, 0); + EC_POP_TAG(); + break; + case FIBER_TERMINATED: + /* XXX does this happen? */ + rb_bug("unexpected FIBER_TERMINATED"); + break; + default: + VM_UNREACHABLE(rb_tl_finish_i_DEFAULT); + } +} + +/* :nodoc: */ +static VALUE +tl_init(int argc, VALUE *argv, VALUE tl) +{ + return fiber_init(tl, rb_block_proc()); +} + +static VALUE +tl_start(VALUE self, int argc, const VALUE *argv) +{ + rb_fiber_t *fib = fiber_ptr(self); + rb_execution_context_t *ec = &fib->cont.saved_ec; + + ec->is_tl = 1; + list_add_tail(&rb_ec_thread_ptr(ec)->coros, &ec->enode); + fiber_switch(fib, argc, argv, 0); + + return self; +} + +/* + * start a Thread::Light (similar to Thread.new) + */ +static VALUE +tl_s_new(int argc, VALUE *argv, VALUE klass) +{ + VALUE self = fiber_alloc(klass); + + rb_obj_call_init(self, argc, argv); + + return tl_start(self, argc, argv); +} + +/* + * start a Thread::Light, but do not call subclass initializer + * (similar to Thread.start) + */ +static VALUE +tl_s_start(int argc, VALUE *argv, VALUE klass) +{ + return tl_start(tl_init(argc, argv, fiber_alloc(klass)), argc, argv); +} + +static void +do_tl_join(rb_fiber_t *fib, rb_hrtime_t *rel) +{ + rb_thread_t *th = GET_THREAD(); + rb_fiber_t *cur = fiber_current(); + + if (cur == fib) + TL_ERR("Target Thread::Light must not be current fiber"); + if (th->root_fiber == fib) + TL_ERR("Target Thread::Light must not be root fiber"); + if (cont_thread_value(&fib->cont) != th->self) + TL_ERR("Target Thread::Light not owned by current thread"); + if (!rb_tl_sched_p(&fib->cont.saved_ec)) + TL_ERR("Target is not a Thread::Light"); + + if (rel) { + const rb_hrtime_t end = rb_hrtime_add(rb_hrtime_now(), *rel); + + while (fib->status != FIBER_TERMINATED) { + rb_iom_schedule(th, rel); + if (rb_hrtime_update_expire(rel, end)) + return; + } + } + else { + while (fib->status != FIBER_TERMINATED) + rb_iom_schedule(th, 0); + } +} + +/* see Thread#join */ +static VALUE +tl_join(int argc, VALUE *argv, VALUE self) +{ + rb_fiber_t *fib = fiber_ptr(self); + rb_hrtime_t rel; + + do_tl_join(fib, rb_join_interval(&rel, argc, argv)); + return fib->status == FIBER_TERMINATED ? fib->cont.self : Qnil; +} + +/* see Thread#value */ +static VALUE +tl_value(VALUE self) +{ + rb_fiber_t *fib = fiber_ptr(self); + + do_tl_join(fib, 0); + return fib->cont.value; +} + +/* thread.c */ +VALUE rb_ec_local_aref(rb_execution_context_t *, ID, VALUE default_value); +VALUE rb_ec_local_aset(rb_execution_context_t *, ID, VALUE); +VALUE rb_ec_local_fetch(rb_execution_context_t *, int, VALUE *, VALUE self); +VALUE rb_ec_local_key_p(rb_execution_context_t *, VALUE key); +VALUE rb_ec_local_keys(rb_execution_context_t *); + +/* see Thread#[] */ +static VALUE +tl_aref(VALUE self, VALUE key) +{ + rb_fiber_t *fib = fiber_ptr(self); + ID id = rb_check_id(&key); /* avoid ID pinning */ + + return id ? rb_ec_local_aref(&fib->cont.saved_ec, id, Qnil) : Qnil; +} + +/* see Thread#[]= */ +static VALUE +tl_aset(VALUE self, VALUE key, VALUE val) +{ + rb_fiber_t *fib = fiber_ptr(self); + + if (OBJ_FROZEN(self)) { + rb_error_frozen("Thread::Light locals"); + } + return rb_ec_local_aset(&fib->cont.saved_ec, rb_to_id(key), val); +} + +/* see Thread#fetch */ +static VALUE +tl_fetch(int argc, VALUE *argv, VALUE self) +{ + rb_fiber_t *fib = fiber_ptr(self); + + return rb_ec_local_fetch(&fib->cont.saved_ec, argc, argv, self); +} + +/* see Thread#key? */ +static VALUE +tl_key_p(VALUE self, VALUE key) +{ + rb_fiber_t *fib = fiber_ptr(self); + + return rb_ec_local_key_p(&fib->cont.saved_ec, key); +} + +/* see Thread#keys */ +static VALUE +tl_keys(VALUE self) +{ + rb_fiber_t *fib = fiber_ptr(self); + + return rb_ec_local_keys(&fib->cont.saved_ec); +} + +/* see Thread#stop? */ +static VALUE +tl_stop_p(VALUE self) +{ + const rb_fiber_t *fib = fiber_ptr(self); + + switch (fib->status) { + case FIBER_RESUMED: return Qfalse; + case FIBER_SUSPENDED: return fib->cont.saved_ec.in_runq ? Qfalse : Qtrue; + case FIBER_TERMINATED: return Qtrue; + default: VM_UNREACHABLE(tl_stop_p); + } + return Qfalse; +} + +/* see Thread#status */ +static VALUE +tl_status(VALUE self) +{ + const rb_fiber_t *fib = fiber_ptr(self); + const rb_execution_context_t *ec = &fib->cont.saved_ec; + + switch (fib->status) { + case FIBER_RESUMED: return rb_str_new2("run"); + case FIBER_SUSPENDED: return rb_str_new2(ec->in_runq ? "run" : "sleep"); + case FIBER_TERMINATED: + /* see thread.c::rb_thread_status */ + if (!NIL_P(ec->errinfo) && !FIXNUM_P(ec->errinfo)) { + return Qnil; + } + return Qfalse; + default: VM_UNREACHABLE(tl_status); + } + return Qfalse; +} + +/* + * Enqueues the sleeping Thread::Light instance to be run. + * May not be called across native threads + */ +static VALUE +tl_run(VALUE self) +{ + rb_fiber_t *fib = fiber_ptr(self); + rb_execution_context_t *target = &fib->cont.saved_ec; + rb_execution_context_t *current = GET_EC(); + + if (current != target) { + if (rb_ec_thread_ptr(current) == rb_ec_thread_ptr(target)) { + rb_iom_run(target); + } + else { + TL_ERR("Thread::Light#run across threads"); + } + } + + return self; +} + /* * Document-class: FiberError * @@ -2015,6 +2308,8 @@ rb_fiber_atfork(rb_thread_t *th) void Init_Cont(void) { + VALUE rb_cThreadLight; + #if FIBER_USE_NATIVE rb_thread_t *th = GET_THREAD(); @@ -2036,6 +2331,25 @@ Init_Cont(void) rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1); rb_define_method(rb_cFiber, "to_s", fiber_to_s, 0); rb_define_alias(rb_cFiber, "inspect", "to_s"); + + rb_cThreadLight = rb_define_class_under(rb_cThread, "Light", rb_cObject); + rb_define_alloc_func(rb_cThreadLight, fiber_alloc); + rb_define_method(rb_cThreadLight, "initialize", tl_init, -1); + rb_define_singleton_method(rb_cThreadLight, "new", tl_s_new, -1); + rb_define_singleton_method(rb_cThreadLight, "start", tl_s_start, -1); + rb_define_singleton_method(rb_cThreadLight, "fork", tl_s_start, -1); + rb_define_method(rb_cThreadLight, "join", tl_join, -1); + rb_define_method(rb_cThreadLight, "value", tl_value, 0); + rb_define_method(rb_cThreadLight, "[]", tl_aref, 1); + rb_define_method(rb_cThreadLight, "[]=", tl_aset, 2); + rb_define_method(rb_cThreadLight, "fetch", tl_fetch, -1); + rb_define_method(rb_cThreadLight, "key?", tl_key_p, 1); + rb_define_method(rb_cThreadLight, "keys", tl_keys, 0); + rb_define_method(rb_cThreadLight, "alive?", rb_fiber_alive_p, 0); + rb_define_method(rb_cThreadLight, "stop?", tl_stop_p, 0); + rb_define_method(rb_cThreadLight, "status", tl_status, 0); + rb_define_method(rb_cThreadLight, "run", tl_run, 0); + rb_define_method(rb_cThreadLight, "wakeup", tl_run, 0); } RUBY_SYMBOL_EXPORT_BEGIN diff --git a/eval.c b/eval.c index f3a30dd159..4feda0f910 100644 --- a/eval.c +++ b/eval.c @@ -199,6 +199,7 @@ ruby_cleanup(volatile int ex) errs[0] = th->ec->errinfo; SAVE_ROOT_JMPBUF(th, rb_thread_terminate_all()); + rb_tl_finish(th); } else { switch (step) { diff --git a/fiber.h b/fiber.h new file mode 100644 index 0000000000..63526087ff --- /dev/null +++ b/fiber.h @@ -0,0 +1,66 @@ +#ifndef RUBY_FIBER_H +#define RUBY_FIBER_H + +#include "internal.h" +#include "vm_core.h" + +/* FIBER_USE_NATIVE enables Fiber performance improvement using system + * dependent method such as make/setcontext on POSIX system or + * CreateFiber() API on Windows. + * This hack make Fiber context switch faster (x2 or more). + * However, it decrease maximum number of Fiber. For example, on the + * 32bit POSIX OS, ten or twenty thousands Fiber can be created. + * + * Details is reported in the paper "A Fast Fiber Implementation for Ruby 1.9" + * in Proc. of 51th Programming Symposium, pp.21--28 (2010) (in Japanese). + */ + +#if !defined(FIBER_USE_NATIVE) +# if defined(HAVE_GETCONTEXT) && defined(HAVE_SETCONTEXT) +# if 0 +# elif defined(__NetBSD__) +/* On our experience, NetBSD doesn't support using setcontext() and pthread + * simultaneously. This is because pthread_self(), TLS and other information + * are represented by stack pointer (higher bits of stack pointer). + * TODO: check such constraint on configure. + */ +# define FIBER_USE_NATIVE 0 +# elif defined(__sun) +/* On Solaris because resuming any Fiber caused SEGV, for some reason. + */ +# define FIBER_USE_NATIVE 0 +# elif defined(__ia64) +/* At least, Linux/ia64's getcontext(3) doesn't save register window. + */ +# define FIBER_USE_NATIVE 0 +# elif defined(__GNU__) +/* GNU/Hurd doesn't fully support getcontext, setcontext, makecontext + * and swapcontext functions. Disabling their usage till support is + * implemented. More info at + * http://darnassus.sceen.net/~hurd-web/open_issues/glibc/#getcontext + */ +# define FIBER_USE_NATIVE 0 +# else +# define FIBER_USE_NATIVE 1 +# endif +# elif defined(_WIN32) +# define FIBER_USE_NATIVE 1 +# endif +#endif +#if !defined(FIBER_USE_NATIVE) +#define FIBER_USE_NATIVE 0 +#endif + +/* + * conditional alloca for FIBER_USE_NATIVE or on-heap fallback for + * FIBER_USE_NATIVE==0 + */ +#if FIBER_USE_NATIVE == 1 +# define RB_FIBER_ALLOCA(v,T) RB_ALLOCV_N(T,v,1) +# define RB_FIBER_ALLOCA_END(v) RB_ALLOCV_END(v) +#else +# define RB_FIBER_ALLOCA(v,T) rb_alloc_tmp_buffer(&(v),sizeof(T)) +# define RB_FIBER_ALLOCA_END(v) rb_free_tmp_buffer(&(v)) +#endif + +#endif /* RUBY_FIBER_H */ diff --git a/hrtime.h b/hrtime.h index f133bdb1ac..66604a6288 100644 --- a/hrtime.h +++ b/hrtime.h @@ -50,6 +50,7 @@ typedef uint64_t rb_hrtime_t; /* thread.c */ /* returns the value of the monotonic clock (if available) */ rb_hrtime_t rb_hrtime_now(void); +rb_hrtime_t *rb_join_interval(rb_hrtime_t *, int argc, VALUE *argv); /* * multiply @a and @b with overflow check and return the @@ -165,4 +166,11 @@ rb_hrtime2timeval(struct timeval *tv, const rb_hrtime_t *hrt) } return 0; } + +/* + * @end is the absolute time when @timeout is set to expire + * Returns true if @end has past + * Updates @timeout and returns false otherwise + */ +int rb_hrtime_update_expire(rb_hrtime_t *timeout, const rb_hrtime_t end); #endif /* RB_HRTIME_H */ diff --git a/include/ruby/io.h b/include/ruby/io.h index 6cacd8a710..6ee7f7e966 100644 --- a/include/ruby/io.h +++ b/include/ruby/io.h @@ -121,6 +121,8 @@ typedef struct rb_io_t { /* #define FMODE_UNIX 0x00200000 */ /* #define FMODE_INET 0x00400000 */ /* #define FMODE_INET6 0x00800000 */ +/* #define FMODE_IOM_PRIVATE1 0x01000000 */ /* OS-dependent */ +/* #define FMODE_IOM_PRIVATE2 0x02000000 */ /* OS-dependent */ #define GetOpenFile(obj,fp) rb_io_check_closed((fp) = RFILE(rb_io_taint_check(obj))->fptr) diff --git a/io.c b/io.c index 9e0b1b9844..9edb30fcc5 100644 --- a/io.c +++ b/io.c @@ -22,6 +22,7 @@ #include #include "ruby_atomic.h" #include "ccan/list/list.h" +#include "vm_core.h" #undef free #define free(x) xfree(x) @@ -10744,6 +10745,37 @@ nogvl_wait_for_single_fd(int fd, short events) } #endif /* !USE_POLL */ +struct waitfd_with_gvl { + int fd; + short events; +}; + +static void * +waitfd_with_gvl(void *p) +{ + struct waitfd_with_gvl *x = p; + + return (void *)(long)rb_wait_for_single_fd(x->fd, x->events, 0); +} + +static int +nogvl_copy_stream_wait(VALUE thval, int fd, short events) +{ + rb_thread_t *th = rb_thread_ptr(thval); + + /* allow scheduling Thread::Light while we wait */ + if (!list_empty(&th->coros)) { + struct waitfd_with_gvl x; + + x.fd = fd; + x.events = events; + + return (int)(long)rb_thread_call_with_gvl(waitfd_with_gvl, &x); + } + + return nogvl_wait_for_single_fd(fd, events); +} + static int maygvl_copy_stream_wait_read(int has_gvl, struct copy_stream_struct *stp) { @@ -10754,7 +10786,7 @@ maygvl_copy_stream_wait_read(int has_gvl, struct copy_stream_struct *stp) ret = rb_wait_for_single_fd(stp->src_fd, RB_WAITFD_IN, NULL); } else { - ret = nogvl_wait_for_single_fd(stp->src_fd, RB_WAITFD_IN); + ret = nogvl_copy_stream_wait(stp->th, stp->src_fd, RB_WAITFD_IN); } } while (ret == -1 && maygvl_copy_stream_continue_p(has_gvl, stp)); @@ -10772,7 +10804,7 @@ nogvl_copy_stream_wait_write(struct copy_stream_struct *stp) int ret; do { - ret = nogvl_wait_for_single_fd(stp->dst_fd, RB_WAITFD_OUT); + ret = nogvl_copy_stream_wait(stp->th, stp->dst_fd, RB_WAITFD_OUT); } while (ret == -1 && maygvl_copy_stream_continue_p(0, stp)); if (ret == -1) { diff --git a/iom.h b/iom.h new file mode 100644 index 0000000000..516b531b5b --- /dev/null +++ b/iom.h @@ -0,0 +1,109 @@ +/* + * iom -> I/O Manager for RubyVM (Thread::Light-aware) + * + * On platforms with epoll or kqueue, this should be ready for multicore; + * even if the rest of the RubyVM is not. + * + * Some inspiration taken from Mio in GHC: + * http://haskell.cs.yale.edu/wp-content/uploads/2013/08/hask035-voellmy.pdf + */ +#ifndef RUBY_IOM_H +#define RUBY_IOM_H +#include "ruby.h" +#include "ruby/io.h" +#include "ruby/intern.h" +#include "vm_core.h" +#include "hrtime.h" + +typedef struct rb_iom_struct rb_iom_t; + +/* WARNING: unstable API, only for Ruby internal use */ + +/* + * Relinquish calling fiber while waiting for +events+ on the given + * +rb_io_t+ + * + * Multiple native threads can enter this function at the same time. + * + * Events are RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI + * + * Returns a mask of events. + */ +int rb_iom_waitio(rb_thread_t *, rb_io_t *, int, const rb_hrtime_t *); + +/* + * Identical to rb_iom_waitio, but takes a pointer to an integer file + * descriptor, instead of rb_io_t. Use rb_iom_waitio when possible, + * since it allows us to optimize epoll (and perhaps avoid kqueue + * portability bugs across different *BSDs). + */ +int rb_iom_waitfd(rb_thread_t *, int *fdp, int, const rb_hrtime_t *); + +int rb_iom_select(rb_thread_t *, int maxfd, rb_fdset_t *r, + rb_fdset_t *w, rb_fdset_t *e, const rb_hrtime_t *); + +/* + * Relinquish calling fiber for at least the duration of given timeout + * in seconds. If timeout is 0, wait forever (until explicitly + * resumed). + * Multiple native threads can enter this function at the same time. + */ +void rb_iom_sleep(rb_thread_t *, const rb_hrtime_t *); + +/* + * Like rb_iom_sleep, but wait on the rb_sched_wait struct + * Multiple native threads can enter this function at the same time. + */ +void rb_sched_waiter_sleep(struct rb_sched_waiter *, const rb_hrtime_t *); + +/* + * there is no public create function, creation is lazy to avoid incurring + * overhead for small scripts which do not need fibers, we only need this + * at VM destruction + */ +void rb_iom_destroy(rb_vm_t *); + +/* runs the scheduler */ +void rb_iom_schedule(rb_thread_t *, const rb_hrtime_t *); + +/* + * Wake up a rb_sched_waiter; may be called outside of GVL. + * @enqueue_tl enqueues the @sw to be run by its owner thread + * into rb_thread_t.nogvl_runq + * @enqueue_tl TRUE may only be used once per rb_sched_waiter + */ +void ruby_iom_wake_signal(struct rb_sched_waiter *, int enqueue_tl); + +/* cont.c */ +/* + * Resume all "ready" fibers belonging to a given thread. + * Returns TRUE when we are done, FALSE otherwise. + */ +int rb_tl_schedule_ready_p(rb_execution_context_t *); + +void rb_iom_mark_thread(rb_thread_t *); +void rb_iom_mark(rb_iom_t *); +void rb_iom_pass(rb_execution_context_t *); + +static inline void +rb_sched_waiter_ready(struct list_head *runq, struct rb_sched_waiter *sw) +{ + sw->ec->in_runq = 1; + list_add_tail(runq, &sw->wnode); +} + +static inline struct rb_sched_waiter * +rb_sched_waiter_dequeue(rb_thread_t *th) +{ + struct list_head *runq = &th->runq; + struct rb_sched_waiter *sw = list_pop(runq, struct rb_sched_waiter, wnode); + + if (sw) { + sw->ec->in_runq = 0; + list_node_init(&sw->wnode); /* for idempotent list_del in ensures */ + } + return sw; +} + +void rb_iom_run(rb_execution_context_t *target); +#endif /* RUBY_IOM_H */ diff --git a/iom_common.h b/iom_common.h new file mode 100644 index 0000000000..607dded510 --- /dev/null +++ b/iom_common.h @@ -0,0 +1,253 @@ +/* included by iom_(epoll|select|kqueue).h */ + +/* we lazily create this, small scripts may never need iom */ +static rb_iom_t * +iom_new(void) +{ + rb_iom_t *iom = ALLOC(rb_iom_t); + iom_init(iom); + + return iom; +} + +static rb_iom_t * +iom_get(const rb_thread_t *th) +{ + rb_vm_t *vm = th->vm; + + if (!vm->iom) + vm->iom = iom_new(); + + return vm->iom; +} + +/* + * check for expired timers, must be called with GVL. Returns the + * number of expired entries. + */ +static void +iom_timer_check(const rb_thread_t *th) +{ + rb_iom_t *iom = th->vm->iom; + rb_hrtime_t now; + struct rb_sched_waiter *sw = 0, *next; + + if (!iom || list_empty(&iom->c.timers)) return; + + now = rb_hrtime_now(); + list_for_each_safe(&iom->c.timers, sw, next, wnode) { + struct iom_timer *t = container_of(sw, struct iom_timer, sw); + + if (now >= t->expire_at) { + rb_execution_context_t *ec = sw->ec; + rb_thread_t *owner = rb_ec_thread_ptr(ec); + + if (rb_tl_sched_p(ec)) { + list_del(&sw->wnode); + rb_sched_waiter_ready(&owner->runq, sw); + } + else { /* native thread */ + list_del_init(&sw->wnode); + if (owner != th) { + rb_threadptr_interrupt(owner); + } + } + } + else { /* sorted by expire_at, so we can break out early */ + return; + } + } +} + +/* + * insert a new +timer+ into +timers+, maintain sort order by expire_at + */ +static void +iom_timer_add(rb_thread_t *th, struct iom_timer *add, const rb_hrtime_t *rel) +{ + struct rb_sched_waiter *sw = 0; + rb_iom_t *iom = iom_get(th); + + add->sw.ec = th->ec; + add->expire_at = rb_hrtime_add(rb_hrtime_now(), *rel); + /* + * search backwards: assume typical projects have multiple objects + * sharing the same timeout values, so new timers will expire later + * than existing timers + */ + list_for_each_rev(&iom->c.timers, sw, wnode) { + struct iom_timer *i = container_of(sw, struct iom_timer, sw); + + if (add->expire_at > i->expire_at) { + list_add_after(&iom->c.timers, &i->sw.wnode, &add->sw.wnode); + return; + } + } + list_add(&iom->c.timers, &add->sw.wnode); +} + +/* + * Register an iom_waiter, allows "ensure" to run. Each call must + * be paired with iom_waiter_ensure, and may not nest. + * + * This is NOT allowed: + * + * iom_waiter_add + * iom_waiter_add + * iom_waiter_ensure + * iom_waiter_ensure + * + * This is OK: + * + * iom_waiter_add + * iom_waiter_ensure + * iom_waiter_add + * iom_waiter_ensure + * ... + * + */ +static void +iom_waiter_add(rb_thread_t *th, struct list_head *h, struct iom_waiter *w) +{ + w->sw.ec = th->ec; + /* use FIFO order for fairness */ + list_add_tail(h, &w->sw.wnode); +} + +static VALUE +iom_sleep_schedule(VALUE p) +{ + rb_execution_context_t *ec = (rb_execution_context_t *)p; + rb_thread_t *th = rb_ec_thread_ptr(ec); + + /* may move to th->runq */ + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + + if (!rb_tl_schedule_ready_p(ec)) + VM_UNREACHABLE(iom_sleep_schedule); + + return Qfalse; +} + +/* + * SLEEP_DEADLOCKABLE is ignored for flag because Thread::Light cannot + * deadlock (new Thread::Light may always be created by native Thread) + * SLEEP_SPURIOUS check is not supported + * Use this ONLY if rb_sched_waiter is already in a wait queue. + */ +void +rb_sched_waiter_sleep(struct rb_sched_waiter *sw, const rb_hrtime_t *rel) +{ + rb_thread_t *th = rb_ec_thread_ptr(sw->ec); + + if (rb_tl_sched_p(sw->ec)) { + iom_timeout_do(th, rel, iom_sleep_schedule, (VALUE)sw->ec); + } + else { + enum rb_thread_status prev_status = th->status; + th->status = THREAD_STOPPED; + native_sleep(th, 0); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + th->status = prev_status; + } +} + +static VALUE +iom_sleep(VALUE ec) +{ + if (!rb_tl_schedule_ready_p((rb_execution_context_t *)ec)) { + VM_UNREACHABLE(iom_sleep_wait); + } + return Qfalse; +} + +/* needed for Thread::Light#run to work */ +void +rb_iom_sleep(rb_thread_t *th, const rb_hrtime_t *rel) +{ + iom_timeout_do(th, rel, iom_sleep, (VALUE)th->ec); +} + +static VALUE +schedule_ensure(VALUE p) +{ + struct rb_sched_waiter *sw = (struct rb_sched_waiter *)p; + + list_del(&sw->wnode); + return Qfalse; +} + +/* cont.c */ +VALUE rb_tl_switch(VALUE); + +static void +iom_tl_pass(rb_execution_context_t *ec) +{ + rb_thread_t *th = rb_ec_thread_ptr(ec); + rb_fiber_t *next; + struct rb_sched_waiter *nxt, *cur; + VALUE tmp; + + rb_tl_merge_runq(th); + + if (!rb_tl_switchable(ec)) { + return; + } + + nxt = rb_sched_waiter_dequeue(th); + + /* fall back to root fiber if nobody else to switch to */ + next = nxt ? nxt->ec->fiber_ptr : th->root_fiber; + + cur = RB_FIBER_ALLOCA(tmp, struct rb_sched_waiter); + cur->ec = ec; + rb_sched_waiter_ready(&th->runq, cur); + rb_ensure(rb_tl_switch, (VALUE)next, schedule_ensure, (VALUE)cur); + RB_FIBER_ALLOCA_END(tmp); +} + +static VALUE +iom_run_ec(VALUE p) +{ + rb_execution_context_t *target = (rb_execution_context_t *)p; + rb_thread_t *target_th = rb_ec_thread_ptr(target); + + rb_iom_schedule(target_th, 0); + return Qfalse; +} + +static VALUE +iom_run_ensure(VALUE p) +{ + struct rb_sched_waiter *sw = (struct rb_sched_waiter *)p; + + list_del(&sw->wnode); /* from iom->c */ + + return Qfalse; +} + +void +rb_iom_run(rb_execution_context_t *target) +{ + rb_thread_t *th = rb_ec_thread_ptr(target); + VALUE tmp; + struct rb_sched_waiter *sw = RB_FIBER_ALLOCA(tmp, struct rb_sched_waiter); + + rb_tl_merge_runq(th); + sw->ec = target; + rb_sched_waiter_ready(&th->runq, sw); + rb_ensure(iom_run_ec, (VALUE)target, iom_run_ensure, (VALUE)sw); + RB_FIBER_ALLOCA_END(tmp); +} + +/* do we have unretrieved events? */ +static int +iom_events_pending(rb_vm_t *vm) +{ + if (vm->iom) { + if (!list_empty(&vm->iom->c.fdws)) return TRUE; + if (!list_empty(&vm->iom->c.fdsets)) return TRUE; + if (!list_empty(&vm->iom->c.timers)) return TRUE; + } + return rb_vm_waitpid_pending(vm); +} diff --git a/iom_epoll.h b/iom_epoll.h new file mode 100644 index 0000000000..d896e552d7 --- /dev/null +++ b/iom_epoll.h @@ -0,0 +1,652 @@ +/* + * Linux-only epoll-based implementation of I/O Manager for RubyVM + * + * Notes: + * + * TODO: epoll_wait only has millisecond resolution; if we need higher + * resolution we can use timerfd or ppoll on the epoll_fd itself. + * + * Inside the Linux kernel, select/poll/ppoll/epoll_wait all use the + * same notification callback (struct file_operations)->poll. + * Unlike with kqueue across different *BSDs; we do not need to worry + * about inconsistencies between these syscalls. + * + * See also notes in iom_kqueue.h + */ +#include "iom_internal.h" +#include +#define FMODE_IOM_ADDED FMODE_IOM_PRIVATE1 +/* TODO: support EPOLLEXCLUSIVE */ +#define IOM_EVENT_SIZE sizeof(struct epoll_event) + +/* allocated on heap (rb_vm_t.iom) */ +struct rb_iom_struct { + struct iom_core c; + struct iom_fdmap fdmap; /* maps each FD to multiple epw */ + struct iom_multiplexer mp; +}; + +struct epw_set; + +union epfdn_as { + struct list_node fdnode; + struct { + struct iom_fd *fdh; + } pre_ctl; +}; + +struct epfdn { + union epfdn_as as; + union { + int *flags; /* &fptr->mode */ + struct epw_set *set; + } owner; +}; + +/* + * Not using iom_fd_waiter here, since we never need to reread the + * FD on this implementation. + */ +struct epw { + struct epfdn fdn; + struct iom_waiter w; + int fd; /* no need for "int *", here, we never reread */ + short events; /* requested events, like poll(2) */ + short revents; /* returned events, like poll(2) */ +}; + +struct epw_set { + struct iom_fdset_waiter fsw; + struct list_head fdset_node; /* - eset_item.fdset_node */ +}; + +/* + * per-FD in rb_thread_fd_select arg, value for epw_set.fdset_node + * Always allocated on-heap + */ +struct eset_item { + struct epfdn fdn; + struct list_node fdset_node; /* epw_set.fdset_node */ + int fd; + short events; /* requested events, like poll(2) */ + short revents; /* returned events, like poll(2) */ +}; + +static const short idx2events[] = { EPOLLIN, EPOLLOUT, EPOLLPRI }; + +/* we can avoid branches when mapping RB_WAIT_* bits to EPOLL* bits */ +STATIC_ASSERT(epbits_matches_waitfd_bits, + RB_WAITFD_IN == EPOLLIN && RB_WAITFD_OUT == EPOLLOUT && + RB_WAITFD_PRI == EPOLLPRI); + +/* what goes into epoll_ctl... */ +static int +rb_events2ep(int events) +{ + return EPOLLONESHOT | events; +} + +/* ...what comes out of epoll_wait */ +static short +rb_ep2revents(int revents) +{ + return (short)(revents & (EPOLLIN|EPOLLOUT|EPOLLPRI)); +} + +static int +hrtime2eptimeout(const rb_hrtime_t *rel) +{ + /* + * clamp timeout to workaround a Linux <= 2.6.37 bug, + * see epoll_wait(2) manpage + */ + const time_t max_sec = 35 * 60; + const int max_msec = max_sec * 1000; /* floor(35.79 minutes) */ + long msec; + + if (!rel) return -1; /* infinite */ + if (!*rel) return 0; + + /* + * round up to avoid busy waiting, the rest of the code uses + * nanosecond resolution and we risk wasting cycles looping + * on epoll_wait(..., timeout=0) calls w/o rounding up, here: + */ + msec = (*rel + (RB_HRTIME_PER_MSEC - 1)) / (long)RB_HRTIME_PER_MSEC; + + if (msec < 0 || msec > max_msec) + return max_msec; + + return msec > max_msec ? max_msec : (int)msec; +} + +static void +register_sigwait_fd(int epfd) +{ + int sigwait_fd = signal_self_pipe.normal[0]; + + if (sigwait_fd >= 0) { + struct epoll_event ev; + + ev.data.ptr = &sigwait_th; + ev.events = EPOLLIN; /* right, NOT using oneshot here */ + + if (epoll_ctl(epfd, EPOLL_CTL_ADD, sigwait_fd, &ev) != 0) + rb_sys_fail("epoll_ctl"); + } +} + +/* lazily create epoll FD, since not everybody waits on I/O */ +static int +iom_epfd(rb_iom_t *iom) +{ + if (iom->mp.fd < 0) { +#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) + iom->mp.fd = epoll_create1(EPOLL_CLOEXEC); + if (iom->mp.fd < 0) { + int err = errno; + + if (rb_gc_for_fd(err)) { + iom->mp.fd = epoll_create1(EPOLL_CLOEXEC); + if (iom->mp.fd < 0) + rb_sys_fail("epoll_create1"); + } + else if (err != ENOSYS) { + rb_syserr_fail(err, "epoll_create1"); + } + else { /* libc >= kernel || build-env > run-env */ +#endif /* HAVE_EPOLL_CREATE1 */ + iom->mp.fd = epoll_create(1); + if (iom->mp.fd < 0) { + if (rb_gc_for_fd(errno)) + iom->mp.fd = epoll_create(1); + } + if (iom->mp.fd < 0) + rb_sys_fail("epoll_create"); + + rb_maygvl_fd_fix_cloexec(iom->mp.fd); +#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) + } + } +#endif /* HAVE_EPOLL_CREATE1 */ + rb_update_max_fd(iom->mp.fd); + register_sigwait_fd(iom->mp.fd); + } + return iom->mp.fd; +} + +static void +iom_init(rb_iom_t *iom) +{ + iom_core_init(&iom->c); + iom_fdmap_init(&iom->fdmap); + iom_multiplexer_init(&iom->mp); +} + +/* + * We need to re-arm watchers in case somebody wants several events + * from the same FD (e.g. both EPOLLIN and EPOLLOUT), but only one + * fires. We re-arm the one that disn't fire. This is an uncommon + * case. + */ +static int +rearm_fd(rb_thread_t *th, struct iom_fd *fdh) +{ + int fd = -1; + struct epoll_event ev; + union epfdn_as *as; + + ev.events = EPOLLONESHOT; + ev.data.ptr = fdh; + list_for_each(&fdh->fdhead, as, fdnode) { + struct epfdn *fdn = container_of(as, struct epfdn, as); + + if (fdn->owner.set) { + struct eset_item *esi = container_of(fdn, struct eset_item, fdn); + fd = esi->fd; + ev.events |= rb_events2ep(esi->events); + } + else { + struct epw *epw = container_of(fdn, struct epw, fdn); + fd = epw->fd; + ev.events |= rb_events2ep(epw->events); + } + } + return epoll_ctl(th->vm->iom->mp.fd, EPOLL_CTL_MOD, fd, &ev) ? errno : 0; +} + +/* + * this MUST not raise exceptions because sigwait_fd can be acquired, + * iom_wait_done can release sigwait_fd and raise + */ +static int +check_epoll_wait(rb_thread_t *th, int nr, struct epoll_event *eventlist, + int *sigwait_fd) +{ + int i; + int err = 0; + + if (nr < 0) return errno; + + for (i = 0; i < nr; i++) { + struct iom_fd *fdh = eventlist[i].data.ptr; + short revents = rb_ep2revents(eventlist[i].events); + union epfdn_as *as, *next; + short errs = eventlist[i].events & (EPOLLHUP | EPOLLERR); + + if (check_sigwait_fd(th, eventlist[i].data.ptr, &err, sigwait_fd)) + continue; + + /* + * Typical list size is 1; only multiple fibers waiting + * on the same FD increases fdh list size + */ + again_if_fd_closed: + list_for_each_safe(&fdh->fdhead, as, next, fdnode) { + struct epfdn *fdn = container_of(as, struct epfdn, as); + struct epw_set *eset = fdn->owner.set; + + if (eset) { + struct eset_item *esi = container_of(fdn, struct eset_item, fdn); + esi->revents = errs ? esi->events : esi->events & revents; + if (esi->revents) { + int ret = eset->fsw.ret++; + list_del_init(&fdn->as.fdnode); + if (!ret) { + if (revents & POLLNVAL) { + eset->fsw.ret = -1; + eset->fsw.errnum = EBADF; + } + iom_waiter_ready(th, &eset->fsw.w); + } + } + } + else { + struct epw *epw = container_of(fdn, struct epw, fdn); + epw->revents = errs ? epw->events : epw->events & revents; + if (epw->revents) { + list_del_init(&fdn->as.fdnode); + if (revents & POLLNVAL) + epw->revents |= POLLNVAL; + iom_waiter_ready(th, &epw->w); + } + } + } + + if (RB_UNLIKELY(!list_empty(&fdh->fdhead))) { + int rearm_err = rearm_fd(th, fdh); + + switch (rearm_err) { + case 0: break; + case ENOENT: + case EBADF: + /* descriptor got closed, force readiness */ + revents = EPOLLIN | EPOLLOUT | EPOLLPRI | POLLNVAL; + goto again_if_fd_closed; + default: + rb_bug_errno("epoll_ctl failed to rearm", rearm_err); + } + } + } + + return (err || RUBY_VM_INTERRUPTED(th->ec)) ? EINTR : 0; +} + +/* perform a non-blocking epoll_wait while holding GVL */ +static void +iom_ping_events(rb_thread_t *th, struct iom_fdset_waiter *ign, unsigned flags) +{ + rb_iom_t *iom = th->vm->iom; + int epfd = iom ? iom->mp.fd : -1; + int sigwait_fd = rb_sigwait_fd_get(th); + int err = 0; + + /* don't steal work from GVL sleepers */ + if (epfd >= 0 && list_empty(&iom->c.blockers)) { + int nr, nevents, retries = 0; + struct epoll_event *eventlist = iom_eventlist(iom, &nevents); + + do { + nr = epoll_wait(epfd, eventlist, nevents, 0); + err = check_epoll_wait(th, nr, eventlist, &sigwait_fd); + } while (!err && nr == nevents && ++retries); + iom_grow(iom, retries); + } + else if (sigwait_fd >= 0) { + err = check_signals_nogvl(th, sigwait_fd) ? EINTR : 0; + } + iom_wait_done(th, sigwait_fd, err, "epoll_wait", 0, flags); +} + +/* for iom_pingable_common.h */ +static void +iom_do_wait(rb_thread_t *th, rb_iom_t *iom) +{ + struct epoll_event eventlist[256 / IOM_EVENT_SIZE]; + int nevents = numberof(eventlist); + int nr = nevents; + rb_hrtime_t hrt; + int msec = hrtime2eptimeout(iom_timeout_next(&hrt, &iom->c.timers)); + + if (msec != 0) { /* slow path */ + int epfd = iom_epfd(iom); /* may raise */ + int err = 0; + int sigwait_fd = rb_sigwait_fd_get(th); + struct iom_blocker cur; + + cur.th = th; + VM_ASSERT(epfd >= 0); + nr = 0; + list_add(&iom->c.blockers, &cur.bnode); + BLOCKING_REGION(th, { + if (!RUBY_VM_INTERRUPTED(th->ec)) { + nr = epoll_wait(epfd, eventlist, nevents, msec); + } + iom_wait_check_nogvl(th, &err, nr, sigwait_fd); + }, sigwait_fd >= 0 ? ubf_sigwait : ubf_select, th, TRUE); + list_del(&cur.bnode); + if (!err) + err = check_epoll_wait(th, nr, eventlist, &sigwait_fd); + iom_wait_done(th, sigwait_fd, err, "epoll_wait", 0, IOM_CHECK_INTS); + } + if (nr == nevents && iom_events_pending(th->vm)) /* || *rel == 0 */ + iom_ping_events(th, 0, IOM_CHECK_INTS); +} + +static void +merge_events(struct epoll_event *ev, struct iom_fd *fdh) +{ + union epfdn_as *as; + + list_for_each(&fdh->fdhead, as, fdnode) { + struct epfdn *fdn = container_of(as, struct epfdn, as); + + if (fdn->owner.set) { + struct eset_item *esi = container_of(fdn, struct eset_item, fdn); + ev->events |= rb_events2ep(esi->events); + } + else { + struct epw *epw = container_of(fdn, struct epw, fdn); + ev->events |= rb_events2ep(epw->events); + } + } +} + +static int +epoll_ctl_ready_p(rb_thread_t *th, struct epw *epw) +{ + int e; + int epfd; + struct epoll_event ev; + int *flags = epw->fdn.owner.flags; + + epw->fdn.owner.flags = 0; /* ensure it's not confused with owner.set */ + + /* we cannot raise until list_add: */ + { + struct iom_fd *fdh = epw->fdn.as.pre_ctl.fdh; + + ev.data.ptr = fdh; + ev.events = rb_events2ep(epw->events); + /* + * merge events from other threads/fibers waiting on the same + * [ descriptor (int fd), description (struct file *) ] tuplet + */ + if (!list_empty(&fdh->fdhead)) /* uncommon, I hope... */ + merge_events(&ev, fdh); + + list_add(&fdh->fdhead, &epw->fdn.as.fdnode); + } + + epfd = iom_epfd(th->vm->iom); /* may raise */ + + /* we want to track if an FD is already being watched ourselves */ + if (flags) { + if (*flags & FMODE_IOM_ADDED) { /* ideal situation */ + e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev); + } + else { + e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev); + if (e == 0) + *flags |= FMODE_IOM_ADDED; + else if (e < 0 && errno == EEXIST) { + /* + * possible EEXIST if several fptrs point to the same FD: + * f1 = Fiber.start { io1.read(1) } + * io2 = IO.for_fd(io1.fileno) + * f2 = Fiber.start { io2.read(1) } + */ + *flags |= FMODE_IOM_ADDED; + e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev); + } + } + } + else { /* don't know if added or not, fall back to add on ENOENT */ + e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev); + if (e < 0 && errno == ENOENT) + e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev); + } + if (e < 0) { + e = errno; + switch (e) { + case EPERM: /* all ready if File or Dir */ + epw->revents = epw->events; + return TRUE; + default: + epw->revents = epw->events | POLLNVAL; + return TRUE; + } + } + return FALSE; +} + +static VALUE +epw_schedule(VALUE ptr) +{ + /* we must have no posibility of raising until list_add: */ + struct epw *epw = (struct epw *)ptr; + rb_execution_context_t *ec = epw->w.sw.ec; + rb_thread_t *th = rb_ec_thread_ptr(ec); + + if (epw->fd < 0) {/* TODO: behave like poll(2) and sleep? */ + list_node_init(&epw->fdn.as.fdnode); /* for list_del in epw_ensure */ + return (VALUE)0; + } + + /* may raise on OOM: */ + epw->fdn.as.pre_ctl.fdh = iom_fd_get(&th->vm->iom->fdmap, epw->fd); + + if (!epoll_ctl_ready_p(th, epw)) { + iom_ping_events(th, 0, IOM_CHECK_INTS); + if (!rb_tl_schedule_ready_p(ec)) { + VM_UNREACHABLE(epw_schedule); + } + } + + return (VALUE)epw->revents; /* may be zero if timed out */ +} + +static VALUE +epw_ensure(VALUE ptr) +{ + struct epw *epw = (struct epw *)ptr; + + list_del(&epw->fdn.as.fdnode); + iom_waiter_ensure(&epw->w); + + return Qfalse; +} + +static inline VALUE +epw_wait(VALUE epw) +{ + return rb_ensure(epw_schedule, epw, epw_ensure, epw); +} + +static int +iom_waitfd(rb_thread_t *th, int fd, int *flags, int events, + const rb_hrtime_t *rel) +{ + rb_iom_t *iom = iom_get(th); + VALUE tmp; + struct epw *epw = RB_FIBER_ALLOCA(tmp, struct epw); + + /* unlike kqueue or select, we never need to reread fd */ + epw->fd = fd; + epw->fdn.owner.flags = flags; + /* + * if we did not have GVL, revents may be set immediately + * upon epoll_ctl by another thread running epoll_wait, + * so we must initialize it before epoll_ctl: + */ + epw->revents = 0; + epw->events = (short)events; + iom_waiter_add(th, &iom->c.fdws, &epw->w); + + return iom_wait_fd_result(th, rel, epw_wait, (VALUE)epw, tmp); +} + +int +rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, + const rb_hrtime_t *rel) +{ + return iom_waitfd(th, fptr->fd, &fptr->mode, events, rel); +} + +int +rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, const rb_hrtime_t *rel) +{ + return iom_waitfd(th, *fdp, 0, events, rel); +} + +/* used by fdset_prepare */ +static void +fdset_prepare_i(struct iom_fdset_waiter *fsw, int fd, int events) +{ + struct epw_set *eset = container_of(fsw, struct epw_set, fsw); + struct epoll_event ev; + rb_thread_t *th = rb_ec_thread_ptr(fsw->w.sw.ec); + int epfd = iom_epfd(th->vm->iom); /* may raise */ + struct iom_fd *fdh = iom_fd_get(&th->vm->iom->fdmap, fd); + struct eset_item *esi = ALLOC(struct eset_item); + + ev.data.ptr = fdh; + ev.events = rb_events2ep(events); + esi->fd = fd; + esi->revents = 0; + esi->events = events; + esi->fdn.owner.set = eset; + if (!list_empty(&fdh->fdhead)) { + /* + * happens when different Fibers call rb_thread_fd_select + * or rb_wait_for_single_fd on the same FD + */ + merge_events(&ev, fdh); + } + list_add(&fdh->fdhead, &esi->fdn.as.fdnode); + list_add_tail(&eset->fdset_node, &esi->fdset_node); + + /* always try EPOLL_CTL_MOD because it is faster to reuse an epitem */ + if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) != 0) { + int e = errno; + + if (e == ENOENT) { + if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) == 0) + return; + e = errno; + } + if (e == EPERM) { /* regular file or directory, totally ready */ + esi->revents = esi->events; + fsw->ret++; + } + else { /* EBADF or anything else */ + if (fsw->ret == 0) { + fsw->errnum = e; + fsw->ret = -1; + } + } + } +} + +static VALUE +eset_schedule(VALUE ptr) +{ + struct epw_set *eset = (struct epw_set *)ptr; + + /* EPERM from regular file gives fsw.ret > 0 */ + fdset_prepare(&eset->fsw); + if (eset->fsw.ret) /* error or regular file */ + return (VALUE)eset->fsw.ret; + + return fdset_schedule(&eset->fsw); +} + +static VALUE +eset_ensure(VALUE ptr) +{ + struct epw_set *eset = (struct epw_set *)ptr; + struct eset_item *esi, *nxt; + + list_for_each_safe(&eset->fdset_node, esi, nxt, fdset_node) { + list_del(&esi->fdset_node); + list_del(&esi->fdn.as.fdnode); + fdset_save_result_i(&eset->fsw, esi->fd, esi->revents); + xfree(esi); + } + return fdset_ensure(&eset->fsw, eset, sizeof(*eset)); +} + +static inline VALUE +eset_wait(VALUE eset) +{ + return rb_ensure(eset_schedule, eset, eset_ensure, eset); +} + +int +rb_iom_select(rb_thread_t *th, int maxfd, + rb_fdset_t *r, rb_fdset_t *w, rb_fdset_t *e, + const rb_hrtime_t *rel) +{ + rb_iom_t *iom = iom_get(th); + VALUE tmp; + struct epw_set *eset = RB_FIBER_ALLOCA(tmp, struct epw_set); + int ret; + + list_head_init(&eset->fdset_node); + iom_fdset_waiter_init(th, &eset->fsw, maxfd, r, w, e, rel); + iom_waiter_add(th, &iom->c.fdsets, &eset->fsw.w); + + ret = (int)iom_timeout_do(th, rel, eset_wait, (VALUE)eset); + RB_FIBER_ALLOCA_END(tmp); + + return ret; +} + +void +rb_iom_destroy(rb_vm_t *vm) +{ + rb_iom_t *iom = vm->iom; + vm->iom = 0; + if (iom) { + /* + * it's possible; but crazy to share epoll FDs across processes + * (kqueue has a unique close-on-fork behavior) + */ + if (iom->mp.fd >= 0) + close(iom->mp.fd); + + iom_fdmap_destroy(&iom->fdmap); + xfree(iom); + } +} + +/* used by thread.c::rb_thread_atfork */ +#if defined(HAVE_WORKING_FORK) +static void +rb_iom_atfork_child(rb_thread_t *th) +{ + rb_iom_destroy(th->vm); +} +#endif +#include "iom_pingable_common.h" +#include "iom_common.h" diff --git a/iom_internal.h b/iom_internal.h new file mode 100644 index 0000000000..e769a013e0 --- /dev/null +++ b/iom_internal.h @@ -0,0 +1,596 @@ +#ifndef RB_IOM_COMMON_H +#define RB_IOM_COMMON_H + +#include "internal.h" +#include "iom.h" + +/* + * FIBER_USE_NATIVE (see cont.c) allows us to read machine stacks of + * yielded (stopped) fibers. Without native fiber support, cont.c needs + * to copy fiber stacks around, so we must use slower heap allocation + * instead of stack allocation (and increase our chance of hitting + * memory leak bugs) + */ +#include "fiber.h" + +#define FMODE_IOM_PRIVATE1 0x01000000 +#define FMODE_IOM_PRIVATE2 0x02000000 + +#define IOM_CHECK_INTS (0x1) + +static inline short +next_pow2(short x) +{ + x |= x >> 1; + x |= x >> 2; + x |= x >> 4; + x |= x >> 8; + + return x + 1; +} + +#if defined(POLLNVAL) +# if (POLLNVAL != RB_WAITFD_IN && POLLNVAL != RB_WAITFD_OUT && \ + POLLNVAL != RB_WAITFD_PRI) +# define IOM_NVAL POLLNVAL +# endif +#endif + +#ifndef IOM_NVAL +# define IOM_NVAL next_pow2(RB_WAITFD_IN|RB_WAITFD_OUT|RB_WAITFD_PRI) +#endif + +/* + * fdmap is a singleton. + * + * It makes zero sense to have multiple fdmaps per-process; even less so + * than multiple ioms. The file descriptor table in POSIX is per-process; + * and POSIX requires syscalls to allocate the lowest available FD. + * This is also why we use an array instead of a hash table, as there will + * be no holes for big processes. + * + * If CPU cache contention becomes a problem, we can pad (struct iom_fd) + * to 64-bytes for cache alignment. + * + * Currently we use fdmap to deal with FD aliasing with epoll + * and kqueue interfaces.. FD aliasing happens when multiple + * Coros wait on the same FD; but epoll/kqueue APIs only allow + * registering a single data pointer per FD. + * + * In the future, we may implement rb_notify_fd_close using fdmap. + */ + +/* linear growth */ +#define RB_IOM_FD_PER_HEAP 64 +/* on-heap and persistent for process lifetime keep as small as possible. */ +struct iom_fd { + struct list_head fdhead; /* -kev.(rfdnode|wfdnode), epw.fdnode */ +}; + +/* singleton (per-rb_iom_t, or per process, if we ever need > 1 iom) */ +struct iom_fdmap { + struct iom_fd **map; + unsigned int heaps; + int max_fd; +}; + +struct iom_timer { + struct rb_sched_waiter sw; + rb_hrtime_t expire_at; +}; + +/* + * use this instead of rb_sched_waiter directly for Thread::Light, + * because we need to register rb_execution_context_t.enode for + * "ensure" + */ +struct iom_waiter { + struct rb_sched_waiter sw; +}; + +struct iom_fd_waiter { + struct iom_waiter w; +#if FIBER_USE_NATIVE + int *fdp; /* (ideally), a pointer fptr->fd to detect closed FDs */ +#else + int fd; /* need to do full copy with non-native fiber */ +#endif + short events; /* requested events, like poll(2) */ + short revents; /* returned events, like poll(2) */ +#if RUBYVM_IOM == IOM_SELECT + const rb_hrtime_t *rel; +#endif +}; + +/* + * On-stack if FIBER_USE_NATIVE, for rb_thread_fd_select + * This is used to emulate select() on epoll and kqueue, and + * allows iom_select to support concurrent rb_thread_fd_select + * callers. + */ +struct iom_fdset_waiter { + struct iom_waiter w; + int max; + int ret; + int errnum; + rb_fdset_t *in[3]; + /* + * without native fibers, other threads and fibers cannot safely + * read the stacks of stopped fibers + */ +#if FIBER_USE_NATIVE == 0 && RUBYVM_IOM == IOM_SELECT + rb_fdset_t *onstack_dst[3]; +#endif +#if RUBYVM_IOM == IOM_SELECT + const rb_hrtime_t *rel; +#endif +}; + +/* + * threads sleeping w/o GVL while waiting for select/epoll_wait/kevent + * ALWAYS allocated on stack + */ +struct iom_blocker { + rb_thread_t *th; + struct list_node bnode; /* -iom->c.blockers */ +}; + +/* common core of rb_iom_t */ +struct iom_core { + /* + * Everything here is protected by GVL at this time, + * URCU lists (LGPL-2.1+) may be used in the future + */ + struct list_head timers; /* -iom_timer.sw.wnode, sort by expire_at */ + + /* rb_wait_for_single_fd callers */ + struct list_head fdws; /* -iom_fd_waiter.w.sw.wnode, FIFO order */ + + /* rb_thread_fd_select callers */ + struct list_head fdsets; /* -iom_fdset_waiter.w.sw.wnode */ + + /* + * threads doing select/epoll_wait/kevent without GVL + * We notify this list because they may be starving each other + */ + struct list_head blockers; /* -iom_blocker.bnode */ +}; + +/* process.c */ +int rb_vm_waitpid_pending(rb_vm_t *vm); + +static void iom_timer_check(const rb_thread_t *); +static void iom_timer_add(rb_thread_t *, struct iom_timer *, + const rb_hrtime_t *); +static void iom_waiter_add(rb_thread_t *, struct list_head *, + struct iom_waiter *); +static rb_iom_t *iom_get(const rb_thread_t *); +static void iom_tl_pass(rb_execution_context_t *); +static int iom_events_pending(rb_vm_t *); + +#define IOM_PINGABLE_P (RUBYVM_IOM == IOM_KQUEUE || RUBYVM_IOM == IOM_EPOLL) +#if IOM_PINGABLE_P +struct iom_multiplexer { + int fd; + /* array of "struct kevent" or "struct epoll_event" */ + VALUE buf; +}; + +static int iom_nevents(const rb_iom_t *); +static void *iom_eventlist(const rb_iom_t *, int *nevents); +static void iom_grow(rb_iom_t *, int retries); +static void iom_multiplexer_init(struct iom_multiplexer *mp); +static void fdset_prepare(struct iom_fdset_waiter *); +static VALUE fdset_schedule(struct iom_fdset_waiter *); +static VALUE fdset_ensure(struct iom_fdset_waiter *, void *, size_t); +static void fdset_save_result_i(struct iom_fdset_waiter *, int, short); +static int check_sigwait_fd(rb_thread_t *, const void *uptr, + int *err, int *sigwait_fd); + +/* TODO: may use this for rb_notify_fd_close */ +static struct iom_fd * +iom_fdhead_aref(struct iom_fdmap *fdmap, int fd) +{ + VM_ASSERT(fd >= 0); + return &fdmap->map[fd / RB_IOM_FD_PER_HEAP][fd % RB_IOM_FD_PER_HEAP]; +} + +static struct iom_fd * +iom_fd_get(struct iom_fdmap *fdmap, int fd) +{ + while (fd >= fdmap->max_fd) { + struct iom_fd *base, *h; + unsigned n = fdmap->heaps + 1; + unsigned i; + + fdmap->map = xrealloc2(fdmap->map, n, sizeof(struct iom_fd *)); + base = h = ALLOC_N(struct iom_fd, RB_IOM_FD_PER_HEAP); + for (i = 0; i < RB_IOM_FD_PER_HEAP; i++) { + list_head_init(&h->fdhead); + h++; + } + fdmap->map[fdmap->heaps++] = base; + fdmap->max_fd += RB_IOM_FD_PER_HEAP; + } + return iom_fdhead_aref(fdmap, fd); +} + +static void +iom_fdmap_init(struct iom_fdmap *fdmap) +{ + fdmap->max_fd = 0; + fdmap->heaps = 0; + fdmap->map = 0; +} + +static void +iom_fdmap_destroy(struct iom_fdmap *fdmap) +{ + unsigned n; + + for (n = 0; n < fdmap->heaps; n++) { + xfree(fdmap->map[n]); + } + xfree(fdmap->map); + iom_fdmap_init(fdmap); +} +#endif /* IOM_PINGABLE_P */ + +/* + * returns the rb_hrtime_t interval to wait for the next pending event + * returns NULL if we may wait indefinitely + */ +static rb_hrtime_t * +iom_timeout_next(rb_hrtime_t *rel, struct list_head *timers) +{ + struct rb_sched_waiter *sw; + + sw = list_top(timers, struct rb_sched_waiter, wnode); + if (sw) { + struct iom_timer *t = container_of(sw, struct iom_timer, sw); + + if (rb_hrtime_update_expire(rel, t->expire_at)) { + /* force an immediate return from epoll_wait/kevent if expired */ + *rel = 0; + } + return rel; + } + return 0; +} + +static VALUE +iom_tl_timeout_ensure(VALUE ptr) +{ + struct iom_timer *t = (struct iom_timer *)ptr; + + list_del(&t->sw.wnode); + + return Qfalse; +} + +static inline VALUE +iom_timeout_do(rb_thread_t *th, const rb_hrtime_t *rel, + VALUE (*fn)(ANYARGS), VALUE arg) +{ + /* + * XXX ugly, but otherwise we sprinkle this assignment all over + * the place in ensure callbacks: + */ + th->ec->in_runq = 0; + + if (rel) { + VALUE tmp, ret; + struct iom_timer *t = RB_FIBER_ALLOCA(tmp, struct iom_timer); + + iom_timer_add(th, t, rel); + ret = rb_ensure(fn, arg, iom_tl_timeout_ensure, (VALUE)t); + RB_FIBER_ALLOCA_END(tmp); + return ret; + } + return (fn)(arg); +} + +/* + * marks an an iom waiter as "ready" by enqueueing into the owner + * thread's runq. Must be called with GVL. + * Use ruby_iom_wake_signal if outside of GVL. + */ +static void +iom_waiter_ready(rb_thread_t *th, struct iom_waiter *w) +{ + rb_execution_context_t *ec = w->sw.ec; + rb_thread_t *owner = rb_ec_thread_ptr(ec); + + list_del(&w->sw.wnode); + rb_sched_waiter_ready(&owner->runq, &w->sw); + if (th != owner) { + rb_threadptr_interrupt(owner); + } +} + +/* callable without GVL */ +void +ruby_iom_wake_signal(struct rb_sched_waiter *sw, int enqueue_tl) +{ + rb_thread_t *th = rb_ec_thread_ptr(sw->ec); + + rb_native_mutex_lock(&th->interrupt_lock); + + if (enqueue_tl) { + rb_sched_waiter_ready(&th->nogvl_runq, sw); + } + + /* + * th->ec != sw->ec is possible. We want to interrupt the + * currently running ec on th. Hit both since cont.c::ec_switch + * is not protected by th->interrupt_lock + */ + RUBY_VM_SET_INTERRUPT(sw->ec); + RUBY_VM_SET_INTERRUPT(th->ec); + + if (th->unblock.func) + (th->unblock.func)(th->unblock.arg); + + rb_native_mutex_unlock(&th->interrupt_lock); +} + +/* + * called by rb_ensure at the end, so we may use list_del instead of + * idempotent list_del_init + */ +static void +iom_waiter_ensure(struct iom_waiter *w) +{ + list_del(&w->sw.wnode); +} + +static void +iom_fdset_waiter_init(rb_thread_t *th, struct iom_fdset_waiter *fsw, + int maxfd, rb_fdset_t *r, rb_fdset_t *w, rb_fdset_t *e, + const rb_hrtime_t *rel) +{ + size_t i; + fsw->ret = 0; + fsw->max = maxfd; + fsw->errnum = 0; + VM_ASSERT(fsw->max > 0); + fsw->in[0] = r; + fsw->in[1] = w; + fsw->in[2] = e; + + for (i = 0; i < 3; i++) { + if (fsw->in[i]) + rb_fd_resize(fsw->max - 1, fsw->in[i]); + } +#if FIBER_USE_NATIVE == 0 && RUBYVM_IOM == IOM_SELECT + for (i = 0; i < 3; i++) { + if (fsw->in[i]) { + fsw->onstack_dst[i] = fsw->in[i]; + fsw->in[i] = ALLOC(rb_fdset_t); + rb_fd_init_copy(fsw->in[i], fsw->onstack_dst[i]); + } + else { + fsw->onstack_dst[i] = 0; + } + } +#endif /* !FIBER_USE_NATIVE && IOM_SELECT */ +#if RUBYVM_IOM == IOM_SELECT + fsw->rel = rel; +#endif +} + +static inline int +iom_fdw_get_fd(struct iom_fd_waiter *fdw) +{ +#if FIBER_USE_NATIVE + return *fdw->fdp; /* able to detect closed FD in rb_io_t.fd */ +#else + return fdw->fd; +#endif +} + +static inline void +iom_fdw_init(struct iom_fd_waiter *fdw, int *fdp, int events, + const rb_hrtime_t *rel) +{ +#if FIBER_USE_NATIVE + fdw->fdp = fdp; +#else + fdw->fd = *fdp; +#endif + fdw->events = (short)events; + fdw->revents = 0; +#if RUBYVM_IOM == IOM_SELECT + fdw->rel = rel; +#endif +} + +static void +iom_core_init(struct iom_core *c) +{ + list_head_init(&c->timers); + list_head_init(&c->fdws); + list_head_init(&c->fdsets); + list_head_init(&c->blockers); +} + +static inline struct iom_fd_waiter * +fd_waiter_of(struct rb_sched_waiter *sw) +{ + struct iom_waiter *w = container_of(sw, struct iom_waiter, sw); + + return container_of(w, struct iom_fd_waiter, w); +} + +static inline struct iom_fdset_waiter * +fdset_waiter_of(struct rb_sched_waiter *sw) +{ + struct iom_waiter *w = container_of(sw, struct iom_waiter, sw); + + return container_of(w, struct iom_fdset_waiter, w); +} + +/* Use after calling select/epoll_wait/kevent to wait for events */ +static void +iom_wait_done(rb_thread_t *th, int sigwait_fd, int err, const char *msg, + struct iom_fdset_waiter *fsw, unsigned flags) +{ + if (sigwait_fd >= 0) { + rb_sigwait_fd_put(th, sigwait_fd); + rb_sigwait_fd_migrate(th->vm); + } + iom_timer_check(th); + switch (err) { + case 0: return; + case EINTR: + if (flags & IOM_CHECK_INTS) { + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + } + return; + default: + if (fsw && !fsw->ret) { + fsw->ret = -1; + fsw->errnum = err; + } + if (err != EBADF) { + rb_bug_errno(msg, err); + } + } +} + +/* + * use for checking the result of epoll_wait, kevent, ppoll, or select + * and similar functions when used without GVL (and non-zero timeout). + */ +static void +iom_wait_check_nogvl(rb_thread_t *th, int *err, int nr, int sigwait_fd) +{ + /* kevent, epoll_wait, select all return -1 and set errno on error */ + if (nr < 0) + *err = errno; + + if (sigwait_fd >= 0 && (nr == 0 || *err == EINTR)) { + if (check_signals_nogvl(th, sigwait_fd)) + *err = EINTR; + } +} + +void rb_fiber_mark_self(const rb_fiber_t *fib); /* cont.c */ +void rb_tl_merge_runq(rb_thread_t *); /* cont.c */ +void rb_tl_finish_i(rb_execution_context_t *, rb_fiber_t *); + +static void +iom_mark_sched_waiters(const struct list_head *h) +{ + const struct rb_sched_waiter *sw = 0; + + list_for_each(h, sw, wnode) + rb_fiber_mark_self(sw->ec->fiber_ptr); +} + +void +rb_iom_mark_thread(rb_thread_t *th) +{ + rb_execution_context_t *ec = 0; + + list_for_each(&th->coros, ec, enode) + rb_fiber_mark_self(ec->fiber_ptr); + + rb_tl_merge_runq(th); + iom_mark_sched_waiters(&th->runq); +} + +/* handles "ensure" */ +void +rb_tl_finish(rb_thread_t *th) +{ + rb_execution_context_t *ec; + + while ((ec = list_pop(&th->coros, rb_execution_context_t, enode))) { + list_node_init(&ec->enode); /* for list_del */ + rb_tl_finish_i(th->ec, ec->fiber_ptr); + } +} + +static inline int +iom_wait_fd_result(rb_thread_t *th, const rb_hrtime_t *rel, + VALUE (*fn)(ANYARGS), VALUE arg, VALUE tmp) +{ + int revents = (int)iom_timeout_do(th, rel, fn, arg); + + RB_FIBER_ALLOCA_END(tmp); + if (revents & IOM_NVAL) { + errno = EBADF; + return -1; + } + return revents; +} + +enum fd_pollable { + FD_POLL_OK, /* sockets or pipes */ + FD_POLL_BUSY, FD_POLL_BAD }; + +#ifdef _WIN32 +# define RB_W32_IS_SOCKET(fd) rb_w32_is_socket(fd) +#else +# define RB_W32_IS_SOCKET(fd) (0) +#endif + +static inline enum fd_pollable +iom_fd_pollable(int fd) +{ + struct stat st; + + if (RB_W32_IS_SOCKET(fd)) + return FD_POLL_OK; + + if (fstat(fd, &st) == 0) { + switch (st.st_mode & S_IFMT) { +#ifdef S_IFSOCK + case S_IFSOCK: /* sockets ought to be OK for all platforms */ +#elif defined(_S_IFSOCK) + case _S_IFSOCK: +#endif + case S_IFIFO: /* I hope FIFOs and pipes are pollable... */ + case S_IFCHR: /* maybe? */ + return FD_POLL_OK; + default: + /* assume select() is non-sensical with others */ + return FD_POLL_BUSY; + } + } + return FD_POLL_BAD; +} + +static inline int +fdset_error_p(struct iom_fdset_waiter *fsw) +{ + int i, fd; + + for (fd = 0; fd < fsw->max; fd++) { + for (i = 0; i < 3; i++) { + const rb_fdset_t *in = fsw->in[i]; + + if (in && rb_fd_isset(fd, in)) { + switch (iom_fd_pollable(fd)) { + case FD_POLL_BUSY: + fsw->ret++; + break; + case FD_POLL_OK: + break; + case FD_POLL_BAD: + if (!fsw->ret) { + fsw->ret = -1; + fsw->errnum = EBADF; + return TRUE; + } + } + + /* only check each FD once */ + break; + } + } + } + return !!fsw->ret; +} +#endif /* IOM_INTERNAL_H */ diff --git a/iom_kqueue.h b/iom_kqueue.h new file mode 100644 index 0000000000..413a0b31bc --- /dev/null +++ b/iom_kqueue.h @@ -0,0 +1,804 @@ +/* + * kqueue-based implementation of I/O Manager for RubyVM on *BSD + * + * The kevent API has an advantage over epoll_ctl+epoll_wait since + * it can simultaneously add filters and check for events with one + * syscall. It also allows EV_ADD to be used idempotently for + * enabling filters, where as epoll_ctl requires separate ADD and + * MOD operations. + * + * These are advantages in the common case... + * + * The epoll API has advantages in more esoteric cases: + * + * epoll has the advantage over kqueue when watching for multiple + * events (POLLIN|POLLOUT|POLLPRI) (which is rare). We also have + * to install two kevent filters to watch POLLIN|POLLOUT simutaneously. + * See udata_set/udata_get functions below for more on this. + * + * Finally, kevent does not support POLLPRI directly, we need to use + * select() (or perhaps poll() on some platforms) with a zero + * timeout to check for POLLPRI after EVFILT_READ returns. + * + * Finally, several *BSDs implement kqueue; and the quality of each + * implementation may vary. Anecdotally, *BSDs are not known to even + * support poll() consistently across different types of files. + * We will need to selective and careful about injecting them into + * kevent(). + */ +#include "iom_internal.h" + +/* LIST_HEAD (via ccan/list) conflicts with sys/queue.h (via sys/event.h) */ +#undef LIST_HEAD +#include +#include +#include + +/* needed for iom_pingable_common.h */ +#define IOM_EVENT_SIZE (sizeof(struct kevent)) + +/* We need to use EVFILT_READ to watch RB_WAITFD_PRI */ +#define WAITFD_READ (RB_WAITFD_IN|RB_WAITFD_PRI) + +/* allocated on heap (rb_vm_t.iom) */ +struct rb_iom_struct { + struct iom_core c; + struct iom_fdmap rfdmap; /* holds fdh for EVFILT_READ */ + struct iom_fdmap wfdmap; /* holds fdh for EVFILT_WRITE */ + struct iom_multiplexer mp; +}; + +struct kqw_set; +struct kqfdn { + /* + * both rfdnode and wfdnode are overloaded for deleting paired + * filters when watching both EVFILT_READ and EVFILT_WRITE on + * a single FD + */ + struct list_node rfdnode; /* -(ev.udata==fdh)->fdhead (EVFILT_READ) */ + struct list_node wfdnode; /* -(ev.udata==fdh)->fdhead (EVFILT_WRITE) */ + struct kqw_set *owner; +}; + +/* allocated on stack */ +struct kev { + struct kqfdn fdn; + + /* fdw.w.sw.wnode is overloaded for checking RB_WAITFD_PRI + * (see check_pri) */ + struct iom_fd_waiter fdw; +}; + +/* allocated on-stack */ +struct kqw_set { + struct iom_fdset_waiter fsw; + struct list_head fdset_head; /* - kset_item.fdset_node */ + /* we reuse iom->mp.buf (eventlist) for changelist */ + int nchanges; +}; + +/* + * per-FD in rb_thread_fd_select arg, value for kqw_set.fdn.as.tbl, + * allocated on-heap + */ +struct kset_item { + struct kqfdn fdn; + struct list_node fdset_node; /* kqw_set.kqw_head */ + int fd; + short events; /* requested events, like poll(2) */ + short revents; /* returned events, like poll(2) */ +}; + +/* + * we'll often call kevent with a zero timeout to perform a quick + * scan of events while holding GVL. I expect the BSD kernels to be + * optimizing this case (because I know for certain Linux does for epoll_wait). + */ +static const struct timespec zero; +static const short idx2events[] = { RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI }; + +static void +register_sigwait_fd(int kqfd) +{ + int sigwait_fd = signal_self_pipe.normal[0]; + + if (sigwait_fd >= 0) { + struct kevent kev; + + /* NOT using oneshot for signals */ + EV_SET(&kev, sigwait_fd, EVFILT_READ, EV_ADD, 0, 0, &sigwait_th); + + if (kevent(kqfd, &kev, 1, 0, 0, &zero) < 0) + rb_sys_fail("kevent"); + } +} + +static int +iom_kqfd(rb_iom_t *iom) +{ + struct iom_multiplexer *mp = &iom->mp; + + if (LIKELY(mp->fd >= 0)) + return mp->fd; + + mp->fd = kqueue(); + if (mp->fd < 0) { + if (rb_gc_for_fd(errno)) + mp->fd = kqueue(); + if (mp->fd < 0) + rb_sys_fail("kqueue"); + } + rb_fd_fix_cloexec(mp->fd); + register_sigwait_fd(mp->fd); + return mp->fd; +} + +static void +iom_init(rb_iom_t *iom) +{ + iom_core_init(&iom->c); + iom_fdmap_init(&iom->rfdmap); + iom_fdmap_init(&iom->wfdmap); + iom_multiplexer_init(&iom->mp); +} + +static struct kev * +kev_of(struct rb_sched_waiter *sw) +{ + struct iom_waiter *w = container_of(sw, struct iom_waiter, sw); + struct iom_fd_waiter *fdw = container_of(w, struct iom_fd_waiter, w); + + return container_of(fdw, struct kev, fdw); +} + +/* + * kevent does not have an EVFILT_* flag for (rarely-used) urgent data, + * at least not on FreeBSD 11.0, so we call select() separately after + * EVFILT_READ returns to set the RB_WAITFD_PRI bit: + */ +static void +check_pri(rb_thread_t *th, struct list_head *pri) +{ + rb_fdset_t efds; + int max, r, err = 0; + struct rb_sched_waiter *sw, *next; + struct timeval tv; + + if (RB_LIKELY(list_empty(pri))) + return; + + r = 0; + max = -1; + rb_fd_init(&efds); + + list_for_each(pri, sw, wnode) { + struct kev *kev = kev_of(sw); + int fd = iom_fdw_get_fd(&kev->fdw); + + if (fd >= 0) { + rb_fd_set(fd, &efds); + if (fd > max) + max = fd; + } + } + if (max >= 0) { +again: + tv.tv_sec = 0; + tv.tv_usec = 0; + r = native_fd_select(max + 1, 0, 0, &efds, &tv, th); + if (r < 0) { + err = errno; + if (err == EINTR) /* shouldn't be possible with zero timeout */ + goto again; + } + list_for_each_safe(pri, sw, next, wnode) { + struct kev *kev = kev_of(sw); + + /* + * favor spurious wakeup over empty revents: + * I have observed select(2) failing to detect urgent data + * readiness after EVFILT_READ fires for it on FreeBSD 11.0. + * test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb + * is relaxed on non-Linux for this reason. + * We could use a non-zero timeout for select(2), but + * we also don't want to release GVL and have th->runq + * processed before we set revents, so we favor spurious + * RB_WAITFD_PRI over an empty revents field + */ + if (r >= 0) { + int fd = iom_fdw_get_fd(&kev->fdw); + + if (fd >= 0 && (rb_fd_isset(fd, &efds) || !kev->fdw.revents)) + kev->fdw.revents |= RB_WAITFD_PRI; + } + iom_waiter_ready(th, &kev->fdw.w); + } + } + rb_fd_term(&efds); +} + +/* + * kqueue is a more complicated than epoll for corner cases because we + * install separate filters for simultaneously watching read and write + * on the same FD, and now we must clear shared filters if only the + * event from the other side came in. + */ +static int +drop_pairs(rb_thread_t *th, int max, struct kevent *changelist, + struct list_head *rdel, struct list_head *wdel) +{ + int nchanges = 0; + struct kqfdn *fdn, *next; + + list_for_each_safe(rdel, fdn, next, wfdnode) { + struct kev *kev = container_of(fdn, struct kev, fdn); + int fd = iom_fdw_get_fd(&kev->fdw); + + list_del_init(&kev->fdn.wfdnode); /* delete from rdel */ + VM_ASSERT(kev->fdw.revents & RB_WAITFD_OUT); + + if (fd >= 0 && !(kev->fdw.revents & WAITFD_READ)) { + struct iom_fd *fdh = iom_fd_get(&th->vm->iom->rfdmap, fd); + + if (list_empty(&fdh->fdhead)) { + struct kevent *chg = &changelist[nchanges++]; + VM_ASSERT(nchanges <= max); + EV_SET(chg, fd, EVFILT_READ, EV_DELETE, 0, 0, 0); + } + } + } + list_for_each_safe(wdel, fdn, next, rfdnode) { + struct kev *kev = container_of(fdn, struct kev, fdn); + int fd = iom_fdw_get_fd(&kev->fdw); + + list_del_init(&kev->fdn.rfdnode); /* delete from wdel */ + VM_ASSERT(kev->fdw.revents & WAITFD_READ); + + if (fd >= 0 && !(kev->fdw.revents & RB_WAITFD_OUT)) { + struct iom_fd *fdh = iom_fd_get(&th->vm->iom->wfdmap, fd); + + if (list_empty(&fdh->fdhead)) { + struct kevent *chg = &changelist[nchanges++]; + VM_ASSERT(nchanges <= max); + EV_SET(chg, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0); + } + } + } + if (nchanges) { + int kqfd = th->vm->iom->mp.fd; + + if (kevent(kqfd, changelist, nchanges, 0, 0, &zero) < 0) + return errno; + } + return 0; +} + +/* + * kqueue requires separate filters when watching for simultaneously + * watching read and write events on the same FD. Different filters + * means they can have different udata pointers; at least with + * native kqueue. + * + * When emulating native kqueue behavior using epoll as libkqueue does, + * there is only space for one udata pointer. This is because epoll + * allows each "filter" (epitem in the kernel) to watch read and write + * simultaneously. + * epoll keys epitems using: [ fd, file description (struct file *) ]. + * In contrast, kevent keys its filters using: [ fd (ident), filter ]. + * + * So, with native kqueue, we can at least optimize away iom_fd_get + * function calls; but we cannot do that when libkqueue emulates + * kqueue with epoll and need to retrieve the fdh from the correct + * fdmap. + * + * Finally, the epoll implementation only needs one fdmap. + */ +static void * +udata_set(rb_iom_t *iom, struct iom_fd *fdh) +{ +#ifdef LIBKQUEUE +# warning libkqueue support is incomplete and does not handle corner cases + return iom; +#else /* native kqueue */ + return fdh; +#endif +} + +static struct iom_fd * +udata_get(const struct kevent *ev) +{ +#ifdef LIBKQUEUE + rb_iom_t *iom = ev->udata; + int fd = ev->ident; + + switch (ev->filter) { + case EVFILT_READ: return iom_fd_get(&iom->rfdmap, fd); + case EVFILT_WRITE: return iom_fd_get(&iom->wfdmap, fd); + default: + rb_bug("bad filter in libkqueue compatibility mode: %d", ev->filter); + } +#endif + return ev->udata; +} + +/* + * this MUST not raise exceptions because sigwait_fd can be acquired, + * iom_wait_done can release sigwait_fd and raise + */ +static int +check_kevent(rb_thread_t *th, int nr, struct kevent *eventlist, int *sigwait_fd) +{ + int i, err = 0; + struct list_head pri, wdel, rdel; + struct kqfdn *fdn = 0, *next; + + if (nr < 0) return errno; + if (nr == 0) return err; + + list_head_init(&pri); + list_head_init(&wdel); + list_head_init(&rdel); + + for (i = 0; i < nr; i++) { + struct kevent *ev = &eventlist[i]; + struct iom_fd *fdh; + + if (check_sigwait_fd(th, ev->udata, &err, sigwait_fd)) + continue; + + /* + * fdh is keyed to both FD and filter, so + * it's either in iom->rfdmap or iom->wfdmap + */ + fdh = udata_get(ev); + + if (ev->filter == EVFILT_READ) { + int pair_queued = 0; + + list_for_each_safe(&fdh->fdhead, fdn, next, rfdnode) { + struct kqw_set *kset = fdn->owner; + int rbits; + + if (kset) { + struct kset_item *ksi; + + ksi = container_of(fdn, struct kset_item, fdn); + rbits = ksi->events & WAITFD_READ; + VM_ASSERT(rbits && "unexpected EVFILT_READ"); + + /* + * The following may spuriously set RB_WAITFD_PRI: + * (see comments in check_pri for reasoning) + */ + ksi->revents |= rbits; + list_del_init(&fdn->rfdnode); + + /* entire set is ready once the first event fires */ + if (!kset->fsw.ret++) + iom_waiter_ready(th, &kset->fsw.w); + } + else { + struct kev *kev = container_of(fdn, struct kev, fdn); + + rbits = kev->fdw.events & WAITFD_READ; + VM_ASSERT(rbits && "unexpected EVFILT_READ"); + + kev->fdw.revents |= (RB_WAITFD_IN & rbits); + list_del_init(&kev->fdn.rfdnode); + + if (iom_fdw_get_fd(&kev->fdw) < 0) { /* FD was closed */ + iom_waiter_ready(th, &kev->fdw.w); + continue; + } + if (rbits & RB_WAITFD_PRI) { + list_del(&kev->fdw.w.sw.wnode); + list_add(&pri, &kev->fdw.w.sw.wnode); + /* check_pri will do iom_waiter_ready */ + } + else { + iom_waiter_ready(th, &kev->fdw.w); + } + if ((kev->fdw.events & RB_WAITFD_OUT) && !pair_queued) { + pair_queued = 1; + list_add(&wdel, &kev->fdn.rfdnode); + } + } + } + } + else if (ev->filter == EVFILT_WRITE) { + int pair_queued = 0; + + list_for_each_safe(&fdh->fdhead, fdn, next, wfdnode) { + struct kqw_set *kset = fdn->owner; + int rbits; + + if (kset) { + struct kset_item *ksi; + + ksi = container_of(fdn, struct kset_item, fdn); + rbits = ksi->events & RB_WAITFD_OUT; + VM_ASSERT(rbits && "unexpected EVFILT_WRITE"); + ksi->revents |= rbits; + list_del_init(&fdn->wfdnode); + if (!kset->fsw.ret++) + iom_waiter_ready(th, &kset->fsw.w); + } + else { + struct kev *kev = container_of(fdn, struct kev, fdn); + + if (!kev->fdw.revents) + iom_waiter_ready(th, &kev->fdw.w); + kev->fdw.revents |= RB_WAITFD_OUT; + list_del_init(&kev->fdn.wfdnode); + + if ((kev->fdw.events & WAITFD_READ) && + iom_fdw_get_fd(&kev->fdw) >= 0 && + !pair_queued) { + pair_queued = 1; + list_add(&rdel, &kev->fdn.wfdnode); + } + } + } + } + else { + rb_bug("unexpected filter: %d", ev->filter); + } + } + check_pri(th, &pri); + err = drop_pairs(th, nr, eventlist, &rdel, &wdel); + + return (err || RUBY_VM_INTERRUPTED(th->ec)) ? EINTR : 0; +} + +/* perform a non-blocking kevent check while holding GVL */ +static void +iom_ping_events(rb_thread_t *th, struct iom_fdset_waiter *fsw, unsigned flags) +{ + struct kqw_set *kset = fsw ? container_of(fsw, struct kqw_set, fsw) : 0; + rb_iom_t *iom = th->vm->iom; + int kqfd = kset ? iom_kqfd(iom) : (iom ? iom->mp.fd : -1); + int sigwait_fd = rb_sigwait_fd_get(th); + int err = 0; + + if (kqfd >= 0 && list_empty(&iom->c.blockers)) { + int nr, nevents, retries = 0; + struct kevent *eventlist = iom_eventlist(iom, &nevents); + + do { + struct kevent *changelist = 0; + int nchanges = 0; + + /* only register changes once if we loop: */ + if (kset) { + changelist = eventlist; + nchanges = kset->nchanges; + kset = 0; + } + nr = kevent(kqfd, changelist, nchanges, eventlist, nevents, &zero); + err = check_kevent(th, nr, eventlist, &sigwait_fd); + } while (!err && nr == nevents && ++retries); + iom_grow(iom, retries); + } + else if (sigwait_fd >= 0) { + err = check_signals_nogvl(th, sigwait_fd) ? EINTR : 0; + } + iom_wait_done(th, sigwait_fd, err, "kevent", fsw, flags); +} + +/* for iom_pingable_common.h */ +static void +iom_do_wait(rb_thread_t *th, rb_iom_t *iom) +{ + struct kevent eventlist[256 / IOM_EVENT_SIZE]; + int nevents = numberof(eventlist); + int nr = nevents; + rb_hrtime_t hrt, *rel = iom_timeout_next(&hrt, &iom->c.timers); + + if (!rel || *rel != 0) { /* slow path */ + int kqfd = iom_kqfd(iom); /* may raise */ + int err = 0; + int sigwait_fd = rb_sigwait_fd_get(th); + struct iom_blocker cur; + + cur.th = th; + VM_ASSERT(kqfd >= 0); + nr = 0; + list_add(&iom->c.blockers, &cur.bnode); + BLOCKING_REGION(th, { + struct timespec ts; + struct timespec *tsp = rb_hrtime2timespec(&ts, rel); + + if (!RUBY_VM_INTERRUPTED(th->ec)) { + nr = kevent(kqfd, 0, 0, eventlist, nevents, tsp); + } + iom_wait_check_nogvl(th, &err, nr, sigwait_fd); + }, sigwait_fd >= 0 ? ubf_sigwait : ubf_select, th, TRUE); + list_del(&cur.bnode); + if (!err) + err = check_kevent(th, nr, eventlist, &sigwait_fd); + iom_wait_done(th, sigwait_fd, err, "kevent", 0, IOM_CHECK_INTS); + } + if (nr == nevents && iom_events_pending(th->vm)) /* || *rel == 0 */ + iom_ping_events(th, 0, IOM_CHECK_INTS); +} + +/* + * kqueue shines here because there's only one syscall in the common case, + * but the corner-cases (e.g. waiting for both read and write) are complex + */ +static int +kevxchg_ping(int fd, struct kev *kev) +{ + rb_thread_t *th = rb_ec_thread_ptr(kev->fdw.w.sw.ec); + rb_iom_t *iom = iom_get(th); + int retries = 0; + int nr, err, nevents, nchanges = 0; + struct kevent *eventlist = iom_eventlist(iom, &nevents); + struct kevent *changelist = eventlist; + int kqfd = iom_kqfd(iom); + int sigwait_fd = rb_sigwait_fd_get(th); + + VM_ASSERT(nevents >= 2 && "iom->mp.buf should never be this small"); + + /* + * we must clear this before reading since we check for owner kset in + * check_kevent to distinguish between single and multi-FD select(2) + * callers + */ + kev->fdn.owner = 0; + + /* EVFILT_READ handles urgent data (POLLPRI)... hopefully */ + if (kev->fdw.events & WAITFD_READ) { + struct iom_fd *fdh = iom_fd_get(&iom->rfdmap, fd); + + if (list_empty(&fdh->fdhead)) { + struct kevent *chg = &changelist[nchanges++]; + void *udata = udata_set(iom, fdh); + EV_SET(chg, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, udata); + } + list_add(&fdh->fdhead, &kev->fdn.rfdnode); + } + if (kev->fdw.events & RB_WAITFD_OUT) { + struct iom_fd *fdh = iom_fd_get(&iom->wfdmap, fd); + + if (list_empty(&fdh->fdhead)) { + struct kevent *chg = &changelist[nchanges++]; + void *udata = udata_set(iom, fdh); + EV_SET(chg, fd, EVFILT_WRITE, EV_ADD|EV_ONESHOT, 0, 0, udata); + } + list_add(&fdh->fdhead, &kev->fdn.wfdnode); + } + do { + nr = kevent(kqfd, changelist, nchanges, eventlist, nevents, &zero); + err = check_kevent(th, nr, eventlist, &sigwait_fd); + } while (!err && nr == nevents && ++retries && (nchanges = 0) == 0); + + iom_grow(iom, retries); + iom_wait_done(th, sigwait_fd, err, "kevent (kevxchg_ping)", 0, + IOM_CHECK_INTS); + return err; +} + +static VALUE +kev_schedule(VALUE ptr) +{ + struct kev *kev = (struct kev *)ptr; + int fd = iom_fdw_get_fd(&kev->fdw); + + if (fd >= 0) { + int err = kevxchg_ping(fd, kev); + + if (err) + kev->fdw.revents = kev->fdw.events | POLLNVAL; + else if (!rb_tl_schedule_ready_p(kev->fdw.w.sw.ec)) + VM_UNREACHABLE(kev_schedule); + } + return (VALUE)kev->fdw.revents; /* may be zero if timed out */ +} + +static VALUE +kev_ensure(VALUE ptr) +{ + struct kev *kev = (struct kev *)ptr; + list_del(&kev->fdn.rfdnode); + list_del(&kev->fdn.wfdnode); + iom_waiter_ensure(&kev->fdw.w); + return Qfalse; +} + +static inline VALUE +kev_wait(VALUE kev) +{ + return rb_ensure(kev_schedule, kev, kev_ensure, kev); +} + +static int +iom_waitfd(rb_thread_t *th, int *fdp, int events, const rb_hrtime_t *rel) +{ + rb_iom_t *iom = iom_get(th); + VALUE tmp; + struct kev *kev = RB_FIBER_ALLOCA(tmp, struct kev); + + iom_fdw_init(&kev->fdw, fdp, events, rel); + list_node_init(&kev->fdn.rfdnode); + list_node_init(&kev->fdn.wfdnode); + iom_waiter_add(th, &iom->c.fdws, &kev->fdw.w); + + return iom_wait_fd_result(th, rel, kev_wait, (VALUE)kev, tmp); +} + +/* + * XXX we may use fptr->mode to mark FDs which kqueue cannot handle, + * different BSDs may have different bugs which prevent certain + * FD types from being handled by kqueue... + */ +int +rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, + const rb_hrtime_t *rel) +{ + return iom_waitfd(th, &fptr->fd, events, rel); +} + +int +rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, const rb_hrtime_t *rel) +{ + return iom_waitfd(th, fdp, events, rel); +} + +static void +kset_add_event(struct kqw_set *kset, int fd, short filter, void *udata) +{ + static const long kevent_size = (long)IOM_EVENT_SIZE; + VALUE dst = rb_ec_vm_ptr(kset->fsw.w.sw.ec)->iom->mp.buf; + long old_len = RSTRING_LEN(dst); + struct kevent *chg; + size_t off = kset->nchanges * kevent_size; + long new_len = (kset->nchanges += 1) * kevent_size; + + if (old_len < new_len) { + rb_str_resize(dst, new_len); + rb_str_set_len(dst, new_len); + } + + chg = (struct kevent *)(RSTRING_PTR(dst) + off); + EV_SET(chg, fd, filter, EV_ADD|EV_ONESHOT, 0, 0, udata); +} + +/* used by fdset_prepare */ +static int +fdset_prepare_i(struct iom_fdset_waiter *fsw, int fd, int events) +{ + struct kqw_set *kset = container_of(fsw, struct kqw_set, fsw); + rb_iom_t *iom = rb_ec_vm_ptr(fsw->w.sw.ec)->iom; + struct kset_item *ksi = ALLOC(struct kset_item); + + ksi->fd = fd; + ksi->revents = 0; + ksi->events = events; + ksi->fdn.owner = kset; + list_add_tail(&kset->fdset_head, &ksi->fdset_node); + list_node_init(&ksi->fdn.rfdnode); + list_node_init(&ksi->fdn.wfdnode); + + if (events & (RB_WAITFD_IN|RB_WAITFD_PRI)) { + struct iom_fd *fdh = iom_fd_get(&iom->rfdmap, fd); + + if (list_empty(&fdh->fdhead)) + kset_add_event(kset, fd, EVFILT_READ, udata_set(iom, fdh)); + list_add(&fdh->fdhead, &ksi->fdn.rfdnode); + } + if (events & RB_WAITFD_OUT) { + struct iom_fd *fdh = iom_fd_get(&iom->wfdmap, fd); + + if (list_empty(&fdh->fdhead)) + kset_add_event(kset, fd, EVFILT_WRITE, udata_set(iom, fdh)); + list_add(&fdh->fdhead, &ksi->fdn.wfdnode); + } + return ST_CONTINUE; +} + +static VALUE +kset_schedule(VALUE ptr) +{ + struct kqw_set *kset = (struct kqw_set *)ptr; + + fdset_prepare(&kset->fsw); + /* + * no error check here since no kevent call at this point, + * fdset_schedule calls kevent: + */ + return fdset_schedule(&kset->fsw); +} + +static VALUE +kset_ensure(VALUE ptr) +{ + struct kqw_set *kset = (struct kqw_set *)ptr; + struct kset_item *ksi, *nxt; + + list_for_each_safe(&kset->fdset_head, ksi, nxt, fdset_node) { + list_del(&ksi->fdset_node); + list_del_init(&ksi->fdn.rfdnode); + list_del_init(&ksi->fdn.wfdnode); + fdset_save_result_i(&kset->fsw, ksi->fd, ksi->revents); + xfree(ksi); + } + return fdset_ensure(&kset->fsw, kset, sizeof(*kset)); +} + +static VALUE +kset_wait(VALUE kset) +{ + return rb_ensure(kset_schedule, kset, kset_ensure, kset); +} + +int +rb_iom_select(rb_thread_t *th, int maxfd, + rb_fdset_t *r, rb_fdset_t *w, rb_fdset_t *e, + const rb_hrtime_t *rel) +{ + rb_iom_t *iom = iom_get(th); + VALUE tmp; + struct kqw_set *kset = RB_FIBER_ALLOCA(tmp, struct kqw_set); + int ret; + + kset->nchanges = 0; + iom_fdset_waiter_init(th, &kset->fsw, maxfd, r, w, e, rel); + list_head_init(&kset->fdset_head); + iom_waiter_add(th, &iom->c.fdsets, &kset->fsw.w); + + ret = (int)iom_timeout_do(th, rel, kset_wait, (VALUE)kset); + RB_FIBER_ALLOCA_END(tmp); + return ret; +} + +/* + * this exists because native kqueue has a unique close-on-fork + * behavior, so we need to skip close() on fork + */ +static void +iom_free(rb_iom_t *iom) +{ + iom_fdmap_destroy(&iom->rfdmap); + iom_fdmap_destroy(&iom->wfdmap); + xfree(iom); +} + +void +rb_iom_destroy(rb_vm_t *vm) +{ + rb_iom_t *iom = vm->iom; + + vm->iom = 0; + if (iom) { + if (iom->mp.fd >= 0) + close(iom->mp.fd); + iom_free(iom); + } +} + +#if defined(HAVE_WORKING_FORK) +/* used by thread.c::rb_thread_atfork */ +static void +rb_iom_atfork_child(rb_thread_t *th) +{ +#ifdef LIBKQUEUE + /* + * libkqueue uses epoll, /dev/poll FD, or poll/select on a pipe, + * so there is an FD. + * I guess we're going to hit an error in case somebody uses + * libkqueue with a real native kqueue backend, but nobody does + * that in production, hopefully. + */ + rb_iom_destroy(th->vm); +#else /* native kqueue is close-on-fork, so we do not close ourselves */ + rb_iom_t *iom = th->vm->iom; + if (iom) { + iom_free(iom); + th->vm->iom = 0; + } +#endif /* !LIBKQUEUE */ +} +#endif /* defined(HAVE_WORKING_FORK) */ +#include "iom_pingable_common.h" +#include "iom_common.h" diff --git a/iom_pingable_common.h b/iom_pingable_common.h new file mode 100644 index 0000000000..07841b44ca --- /dev/null +++ b/iom_pingable_common.h @@ -0,0 +1,224 @@ +/* + * shared between "pingable" implementations (iom_kqueue.h and iom_epoll.h) + * "pingable" means it is relatively cheap to check for events with a + * zero timeout on epoll_wait/kevent w/o releasing GVL. + * select(2) is not "pingable" because it has O(n) setup/teardown costs + * where "n" is number of descriptors watched. + */ + +/* only for iom_kqueue.h and iom_epoll.h */ +static void iom_do_wait(rb_thread_t *, rb_iom_t *); + +static VALUE +iom_do_schedule(VALUE ptr) +{ + rb_thread_t *th = (rb_thread_t *)ptr; + rb_iom_t *iom = iom_get(th); + + /* we should not retrieve more events if we can run some right now */ + VM_ASSERT(list_empty(&th->runq)); + iom_do_wait(th, iom); + + return Qfalse; +} + +/* Runs the scheduler */ +void +rb_iom_schedule(rb_thread_t *th, const rb_hrtime_t *rel) +{ + if (rb_tl_sched_p(th->ec)) { + iom_ping_events(th, 0, IOM_CHECK_INTS); + iom_tl_pass(th->ec); + } + else if (!rb_tl_schedule_ready_p(th->ec)) { + iom_timeout_do(th, rel, iom_do_schedule, (VALUE)th); + } + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); +} + +/* + * iom_kqueue.h::fdset_prepare_i does this atomically by creating a + * userspace buffer of changes for registration in fdset_schedule. + * + * iom_epoll.h::fdset_prepare_i uses epoll_ctl to register events + * here, but GVL prevents existing epoll_wait callers from fishing + * for events while this is running (EPOLL_CTL_MOD is faster + * than EPOLL_CTL_ADD). + */ +static void +fdset_prepare(struct iom_fdset_waiter *fsw) +{ + int i, fd; + + /* we must not retrieve events in these loops */ + for (fd = 0; fd < fsw->max; fd++) { + int events = 0; + + for (i = 0; i < 3; i++) { + rb_fdset_t *in = fsw->in[i]; + + if (in && rb_fd_isset(fd, in)) + events |= idx2events[i]; + } + if (events) { + fdset_prepare_i(fsw, fd, events); + if (fsw->ret < 0) + return; + } + } + + /* fdset_save_result_i will repopulate fsw->in */ + for (i = 0; i < 3; i++) { + rb_fdset_t *in = fsw->in[i]; + + if (in) rb_fd_zero(in); + } +} + +/* + * Retrieves events via epoll_wait or kevent + * kqueue atomically registers and check events with one syscall, here. + */ +static VALUE +fdset_schedule(struct iom_fdset_waiter *fsw) +{ + rb_execution_context_t *ec = fsw->w.sw.ec; + rb_thread_t *th = rb_ec_thread_ptr(ec); + + iom_ping_events(th, fsw, IOM_CHECK_INTS); + if (!fsw->ret) { /* fsw->ret may be set by iom_ping_events */ + if (!rb_tl_schedule_ready_p(ec)) { + VM_UNREACHABLE(fdset_schedule); + } + } + return (VALUE)fsw->ret; +} + +/* + * Updates the rb_fdset_t of rb_thread_fd_select callers + */ +static void +fdset_save_result_i(struct iom_fdset_waiter *fsw, int fd, short revents) +{ + int i; + + if (fsw->ret <= 0) + return; + for (i = 0; i < 3; i++) { + if (revents & idx2events[i]) { + rb_fdset_t *in = fsw->in[i]; + + VM_ASSERT(in && "revents set without fsw->in"); + rb_fd_set(fd, in); + } + } +} + +static VALUE +fdset_ensure(struct iom_fdset_waiter *fsw, void *ptr, size_t n) +{ + int err = fsw->errnum; + + iom_waiter_ensure(&fsw->w); + + errno = err; + return Qfalse; +} + +/* @udata is epoll_data_t.ptr or kevent.udata */ +static int +check_sigwait_fd(rb_thread_t *th, const void *udata, int *err, int *sigwait_fd) +{ + if (UNLIKELY(udata == &sigwait_th)) { + /* + * if we couldn't acquire sigwait_fd before calling kevent/epoll_wait, + * try to get it now. iom_wait_done will call rb_sigwait_fd_put + */ + if (*sigwait_fd < 0) + *sigwait_fd = rb_sigwait_fd_get(th); /* caller will _put */ + + if (*sigwait_fd >= 0 && check_signals_nogvl(th, *sigwait_fd)) + *err = EINTR; + + return TRUE; /* `continue' the loop because sigwait_fd is reserved */ + } + /* normal FD */ + return FALSE; +} + +/* limit growth of eventlist buffer */ +#define IOM_MAX_EVENTS (1 << 17) + +/* returns the number of events we can fetch while holding GVL */ +static int +iom_nevents(const rb_iom_t *iom) +{ + return RSTRING_LENINT(iom->mp.buf) / IOM_EVENT_SIZE; +} + +/* returns the eventlist buffer for kevent or epoll_wait to write to */ +static void * +iom_eventlist(const rb_iom_t *iom, int *nevents) +{ + *nevents = iom_nevents(iom); + + return (void *)RSTRING_PTR(iom->mp.buf); +} + +/* grows the eventlist buffer if needed */ +static void +iom_grow(rb_iom_t *iom, int retries) +{ + if (retries) { + int old_nevents = iom_nevents(iom); + int new_nevents = retries * old_nevents; + + if (new_nevents <= 0 || new_nevents > IOM_MAX_EVENTS) + new_nevents = IOM_MAX_EVENTS; + + rb_str_resize(iom->mp.buf, new_nevents * IOM_EVENT_SIZE); + } +} + +static void +iom_multiplexer_init(struct iom_multiplexer *mp) +{ + mp->fd = -1; + mp->buf = rb_str_tmp_new(IOM_EVENT_SIZE * 8); +} + +void +rb_iom_mark(rb_iom_t *iom) +{ + rb_gc_mark(iom->mp.buf); +} + +/* used by thread_pthread.c */ +static int +rb_iom_reserved_fd(int fd) +{ + rb_iom_t *iom = GET_VM()->iom; + + return fd >= 0 && iom && fd == iom->mp.fd; +} + +static int +rb_iom_fd(const rb_thread_t *th) +{ + rb_iom_t *iom = th->vm->iom; + + return iom && !list_empty(&th->coros) && list_empty(&iom->c.blockers) ? + iom->mp.fd : -1; +} + +static int +rb_iom_schedule_p(const rb_thread_t *th) +{ + return rb_tl_sched_p(th->ec); +} + +static void +rb_iom_ping_events(rb_thread_t *th) +{ + iom_ping_events(th, 0, 0); +} diff --git a/iom_select.h b/iom_select.h new file mode 100644 index 0000000000..841104fb01 --- /dev/null +++ b/iom_select.h @@ -0,0 +1,569 @@ +/* + * select()-based implementation of I/O Manager for RubyVM + * + * This is crippled and relies heavily on GVL compared to the + * epoll and kqueue versions. Every call to select requires + * scanning the entire iom->c.fdws list; so it gets silly expensive + * with hundreds or thousands of FDs. + */ +#include "iom_internal.h" +#include +#include +#include + +/* allocated on heap (rb_vm_t.iom) */ +struct rb_iom_struct { + struct iom_core c; + /* + * generation counters for determining if we need to redo select() + * for newly registered FDs. kevent and epoll APIs do not require + * this, as events can be registered and retrieved at any time by + * different threads. + */ + rb_serial_t select_gen; /* RDWR protected by GVL */ +}; + +/* allocated on stack */ +struct select_do { + rb_thread_t *th; + unsigned int do_wait : 1; + unsigned int check_ints : 1; +}; + +static const short idx2events[] = { RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI }; + +static void +iom_init(rb_iom_t *iom) +{ + iom->select_gen = 0; + iom_core_init(&iom->c); +} + +/* max == -1 : wake all */ +static void +iom_blockers_notify(rb_iom_t *iom, int max) +{ + struct iom_blocker *b = 0, *next; + + list_for_each_safe(&iom->c.blockers, b, next, bnode) { + list_del_init(&b->bnode); + rb_threadptr_interrupt(b->th); + + if (--max == 0) + break; + } +} + +/* FIXME inefficient, but maybe not worth optimizing ... */ +static void +fd_merge(int max, rb_fdset_t *dst, const rb_fdset_t *src) +{ + int fd; + + for (fd = 0; fd < max; fd++) { + if (rb_fd_isset(fd, src)) { + rb_fd_set(fd, dst); + } + } +} + +static rb_fdset_t * +fdset_init_once(rb_fdset_t **dst, rb_fdset_t *src) +{ + if (!*dst) { + rb_fd_init(src); + *dst = src; + } + return *dst; +} + +static int +fd_intersect(struct iom_fdset_waiter *fsw, unsigned i, const rb_fdset_t *res) +{ + int ret = 0; + rb_fdset_t *in = fsw->in[i]; + + if (in && res) { + int fd; + rb_fdset_t *dst = 0, tmp; + + for (fd = 0; fd < fsw->max; fd++) { + if (rb_fd_isset(fd, in) && rb_fd_isset(fd, res)) { + rb_fd_set(fd, fdset_init_once(&dst, &tmp)); + ret++; + } + } + + /* copy results back to user-supplied fdset */ + if (ret) { + rb_fd_dup(in, dst); + rb_fd_term(dst); + } + } + return ret; +} + +static void +iom_select_wait(struct select_do *sd) +{ + rb_thread_t *th = sd->th; + rb_iom_t *iom = th->vm->iom; + rb_fdset_t fdsets[3]; + rb_fdset_t *fds[3] = { 0 }; + rb_hrtime_t hrt = 0; + rb_hrtime_t *rel = &hrt; + int nr = 0, err = 0, max = 0; + int sigwait_fd = rb_sigwait_fd_get(th); + unsigned int i; + struct rb_sched_waiter *sw = 0, *next; + + /* deal with rb_iom_select callers: */ + list_for_each_safe(&iom->c.fdsets, sw, next, wnode) { + struct iom_fdset_waiter *fsw = fdset_waiter_of(sw); + + if (fsw->max > max) + max = fsw->max; + for (i = 0; i < 3; i++) { + const rb_fdset_t *src = fsw->in[i]; + + if (src) + fd_merge(fsw->max, fdset_init_once(&fds[i], &fdsets[i]), src); + } + } + + /* deal with rb_iom_waitfd callers: */ + list_for_each_safe(&iom->c.fdws, sw, next, wnode) { + struct iom_fd_waiter *fdw = fd_waiter_of(sw); + int fd = iom_fdw_get_fd(fdw); + + if (fd < 0) { /* closed */ + fdw->revents = fdw->events; + iom_waiter_ready(th, &fdw->w); + sd->do_wait = 0; + continue; + } + if (fd >= max) + max = fd + 1; + for (i = 0; i < 3; i++) { + if (fdw->events & idx2events[i]) + rb_fd_set(fd, fdset_init_once(&fds[i], &fdsets[i])); + } + } + + if (sigwait_fd >= 0) { + if (sigwait_fd >= max) + max = sigwait_fd + 1; + rb_fd_set(sigwait_fd, fdset_init_once(&fds[0], &fdsets[0])); + } + + if (sd->do_wait && (max > 0 || rb_vm_waitpid_pending(th->vm))) { + rel = iom_timeout_next(&hrt, &iom->c.timers); + } + + for (i = 0; i < 3; i++) + if (fds[i]) rb_fd_resize(max - 1, fds[i]); + + if (sd->do_wait) { + struct iom_blocker cur; + cur.th = th; + list_add_tail(&iom->c.blockers, &cur.bnode); + + BLOCKING_REGION(th, { + if (!RUBY_VM_INTERRUPTED(th->ec)) { + struct timeval tv; + struct timeval *tvp = rb_hrtime2timeval(&tv, rel); + + nr = native_fd_select(max, fds[0], fds[1], fds[2], tvp, th); + } + iom_wait_check_nogvl(th, &err, nr, sigwait_fd); + }, sigwait_fd >= 0 ? ubf_sigwait : ubf_select, th, TRUE); + list_del(&cur.bnode); + } + else { + /* + * don't release GVL for "quick" scan, this maintains determinism + * with cooperative threads + */ + struct timeval zero = { 0, 0 }; + + nr = native_fd_select(max, fds[0], fds[1], fds[2], &zero, th); + iom_wait_check_nogvl(th, &err, nr, sigwait_fd); + } + + /* crap, somebody closed an FD in one of the sets, scan all of it */ + if (nr < 0 && err == EBADF) + nr = INT_MAX; + + if (nr > 0) { + /* rb_thread_fd_select callers */ + list_for_each_safe(&iom->c.fdsets, sw, next, wnode) { + struct iom_fdset_waiter *fsw = fdset_waiter_of(sw); + + if (err == EBADF) { + if (fdset_error_p(fsw)) + iom_waiter_ready(th, &fsw->w); + } + else { + int changes[3]; + + for (i = 0; i < 3; i++) { + changes[i] = fd_intersect(fsw, i, fds[i]); + fsw->ret += changes[i]; + } + if (fsw->ret) { + for (i = 0; i < 3; i++) { + /* clobber the sets we didn't change */ + if (!changes[i] && fsw->in[i]) { + rb_fd_zero(fsw->in[i]); + } + } + iom_waiter_ready(th, &fsw->w); + } + } + } + + /* rb_wait_for_single_fd callers */ + list_for_each_safe(&iom->c.fdws, sw, next, wnode) { + struct iom_fd_waiter *fdw = fd_waiter_of(sw); + int fd = iom_fdw_get_fd(fdw); + + if (err == EBADF && fd >= 0) { + if (iom_fd_pollable(fd) != FD_POLL_OK) + fd = -1; + } + + if (fd < 0) { /* closed */ + fdw->revents = fdw->events; + } + else { + for (i = 0; i < 3; i++) { + if (fds[i] && rb_fd_isset(fd, fds[i])) + fdw->revents |= fdw->events & idx2events[i]; + } + } + + /* got revents? enqueue the waiter to be run! */ + if (fdw->revents) + iom_waiter_ready(th, &fdw->w); + } + + if (sigwait_fd >= 0 && rb_fd_isset(sigwait_fd, fds[0])) + err = check_signals_nogvl(th, sigwait_fd) ? EINTR : 0; + } + + for (i = 0; i < 3; i++) + if (fds[i]) rb_fd_term(fds[i]); + + iom_wait_done(th, sigwait_fd, err, "select", 0, + sd->check_ints ? IOM_CHECK_INTS : 0); +} + +static VALUE +iom_do_select(VALUE ptr) +{ + struct select_do *sd = (struct select_do *)ptr; + rb_thread_t *th = sd->th; + rb_iom_t *iom; + int retry; + rb_serial_t select_start_gen; + + if (rb_tl_schedule_ready_p(th->ec)) + return Qtrue; + + iom = iom_events_pending(th->vm) ? iom_get(th) : th->vm->iom; + if (!iom) + return Qfalse; + + do { + /* + * select() setup cost is high, so minimize calls to it and + * check other things, instead: + */ + if (sd->check_ints) + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + if (rb_tl_schedule_ready_p(th->ec)) + return Qtrue; + + select_start_gen = iom->select_gen; + iom_select_wait(sd); + /* + * if somebody changed iom->c.fdws while we were inside select, + * rerun it to avoid a race condition. + * This is not necessary for epoll or kqueue because the kernel + * is constantly monitoring the watched set. + */ + retry = sd->check_ints && iom->select_gen != select_start_gen; + } while (retry); + return Qfalse; +} + +void +rb_iom_schedule(rb_thread_t *th, const rb_hrtime_t *rel) +{ + if (rb_tl_sched_p(th->ec)) { + rb_iom_ping_events(th); + iom_tl_pass(th->ec); + } + else if (!rb_tl_schedule_ready_p(th->ec)) { + struct select_do sd; + + sd.th = th; + sd.check_ints = 1; + sd.do_wait = 1; + iom_timeout_do(th, rel, iom_do_select, (VALUE)&sd); + } + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); +} + +/* + * waits for @exp == @addr for @timeout duration + * FIXME: this code path sometimes causes timeouts and long tests + * with webrick and test/net in "make test-all" + */ +static void +do_select_wait(rb_execution_context_t *ec, const void *exp, + void *uaddr, size_t uaddrlen, const rb_hrtime_t *timeout) +{ + rb_hrtime_t end; + struct select_do sd; + rb_thread_t *th = rb_ec_thread_ptr(ec); + + sd.th = th; + sd.check_ints = 1; + + if (timeout) { + end = rb_hrtime_add(rb_hrtime_now(), *timeout); + } + + /* fdw->revents or fsw->ret == 0 */ + while (!memcmp(uaddr, exp, uaddrlen)) { + sd.do_wait = 1; + + if (rb_tl_schedule_ready_p(ec)) { + int runq_busy = !list_empty(&th->runq); + + if ((timeout && rb_hrtime_now() > end) || runq_busy) { + sd.do_wait = 0; + } + } + + iom_select_wait(&sd); + if (timeout && rb_hrtime_now() > end) + break; + RUBY_VM_CHECK_INTS_BLOCKING(ec); + } +} + +static void +do_tl_wait(rb_execution_context_t *ec) +{ + rb_iom_t *iom = iom_get(rb_ec_thread_ptr(ec)); + + if (!list_empty(&iom->c.blockers)) + iom_blockers_notify(iom, 1); + + if (!rb_tl_schedule_ready_p(ec)) + VM_UNREACHABLE(do_tl_wait); +} + +static VALUE +fdw_schedule(VALUE ptr) +{ + struct iom_fd_waiter *fdw = (struct iom_fd_waiter *)ptr; + rb_execution_context_t *ec = fdw->w.sw.ec; + + if (rb_tl_sched_p(ec)) { + do_tl_wait(ec); + } + else { + static const short z; + + VM_ASSERT(sizeof(z) == sizeof(fdw->revents)); + do_select_wait(ec, &z, &fdw->revents, sizeof(z), fdw->rel); + } + + return (VALUE)fdw->revents; /* may be zero if timed out */ +} + +/* only epoll takes advantage of this (kqueue may for portability bugs) */ +int +rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, + const rb_hrtime_t *rel) +{ + return rb_iom_waitfd(th, &fptr->fd, events, rel); +} + +static VALUE +fdw_ensure(VALUE ptr) +{ + struct iom_fd_waiter *fdw = (struct iom_fd_waiter *)ptr; + + iom_waiter_ensure(&fdw->w); + + return Qfalse; +} + +static inline VALUE +fdw_wait(VALUE fdw) +{ + return rb_ensure(fdw_schedule, fdw, fdw_ensure, fdw); +} + +int +rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, const rb_hrtime_t *rel) +{ + rb_iom_t *iom = iom_get(th); + struct iom_fd_waiter *fdw; + struct stat sb; + VALUE tmp; + + if (*fdp < 0) { /* TODO: behave like poll(2) and sleep? */ + return 0; + } + + /* detect closed FDs */ + if (fstat(*fdp, &sb) < 0) + rb_sys_fail("rb_iom_waitfd (fstat)"); + + fdw = RB_FIBER_ALLOCA(tmp, struct iom_fd_waiter); + iom_fdw_init(fdw, fdp, events, rel); + iom_waiter_add(th, &iom->c.fdws, &fdw->w); + iom->select_gen++; + + return iom_wait_fd_result(th, rel, fdw_wait, (VALUE)fdw, tmp); +} + +static VALUE +fsw_ensure(VALUE ptr) +{ + struct iom_fdset_waiter *fsw = (struct iom_fdset_waiter *)ptr; + int errnum = fsw->errnum; + size_t i; + + iom_waiter_ensure(&fsw->w); + for (i = 0; i < 3; i++) { +#if !FIBER_USE_NATIVE + if (fsw->in[i]) { + rb_fd_dup(fsw->onstack_dst[i], fsw->in[i]); + rb_fd_term(fsw->in[i]); + xfree(fsw->in[i]); + } +#endif + } + + if (errnum) errno = errnum; + return Qfalse; +} + +static VALUE +fsw_schedule(VALUE ptr) +{ + struct iom_fdset_waiter *fsw = (struct iom_fdset_waiter *)ptr; + rb_execution_context_t *ec = fsw->w.sw.ec; + + if (rb_tl_sched_p(ec)) { + do_tl_wait(ec); + } + else { + static const int z; + + VM_ASSERT(sizeof(z) == sizeof(fsw->ret)); + do_select_wait(ec, &z, &fsw->ret, sizeof(z), fsw->rel); + } + + return (VALUE)fsw->ret; +} + +static inline VALUE +fsw_wait(VALUE fsw) +{ + return rb_ensure(fsw_schedule, fsw, fsw_ensure, fsw); +} + +int +rb_iom_select(rb_thread_t *th, int maxfd, + rb_fdset_t *r, rb_fdset_t *w, rb_fdset_t *e, + const rb_hrtime_t *rel) +{ + rb_iom_t *iom = iom_get(th); + VALUE tmp; + struct iom_fdset_waiter *fsw; + int ret; + + fsw = RB_FIBER_ALLOCA(tmp, struct iom_fdset_waiter); + iom_fdset_waiter_init(th, fsw, maxfd, r, w, e, rel); + iom_waiter_add(th, &iom->c.fdsets, &fsw->w); + iom->select_gen++; + ret = (int)iom_timeout_do(th, rel, fsw_wait, (VALUE)fsw); + RB_FIBER_ALLOCA_END(tmp); + return ret; +} + +void +rb_iom_destroy(rb_vm_t *vm) +{ + if (vm->iom) { + xfree(vm->iom); + vm->iom = 0; + } +} + +/* used by thread.c::rb_thread_atfork */ +#if defined(HAVE_WORKING_FORK) +static void +rb_iom_atfork_child(rb_thread_t *th) +{ + rb_iom_destroy(th->vm); +} +#endif /* defined(HAVE_WORKING_FORK) */ + +/* used by thread_pthread.c */ +static int +rb_iom_reserved_fd(int fd) +{ + return 0; +} + +void +rb_iom_mark(rb_iom_t *iom) +{ + /* nothing for select() */ +} + +static int +rb_iom_fd(const rb_thread_t *th) +{ + return -1; +} + +static int +rb_iom_schedule_p(const rb_thread_t *th) +{ + rb_iom_t *iom = th->vm->iom; + + if (rb_tl_sched_p(th->ec)) return 1; + + if (iom && list_empty(&iom->c.blockers) && iom_events_pending(th->vm)) { + /* + * FIXME: we sometimes miss wakeups if we use select for waiting + * on native threads unfortunately: + */ + return 1; + } + return 0; +} + +static void +rb_iom_ping_events(rb_thread_t *th) +{ + rb_iom_t *iom = th->vm->iom; + + if (iom && list_empty(&iom->c.blockers) && iom_events_pending(th->vm)) { + struct select_do sd; + sd.th = th; + sd.check_ints = sd.do_wait = 0; + iom_select_wait(&sd); + } +} + +#include "iom_common.h" diff --git a/process.c b/process.c index 2a8dc5dcda..085d56f80b 100644 --- a/process.c +++ b/process.c @@ -17,6 +17,8 @@ #include "ruby/thread.h" #include "ruby/util.h" #include "vm_core.h" +#include "iom.h" +#include "fiber.h" #include "hrtime.h" #include @@ -931,20 +933,20 @@ void rb_native_mutex_unlock(rb_nativethread_lock_t *); void rb_native_cond_signal(rb_nativethread_cond_t *); void rb_native_cond_wait(rb_nativethread_cond_t *, rb_nativethread_lock_t *); int rb_sigwait_fd_get(const rb_thread_t *); -void rb_sigwait_sleep(const rb_thread_t *, int fd, const rb_hrtime_t *); +int rb_sigwait_sleep(const rb_thread_t *, int sigwait_fd, int iom_fd, + const rb_hrtime_t *); void rb_sigwait_fd_put(const rb_thread_t *, int fd); -void rb_thread_sleep_interruptible(void); static int -waitpid_signal(struct rb_sched_waiter *sw) +waitpid_notify(struct rb_sched_waiter *sw, int enqueue_tl) { + struct waitpid_state *w = container_of(sw, struct waitpid_state, sw); + if (sw->ec) { /* rb_waitpid */ - rb_threadptr_interrupt(rb_ec_thread_ptr(sw->ec)); + ruby_iom_wake_signal(sw, enqueue_tl); return TRUE; } else { /* ruby_waitpid_locked */ - struct waitpid_state *w = container_of(sw, struct waitpid_state, sw); - if (w->cond) { rb_native_cond_signal(w->cond); return TRUE; @@ -964,10 +966,10 @@ sigwait_fd_migrate_sleeper(rb_vm_t *vm) struct rb_sched_waiter *sw = 0; list_for_each(&vm->waiting_pids, sw, wnode) { - if (waitpid_signal(sw)) return; + if (waitpid_notify(sw, FALSE)) return; } list_for_each(&vm->waiting_grps, sw, wnode) { - if (waitpid_signal(sw)) return; + if (waitpid_notify(sw, FALSE)) return; } } @@ -979,8 +981,24 @@ rb_sigwait_fd_migrate(rb_vm_t *vm) rb_native_mutex_unlock(&vm->waitpid_lock); } +int +rb_vm_waitpid_pending(rb_vm_t *vm) +{ + int ret; + + rb_native_mutex_lock(&vm->waitpid_lock); + ret = (!list_empty(&vm->waiting_pids) || !list_empty(&vm->waiting_grps)); + rb_native_mutex_unlock(&vm->waitpid_lock); + + return ret; +} + #if RUBY_SIGCHLD extern volatile unsigned int ruby_nocldwait; /* signal.c */ +#else +# define ruby_nocldwait 0 +#endif + /* called by timer thread or thread which acquired sigwait_fd */ static void waitpid_each(struct list_head *head) @@ -991,22 +1009,20 @@ waitpid_each(struct list_head *head) struct waitpid_state *w = container_of(sw, struct waitpid_state, sw); rb_pid_t ret = do_waitpid(w->pid, &w->status, w->options | WNOHANG); - if (!ret) continue; - if (ret == -1) w->errnum = errno; + if (ret) { + if (ret == -1) + w->errnum = errno; - w->ret = ret; - list_del_init(&sw->wnode); - waitpid_signal(sw); + w->ret = ret; + list_del_init(&sw->wnode); + waitpid_notify(sw, TRUE); + } } } -#else -# define ruby_nocldwait 0 -#endif void ruby_waitpid_all(rb_vm_t *vm) { -#if RUBY_SIGCHLD rb_native_mutex_lock(&vm->waitpid_lock); waitpid_each(&vm->waiting_pids); if (list_empty(&vm->waiting_pids)) { @@ -1018,7 +1034,6 @@ ruby_waitpid_all(rb_vm_t *vm) ; /* keep looping */ } rb_native_mutex_unlock(&vm->waitpid_lock); -#endif } static void @@ -1080,7 +1095,7 @@ ruby_waitpid_locked(rb_vm_t *vm, rb_pid_t pid, int *status, int options, if (sigwait_fd >= 0) { w.cond = 0; rb_native_mutex_unlock(&vm->waitpid_lock); - rb_sigwait_sleep(0, sigwait_fd, sigwait_sleep_time()); + rb_sigwait_sleep(0, sigwait_fd, -1, sigwait_sleep_time()); rb_native_mutex_lock(&vm->waitpid_lock); } else { @@ -1108,8 +1123,9 @@ waitpid_sleep(VALUE x) { struct waitpid_state *w = (struct waitpid_state *)x; + rb_thread_check_ints(); while (!w->ret) { - rb_thread_sleep_interruptible(); + rb_sched_waiter_sleep(&w->sw, 0); } return Qfalse; @@ -1135,9 +1151,10 @@ waitpid_cleanup(VALUE x) return Qfalse; } -static void -waitpid_wait(struct waitpid_state *w) +static VALUE +waitpid_wait(VALUE p) { + struct waitpid_state *w = (struct waitpid_state *)p; rb_vm_t *vm = rb_ec_vm_ptr(w->sw.ec); struct list_head *head; @@ -1149,7 +1166,6 @@ waitpid_wait(struct waitpid_state *w) rb_native_mutex_lock(&vm->waitpid_lock); head = waitpid_sleep_prepare(w, vm); if (head) { - w->cond = 0; /* order matters, favor specified PIDs rather than -1 or 0 */ list_add(head, &w->sw.wnode); } @@ -1158,6 +1174,8 @@ waitpid_wait(struct waitpid_state *w) if (head) { rb_ensure(waitpid_sleep, (VALUE)w, waitpid_cleanup, (VALUE)w); } + + return Qfalse; } static void * @@ -1190,32 +1208,35 @@ waitpid_no_SIGCHLD(struct waitpid_state *w) rb_pid_t rb_waitpid(rb_pid_t pid, int *st, int flags) { - struct waitpid_state w; - - waitpid_state_init(&w, pid, flags); - w.sw.ec = GET_EC(); + VALUE tmp; + struct waitpid_state *w = RB_FIBER_ALLOCA(tmp, struct waitpid_state); + rb_pid_t ret; + w->sw.ec = GET_EC(); + waitpid_state_init(w, pid, flags); if (WAITPID_USE_SIGCHLD) { - waitpid_wait(&w); + waitpid_wait((VALUE)w); } else { - waitpid_no_SIGCHLD(&w); + waitpid_no_SIGCHLD(w); } - if (st) *st = w.status; - if (w.ret == -1) { - errno = w.errnum; + if (st) *st = w->status; + if (w->ret == -1) { + errno = w->errnum; } - else if (w.ret > 0) { + else if (w->ret > 0) { if (ruby_nocldwait) { - w.ret = -1; + w->ret = -1; errno = ECHILD; } else { - rb_last_status_set(w.status, w.ret); + rb_last_status_set(w->status, w->ret); } } - return w.ret; + ret = w->ret; + RB_FIBER_ALLOCA_END(tmp); + return ret; } @@ -3886,7 +3907,7 @@ retry_fork_async_signal_safe(int *status, int *ep, prefork(); disable_child_handler_before_fork(&old); if (w && WAITPID_USE_SIGCHLD) { - rb_native_mutex_lock(&rb_ec_vm_ptr(w->ec)->waitpid_lock); + rb_native_mutex_lock(&rb_ec_vm_ptr(w->sw.ec)->waitpid_lock); } #ifdef HAVE_WORKING_VFORK if (!has_privilege()) @@ -3913,11 +3934,13 @@ retry_fork_async_signal_safe(int *status, int *ep, } err = errno; if (w && WAITPID_USE_SIGCHLD) { + rb_vm_t *vm = rb_ec_vm_ptr(w->sw.ec); + if (pid > 0) { w->pid = pid; - list_add(&rb_ec_vm_ptr(w->ec)->waiting_pids, &w->wnode); + list_add(&vm->waiting_pids, &w->sw.wnode); } - rb_native_mutex_unlock(&rb_ec_vm_ptr(w->ec)->waitpid_lock); + rb_native_mutex_unlock(&vm->waitpid_lock); } disable_child_handler_fork_parent(&old); if (0 < pid) /* fork succeed, parent process */ @@ -4392,19 +4415,17 @@ rb_spawn(int argc, const VALUE *argv) static VALUE rb_f_system(int argc, VALUE *argv) { - /* - * n.b. using alloca for now to simplify future Thread::Light code - * when we need to use malloc for non-native Fiber - */ - struct waitpid_state *w = alloca(sizeof(struct waitpid_state)); + VALUE execarg_obj, waitpid_obj; + struct waitpid_state *w; rb_pid_t pid; /* may be different from waitpid_state.pid on exec failure */ - VALUE execarg_obj; struct rb_execarg *eargp; int exec_errnum; + VALUE ret = Qtrue; execarg_obj = rb_execarg_new(argc, argv, TRUE, TRUE); TypedData_Get_Struct(execarg_obj, struct rb_execarg, &exec_arg_data_type, eargp); - w->ec = GET_EC(); + w = RB_FIBER_ALLOCA(waitpid_obj, struct waitpid_state); + w->sw.ec = GET_EC(); waitpid_state_init(w, 0, 0); eargp->waitpid_state = w; pid = rb_execarg_spawn(execarg_obj, 0, 0); @@ -4430,21 +4451,22 @@ rb_f_system(int argc, VALUE *argv) rb_syserr_fail_str(err, command); } else { - return Qnil; + ret = Qnil; } } - if (w->status == EXIT_SUCCESS) return Qtrue; - if (eargp->exception) { - VALUE command = eargp->invoke.sh.shell_script; - VALUE str = rb_str_new_cstr("Command failed with"); - rb_str_cat_cstr(pst_message_status(str, w->status), ": "); - rb_str_append(str, command); - RB_GC_GUARD(execarg_obj); - rb_exc_raise(rb_exc_new_str(rb_eRuntimeError, str)); - } - else { - return Qfalse; + else if (w->status != EXIT_SUCCESS) { + if (eargp->exception) { + VALUE command = eargp->invoke.sh.shell_script; + VALUE str = rb_str_new_cstr("Command failed with"); + rb_str_cat_cstr(pst_message_status(str, w->status), ": "); + rb_str_append(str, command); + RB_GC_GUARD(execarg_obj); + rb_exc_raise(rb_exc_new_str(rb_eRuntimeError, str)); + } + ret = Qfalse; } + RB_FIBER_ALLOCA_END(waitpid_obj); + return ret; } /* diff --git a/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb b/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb index 777e9d14dd..2d841b26d5 100644 --- a/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb +++ b/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb @@ -1,5 +1,6 @@ # frozen_string_literal: false require 'test/unit' +require 'socket' class TestWaitForSingleFD < Test::Unit::TestCase require '-test-/wait_for_single_fd' @@ -46,4 +47,72 @@ def test_wait_for_kqueue skip 'no kqueue' unless IO.respond_to?(:kqueue_test_wait) assert_equal RB_WAITFD_IN, IO.kqueue_test_wait end + + def test_tl_start_rw + %w(LDFLAGS LIBS DLDFLAGS).each do |k| + next if RbConfig::CONFIG[k] !~ /-lkqueue\b/ + skip "FIXME libkqueue is buggy on this test" + end + + host = '127.0.0.1' + TCPServer.open(host, 0) do |srv| + nbc = Socket.new(:INET, :STREAM, 0) + con = TCPSocket.new(host, srv.addr[1]) + acc = srv.accept + begin + # 2 events, but only one triggers: + flags = RB_WAITFD_IN|RB_WAITFD_OUT + f = Thread::Light.new { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal RB_WAITFD_OUT, f.value + + # trigger readability + con.write('a') + flags = RB_WAITFD_IN + f = Thread::Light.new { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + # ensure both events trigger: + flags = RB_WAITFD_IN|RB_WAITFD_OUT + f = Thread::Light.new { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + # ensure the EVFILT_WRITE (kqueue) is disabled after previous test: + flags = RB_WAITFD_IN + f = Thread::Light.new { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + skip 'missing MSG_OOB' unless Socket.const_defined?(:MSG_OOB) + + # make sure urgent data works with kqueue: + assert_equal 3, con.send('123', Socket::MSG_OOB) + flags = RB_WAITFD_PRI + f = Thread::Light.new { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + flags = RB_WAITFD_IN|RB_WAITFD_OUT|RB_WAITFD_PRI + f = Thread::Light.new { IO.wait_for_single_fd(acc.fileno, flags, nil) } + if RUBY_PLATFORM =~ /linux/ + assert_equal flags, f.value + else + kq_possible = [ RB_WAITFD_IN|RB_WAITFD_OUT, flags ] + assert_include kq_possible, f.value + end + + nbc.connect_nonblock(srv.local_address, exception: false) + fd = nbc.fileno + flags = RB_WAITFD_IN|RB_WAITFD_OUT + f1 = Thread::Light.new { IO.wait_for_single_fd(fd, flags, nil) } + f2 = Thread::Light.new { IO.wait_for_single_fd(fd, RB_WAITFD_IN, nil) } + assert_equal RB_WAITFD_OUT, f1.value + abc = srv.accept + abc.write('!') + assert_equal RB_WAITFD_IN, f2.value + ensure + con.close + acc.close + nbc.close + abc&.close + end + end + end end diff --git a/test/lib/leakchecker.rb b/test/lib/leakchecker.rb index af9200bf77..625b07992c 100644 --- a/test/lib/leakchecker.rb +++ b/test/lib/leakchecker.rb @@ -41,6 +41,15 @@ def find_fds if d.respond_to? :fileno a -= [d.fileno] end + + # only place we use epoll is internally, do not count it against us + a.delete_if do |fd| + begin + File.readlink("#{fd_dir}/#{fd}").match?(/\beventpoll\b/) + rescue + false + end + end a } fds.sort diff --git a/test/net/http/test_http.rb b/test/net/http/test_http.rb index 389b6fa42e..f12dd0d185 100644 --- a/test/net/http/test_http.rb +++ b/test/net/http/test_http.rb @@ -1080,7 +1080,7 @@ def test_server_closed_connection_auto_reconnect http.keep_alive_timeout = 5 assert_kind_of Net::HTTPResponse, res assert_kind_of String, res.body - sleep 1.5 + sleep 1.9 assert_nothing_raised { # Net::HTTP should detect the closed connection before attempting the # request, since post requests cannot be retried. diff --git a/test/ruby/test_thread_light.rb b/test/ruby/test_thread_light.rb new file mode 100644 index 0000000000..6f3c9db652 --- /dev/null +++ b/test/ruby/test_thread_light.rb @@ -0,0 +1,828 @@ +# frozen_string_literal: true +require 'test/unit' +require 'fiber' +require 'io/nonblock' +require 'io/wait' +require 'socket' +require 'tempfile' + +class TestThreadLight < Test::Unit::TestCase + def require_nonblock + IO.method_defined?(:nonblock?) or skip 'pipe does not support non-blocking' + end + + # libkqueue is good for compile-testing, but can't handle corner cases + # like native kqueue does + def skip_libkqueue + %w(LDFLAGS LIBS DLDFLAGS).each do |k| + next if RbConfig::CONFIG[k] !~ /-lkqueue\b/ + skip "FIXME libkqueue is buggy on this test" + end + end + + def test_value + nr = 0 + f = Thread::Light.new { nr += 1 } + assert_equal 1, f.value, 'value returns as expected' + assert_equal 1, f.value, 'value is idempotent' + end + + def test_join + nr = 0 + f = Thread::Light.new { nr += 1 } + assert_equal f, f.join + assert_equal 1, f.value + assert_equal f, f.join, 'join is idempotent' + assert_equal f, f.join(10), 'join is idempotent w/ timeout' + assert_equal 1, f.value, 'value is unchanged after join' + end + + def test_runs_first + ary = [] + f = Thread::Light.new { ary << 1 } + ary << 2 + assert_equal [1, 2], f.value + end + + def test_parameter_passing + ary = [] + var = :a + c = Thread::Light.new(var) do |x| + ary << x + Thread.pass + ary << x + end + ary << :b + var = :c # should not affect `x' in Thread::Light + Thread.pass + c.join + assert_equal %i(a b a), ary + end + + def test_select_no_io + tick = 0.2 * rand + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + c = Thread::Light.new { IO.select(nil, nil, nil, tick) } + assert_nil c.value + t1 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + diff = t1 - t0 + assert_operator diff, :>=, tick + end + + def test_io_select_pipe + IO.pipe do |r, w| + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + f1 = Thread::Light.new { IO.select([r], nil, nil, 0.02) } + assert_nil f1.value + assert_nil f1.value + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, 0.02 + assert_operator diff, :<, 0.2 + + skip_libkqueue # FIXME + f2 = Thread::Light.new { IO.select([r], [w]) } + assert_equal [ [], [w], [] ], f2.value + + w.write '.' + f3 = Thread::Light.new { IO.select([r]) } + assert_equal [ [r], [], [] ], f3.value + end + + # native Thread also relies on iom scheduling if vm->iom is initialized + IO.pipe do |r, w| + tick = 0.01 + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + t = Thread.new { IO.select([r], nil, nil, tick) } + assert_nil t.value + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, tick + + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + e = nil + t = Thread.new { IO.select([r]) } + sleep(tick) + c = Thread::Light.new { IO.select(nil, [w]) } # cause spurious wakeup + w.write('.') + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, tick + tv = t.value + assert_equal([[r], [], []], tv) + assert_equal([[],[w],[]], c.value) + end + IO.pipe do |r, w| + t = Thread::Light.new { IO.select([r]) } + Thread.pass until t.stop? + w.close + assert_same t, t.join(0.1) + assert_equal([[r], [], []], t.value) + end + end + + def test_io_wait_pipe + IO.pipe do |r, w| + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + f1 = Thread::Light.new { r.wait_readable(0.02) } + f2 = Thread::Light.new { w.wait_writable } + assert_nil f1.value + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, 0.02 + assert_operator diff, :<, 0.2 + assert_same w, f2.value + skip_libkqueue + w.write '.' + f3 = Thread::Light.new { r.wait_readable } + assert_same r, f3.value + + t1 = Thread.new { r.wait_readable } + assert_same r, t1.value + end + + # native Thread also relies on iom scheduling if vm->iom is initialized + IO.pipe do |r, w| + tick = 0.01 + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + t = Thread.new { r.wait_readable(tick) } + assert_nil t.value + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, tick + + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + e = nil + t = Thread.new { r.wait_readable } + sleep(tick) + c = Thread::Light.new { w.wait_writable } # cause spurious wakeup + w.write('.') + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, tick + tv = t.value + assert_same r, tv + assert_same w, c.value + end + end + + def test_io_wait_hup + skip_libkqueue if RUBY_PLATFORM =~ /linux/ # EPOLLHUP not handled, I guess + IO.pipe do |r, w| + c = Thread::Light.new { r.wait_readable } + assert_nil w.close + assert_same r, c.value + end + end + + def test_copy_stream_wait_read_from_light + require_nonblock + IO.pipe do |r,w| + r.nonblock = true + c = Thread::Light.new { IO.copy_stream(r, IO::NULL) } + assert_predicate c, :stop? + assert_nil c.join(0.001) + w.write 'hi' # wakes up `c' + w.close + assert_equal 2, c.value + end + end + + def test_copy_stream_wait_read_from_native + require_nonblock + skip 'need IO#nread' unless IO.method_defined?(:nread) + IO.pipe do |r,w| + r.nonblock = true + c = Thread::Light.new do + Thread.pass + w.write('hi') + w.close + end + assert_not_predicate c, :stop? + assert_equal 0, r.nread, 'nothing written, yet' + # this resumes for w.write('hi'): + assert_equal 2, IO.copy_stream(r, IO::NULL) + end + end + + def test_copy_stream_wait_write_from_native + require_nonblock + skip 'need /dev/zero' unless File.readable?('/dev/zero') + IO.pipe do |r,w| + r.nonblock = w.nonblock = true + copying = false + c = Thread::Light.new do + Thread.pass + copying = true + IO.copy_stream(r, IO::NULL) + end + assert_not_predicate c, :stop? + refute copying + n = 1024 * 1024 + IO.copy_stream('/dev/zero', w, n) + w.close + assert_equal n, c.value + end + end + + def test_copy_stream_wait_write_from_light + require_nonblock + skip 'need /dev/zero' unless File.readable?('/dev/zero') + IO.pipe do |r,w| + r.nonblock = w.nonblock = true + n = 1024 * 1024 + c = Thread::Light.new do + nwrite = IO.copy_stream('/dev/zero', w, n) + w.close + nwrite + end + assert_predicate c, :stop? + assert_nil c.join(0.001) + assert_equal n, IO.copy_stream(r, IO::NULL) + assert_equal n, c.value + end + end + + def test_io_select_hup + skip_libkqueue if RUBY_PLATFORM =~ /linux/ # EPOLLHUP not handled, I guess + IO.pipe do |r, w| + c = Thread::Light.new { IO.select([r]) } + assert_nil w.close + assert_equal [[r],[],[]], c.value + end + end + + def test_io_wait_read + require_nonblock + skip_libkqueue + tick = 0.1 # 100ms, TIME_QUANTUM_USEC in thread_pthread.c + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + rdr = Thread::Light.new { r.read(1) } + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + assert_nil rdr.join(tick), 'join returns nil on timeout' + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, tick, 'Thread::Light still running' + w.write('a') + assert_equal 'a', rdr.value, 'finished' + + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + rwait = Thread::Light.new { r.wait_readable(tick) } + assert_nil rwait.value, 'IO returned no events after timeout' + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, tick, 'Thread::Light waited for timeout' + end + end + + # make sure we do not create extra FDs (from epoll/kqueue) + # for operations which do not need it + def xtest_fd_creation_and_fork + require_nonblock + assert_separately(%w(-r io/nonblock), <<~'end;') # do + fdrange = (3..64) + reserved = -'The given fd is not accessible because RubyVM reserves it' + fdstats = lambda do + stats = Hash.new(0) + fdrange.map do |fd| + begin + IO.for_fd(fd, autoclose: false) + stats[:good] += 1 + rescue Errno::EBADF # ignore + rescue ArgumentError => e + raise if e.message != reserved + stats[:reserved] += 1 + end + end + stats.freeze + end + + before = fdstats.call + Thread::Light.new { :fib }.join + assert_equal before, fdstats.call, + 'Thread::Light.new + Thread::Light#join does not create new FD' + + # now, epoll/kqueue implementations create FDs, ensure they're reserved + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + + before_io = fdstats.call + assert_equal before[:reserved], before_io[:reserved], + 'no reserved FDs created before I/O attempted' + + f1 = Thread::Light.new { r.read(1) } + after_io = fdstats.call + assert_equal before_io[:good], after_io[:good], + 'no change in unreserved FDs during I/O wait' + + w.write('!') + assert_equal '!', f1.value, 'auto-Thread::Light IO#read works' + assert_operator before_io[:reserved], :<=, after_io[:reserved], + 'a reserved FD may be created on some implementations' + + # ensure we do not share epoll/kqueue FDs across fork + if Process.respond_to?(:fork) && Process.respond_to?(:waitpid2) + pid = fork do + after_fork = fdstats.call + w.write(after_fork.inspect == before_io.inspect ? '1' : '0') + + # ensure auto-Thread::Light works properly after forking + IO.pipe do |a, b| + a.nonblock = b.nonblock = true + f1 = Thread::Light.new { a.read(1) } + f2 = Thread::Light.new { b.write('!') } + assert_equal 1, f2.value + w.write(f1.value == '!' ? '!' : '?') + end + + exit!(0) + end + assert_equal '1', r.read(1), 'reserved FD cleared after fork' + assert_equal '!', r.read(1), 'auto-Thread::Light works after forking' + _, status = Process.waitpid2(pid) + assert_predicate status, :success?, 'forked child exited properly' + end + end + end; + end + + def test_waitpid + skip 'fork required' unless Process.respond_to?(:fork) + skip 'SIGCHLD required' unless Signal.list['CHLD'] + + assert_separately(%w(-r io/nonblock), <<~'end;') # do + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + pid = fork do + sleep 0.1 + exit!(0) + end + f = Thread::Light.new { Process.waitpid2(pid) } + wpid, st = f.value + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, 0.1, 'child did not return immediately' + assert_equal pid, wpid, 'waited pid matches' + assert_predicate st, :success?, 'process exited successfully' + + IO.pipe do |r, w| + # "blocking" Process.waitpid2 takes precedence over trap(:CHLD) + waited = [] + chld_called = false + trap(:CHLD) { chld_called = true; waited.concat(Process.waitall) } + pid = fork do + r.read(1) + exit!(0) + end + assert_nil Process.waitpid2(pid, Process::WNOHANG), 'child is waiting' + assert_nil(Thread::Light.new do + Process.waitpid2(pid, Process::WNOHANG) + end.value, 'WNOHANG works normally in Thread::Light') + f = Thread::Light.new { Process.waitpid2(pid) } + assert_equal 1, w.write('.'), 'woke child' + wpid, st = f.value + assert_equal pid, wpid, 'waited pid matches' + assert_predicate st, :success?, 'child exited successfully' + assert_empty waited, 'waitpid had predence' + assert chld_called, 'CHLD handler got called anyways' + + chld_called = false + [ 'DEFAULT', 'SIG_DFL', 'SYSTEM_DEFAULT' ].each do |handler| + trap(:CHLD, handler) + pid = fork do + r.read(1) + exit!(0) + end + t = "trap(:CHLD, #{handler.inspect})" + f = Thread::Light.new { Process.waitpid2(pid) } + assert_equal 1, w.write('.'), "woke child for #{t}" + wpid, st = f.value + assert_predicate st, :success?, "child exited successfully for #{t}" + assert_equal wpid, pid, "return value correct for #{t}" + assert_equal false, chld_called, + "old CHLD handler did not fire for #{t}" + end + end + end; + end + + def test_cpu_usage_io + disabled = GC.disable + skip if defined?(RubyVM::MJIT) && RubyVM::MJIT.enabled? + require_nonblock + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + t = fv = wr = nil + + # select() requires higher CPU% :< + assert_cpu_usage_low(pct: 0.50) do + t = Thread.new do + Thread::Light.new do + r.read(1) + end.value + end + wr = Thread.new do + sleep 0.5 + w.write('..') + end + fv = Thread::Light.new do + r.read(1) + end.value + end + assert_equal '.', fv + assert_equal '.', t.value + wr.join + end + ensure + GC.enable unless disabled + end + + def test_cpu_usage_waitpid + disabled = GC.disable + skip if defined?(RubyVM::MJIT) && RubyVM::MJIT.enabled? + thrs = [] + pid = spawn(*%W(#{EnvUtil.rubybin} --disable=gems -e #{'sleep 0.5'})) + 2.times do + thrs << Thread.new { Thread::Light.new { Process.waitall }.value } + end + assert_cpu_usage_low(pct: 0.44) do + thrs.map!(&:value) + end + ensure + GC.enable unless disabled + end + + # As usual, cannot resume/run Thread::Lights across threads, but the + # scheduler works across threads + def test_cross_thread_schedule + require_nonblock + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + t = Thread.new { Thread::Light.new { r.read(1) }.value } + assert_nil t.join(0.1) + f = Thread::Light.new { w.write('a') } + assert_equal 'a', t.value + assert_equal 1, f.value + end + end + + # tricky for kqueue and epoll implementations + def test_multi_readers + skip_libkqueue if RUBY_PLATFORM =~ /linux/ + require_nonblock + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + nr = 30 + fibs = nr.times.map { Thread::Light.new { r.read(1) } } + fibs.each { |f| assert_nil f.join(0.001) } + + exp = nr.times.map { -'a' }.freeze + w.write(exp.join) + assert_equal exp, fibs.map(&:value); + end + end + + def test_unix_rw + unless defined?(UNIXSocket) && UNIXSocket.respond_to?(:pair) + skip 'no UNIXSocket.pair' + end + + # at least epoll emulation of kqueue cannot deal with simultaneous + # watching the same FD on both EVFILT_READ and EVFILT_WRITE + skip_libkqueue + + # ensure simultaneous waits on the same FD works + UNIXSocket.pair do |a, b| + assert_predicate a, :wait_writable + f0 = Thread::Light.new { a.wait_readable } # cold + f9 = Thread::Light.new { a.wait_writable } # already armed + assert_equal a, f9.value + + f4 = Thread::Light.new { a.wait_readable } + b.write('.') + f5 = Thread::Light.new { a.wait_writable } + f6 = Thread::Light.new { a.wait_readable } + assert_equal a, f4.value + assert_equal a, f5.value + assert_equal a, f6.value + assert_equal a, f0.value + end + end + + def test_reg_file_wait_readable + skip_libkqueue + File.open(__FILE__) do |fp| + t = Thread::Light.new { fp.wait_readable } + assert_same fp, t.value + end + end + + def test_reg_file_select_readable + skip_libkqueue + File.open(__FILE__) do |fp| + t = Thread::Light.new { IO.select([fp]) } + assert_equal [[fp],[],[]], t.value + end + end + + def test_reg_file_wait_writable + skip_libkqueue + Dir.mktmpdir('wait_writable') do |dir| + File.open("#{dir}/reg", "w") do |fp| + t = Thread::Light.new { fp.wait_writable } + assert_same fp, t.value + end + end + end + + def test_reg_file_select_writable + skip_libkqueue + Dir.mktmpdir('select_writable') do |dir| + File.open("#{dir}/reg", "w") do |fp| + t = Thread::Light.new { IO.select(nil, [fp]) } + assert_equal [[],[fp],[]], t.value + end + end + end + + def test_tl_local_storage + t = Thread::Light.new do + assert_equal(:a, Thread.current[:k] = :a) + x = Thread.current[:k] + Thread.pass # allow root to set t[:k] = :b + Thread.current[:k] + end + assert_equal :a, t[:k] + assert_equal :a, t.fetch(:k) + t[:k] = :b + assert_equal :b, t.value + + assert_equal(false, t.key?(:foo)) + t["foo"] = "foo" + assert_equal %i(k foo), t.keys + assert_same t['foo'], t[:foo] + + x = nil + assert_equal("foo", t.fetch(:foo, 0)) + assert_equal("foo", t.fetch(:foo) {x = true}) + assert_nil(x) + assert_equal("foo", t.fetch("foo", 0)) + assert_equal("foo", t.fetch("foo") {x = true}) + assert_nil(x) + + x = nil + assert_equal(0, t.fetch(:qux, 0)) + assert_equal(1, t.fetch(:qux) {x = 1}) + assert_equal(1, x) + assert_equal(2, t.fetch("qux", 2)) + assert_equal(3, t.fetch("qux") {x = 3}) + assert_equal(3, x) + + e = assert_raise(KeyError) {t.fetch(:qux)} + assert_equal(:qux, e.key) + assert_equal(t, e.receiver) + + # freeze other thread + t = Thread::Light.new { Thread.pass } + t.freeze + assert_raise(FrozenError) { t[:foo] = :baz } + assert_same t, t.join + + # XXX not sure how Thread.current should behave, here; can Thread.current + # return a Thread::Light object? + t = Thread.new do + Thread.current.freeze + c = Thread::Light.new do + begin + Thread.current[:foo] = :baz + rescue => e + e + end + end + c.value + end + assert_kind_of FrozenError, t.value + end + + def test_system + skip 'SIGCHLD required' unless Signal.list['CHLD'] + st = [] + th = Thread::Light.new do + ret = system(*%W(#{EnvUtil.rubybin} --disable=gems -e exit!(false))) + st << $? + ret + end + assert_equal false, th.value + th = Thread::Light.new do + ret = system(*%W(#{EnvUtil.rubybin} --disable=gems -e exit!(true))) + st << $? + ret + end + assert_equal true, th.value + assert_not_predicate st[0], :success? + assert_predicate st[1], :success? + end + + def test_ensure_at_exit + assert_in_out_err([], <<-EOS, %w(ok)) + IO.pipe do |r, _| + Thread::Light.new { + begin + IO.select([r]) + ensure + puts "ok" + end + } + end + EOS + end + + def test_ensure_by_gc_on_file_limit + assert_ruby_status(nil, <<-'EOS') + begin + n = Process.getrlimit(Process::RLIMIT_NOFILE)[0] + sane_max = 64 + Process.setrlimit(Process::RLIMIT_NOFILE, sane_max) if n > sane_max + rescue + end if Process.const_defined?(:RLIMIT_NOFILE) + r, w = IO.pipe + a = b = '0' + begin + Thread::Light.new do + begin + a.succ! + IO.pipe { IO.select([r]) } + ensure + b.succ! + end + end + rescue Errno::EMFILE, Errno::ENFILE, Errno::ENOMEM + warn "a=#{a} b=#{b}" if a != b + exit!(a == b && a.to_i > 4) + end while true + EOS + end + + def test_puts_in_sub_thread + assert_in_out_err([], <<-EOS, %w(ok)) + th = Thread.new do + Thread::Light.new do + begin + IO.pipe { |r,_| IO.select([r]) } + ensure + puts "ok" + end + end + end + Thread.pass until th.stop? + EOS + end + + def test_raise_in_main_thread + assert_normal_exit %q{ + IO.pipe { |r,_| + Thread::Light.new do + begin + IO.select([r]) + ensure + raise + end + end + } + } + end + + def test_raise_in_sub_thread + assert_normal_exit %q{ + IO.pipe { |r,_| + th = Thread.new do + Thread::Light.new do + begin + IO.select([r]) + ensure + raise + end + end + sleep + end + Thread.pass until th.stop? + exit!(0) + } + } + end + + def test_ensure_thread_kill + IO.pipe { |r,_| + q = Queue.new + th = Thread.new do + Thread::Light.new { + begin + IO.select([r]) + ensure + q.push 'ok' + end + } + end + Thread.pass until th.stop? + th.kill + th.join + assert_equal 'ok', q.pop(true) + } + end + + def test_thread_pass_to_root + ary = [] + c = Thread::Light.new do + ary << :a + Thread.pass + ary << :c + end + ary << :b + assert_same c, c.join + assert_equal %i(a b c), ary + end + + def test_thread_pass_between_light + ary = [] + c1 = Thread::Light.new do + ary << :a + Thread.pass + ary << :c + end + c2 = Thread::Light.new do + ary << :b + Thread.pass + ary << :d + end + assert_same c1, c1.join + assert_same c2, c2.join + assert_equal %i(a b c d), ary + end + + def test_tl_alive_stop_status + c = Thread::Light.new { Thread.pass } + assert_equal 'run', c.status + assert_not_predicate c, :stop? + assert_predicate c, :alive? + assert_same c, c.join + assert_same false, c.status + assert_predicate c, :stop? + assert_not_predicate c, :alive? + + IO.pipe do |r, w| + c = Thread::Light.new { r.wait } + assert_equal 'sleep', c.status + assert_predicate c, :stop? + assert_predicate c, :alive? + w.close + assert_same r, Thread::Light.new { r.wait }.value # just to kick `c' + if false # true for Linux epoll or *BSD kevent; not select(2) + assert_not_predicate c, :stop? + assert_predicate c, :alive? + end + assert_same r, c.value + assert_predicate c, :stop? + assert_not_predicate c, :alive? + end + end + + def test_sleep + tick = 0.2 * rand + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + c = Thread::Light.new { sleep(tick) } + assert_kind_of Integer, c.value + t1 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + diff = t1 - t0 + assert_operator diff, :>=, tick + end + + def test_sleep_and_run_local_thread + done = false + c = Thread::Light.new { sleep; done = true } + assert_predicate c, :stop? + assert_same c, c.run + assert done + + done = false + timeout = 60 + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + c = Thread::Light.new { sleep(timeout); done = true } + assert_predicate c, :stop? + assert_same c, c.run + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :<, timeout + end + + def test_run_self + c = nil + c = Thread::Light.new do + Thread.pass + c.run + end + assert_same c, c.value + end + + # XXX not sure about semantics + def test_sleep_and_run_remote_thread + a = Queue.new + th = Thread.new do + c = Thread::Light.new { sleep } + a << c + end + c = a.pop + assert_raise(ThreadError) { c.run } + ensure + th&.kill.join + end +end diff --git a/test/ruby/test_thread_queue.rb b/test/ruby/test_thread_queue.rb index 8cebbbecb4..e3866d47b4 100644 --- a/test/ruby/test_thread_queue.rb +++ b/test/ruby/test_thread_queue.rb @@ -7,6 +7,11 @@ class TestThreadQueue < Test::Unit::TestCase Queue = Thread::Queue SizedQueue = Thread::SizedQueue + def setup + super + @cls = Thread + end + def test_queue_initialized assert_raise(TypeError) { Queue.allocate.push(nil) @@ -32,14 +37,14 @@ def grind(num_threads, num_objects, num_iterations, klass, *args) to_workers = klass.new(*args) workers = (1..num_threads).map { - Thread.new { + @cls.new { while object = to_workers.pop from_workers.push object end } } - Thread.new { + @cls.new { num_iterations.times { num_objects.times { to_workers.push 99 } num_objects.times { from_workers.pop } @@ -73,7 +78,7 @@ def test_sized_queue_assign_max before = q.max q.max.times { q << 1 } - t1 = Thread.new { q << 1 } + t1 = @cls.new { q << 1 } sleep 0.01 until t1.stop? q.max = q.max + 1 assert_equal before + 1, q.max @@ -82,8 +87,9 @@ def test_sized_queue_assign_max end def test_queue_pop_interrupt + skip 'TODO: Thread::Light#kill' if @cls == Thread::Light q = Queue.new - t1 = Thread.new { q.pop } + t1 = @cls.new { q.pop } sleep 0.01 until t1.stop? t1.kill.join assert_equal(0, q.num_waiting) @@ -97,8 +103,9 @@ def test_queue_pop_non_block end def test_sized_queue_pop_interrupt + skip 'TODO: Thread::Light#kill' if @cls == Thread::Light q = SizedQueue.new(1) - t1 = Thread.new { q.pop } + t1 = @cls.new { q.pop } sleep 0.01 until t1.stop? t1.kill.join assert_equal(0, q.num_waiting) @@ -120,15 +127,17 @@ def test_sized_queue_push_interrupt end def test_sized_queue_push_non_block + skip 'TODO: Thread::Light#kill' if @cls == Thread::Light q = SizedQueue.new(1) q.push(1) - t1 = Thread.new { q.push(2) } + t1 = @cls.new { q.push(2) } sleep 0.01 until t1.stop? t1.kill.join assert_equal(0, q.num_waiting) end def test_thr_kill + skip 'TODO Thread::Light#kill' if @cls == Thread::Light bug5343 = '[ruby-core:39634]' Dir.mktmpdir {|d| timeout = 60 @@ -139,7 +148,7 @@ def test_thr_kill open("test_thr_kill_count", "w") {|f| f.puts i } queue = Queue.new r, w = IO.pipe - th = Thread.start { + th = #@cls.start { queue.push(nil) r.read 1 } @@ -172,15 +181,15 @@ def test_sized_queue_clear sq = SizedQueue.new(2) 2.times { sq << 1 } - t1 = Thread.new do + t1 = @cls.new do sq << 1 end - t2 = Thread.new do + t2 = @cls.new do sq << 1 end - t3 = Thread.new do + t3 = @cls.new do Thread.pass sq.clear end @@ -204,7 +213,7 @@ def test_sized_queue_clear_return_value def test_sized_queue_throttle q = SizedQueue.new(1) i = 0 - consumer = Thread.new do + consumer = @cls.new do while q.pop i += 1 Thread.pass @@ -214,7 +223,7 @@ def test_sized_queue_throttle npush = 100 producer = nprod.times.map do - Thread.new do + @cls.new do npush.times { q.push(true) } end end @@ -225,15 +234,16 @@ def test_sized_queue_throttle end def test_queue_thread_raise + skip 'TODO: Thread::Light#raise' if @cls == Thread::Light q = Queue.new - th1 = Thread.new do + th1 = @cls.new do begin q.pop rescue RuntimeError sleep end end - th2 = Thread.new do + th2 = @cls.new do sleep 0.1 q.pop end @@ -303,7 +313,7 @@ def close_wakeup( num_items, num_threads, &qcreate ) # create the Queue q = yield - threads = num_threads.times.map{Thread.new{q.pop}} + threads = num_threads.times.map{@cls.new{q.pop}} num_items.times{|i| q << i} # wait until queue empty @@ -329,9 +339,10 @@ def test_size_queue_close_wakeup end def test_sized_queue_one_closed_interrupt + skip 'TODO: Thread::Light#kill' if @cls == Thread::Light q = SizedQueue.new 1 q << :one - t1 = Thread.new { q << :two } + t1 = @cls.new { q << :two } sleep 0.01 until t1.stop? q.close @@ -348,7 +359,7 @@ def test_empty_non_blocking 3.times{|i| q << i} # these all block cos the queue is full - prod_threads = 4.times.map{|i| Thread.new{q << 3+i}} + prod_threads = 4.times.map{|i| @cls.new{q << 3+i}} sleep 0.01 until prod_threads.all?{|thr| thr.status == 'sleep'} q.close @@ -371,9 +382,10 @@ def test_sized_queue_closed_push_non_blocking end def test_blocked_pushers + skip 'TODO: Thread::Light exception handling semantics' if @cls == Thread::Light q = SizedQueue.new 3 prod_threads = 6.times.map do |i| - thr = Thread.new{ + thr = @cls.new{ Thread.current.report_on_exception = false q << i } @@ -389,7 +401,7 @@ def test_blocked_pushers # more than prod_threads cons_threads = 10.times.map do |i| - thr = Thread.new{q.pop}; thr[:pc] = i; thr + thr = @cls.new{q.pop}; thr[:pc] = i; thr end # values that came from the queue @@ -421,7 +433,7 @@ def test_deny_pushers q = qcreate[] synq = Queue.new prod_threads = 20.times.map do |i| - Thread.new { + @cls.new { synq.pop assert_raise(ClosedQueueError) { q << i @@ -439,8 +451,8 @@ def test_deny_pushers def sized_queue_size_close q = SizedQueue.new 4 4.times{|i| q << i} - Thread.new{ q << 5 } - Thread.new{ q << 6 } + @cls.new{ q << 5 } + @cls.new{ q << 6 } assert_equal 4, q.size assert_equal 4, q.items q.close @@ -449,9 +461,10 @@ def sized_queue_size_close end def test_blocked_pushers_empty + skip 'TODO: Thread::Light exception handling semantics' if @cls == Thread::Light q = SizedQueue.new 3 prod_threads = 6.times.map do |i| - Thread.new{ + @cls.new{ Thread.current.report_on_exception = false q << i } @@ -482,7 +495,7 @@ def test_blocked_pushers_empty # test thread wakeup on one-element SizedQueue with close def test_one_element_sized_queue q = SizedQueue.new 1 - t = Thread.new{ q.pop } + t = @cls.new{ q.pop } q.close assert_nil t.value end @@ -502,14 +515,14 @@ def test_queue_close_multi_multi count_producers = rand(10..20) producers = count_producers.times.map do - Thread.new do + @cls.new do sleep(rand / 100) count_items.times{|i| q << [i,"#{i} for #{Thread.current.inspect}"]} end end consumers = rand(7..12).times.map do - Thread.new do + @cls.new do count = 0 while e = q.pop i, st = e @@ -526,7 +539,7 @@ def test_queue_close_multi_multi assert (consumers + producers).all?{|thr| thr.status =~ /\A(?:run|sleep)\z/}, 'no threads running' # just exercising the concurrency of the support methods. - counter = Thread.new do + counter = @cls.new do until q.closed? && q.empty? raise if q.size > q.max # otherwise this exercise causes too much contention on the lock @@ -553,12 +566,13 @@ def test_queue_with_trap if ENV['APPVEYOR'] == 'True' && RUBY_PLATFORM.match?(/mswin/) skip 'This test fails too often on AppVeyor vs140' end + skip 'TODO reentrancy may be impossible' if @cls == Thread::Light assert_in_out_err([], <<-INPUT, %w(INT INT exit), []) q = Queue.new trap(:INT){ q.push 'INT' } - Thread.new{ + #@cls.new{ loop{ Process.kill :INT, $$ } @@ -572,8 +586,8 @@ def test_queue_with_trap def test_fork_while_queue_waiting q = Queue.new sq = SizedQueue.new(1) - thq = Thread.new { q.pop } - thsq = Thread.new { sq.pop } + thq = @cls.new { q.pop } + thsq = @cls.new { sq.pop } Thread.pass until thq.stop? && thsq.stop? pid = fork do @@ -596,8 +610,8 @@ def test_fork_while_queue_waiting assert_equal :thsq, thsq.value sq.push(1) - th = Thread.new { q.pop; sq.pop } - thsq = Thread.new { sq.push(2) } + th = @cls.new { q.pop; sq.pop } + thsq = @cls.new { sq.push(2) } Thread.pass until th.stop? && thsq.stop? pid = fork do exit!(1) if q.num_waiting != 0 @@ -617,3 +631,10 @@ def test_fork_while_queue_waiting assert_equal 2, th.value end if Process.respond_to?(:fork) end + +class TestThreadLightQueue < TestThreadQueue + def setup + super + @cls = Thread::Light + end +end diff --git a/thread.c b/thread.c index c96eb97fc3..6bd8525e77 100644 --- a/thread.c +++ b/thread.c @@ -76,6 +76,7 @@ #include "vm_core.h" #include "mjit.h" #include "hrtime.h" +#include "iom.h" #ifndef USE_NATIVE_THREAD_PRIORITY #define USE_NATIVE_THREAD_PRIORITY 0 @@ -106,12 +107,16 @@ static int rb_threadptr_dead(rb_thread_t *th); static void rb_check_deadlock(rb_vm_t *vm); static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th); static const char *thread_status_name(rb_thread_t *th, int detail); -static int hrtime_update_expire(rb_hrtime_t *, const rb_hrtime_t); NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd)); static int consume_communication_pipe(int fd); static int check_signals_nogvl(rb_thread_t *, int sigwait_fd); void rb_sigwait_fd_migrate(rb_vm_t *); /* process.c */ +/* iom_{pingable_common,select}.h */ +static void rb_iom_ping_events(rb_thread_t *); +static int rb_iom_fd(const rb_thread_t *); +static int rb_iom_schedule_p(const rb_thread_t *); + #define eKillSignal INT2FIX(0) #define eTerminateSignal INT2FIX(1) static volatile int system_working = 1; @@ -227,7 +232,8 @@ vm_living_thread_num(const rb_vm_t *vm) #endif static void -timeout_prepare(rb_hrtime_t **to, rb_hrtime_t *rel, rb_hrtime_t *end, +timeout_prepare(rb_execution_context_t *ec, int *runq_busy, + rb_hrtime_t **to, rb_hrtime_t *rel, rb_hrtime_t *end, const struct timeval *timeout) { if (timeout) { @@ -238,6 +244,26 @@ timeout_prepare(rb_hrtime_t **to, rb_hrtime_t *rel, rb_hrtime_t *end, else { *to = 0; } + /* see if we can schedule any auto-fibers while we're at it: */ + VM_ASSERT(!rb_tl_sched_p(ec)); + if (rb_tl_schedule_ready_p(ec)) { + *runq_busy = !list_empty(&rb_ec_thread_ptr(ec)->runq); + /* if we switched fibers, update our own timeout: */ + if (*to && rb_hrtime_update_expire(*to, *end)) { + /* clamp so we at least ppoll or select once if timed-out */ + *rel = 0; + } + } +} + +static rb_hrtime_t * +timeout2hrtime(rb_hrtime_t *val, const struct timeval *timeout) +{ + if (timeout) { + *val = rb_timeval2hrtime(timeout); + return val; + } + return 0; } #if THREAD_DEBUG @@ -769,6 +795,8 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_s (void *)th, th->locking_mutex); } + rb_tl_finish(th); /* run "ensure" statements */ + /* delete self other than main thread from living_threads */ rb_vm_living_threads_remove(th->vm, th); if (main_th->status == THREAD_KILLED && rb_thread_alone()) { @@ -986,7 +1014,7 @@ thread_join_sleep(VALUE arg) th->vm->sleeper--; } else { - if (hrtime_update_expire(p->limit, end)) { + if (rb_hrtime_update_expire(p->limit, end)) { thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n", thread_id_str(target_th)); return Qfalse; @@ -1064,6 +1092,27 @@ thread_join(rb_thread_t *target_th, rb_hrtime_t *rel) static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double); +rb_hrtime_t * +rb_join_interval(rb_hrtime_t *rel, int argc, VALUE *argv) +{ + VALUE limit; + + rb_scan_args(argc, argv, "01", &limit); + + /* + * This supports INFINITY and negative values, so we can't use + * rb_time_interval right now... + */ + switch (TYPE(limit)) { + case T_NIL: return 0; + case T_FIXNUM: + *rel = rb_sec2hrtime(NUM2TIMET(limit)); + return rel; + default: + return double2hrtime(rel, rb_num2dbl(limit)); + } +} + /* * call-seq: * thr.join -> thr @@ -1106,26 +1155,9 @@ static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double); static VALUE thread_join_m(int argc, VALUE *argv, VALUE self) { - VALUE limit; - rb_hrtime_t rel, *to = 0; + rb_hrtime_t rel; - rb_scan_args(argc, argv, "01", &limit); - - /* - * This supports INFINITY and negative values, so we can't use - * rb_time_interval right now... - */ - switch (TYPE(limit)) { - case T_NIL: break; - case T_FIXNUM: - rel = rb_sec2hrtime(NUM2TIMET(limit)); - to = &rel; - break; - default: - to = double2hrtime(&rel, rb_num2dbl(limit)); - } - - return thread_join(rb_thread_ptr(self), to); + return thread_join(rb_thread_ptr(self), rb_join_interval(&rel, argc, argv)); } /* @@ -1196,7 +1228,6 @@ getclockofday(struct timespec *ts) * Don't inline this, since library call is already time consuming * and we don't want "struct timespec" on stack too long for GC */ -NOINLINE(rb_hrtime_t rb_hrtime_now(void)); rb_hrtime_t rb_hrtime_now(void) { @@ -1248,8 +1279,8 @@ COMPILER_WARNING_IGNORED(-Wmaybe-uninitialized) * Returns true if @end has past * Updates @ts and returns false otherwise */ -static int -hrtime_update_expire(rb_hrtime_t *timeout, const rb_hrtime_t end) +int +rb_hrtime_update_expire(rb_hrtime_t *timeout, const rb_hrtime_t end) { rb_hrtime_t now = rb_hrtime_now(); @@ -1276,7 +1307,7 @@ sleep_hrtime(rb_thread_t *th, rb_hrtime_t rel, unsigned int fl) woke = vm_check_ints_blocking(th->ec); if (woke && !(fl & SLEEP_SPURIOUS_CHECK)) break; - if (hrtime_update_expire(&rel, end)) + if (rb_hrtime_update_expire(&rel, end)) break; } th->status = prev_status; @@ -1285,8 +1316,15 @@ sleep_hrtime(rb_thread_t *th, rb_hrtime_t rel, unsigned int fl) void rb_thread_sleep_forever(void) { + rb_thread_t *th = GET_THREAD(); + thread_debug("rb_thread_sleep_forever\n"); - sleep_forever(GET_THREAD(), SLEEP_SPURIOUS_CHECK); + if (rb_tl_sched_p(th->ec)) { + rb_iom_sleep(th, 0); + } + else { + sleep_forever(th, SLEEP_SPURIOUS_CHECK); + } } void @@ -1296,18 +1334,6 @@ rb_thread_sleep_deadly(void) sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE|SLEEP_SPURIOUS_CHECK); } -void -rb_thread_sleep_interruptible(void) -{ - rb_thread_t *th = GET_THREAD(); - enum rb_thread_status prev_status = th->status; - - th->status = THREAD_STOPPED; - native_sleep(th, 0); - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - th->status = prev_status; -} - static void rb_thread_sleep_deadly_allow_spurious_wakeup(void) { @@ -1319,8 +1345,14 @@ void rb_thread_wait_for(struct timeval time) { rb_thread_t *th = GET_THREAD(); + rb_hrtime_t rel = rb_timeval2hrtime(&time); - sleep_hrtime(th, rb_timeval2hrtime(&time), SLEEP_SPURIOUS_CHECK); + if (rb_tl_sched_p(th->ec)) { + rb_iom_sleep(th, &rel); + } + else { + sleep_hrtime(th, rel, SLEEP_SPURIOUS_CHECK); + } } /* @@ -1359,19 +1391,32 @@ rb_thread_sleep(int sec) rb_thread_wait_for(rb_time_timeval(INT2FIX(sec))); } +static void +rb_thread_yield_native(rb_thread_t *th) +{ + RB_GC_SAVE_MACHINE_CONTEXT(th); + gvl_yield(th->vm, th); + rb_thread_set_current(th); +} + static void rb_thread_schedule_limits(uint32_t limits_us) { - thread_debug("rb_thread_schedule\n"); - if (!rb_thread_alone()) { - rb_thread_t *th = GET_THREAD(); + rb_thread_t *th = GET_THREAD(); + rb_hrtime_t rel = 0; + thread_debug("rb_thread_schedule\n"); + if (rb_thread_alone()) { + rb_iom_schedule(th, &rel); + } + else { if (th->running_time_us >= limits_us) { thread_debug("rb_thread_schedule/switch start\n"); - RB_GC_SAVE_MACHINE_CONTEXT(th); - gvl_yield(th->vm, th); - rb_thread_set_current(th); + rb_thread_yield_native(th); thread_debug("rb_thread_schedule/switch done\n"); + if (!list_empty(&th->coros)) { + rb_iom_schedule(th, &rel); + } } } } @@ -1682,7 +1727,14 @@ ruby_thread_has_gvl_p(void) static VALUE thread_s_pass(VALUE klass) { - rb_thread_schedule(); + rb_execution_context_t *ec = GET_EC(); + + if (rb_tl_sched_p(ec)) { + rb_iom_schedule(rb_ec_thread_ptr(ec), 0); + } + else { + rb_thread_schedule(); + } return Qnil; } @@ -3142,29 +3194,29 @@ rb_thread_to_s(VALUE thread) /* variables for recursive traversals */ static ID recursive_key; -static VALUE -threadptr_local_aref(rb_thread_t *th, ID id) +VALUE +rb_ec_local_aref(rb_execution_context_t *ec, ID id, VALUE default_value) { if (id == recursive_key) { - return th->ec->local_storage_recursive_hash; + return ec->local_storage_recursive_hash; } else { - st_data_t val; - st_table *local_storage = th->ec->local_storage; + st_data_t val; + st_table *local_storage = ec->local_storage; - if (local_storage != NULL && st_lookup(local_storage, id, &val)) { - return (VALUE)val; - } - else { - return Qnil; - } + if (local_storage != NULL && st_lookup(local_storage, id, &val)) { + return (VALUE)val; + } + else { + return default_value; + } } } VALUE rb_thread_local_aref(VALUE thread, ID id) { - return threadptr_local_aref(rb_thread_ptr(thread), id); + return rb_ec_local_aref(rb_thread_ptr(thread)->ec, id, Qnil); } /* @@ -3235,25 +3287,11 @@ rb_thread_aref(VALUE thread, VALUE key) return rb_thread_local_aref(thread, id); } -/* - * call-seq: - * thr.fetch(sym) -> obj - * thr.fetch(sym) { } -> obj - * thr.fetch(sym, default) -> obj - * - * Returns a fiber-local for the given key. If the key can't be - * found, there are several options: With no other arguments, it will - * raise a KeyError exception; if default is - * given, then that will be returned; if the optional code block is - * specified, then that will be run and its result returned. - * See Thread#[] and Hash#fetch. - */ -static VALUE -rb_thread_fetch(int argc, VALUE *argv, VALUE self) +VALUE +rb_ec_local_fetch(rb_execution_context_t *ec, int argc, VALUE *argv, VALUE self) { VALUE key, val; ID id; - rb_thread_t *target_th = rb_thread_ptr(self); int block_given; rb_check_arity(argc, 1, 2); @@ -3261,51 +3299,66 @@ rb_thread_fetch(int argc, VALUE *argv, VALUE self) block_given = rb_block_given_p(); if (block_given && argc == 2) { - rb_warn("block supersedes default value argument"); + rb_warn("block supersedes default value argument"); } id = rb_check_id(&key); - - if (id == recursive_key) { - return target_th->ec->local_storage_recursive_hash; - } - else if (id && target_th->ec->local_storage && - st_lookup(target_th->ec->local_storage, id, &val)) { - return val; + val = rb_ec_local_aref(ec, id, Qundef); + if (val != Qundef) { + return val; } else if (block_given) { - return rb_yield(key); + return rb_yield(key); } else if (argc == 1) { - rb_key_err_raise(rb_sprintf("key not found: %+"PRIsVALUE, key), self, key); + rb_key_err_raise(rb_sprintf("key not found: %+"PRIsVALUE, key), self, key); } else { - return argv[1]; + return argv[1]; } } +/* + * call-seq: + * thr.fetch(sym) -> obj + * thr.fetch(sym) { } -> obj + * thr.fetch(sym, default) -> obj + * + * Returns a fiber-local for the given key. If the key can't be + * found, there are several options: With no other arguments, it will + * raise a KeyError exception; if default is + * given, then that will be returned; if the optional code block is + * specified, then that will be run and its result returned. + * See Thread#[] and Hash#fetch. + */ static VALUE -threadptr_local_aset(rb_thread_t *th, ID id, VALUE val) +rb_thread_fetch(int argc, VALUE *argv, VALUE self) +{ + return rb_ec_local_fetch(rb_thread_ptr(self)->ec, argc, argv, self); +} + +VALUE +rb_ec_local_aset(rb_execution_context_t *ec, ID id, VALUE val) { if (id == recursive_key) { - th->ec->local_storage_recursive_hash = val; - return val; + ec->local_storage_recursive_hash = val; + return val; } else { - st_table *local_storage = th->ec->local_storage; + st_table *local_storage = ec->local_storage; - if (NIL_P(val)) { - if (!local_storage) return Qnil; - st_delete_wrap(local_storage, id); - return Qnil; - } - else { - if (local_storage == NULL) { - th->ec->local_storage = local_storage = st_init_numtable(); - } - st_insert(local_storage, id, val); - return val; - } + if (NIL_P(val)) { + if (!local_storage) return Qnil; + st_delete_wrap(local_storage, id); + return Qnil; + } + else { + if (local_storage == NULL) { + ec->local_storage = local_storage = st_init_numtable(); + } + st_insert(local_storage, id, val); + return val; + } } } @@ -3316,7 +3369,7 @@ rb_thread_local_aset(VALUE thread, ID id, VALUE val) rb_error_frozen("thread locals"); } - return threadptr_local_aset(rb_thread_ptr(thread), id, val); + return rb_ec_local_aset(rb_thread_ptr(thread)->ec, id, val); } /* @@ -3397,6 +3450,23 @@ rb_thread_variable_set(VALUE thread, VALUE id, VALUE val) return rb_hash_aset(locals, rb_to_symbol(id), val); } +VALUE +rb_ec_local_key_p(rb_execution_context_t *ec, VALUE key) +{ + ID id = rb_check_id(&key); + st_table *local_storage = ec->local_storage; + + if (!id || local_storage == NULL) { + return Qfalse; + } + else if (st_lookup(local_storage, id, 0)) { + return Qtrue; + } + else { + return Qfalse; + } +} + /* * call-seq: * thr.key?(sym) -> true or false @@ -3413,31 +3483,32 @@ rb_thread_variable_set(VALUE thread, VALUE id, VALUE val) static VALUE rb_thread_key_p(VALUE self, VALUE key) { - ID id = rb_check_id(&key); - st_table *local_storage = rb_thread_ptr(self)->ec->local_storage; + return rb_ec_local_key_p(rb_thread_ptr(self)->ec, key); +} - if (!id || local_storage == NULL) { - return Qfalse; - } - else if (st_lookup(local_storage, id, 0)) { - return Qtrue; - } - else { - return Qfalse; - } +int +rb_thread_alone(void) +{ + return vm_living_thread_num(GET_VM()) == 1; } static int -thread_keys_i(ID key, VALUE value, VALUE ary) +ec_local_keys_i(ID key, VALUE value, VALUE ary) { rb_ary_push(ary, ID2SYM(key)); return ST_CONTINUE; } -int -rb_thread_alone(void) +VALUE +rb_ec_local_keys(rb_execution_context_t *ec) { - return vm_living_thread_num(GET_VM()) == 1; + st_table *local_storage = ec->local_storage; + VALUE ary = rb_ary_new(); + + if (local_storage) { + st_foreach(local_storage, ec_local_keys_i, ary); + } + return ary; } /* @@ -3457,13 +3528,7 @@ rb_thread_alone(void) static VALUE rb_thread_keys(VALUE self) { - st_table *local_storage = rb_thread_ptr(self)->ec->local_storage; - VALUE ary = rb_ary_new(); - - if (local_storage) { - st_foreach(local_storage, thread_keys_i, ary); - } - return ary; + return rb_ec_local_keys(rb_thread_ptr(self)->ec); } static int @@ -3832,8 +3897,11 @@ rb_fd_set(int fd, rb_fdset_t *set) #endif static int -wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end) +wait_retryable(rb_execution_context_t *ec, int *result, int errnum, + int *runq_busy, rb_hrtime_t *rel, rb_hrtime_t end) { + rb_thread_t *th = rb_ec_thread_ptr(ec); + if (*result < 0) { switch (errnum) { case EINTR: @@ -3841,17 +3909,23 @@ wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end) case ERESTART: #endif *result = 0; - if (rel && hrtime_update_expire(rel, end)) { + if (rel && rb_hrtime_update_expire(rel, end)) { *rel = 0; } + if (rb_tl_schedule_ready_p(ec) && !list_empty(&th->runq)) { + *runq_busy = TRUE; + } return TRUE; } return FALSE; } else if (*result == 0) { - /* check for spurious wakeup */ + /* check for spurious wakeup */ + if (rb_tl_schedule_ready_p(ec) && !list_empty(&th->runq)) { + *runq_busy = TRUE; + } if (rel) { - return !hrtime_update_expire(rel, end); + return !rb_hrtime_update_expire(rel, end); } return TRUE; } @@ -3861,6 +3935,7 @@ wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end) struct select_set { int max; int sigwait_fd; + int extra_fd; rb_thread_t *th; rb_fdset_t *rset; rb_fdset_t *wset; @@ -3890,9 +3965,12 @@ select_set_free(VALUE p) static const rb_hrtime_t * sigwait_timeout(rb_thread_t *th, int sigwait_fd, const rb_hrtime_t *orig, - int *drained_p) + int *drained_p, int runq_busy) { static const rb_hrtime_t quantum = TIME_QUANTUM_USEC * 1000; + static const rb_hrtime_t zero; + + if (runq_busy) return &zero; if (sigwait_fd >= 0 && (!ubf_threads_empty() || BUSY_WAIT_SIGNALS)) { *drained_p = check_signals_nogvl(th, sigwait_fd); @@ -3910,8 +3988,10 @@ do_select(VALUE p) int result = 0; int lerrno; rb_hrtime_t *to, rel, end = 0; + int runq_busy = FALSE; + rb_thread_t *th = set->th; - timeout_prepare(&to, &rel, &end, set->timeout); + timeout_prepare(th->ec, &runq_busy, &to, &rel, &end, set->timeout); #define restore_fdset(dst, src) \ ((dst) ? rb_fd_dup(dst, src) : (void)0) #define do_select_update() \ @@ -3924,27 +4004,34 @@ do_select(VALUE p) int drained; lerrno = 0; - BLOCKING_REGION(set->th, { + BLOCKING_REGION(th, { const rb_hrtime_t *sto; struct timeval tv; - sto = sigwait_timeout(set->th, set->sigwait_fd, to, &drained); - if (!RUBY_VM_INTERRUPTED(set->th->ec)) { + sto = sigwait_timeout(th, set->sigwait_fd, to, &drained, runq_busy); + if (!RUBY_VM_INTERRUPTED(th->ec)) { result = native_fd_select(set->max, set->rset, set->wset, set->eset, - rb_hrtime2timeval(&tv, sto), set->th); + rb_hrtime2timeval(&tv, sto), th); if (result < 0) lerrno = errno; } - }, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, TRUE); + }, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, th, TRUE); - if (set->sigwait_fd >= 0) { - if (result > 0 && rb_fd_isset(set->sigwait_fd, set->rset)) + if (set->extra_fd >= 0) { + if (result > 0 && rb_fd_isset(set->extra_fd, set->rset)) { result--; - (void)check_signals_nogvl(set->th, set->sigwait_fd); + if (set->sigwait_fd >= 0) { + (void)check_signals_nogvl(th, set->sigwait_fd); + } + else { + rb_iom_ping_events(th); + } + } } - RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */ - } while (wait_retryable(&result, lerrno, to, end) && do_select_update()); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may raise */ + } while (wait_retryable(th->ec, &result, lerrno, &runq_busy, to, end) + && do_select_update()); if (result < 0) { errno = lerrno; @@ -4006,31 +4093,45 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * set.th = GET_THREAD(); RUBY_VM_CHECK_INTS_BLOCKING(set.th->ec); + + if (rb_iom_schedule_p(set.th)) { + rb_hrtime_t hrt; + const rb_hrtime_t *rel = timeout2hrtime(&hrt, timeout); + + return rb_iom_select(set.th, max, read, write, except, rel); + } + set.max = max; set.rset = read; set.wset = write; set.eset = except; - set.timeout = timeout; - if (!set.rset && !set.wset && !set.eset) { - if (!timeout) { - rb_thread_sleep_forever(); - return 0; - } - rb_thread_wait_for(*timeout); - return 0; + if (!timeout) { + rb_thread_sleep_forever(); + return 0; + } + rb_thread_wait_for(*timeout); + return 0; + } + + set.timeout = timeout; + set.extra_fd = rb_iom_fd(set.th); + if (set.extra_fd >= 0) { + set.sigwait_fd = -1; + } + else { + set.sigwait_fd = set.extra_fd = rb_sigwait_fd_get(set.th); } - set.sigwait_fd = rb_sigwait_fd_get(set.th); - if (set.sigwait_fd >= 0) { + if (set.extra_fd >= 0) { if (set.rset) - rb_fd_set(set.sigwait_fd, set.rset); + rb_fd_set(set.extra_fd, set.rset); else - set.rset = init_set_fd(set.sigwait_fd, &set.orig_rset); - if (set.sigwait_fd >= set.max) { - set.max = set.sigwait_fd + 1; - } + set.rset = init_set_fd(set.extra_fd, &set.orig_rset); + if (set.extra_fd >= set.max) + set.max = set.extra_fd + 1; } + #define fd_init_copy(f) do { \ if (set.f) { \ rb_fd_resize(set.max - 1, set.f); \ @@ -4074,20 +4175,28 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) rb_thread_t *th = GET_THREAD(); nfds_t nfds; rb_unblock_function_t *ubf; + int iom_fd; + int runq_busy = FALSE; + rb_execution_context_t *ec = th->ec; - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - timeout_prepare(&to, &rel, &end, timeout); + RUBY_VM_CHECK_INTS_BLOCKING(ec); + if (rb_iom_schedule_p(th)) + return rb_iom_waitfd(th, &fd, events, timeout2hrtime(&rel, timeout)); + + timeout_prepare(ec, &runq_busy, &to, &rel, &end, timeout); fds[0].fd = fd; fds[0].events = (short)events; + do { + iom_fd = rb_iom_fd(th); fds[0].revents = 0; - fds[1].fd = rb_sigwait_fd_get(th); + fds[1].fd = iom_fd >= 0 ? iom_fd : rb_sigwait_fd_get(th); if (fds[1].fd >= 0) { + ubf = iom_fd >= 0 ? ubf_select : ubf_sigwait; fds[1].events = POLLIN; fds[1].revents = 0; nfds = 2; - ubf = ubf_sigwait; } else { nfds = 1; @@ -4098,8 +4207,9 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) BLOCKING_REGION(th, { const rb_hrtime_t *sto; struct timespec ts; + int sigwait_fd = iom_fd < 0 ? fds[1].fd : -1; - sto = sigwait_timeout(th, fds[1].fd, to, &drained); + sto = sigwait_timeout(th, sigwait_fd, to, &drained, runq_busy); if (!RUBY_VM_INTERRUPTED(th->ec)) { result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, sto), NULL); if (result < 0) lerrno = errno; @@ -4111,12 +4221,17 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) result--; fds[1].revents = 0; } - (void)check_signals_nogvl(th, fds[1].fd); - rb_sigwait_fd_put(th, fds[1].fd); - rb_sigwait_fd_migrate(th->vm); + if (iom_fd >= 0) { + rb_iom_ping_events(th); /* may also handle signals */ + } + else { + (void)check_signals_nogvl(th, fds[1].fd); + rb_sigwait_fd_put(th, fds[1].fd); + rb_sigwait_fd_migrate(th->vm); + } } RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - } while (wait_retryable(&result, lerrno, to, end)); + } while (wait_retryable(th->ec, &result, lerrno, &runq_busy, to, end)); if (result < 0) { errno = lerrno; @@ -4199,7 +4314,13 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv) struct select_args args; int r; VALUE ptr = (VALUE)&args; + rb_thread_t *th = GET_THREAD(); + + if (rb_iom_schedule_p(th)) { + rb_hrtime_t hrt; + return rb_iom_waitfd(th, &fd, events, timeout2hrtime(&hrt, tv)); + } args.as.fd = fd; args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL; args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL; @@ -4430,10 +4551,13 @@ terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th) } void rb_fiber_atfork(rb_thread_t *); +static void rb_iom_atfork_child(rb_thread_t *); + void rb_thread_atfork(void) { rb_thread_t *th = GET_THREAD(); + rb_iom_atfork_child(th); rb_thread_atfork_internal(th, terminate_atfork_i); th->join_list = NULL; rb_mutex_cleanup_keeping_mutexes(th); @@ -5475,3 +5599,21 @@ rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data) return rb_ensure(b_proc, data, rb_ary_pop, cur_th->pending_interrupt_mask_stack); } + +#ifndef RUBYVM_IOM +# if defined(HAVE_SYS_EVENT_H) && defined(HAVE_KQUEUE) && defined(HAVE_KEVENT) +# define RUBYVM_IOM IOM_KQUEUE +# elif defined(HAVE_SYS_EPOLL_H) && defined(HAVE_EPOLL_CREATE) && \ + defined(HAVE_EPOLL_CTL) && defined(HAVE_EPOLL_WAIT) +# define RUBYVM_IOM IOM_EPOLL +# else +# define RUBYVM_IOM IOM_SELECT +# endif +#endif +#if RUBYVM_IOM == IOM_KQUEUE +# include "iom_kqueue.h" +#elif RUBYVM_IOM == IOM_EPOLL +# include "iom_epoll.h" +#else +# include "iom_select.h" +#endif diff --git a/thread_pthread.c b/thread_pthread.c index c87e952750..cba238b8b2 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -126,7 +126,8 @@ static int native_cond_timedwait(rb_nativethread_cond_t *, pthread_mutex_t *, const rb_hrtime_t *abs); static const rb_hrtime_t *sigwait_timeout(rb_thread_t *, int sigwait_fd, const rb_hrtime_t *, - int *drained_p); + int *drained_p, + int runq_busy); static void ubf_timer_disarm(void); static void threadptr_trap_interrupt(rb_thread_t *); @@ -532,7 +533,7 @@ native_cond_timeout(rb_nativethread_cond_t *cond, const rb_hrtime_t rel) else { struct timespec ts; - rb_timespec_now(&ts); + rb_timespec_now(&ts); /* CLOCK_REALTIME */ return rb_hrtime_add(rb_timespec2hrtime(&ts), rel); } } @@ -1395,7 +1396,13 @@ static int ubf_threads_empty(void) { return 1; } static struct { /* pipes are closed in forked children when owner_process does not match */ int normal[2]; /* [0] == sigwait_fd */ - int ub_main[2]; /* unblock main thread from native_ppoll_sleep */ + + /* + * used to unblock main thread from native_ppoll_sleep + * this NEVER enters epoll or kqueue multiplexer. Only the + * main thread will ppoll and read from it. + */ + int ub_main[2]; /* volatile for signal handler use: */ volatile rb_pid_t owner_process; @@ -1839,6 +1846,8 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr) } #endif +static int rb_iom_reserved_fd(int); /* check kqueue or epoll FD */ + int rb_reserved_fd_p(int fd) { @@ -1854,6 +1863,9 @@ rb_reserved_fd_p(int fd) goto check_pid; if (fd == signal_self_pipe.ub_main[0] || fd == signal_self_pipe.ub_main[1]) goto check_pid; + if (rb_iom_reserved_fd(fd)) + goto check_pid; + return 0; check_pid: if (signal_self_pipe.owner_process == getpid()) /* async-signal-safe */ @@ -1960,17 +1972,26 @@ ruby_ppoll(struct pollfd *fds, nfds_t nfds, # define ppoll(fds,nfds,ts,sigmask) ruby_ppoll((fds),(nfds),(ts),(sigmask)) #endif -void -rb_sigwait_sleep(rb_thread_t *th, int sigwait_fd, const rb_hrtime_t *rel) +int +rb_sigwait_sleep(rb_thread_t *th, int sigwait_fd, int iom_fd, + const rb_hrtime_t *rel) { - struct pollfd pfd; + struct pollfd pfd[2]; struct timespec ts; + nfds_t nfds = 1; - pfd.fd = sigwait_fd; - pfd.events = POLLIN; + pfd[0].fd = sigwait_fd; + pfd[0].events = POLLIN; + pfd[1].revents = 0; + + if (iom_fd >= 0) { + pfd[1].fd = iom_fd; + pfd[1].events = POLLIN; + nfds = 2; + } if (!BUSY_WAIT_SIGNALS && ubf_threads_empty()) { - (void)ppoll(&pfd, 1, rb_hrtime2timespec(&ts, rel), 0); + (void)ppoll(pfd, nfds, rb_hrtime2timespec(&ts, rel), 0); check_signals_nogvl(th, sigwait_fd); } else { @@ -1990,18 +2011,20 @@ rb_sigwait_sleep(rb_thread_t *th, int sigwait_fd, const rb_hrtime_t *rel) * [ruby-core:88102] */ for (;;) { - const rb_hrtime_t *sto = sigwait_timeout(th, sigwait_fd, &to, &n); + const rb_hrtime_t *sto = sigwait_timeout(th, sigwait_fd, &to, &n, + FALSE); - if (n) return; - n = ppoll(&pfd, 1, rb_hrtime2timespec(&ts, sto), 0); + if (n) break; + n = ppoll(pfd, nfds, rb_hrtime2timespec(&ts, sto), 0); if (check_signals_nogvl(th, sigwait_fd)) - return; + break; if (n || (th && RUBY_VM_INTERRUPTED(th->ec))) - return; - if (rel && hrtime_update_expire(&to, end)) - return; + break; + if (rel && rb_hrtime_update_expire(&to, end)) + break; } } + return pfd[1].revents; /* iom_fd readiness */ } /* @@ -2028,18 +2051,24 @@ ubf_ppoll_sleep(void *ignore) static void native_ppoll_sleep(rb_thread_t *th, rb_hrtime_t *rel) { + struct pollfd pfd[2]; + + pfd[0].fd = rb_iom_fd(th); /* kqueue or epoll FD */ + if (pfd[0].fd < 0) { + pfd[0].fd = signal_self_pipe.normal[0]; /* sigwait_fd */ + } + pfd[1].fd = signal_self_pipe.ub_main[0]; + pfd[0].events = pfd[1].events = POLLIN; + pfd[0].revents = 0; + rb_native_mutex_lock(&th->interrupt_lock); th->unblock.func = ubf_ppoll_sleep; rb_native_mutex_unlock(&th->interrupt_lock); GVL_UNLOCK_BEGIN(th); if (!RUBY_VM_INTERRUPTED(th->ec)) { - struct pollfd pfd[2]; struct timespec ts; - pfd[0].fd = signal_self_pipe.normal[0]; /* sigwait_fd */ - pfd[1].fd = signal_self_pipe.ub_main[0]; - pfd[0].events = pfd[1].events = POLLIN; if (ppoll(pfd, 2, rb_hrtime2timespec(&ts, rel), 0) > 0) { if (pfd[1].revents & POLLIN) { (void)consume_communication_pipe(pfd[1].fd); @@ -2054,6 +2083,9 @@ native_ppoll_sleep(rb_thread_t *th, rb_hrtime_t *rel) unblock_function_clear(th); unregister_ubf_list(th); GVL_UNLOCK_END(th); + if (pfd[0].revents && pfd[0].fd != signal_self_pipe.normal[0]) { + rb_iom_ping_events(th); + } } static void @@ -2062,6 +2094,9 @@ native_sleep(rb_thread_t *th, rb_hrtime_t *rel) int sigwait_fd = rb_sigwait_fd_get(th); if (sigwait_fd >= 0) { + int iom_fd = rb_iom_fd(th); + int do_ping = FALSE; + rb_native_mutex_lock(&th->interrupt_lock); th->unblock.func = ubf_sigwait; rb_native_mutex_unlock(&th->interrupt_lock); @@ -2069,7 +2104,7 @@ native_sleep(rb_thread_t *th, rb_hrtime_t *rel) GVL_UNLOCK_BEGIN(th); if (!RUBY_VM_INTERRUPTED(th->ec)) { - rb_sigwait_sleep(th, sigwait_fd, rel); + do_ping = rb_sigwait_sleep(th, sigwait_fd, iom_fd, rel); } else { check_signals_nogvl(th, sigwait_fd); @@ -2078,6 +2113,9 @@ native_sleep(rb_thread_t *th, rb_hrtime_t *rel) GVL_UNLOCK_END(th); rb_sigwait_fd_put(th, sigwait_fd); rb_sigwait_fd_migrate(th->vm); + if (do_ping) { + rb_iom_ping_events(th); + } } else if (th == th->vm->main_thread) { /* always able to handle signals */ native_ppoll_sleep(th, rel); diff --git a/thread_sync.c b/thread_sync.c index 000812556d..91c775ccd6 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -1,5 +1,6 @@ /* included by thread.c */ #include "ccan/list/list.h" +#include "fiber.h" static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable; static VALUE rb_eClosedQueueError; @@ -9,18 +10,30 @@ static VALUE rb_eClosedQueueError; static void sync_wakeup(struct list_head *head, long max) { - struct rb_sched_waiter *cur = 0, *next; + struct rb_sched_waiter *sw = 0, *next; + rb_thread_t *cur_th = GET_THREAD(); + int need_schedule = 0; - list_for_each_safe(head, cur, next, wnode) { - rb_thread_t *th = rb_ec_thread_ptr(cur->ec); + list_for_each_safe(head, sw, next, wnode) { + rb_thread_t *th = rb_ec_thread_ptr(sw->ec); - list_del_init(&cur->wnode); - if (th->status != THREAD_KILLED) { - rb_threadptr_interrupt(th); - th->status = THREAD_RUNNABLE; - if (--max == 0) return; + if (rb_tl_sched_p(sw->ec)) { + list_del(&sw->wnode); + rb_sched_waiter_ready(&th->runq, sw); + need_schedule |= !!(th == cur_th); + } + else { + list_del_init(&sw->wnode); + /* fprintf(stderr, "waking th=%p\n", th); */ + if (th->status != THREAD_KILLED) { + rb_threadptr_interrupt(th); + th->status = THREAD_RUNNABLE; + } } + if (--max == 0) break; } + if (need_schedule) + rb_iom_schedule(cur_th, 0); } static void @@ -780,7 +793,9 @@ queue_do_push(VALUE self, struct rb_queue *q, VALUE obj) raise_closed_queue_error(self); } rb_ary_push(check_array(self, q->que), obj); + /* fprintf(stderr, "waking...\n"); */ wakeup_one(queue_waitq(q)); + /* fprintf(stderr, "done waking...\n"); */ return self; } @@ -861,9 +876,16 @@ rb_queue_push(VALUE self, VALUE obj) } static VALUE -queue_sleep(VALUE arg) +sched_sleep(VALUE arg) { - rb_thread_sleep_deadly_allow_spurious_wakeup(); + struct rb_sched_waiter *sw = (struct rb_sched_waiter *)arg; + + if (rb_tl_sched_p(sw->ec)) { + rb_sched_waiter_sleep(sw, 0); + } + else if (!rb_tl_schedule_ready_p(sw->ec)) { + rb_thread_sleep_deadly_allow_spurious_wakeup(); + } return Qnil; } @@ -910,17 +932,19 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block) return queue_closed_result(self, q); } else { - struct queue_waiter qw; + VALUE tmp; + struct queue_waiter *qw = RB_FIBER_ALLOCA(tmp, struct queue_waiter); - assert(RARRAY_LEN(q->que) == 0); - assert(queue_closed_p(self) == 0); + assert(RARRAY_LEN(q->que) == 0); + assert(queue_closed_p(self) == 0); - qw.w.ec = GET_EC(); - qw.as.q = q; - list_add_tail(&qw.as.q->waitq, &qw.w.wnode); - qw.as.q->num_waiting++; + qw->w.ec = GET_EC(); + qw->as.q = q; + list_add_tail(&qw->as.q->waitq, &qw->w.wnode); + qw->as.q->num_waiting++; - rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw); + rb_ensure(sched_sleep, (VALUE)&qw->w, queue_sleep_done, (VALUE)qw); + RB_FIBER_ALLOCA_END(tmp); } } @@ -1152,15 +1176,18 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self) goto closed; } else { - struct queue_waiter qw; - struct list_head *pushq = szqueue_pushq(sq); - - qw.w.ec = GET_EC(); - qw.as.sq = sq; - list_add_tail(pushq, &qw.w.wnode); - sq->num_waiting_push++; - - rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw); + VALUE tmp; + struct queue_waiter *qw = RB_FIBER_ALLOCA(tmp, struct queue_waiter); + struct list_head *pushq = szqueue_pushq(sq); + + qw->w.ec = GET_EC(); + qw->as.sq = sq; + list_add_tail(pushq, &qw->w.wnode); + sq->num_waiting_push++; + + rb_ensure(sched_sleep, (VALUE)&qw->w, + szqueue_sleep_done, (VALUE)qw); + RB_FIBER_ALLOCA_END(tmp); } } diff --git a/thread_win32.c b/thread_win32.c index d4db9e3824..9df9cfabb7 100644 --- a/thread_win32.c +++ b/thread_win32.c @@ -686,6 +686,8 @@ static struct { } timer_thread; #define TIMER_THREAD_CREATED_P() (timer_thread.id != 0) +void ruby_waitpid_all(rb_vm_t *); /* process.c */ + static unsigned long __stdcall timer_thread_func(void *dummy) { @@ -695,7 +697,7 @@ timer_thread_func(void *dummy) while (WaitForSingleObject(timer_thread.lock, TIME_QUANTUM_USEC/1000) == WAIT_TIMEOUT) { timer_thread_function(); - ruby_sigchld_handler(vm); /* probably no-op */ + ruby_waitpid_all(vm); rb_threadptr_check_signal(vm->main_thread); } thread_debug("timer killed\n"); @@ -774,10 +776,12 @@ ruby_alloca_chkstk(size_t len, void *sp) } } #endif + +static int rb_iom_reserved_fd(int); /* iom_select.h */ int rb_reserved_fd_p(int fd) { - return 0; + return rb_iom_reserved_fd(fd); } int @@ -793,11 +797,13 @@ rb_sigwait_fd_put(rb_thread_t *th, int fd) rb_bug("not implemented, should not be called"); } -NORETURN(void rb_sigwait_sleep(const rb_thread_t *, int, const rb_hrtime_t *)); -void -rb_sigwait_sleep(const rb_thread_t *th, int fd, const rb_hrtime_t *rel) +NORETURN(int rb_sigwait_sleep(const rb_thread_t *, int, int, const rb_hrtime_t *)); +int +rb_sigwait_sleep(const rb_thread_t *th, int sigwait_fd, int iom_fd, + const rb_hrtime_t *rel) { rb_bug("not implemented, should not be called"); + return 0; } rb_nativethread_id_t diff --git a/vm.c b/vm.c index a10fff640b..c1cfd55943 100644 --- a/vm.c +++ b/vm.c @@ -8,6 +8,7 @@ **********************************************************************/ +#include "iom.h" #include "internal.h" #include "ruby/vm.h" #include "ruby/st.h" @@ -2169,6 +2170,9 @@ rb_vm_mark(void *ptr) rb_gc_mark_values(RUBY_NSIG, vm->trap_list.cmd); + if (vm->iom) + rb_iom_mark(vm->iom); + mjit_mark(); } @@ -2217,6 +2221,7 @@ ruby_vm_destruct(rb_vm_t *vm) rb_fiber_reset_root_local_storage(th); thread_free(th); } + rb_iom_destroy(vm); rb_vm_living_threads_init(vm); ruby_vm_run_at_exit_hooks(vm); if (vm->loading_table) { @@ -2447,6 +2452,8 @@ thread_mark(void *ptr) RUBY_MARK_ENTER("thread"); rb_fiber_mark_self(th->ec->fiber_ptr); + rb_iom_mark_thread(th); + /* mark ruby objects */ switch (th->invoke_type) { case thread_invoke_type_proc: @@ -2552,6 +2559,9 @@ th_init(rb_thread_t *th, VALUE self) { th->self = self; rb_threadptr_root_fiber_setup(th); + list_head_init(&th->runq); + list_head_init(&th->coros); + list_head_init(&th->nogvl_runq); { /* vm_stack_size is word number. diff --git a/vm_core.h b/vm_core.h index 858f676869..9e10b321da 100644 --- a/vm_core.h +++ b/vm_core.h @@ -16,6 +16,7 @@ * Enable check mode. * 1: enable local assertions. */ +#define VM_CHECK_MODE 2 #ifndef VM_CHECK_MODE #define VM_CHECK_MODE 0 #endif @@ -580,6 +581,8 @@ typedef struct rb_hook_list_struct { int need_clean; } rb_hook_list_t; +struct rb_iom_struct; + typedef struct rb_vm_struct { VALUE self; @@ -593,7 +596,7 @@ typedef struct rb_vm_struct { #ifdef USE_SIGALTSTACK void *main_altstack; #endif - + struct rb_iom_struct *iom; rb_serial_t fork_gen; rb_nativethread_lock_t waitpid_lock; struct list_head waiting_pids; /* PID > 0: <=> struct waitpid_state */ @@ -852,6 +855,9 @@ typedef struct rb_execution_context_struct { /* ensure & callcc */ rb_ensure_list_t *ensure_list; + /* ensure for Thread::Light, also Thread::Light#list in future */ + struct list_node enode; + /* trace information */ struct rb_trace_arg_struct *trace_arg; @@ -864,6 +870,9 @@ typedef struct rb_execution_context_struct { /* n.b. only 7 bits needed, really: */ BITFIELD(enum method_missing_reason, method_missing_reason, 8); + uint8_t is_tl : 1; + uint8_t in_runq : 1; /* only relevant if ec->is_tl is true */ + VALUE private_const_reference; /* for GC */ @@ -964,6 +973,18 @@ typedef struct rb_thread_struct { rb_fiber_t *root_fiber; rb_jmpbuf_t root_jmpbuf; + /* + * Coros waiting for something, needed to support "ensure" in + * Thread::Light. Links with rb_execution_context_t.enode + */ + struct list_head coros; + + /* ready-to-run coros, <=> rb_sched_waiter.wnode, protected by GVL */ + struct list_head runq; + + /* same as runq, but protected by interrupt_lock instead of GVL */ + struct list_head nogvl_runq; + /* misc */ VALUE name; @@ -1731,6 +1752,13 @@ rb_current_execution_context(void) return ruby_current_execution_context_ptr; } +/* returns whether the given ec is a Thread::Light */ +static inline int +rb_tl_sched_p(const rb_execution_context_t *ec) +{ + return ec->is_tl; +} + static inline rb_thread_t * rb_current_thread(void) { @@ -1797,6 +1825,7 @@ void rb_threadptr_pending_interrupt_enque(rb_thread_t *th, VALUE v); void rb_ec_error_print(rb_execution_context_t * volatile ec, volatile VALUE errinfo); void rb_execution_context_mark(const rb_execution_context_t *ec); void rb_fiber_close(rb_fiber_t *fib); +void rb_tl_finish(rb_thread_t *); void Init_native_thread(rb_thread_t *th); #define RUBY_VM_CHECK_INTS(ec) rb_vm_check_ints(ec) @@ -1809,6 +1838,30 @@ rb_vm_check_ints(rb_execution_context_t *ec) } } +static inline int +rb_tl_switchable(const rb_execution_context_t *ec) +{ + const rb_thread_t *th = rb_ec_thread_ptr(ec); + + /* dangerous, don't allow switching inside trap handler: */ + if (ec->interrupt_mask & TRAP_INTERRUPT_MASK) return 0; + + /* auto-fibers can switch away to root fiber */ + if (rb_tl_sched_p(ec)) return 1; + + /* no auto-fibers, yet, but we can create and switch to them */ + if (!th->root_fiber) return 1; + + /* root fiber can switch to auto-fibers, because ensure works */ + if (th->root_fiber == ec->fiber_ptr) return 1; + + /* + * no auto-switching away from regular Fibers because they lack + * ensure support: https://bugs.ruby-lang.org/issues/595 + */ + return 0; +} + /* tracer */ struct rb_trace_arg_struct { rb_event_flag_t event; -- EW