From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.1 (2015-04-28) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS198093 171.25.193.0/24 X-Spam-Status: No, score=-2.6 required=3.0 tests=AWL,BAYES_00,RCVD_IN_XBL, SPF_FAIL,SPF_HELO_FAIL shortcircuit=no autolearn=no autolearn_force=no version=3.4.1 Received: from 80x24.org (tor-exit3-readme.dfri.se [171.25.193.235]) by dcvr.yhbt.net (Postfix) with ESMTP id 4E9661F428 for ; Sat, 1 Sep 2018 13:10:18 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [PATCH 2/2] Thread::Coro: green threads implemented using fibers Date: Sat, 1 Sep 2018 13:10:12 +0000 Message-Id: <20180901131012.22138-2-e@80x24.org> In-Reply-To: <20180901131012.22138-1-e@80x24.org> References: <20180901131012.22138-1-e@80x24.org> List-Id: Thread::Coro is a type of Fiber which gives Thread-like behavior with implementation characteristics similar to the Thread class from Ruby 1.8 (but allowing kqueue and epoll on modern OSes). This adds following scheduling points in the C-API: * rb_wait_for_single_fd * rb_thread_fd_select * rb_waitpid (more to come) Newest updates in this version: - new name: Thread::Coro (reject Thriber/Threadlet) - Fiber#transfer used for switching Thanks to funny.falcon for suggestion [ruby-core:87776], it made the code better, even :) - improve IO.select mapping in kqueue/epoll (st_table => ccan/list), since there is no need for lookup, only scan - sync to use MJIT-friendly rb_waitpid (added th->nogvl_runq since waitpid(2) can run without GVL) - "ensure" support. Only for Thread::Coro, Fiber has no ensure (see [Bug #595]). "ensure" is REQUIRED for auto-scheduling safety. I am not sure if regular Fiber can support "ensure" in a way which is both performant and backwards-compatible. With Thread::Coro being a new feature, there is no backwards compatibility to worry about, so the "ensure" support adds practically no overhead - more code sharing between iom_{select,kqueue,epoll.h} - switch to rb_hrtime_t for timeouts - extract timeout API, so non-timeout-arg users can benefit from reduced. This will make Timeout-in-VM support easier and more orthogonal to this one. main Ruby API is similar to Thread, except "Thread::Coro" "Thread.new" The following behave like their Thread counterparts: Thread::Coro.new - Thread.new (TODO: parameter passing) Thread::Coro#join - run internal scheduler until target coro is done Thread::Coro#value - ditto, with return value cont.c - rb_Thread::Coro_sched_p checks if the current execution context is Changes to existing functions are minimal. New files (all new structs and relations should be documented): iom.h - internal API for the rest of RubyVM (more features to follow) iom_internal.h - internal header for iom_(select|epoll|kqueue).h iom_epoll.h - epoll-specific pieces iom_kqueue.h - kqueue-specific pieces iom_select.h - select-specific pieces iom_pingable_common.h - common code for iom_(epoll|kqueue).h iom_common.h - common footer for iom_(select|epoll|kqueue).h [Feature #13618] Changes to existing data structures: * rb_thread_t .runq - list of Thread::Coros to auto-resume .nogvl_runq - same as above for out-of-GVL use .waiting_coro - list of blocked Coros, for "ensure" support. For auto-scheduling, we must have ensure support because Fiber does not yet support ensure @@ -139,6 +94,7 @@ enum fiber_status { #define FIBER_SUSPENDED_P(fib) ((fib)->status == FIBER_SUSPENDED) #define FIBER_TERMINATED_P(fib) ((fib)->status == FIBER_TERMINATED) #define FIBER_RUNNABLE_P(fib) (FIBER_CREATED_P(fib) || FIBER_SUSPENDED_P(fib)) +#define FIBER_ENSURE -2 #if FIBER_USE_NATIVE && !defined(_WIN32) static inline int @@ -180,6 +136,7 @@ struct rb_fiber_struct { * You shouldn't mix "transfer" and "resume". */ unsigned int transferred : 1; + unsigned int is_coro : 1; #if FIBER_USE_NATIVE #ifdef _WIN32 @@ -301,6 +258,8 @@ fiber_ptr(VALUE obj) return fib; } +#define CORO_ERR(msg) rb_raise(rb_eThreadError,msg) + NOINLINE(static VALUE cont_capture(volatile int *volatile stat)); #define THREAD_MUST_BE_RUNNING(th) do { \ @@ -1161,6 +1120,7 @@ static VALUE make_passing_arg(int argc, const VALUE *argv) { switch (argc) { + case FIBER_ENSURE: case 0: return Qnil; case 1: @@ -1460,12 +1420,35 @@ rb_fiber_new(VALUE (*func)(ANYARGS), VALUE obj) static void rb_fiber_terminate(rb_fiber_t *fib, int need_interrupt); +static VALUE +fiber_start_i(VALUE tag, VALUE p) +{ + rb_context_t *cont = (rb_context_t *)p; + int argc = cont->argc; + const VALUE *argv, args = cont->value; + rb_execution_context_t *ec = &cont->saved_ec; + rb_fiber_t *fib = ec->fiber_ptr; + rb_thread_t *th = rb_ec_thread_ptr(ec); + rb_proc_t *proc; + + GetProcPtr(fib->first_proc, proc); + argv = argc > 1 ? RARRAY_CONST_PTR(args) : &args; + cont->value = Qnil; + ec->errinfo = Qnil; + ec->root_lep = rb_vm_proc_local_ep(fib->first_proc); + ec->root_svar = Qfalse; + + EXEC_EVENT_HOOK(ec, RUBY_EVENT_FIBER_SWITCH, th->self, 0, 0, 0, Qnil); + cont->value = rb_vm_invoke_proc(ec, proc, argc, argv, VM_BLOCK_HANDLER_NONE); + + return Qfalse; +} + void rb_fiber_start(void) { rb_thread_t * volatile th = GET_THREAD(); rb_fiber_t *fib = th->ec->fiber_ptr; - rb_proc_t *proc; enum ruby_tag_type state; int need_interrupt = TRUE; @@ -1475,17 +1458,7 @@ rb_fiber_start(void) EC_PUSH_TAG(th->ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { rb_context_t *cont = &VAR_FROM_MEMORY(fib)->cont; - int argc; - const VALUE *argv, args = cont->value; - GetProcPtr(fib->first_proc, proc); - argv = (argc = cont->argc) > 1 ? RARRAY_CONST_PTR(args) : &args; - cont->value = Qnil; - th->ec->errinfo = Qnil; - th->ec->root_lep = rb_vm_proc_local_ep(fib->first_proc); - th->ec->root_svar = Qfalse; - - EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_FIBER_SWITCH, th->self, 0, 0, 0, Qnil); - cont->value = rb_vm_invoke_proc(th->ec, proc, argc, argv, VM_BLOCK_HANDLER_NONE); + rb_catch_obj(FIBER_ENSURE, fiber_start_i, (VALUE)cont); } EC_POP_TAG(); @@ -1611,6 +1584,15 @@ rb_fiber_current(void) return fiber_current()->cont.self; } +static void +fiber_check_jump(const rb_fiber_t *fib) +{ + switch (fib->cont.argc) { + case -1: rb_exc_raise(fib->cont.value); + case FIBER_ENSURE: rb_throw_obj(FIBER_ENSURE, Qnil); + } +} + static inline VALUE fiber_store(rb_fiber_t *next_fib, rb_thread_t *th) { @@ -1663,14 +1645,14 @@ fiber_store(rb_fiber_t *next_fib, rb_thread_t *th) } #endif /* not _WIN32 */ fib = th->ec->fiber_ptr; - if (fib->cont.argc == -1) rb_exc_raise(fib->cont.value); + fiber_check_jump(fib); return fib->cont.value; #else /* FIBER_USE_NATIVE */ if (ruby_setjmp(fib->cont.jmpbuf)) { /* restored */ fib = th->ec->fiber_ptr; - if (fib->cont.argc == -1) rb_exc_raise(fib->cont.value); + fiber_check_jump(fib); if (next_fib->cont.value == Qundef) { cont_restore_0(&next_fib->cont, &next_fib->cont.value); VM_UNREACHABLE(fiber_store); @@ -1800,6 +1782,63 @@ rb_fiber_terminate(rb_fiber_t *fib, int need_interrupt) fiber_switch(ret_fib, 1, &value, 0); } +/* thread_{pthread,win32}.c */ +void rb_native_mutex_lock(rb_nativethread_lock_t *); +void rb_native_mutex_unlock(rb_nativethread_lock_t *); + +/* + * SIGCHLD-based rb_waitpid may enqueue events for this thread + * outside of GVL. Merge them into the GVL-protected th->runq here: + */ +void +rb_coro_merge_runq(rb_thread_t *th) +{ + /* + * no need to allocate, this function is called by rb_iom_mark_runq + * when GC marking runs. + */ + rb_fiber_t *fib = th->ec ? th->ec->fiber_ptr : 0; + + if (fib && !FIBER_TERMINATED_P(fib)) { + rb_native_mutex_lock(&th->interrupt_lock); + list_append_list(&th->runq, &th->nogvl_runq); + rb_native_mutex_unlock(&th->interrupt_lock); + } +} + +/* + * Runs any ready Thread::Coro in th->runq + * returns TRUE if @ec is a Thread::Coro and we are done scheduling + * ourselves. + */ +int +rb_coro_schedule_ready_p(rb_execution_context_t *ec) +{ + rb_thread_t *th = rb_ec_thread_ptr(ec); + struct rb_sched_waiter *sw; + + rb_coro_merge_runq(th); + + sw = list_pop(&th->runq, struct rb_sched_waiter, wnode); + if (sw) { + list_node_init(&sw->wnode); + + /* we are done if we switch; or see ourselves in the runq: */ + + if (ec != sw->ec) + fiber_switch(sw->ec->fiber_ptr, 0, 0, 0); + + return TRUE; + } + + if (th->root_fiber == ec->fiber_ptr) + return FALSE; /* we are the root fiber and did nothing */ + + /* Nothing to run, let root_fiber do other stuff... */ + fiber_switch(th->root_fiber, 0, 0, 0); + return TRUE; +} + VALUE rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv) { @@ -1991,6 +2030,112 @@ rb_fiber_atfork(rb_thread_t *th) } #endif +void +rb_coro_finish_i(rb_execution_context_t *ec, rb_fiber_t *fib) +{ + VM_ASSERT(fib->is_coro); + VM_ASSERT(FIBER_SUSPENDED_P(fib)); + + fib->prev = ec->fiber_ptr; + EC_PUSH_TAG(ec); + if (EC_EXEC_TAG() == TAG_NONE) + fiber_switch(fib, FIBER_ENSURE, 0, 0); + EC_POP_TAG(); +} + +/* Returns true if Thread::Coro is enabled for current EC */ +int +rb_coro_sched_p(const rb_execution_context_t *ec) +{ + const rb_fiber_t *cur = ec->fiber_ptr; + + return (cur && cur->is_coro); +} + +static VALUE +coro_start(VALUE self, int argc, const VALUE *argv) +{ + rb_fiber_t *fib = fiber_ptr(self); + + fib->is_coro = 1; + fib->transferred = 1; + fiber_switch(fib, argc, argv, 0); + + return self; +} + +/* + * start a Thread::Coro (similar to Thread.new) + */ +static VALUE +coro_s_new(int argc, VALUE *argv, VALUE klass) +{ + VALUE self = fiber_alloc(klass); + + rb_obj_call_init(self, argc, argv); + + return coro_start(self, argc, argv); +} + +/* + * start a Thread::Coro, but do not call subclass initializer + * (similar to Thread.start) + */ +static VALUE +coro_s_start(int argc, VALUE *argv, VALUE klass) +{ + return coro_start(rb_fiber_init(fiber_alloc(klass)), argc, argv); +} + +static void +do_coro_join(rb_fiber_t *fib, rb_hrtime_t *rel) +{ + rb_thread_t *th = GET_THREAD(); + rb_fiber_t *cur = fiber_current(); + + if (cur == fib) + CORO_ERR("Target Thread::Coro must not be current fiber"); + if (th->root_fiber == fib) + CORO_ERR("Target Thread::Coro must not be root fiber"); + if (cont_thread_value(&fib->cont) != th->self) + CORO_ERR("Target Thread::Coro not owned by current thread"); + if (!fib->is_coro) + CORO_ERR("Target is not a Thread::Coro"); + + if (rel) { + const rb_hrtime_t end = rb_hrtime_add(rb_hrtime_now(), *rel); + + while (fib->status != FIBER_TERMINATED) { + rb_iom_schedule(th, rel); + if (rb_hrtime_update_expire(rel, end)) + return; + } + } + else { + while (fib->status != FIBER_TERMINATED) + rb_iom_schedule(th, 0); + } +} + +static VALUE +coro_join(int argc, VALUE *argv, VALUE self) +{ + rb_fiber_t *fib = fiber_ptr(self); + rb_hrtime_t rel; + + do_coro_join(fib, rb_join_interval(&rel, argc, argv)); + return fib->status == FIBER_TERMINATED ? fib->cont.self : Qnil; +} + +static VALUE +coro_value(VALUE self) +{ + rb_fiber_t *fib = fiber_ptr(self); + + do_coro_join(fib, 0); + return fib->cont.value; +} + /* * Document-class: FiberError * @@ -2007,6 +2152,8 @@ rb_fiber_atfork(rb_thread_t *th) void Init_Cont(void) { + VALUE rb_cThreadCoro; + #if FIBER_USE_NATIVE rb_thread_t *th = GET_THREAD(); @@ -2028,6 +2175,13 @@ Init_Cont(void) rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1); rb_define_method(rb_cFiber, "to_s", fiber_to_s, 0); rb_define_alias(rb_cFiber, "inspect", "to_s"); + + rb_cThreadCoro = rb_define_class_under(rb_cThread, "Coro", rb_cFiber); + rb_define_singleton_method(rb_cThreadCoro, "new", coro_s_new, -1); + rb_define_singleton_method(rb_cThreadCoro, "start", coro_s_start, -1); + rb_define_singleton_method(rb_cThreadCoro, "fork", coro_s_start, -1); + rb_define_method(rb_cThreadCoro, "join", coro_join, -1); + rb_define_method(rb_cThreadCoro, "value", coro_value, 0); } RUBY_SYMBOL_EXPORT_BEGIN diff --git a/eval.c b/eval.c index 35255e0975..00b190823e 100644 --- a/eval.c +++ b/eval.c @@ -199,6 +199,7 @@ ruby_cleanup(volatile int ex) errs[0] = th->ec->errinfo; SAVE_ROOT_JMPBUF(th, rb_thread_terminate_all()); + rb_coro_finish(th); } else { switch (step) { diff --git a/fiber.h b/fiber.h new file mode 100644 index 0000000000..3f0f5d5500 --- /dev/null +++ b/fiber.h @@ -0,0 +1,54 @@ +#ifndef RUBY_FIBER_H +#define RUBY_FIBER_H + +#include "internal.h" +#include "vm_core.h" + +/* FIBER_USE_NATIVE enables Fiber performance improvement using system + * dependent method such as make/setcontext on POSIX system or + * CreateFiber() API on Windows. + * This hack make Fiber context switch faster (x2 or more). + * However, it decrease maximum number of Fiber. For example, on the + * 32bit POSIX OS, ten or twenty thousands Fiber can be created. + * + * Details is reported in the paper "A Fast Fiber Implementation for Ruby 1.9" + * in Proc. of 51th Programming Symposium, pp.21--28 (2010) (in Japanese). + */ + +#if !defined(FIBER_USE_NATIVE) +# if defined(HAVE_GETCONTEXT) && defined(HAVE_SETCONTEXT) +# if 0 +# elif defined(__NetBSD__) +/* On our experience, NetBSD doesn't support using setcontext() and pthread + * simultaneously. This is because pthread_self(), TLS and other information + * are represented by stack pointer (higher bits of stack pointer). + * TODO: check such constraint on configure. + */ +# define FIBER_USE_NATIVE 0 +# elif defined(__sun) +/* On Solaris because resuming any Fiber caused SEGV, for some reason. + */ +# define FIBER_USE_NATIVE 0 +# elif defined(__ia64) +/* At least, Linux/ia64's getcontext(3) doesn't save register window. + */ +# define FIBER_USE_NATIVE 0 +# elif defined(__GNU__) +/* GNU/Hurd doesn't fully support getcontext, setcontext, makecontext + * and swapcontext functions. Disabling their usage till support is + * implemented. More info at + * http://darnassus.sceen.net/~hurd-web/open_issues/glibc/#getcontext + */ +# define FIBER_USE_NATIVE 0 +# else +# define FIBER_USE_NATIVE 1 +# endif +# elif defined(_WIN32) +# define FIBER_USE_NATIVE 1 +# endif +#endif +#if !defined(FIBER_USE_NATIVE) +#define FIBER_USE_NATIVE 0 +#endif + +#endif /* RUBY_FIBER_H */ diff --git a/hrtime.h b/hrtime.h index f133bdb1ac..66604a6288 100644 --- a/hrtime.h +++ b/hrtime.h @@ -50,6 +50,7 @@ typedef uint64_t rb_hrtime_t; /* thread.c */ /* returns the value of the monotonic clock (if available) */ rb_hrtime_t rb_hrtime_now(void); +rb_hrtime_t *rb_join_interval(rb_hrtime_t *, int argc, VALUE *argv); /* * multiply @a and @b with overflow check and return the @@ -165,4 +166,11 @@ rb_hrtime2timeval(struct timeval *tv, const rb_hrtime_t *hrt) } return 0; } + +/* + * @end is the absolute time when @timeout is set to expire + * Returns true if @end has past + * Updates @timeout and returns false otherwise + */ +int rb_hrtime_update_expire(rb_hrtime_t *timeout, const rb_hrtime_t end); #endif /* RB_HRTIME_H */ diff --git a/include/ruby/io.h b/include/ruby/io.h index 6cacd8a710..6ee7f7e966 100644 --- a/include/ruby/io.h +++ b/include/ruby/io.h @@ -121,6 +121,8 @@ typedef struct rb_io_t { /* #define FMODE_UNIX 0x00200000 */ /* #define FMODE_INET 0x00400000 */ /* #define FMODE_INET6 0x00800000 */ +/* #define FMODE_IOM_PRIVATE1 0x01000000 */ /* OS-dependent */ +/* #define FMODE_IOM_PRIVATE2 0x02000000 */ /* OS-dependent */ #define GetOpenFile(obj,fp) rb_io_check_closed((fp) = RFILE(rb_io_taint_check(obj))->fptr) diff --git a/iom.h b/iom.h new file mode 100644 index 0000000000..4d5bc13471 --- /dev/null +++ b/iom.h @@ -0,0 +1,83 @@ +/* + * iom -> I/O Manager for RubyVM (Thread::Coro-aware) + * + * On platforms with epoll or kqueue, this should be ready for multicore; + * even if the rest of the RubyVM is not. + * + * Some inspiration taken from Mio in GHC: + * http://haskell.cs.yale.edu/wp-content/uploads/2013/08/hask035-voellmy.pdf + */ +#ifndef RUBY_IOM_H +#define RUBY_IOM_H +#include "ruby.h" +#include "ruby/io.h" +#include "ruby/intern.h" +#include "vm_core.h" +#include "hrtime.h" + +typedef struct rb_iom_struct rb_iom_t; + +/* WARNING: unstable API, only for Ruby internal use */ + +/* + * Relinquish calling fiber while waiting for +events+ on the given + * +rb_io_t+ + * + * Multiple native threads can enter this function at the same time. + * + * Events are RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI + * + * Returns a mask of events. + */ +int rb_iom_waitio(rb_thread_t *, rb_io_t *, int, const rb_hrtime_t *); + +/* + * Identical to rb_iom_waitio, but takes a pointer to an integer file + * descriptor, instead of rb_io_t. Use rb_iom_waitio when possible, + * since it allows us to optimize epoll (and perhaps avoid kqueue + * portability bugs across different *BSDs). + */ +int rb_iom_waitfd(rb_thread_t *, int *fdp, int, const rb_hrtime_t *); + +int rb_iom_select(rb_thread_t *, int maxfd, rb_fdset_t *r, + rb_fdset_t *w, rb_fdset_t *e, const rb_hrtime_t *); + +/* + * Relinquish calling fiber for at least the duration of given timeout + * in seconds. If timeout is negative, wait forever (until explicitly + * resumed). + * Multiple native threads can enter this function at the same time. + */ +void rb_iom_sleep(rb_thread_t *, const rb_hrtime_t *); + +/* + * there is no public create function, creation is lazy to avoid incurring + * overhead for small scripts which do not need fibers, we only need this + * at VM destruction + */ +void rb_iom_destroy(rb_vm_t *); + +/* runs the scheduler */ +void rb_iom_schedule(rb_thread_t *, const rb_hrtime_t *); + +/* + * Wake up a rb_sched_waiter; may be called outside of GVL. + * @enqueue_coro enqueues the @sw to be run by its owner thread + * into rb_thread_t.nogvl_runq + * @enqueue_coro TRUE may only be used once per rb_sched_waiter + */ +void ruby_iom_wake_signal(struct rb_sched_waiter *, int enqueue_coro); + +/* cont.c */ +/* + * Resume all "ready" fibers belonging to a given thread. + * Returns TRUE when we are done, FALSE otherwise. + */ +int rb_coro_schedule_ready_p(rb_execution_context_t *); + +/* returns whether the given ec is a Thread::Coro */ +int rb_coro_sched_p(const rb_execution_context_t *); + +void rb_iom_mark_thread(rb_thread_t *); +void rb_iom_mark(rb_iom_t *); +#endif /* RUBY_IOM_H */ diff --git a/iom_common.h b/iom_common.h new file mode 100644 index 0000000000..fa34d25c17 --- /dev/null +++ b/iom_common.h @@ -0,0 +1,129 @@ +/* included by iom_(epoll|select|kqueue).h */ + +/* we lazily create this, small scripts may never need iom */ +static rb_iom_t * +iom_new(void) +{ + rb_iom_t *iom = ALLOC(rb_iom_t); + iom_init(iom); + + return iom; +} + +static rb_iom_t * +iom_get(rb_thread_t *th) +{ + rb_vm_t *vm = th->vm; + + if (!vm->iom) + vm->iom = iom_new(); + + return vm->iom; +} + +/* + * check for expired timers, must be called with GVL. Returns the + * number of expired entries. + */ +static void +iom_timer_check(const rb_thread_t *th) +{ + rb_iom_t *iom = th->vm->iom; + rb_hrtime_t now; + struct rb_sched_waiter *sw = 0, *next; + + if (!iom || list_empty(&iom->c.timers)) return; + + now = rb_hrtime_now(); + list_for_each_safe(&iom->c.timers, sw, next, wnode) { + struct iom_timer *t = container_of(sw, struct iom_timer, sw); + + if (now >= t->expire_at) { + rb_execution_context_t *ec = sw->ec; + rb_thread_t *owner = rb_ec_thread_ptr(ec); + + if (rb_coro_sched_p(ec)) { + list_del(&sw->wnode); + list_add_tail(&owner->runq, &sw->wnode); + } + else { + list_del_init(&sw->wnode); + rb_threadptr_interrupt(owner); + } + } + else { /* sorted by expire_at, so we can break out early */ + return; + } + } +} + +/* + * insert a new +timer+ into +timers+, maintain sort order by expire_at + */ +static void +iom_timer_add(rb_thread_t *th, struct iom_timer *add, const rb_hrtime_t *rel) +{ + struct rb_sched_waiter *sw = 0; + rb_iom_t *iom = iom_get(th); + + add->sw.ec = th->ec; + add->expire_at = rb_hrtime_add(rb_hrtime_now(), *rel); + /* + * search backwards: assume typical projects have multiple objects + * sharing the same timeout values, so new timers will expire later + * than existing timers + */ + list_for_each_rev(&iom->c.timers, sw, wnode) { + struct iom_timer *i = container_of(sw, struct iom_timer, sw); + + if (add->expire_at > i->expire_at) { + list_add_after(&iom->c.timers, &i->sw.wnode, &add->sw.wnode); + return; + } + } + list_add(&iom->c.timers, &add->sw.wnode); +} + +/* + * Register an iom_waiter, allows "ensure" to run. Each call must + * be paired with iom_waiter_ensure, and may not nest. + * + * This is NOT allowed: + * + * iom_waiter_add + * iom_waiter_add + * iom_waiter_ensure + * iom_waiter_ensure + * + * This is OK: + * + * iom_waiter_add + * iom_waiter_ensure + * iom_waiter_add + * iom_waiter_ensure + * ... + * + */ +static void +iom_waiter_add(rb_thread_t *th, struct list_head *h, struct iom_waiter *w) +{ + w->sw.ec = th->ec; + /* use FIFO order for fairness */ + list_add_tail(h, &w->sw.wnode); + list_add_tail(&th->waiting_coro, &w->sw.ec->enode); +} + +/* max == -1 : wake all */ +static void +iom_blockers_notify(rb_iom_t *iom, int max) +{ + struct iom_blocker *b = 0, *next; + + list_for_each_safe(&iom->c.blockers, b, next, bnode) { + list_del_init(&b->bnode); + rb_threadptr_interrupt(b->th); + + if (--max == 0) + break; + } +} diff --git a/iom_epoll.h b/iom_epoll.h new file mode 100644 index 0000000000..33d446a121 --- /dev/null +++ b/iom_epoll.h @@ -0,0 +1,655 @@ +/* + * Linux-only epoll-based implementation of I/O Manager for RubyVM + * + * Notes: + * + * TODO: epoll_wait only has millisecond resolution; if we need higher + * resolution we can use timerfd or ppoll on the epoll_fd itself. + * + * Inside the Linux kernel, select/poll/ppoll/epoll_wait all use the + * same notification callback (struct file_operations)->poll. + * Unlike with kqueue across different *BSDs; we do not need to worry + * about inconsistencies between these syscalls. + * + * See also notes in iom_kqueue.h + */ +#include "iom_internal.h" +#include +#define FMODE_IOM_ADDED FMODE_IOM_PRIVATE1 +/* TODO: support EPOLLEXCLUSIVE */ +#define IOM_EVENT_SIZE sizeof(struct epoll_event) + +/* allocated on heap (rb_vm_t.iom) */ +struct rb_iom_struct { + struct iom_core c; + struct iom_fdmap fdmap; /* maps each FD to multiple epw */ + struct iom_multiplexer mp; +}; + +struct epw_set; + +union epfdn_as { + struct list_node fdnode; + struct { + struct iom_fd *fdh; + } pre_ctl; +}; + +struct epfdn { + union epfdn_as as; + union { + int *flags; /* &fptr->mode */ + struct epw_set *set; + } owner; +}; + +/* + * Not using iom_fd_waiter here, since we never need to reread the + * FD on this implementation. + */ +struct epw { + struct epfdn fdn; + struct iom_waiter w; + int fd; /* no need for "int *", here, we never reread */ + short events; /* requested events, like poll(2) */ + short revents; /* returned events, like poll(2) */ +}; + +struct epw_set { + struct iom_fdset_waiter fsw; + struct list_head fdset_node; /* - eset_item.fdset_node */ +}; + +/* + * per-FD in rb_thread_fd_select arg, value for epw_set.fdset_node + * Always allocated on-heap + */ +struct eset_item { + struct epfdn fdn; + struct list_node fdset_node; /* epw_set.fdset_node */ + int fd; + short events; /* requested events, like poll(2) */ + short revents; /* returned events, like poll(2) */ +}; + +static const short idx2events[] = { EPOLLIN, EPOLLOUT, EPOLLPRI }; + +/* we can avoid branches when mapping RB_WAIT_* bits to EPOLL* bits */ +STATIC_ASSERT(epbits_matches_waitfd_bits, + RB_WAITFD_IN == EPOLLIN && RB_WAITFD_OUT == EPOLLOUT && + RB_WAITFD_PRI == EPOLLPRI); + +/* what goes into epoll_ctl... */ +static int +rb_events2ep(int events) +{ + return EPOLLONESHOT | events; +} + +/* ...what comes out of epoll_wait */ +static short +rb_ep2revents(int revents) +{ + return (short)(revents & (EPOLLIN|EPOLLOUT|EPOLLPRI)); +} + +static int +hrtime2eptimeout(const rb_hrtime_t *rel) +{ + /* + * clamp timeout to workaround a Linux <= 2.6.37 bug, + * see epoll_wait(2) manpage + */ + const time_t max_sec = 35 * 60; + const int max_msec = max_sec * 1000; /* floor(35.79 minutes) */ + long msec; + + if (!rel) return -1; /* infinite */ + if (!*rel) return 0; + + /* + * round up to avoid busy waiting, the rest of the code uses + * nanosecond resolution and we risk wasting cycles looping + * on epoll_wait(..., timeout=0) calls w/o rounding up, here: + */ + msec = (*rel + RB_HRTIME_PER_MSEC/2) / (long)RB_HRTIME_PER_MSEC; + + if (msec < 0 || msec > max_msec) + return max_msec; + + return msec > max_msec ? max_msec : (int)msec; +} + +static void +register_sigwait_fd(int epfd) +{ + int sigwait_fd = signal_self_pipe.normal[0]; + + if (sigwait_fd >= 0) { + struct epoll_event ev; + + ev.data.ptr = &sigwait_th; + ev.events = EPOLLIN; /* right, NOT using oneshot here */ + + if (epoll_ctl(epfd, EPOLL_CTL_ADD, sigwait_fd, &ev) != 0) + rb_sys_fail("epoll_ctl"); + } +} + +/* lazily create epoll FD, since not everybody waits on I/O */ +static int +iom_epfd(rb_iom_t *iom) +{ + if (iom->mp.fd < 0) { +#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) + iom->mp.fd = epoll_create1(EPOLL_CLOEXEC); + if (iom->mp.fd < 0) { + int err = errno; + + if (rb_gc_for_fd(err)) { + iom->mp.fd = epoll_create1(EPOLL_CLOEXEC); + if (iom->mp.fd < 0) + rb_sys_fail("epoll_create1"); + } + else if (err != ENOSYS) { + rb_syserr_fail(err, "epoll_create1"); + } + else { /* libc >= kernel || build-env > run-env */ +#endif /* HAVE_EPOLL_CREATE1 */ + iom->mp.fd = epoll_create(1); + if (iom->mp.fd < 0) { + if (rb_gc_for_fd(errno)) + iom->mp.fd = epoll_create(1); + } + if (iom->mp.fd < 0) + rb_sys_fail("epoll_create"); + + rb_maygvl_fd_fix_cloexec(iom->mp.fd); +#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1) + } + } +#endif /* HAVE_EPOLL_CREATE1 */ + rb_update_max_fd(iom->mp.fd); + register_sigwait_fd(iom->mp.fd); + } + return iom->mp.fd; +} + +static void +iom_init(rb_iom_t *iom) +{ + iom_core_init(&iom->c); + iom_fdmap_init(&iom->fdmap); + iom_multiplexer_init(&iom->mp); +} + +/* + * We need to re-arm watchers in case somebody wants several events + * from the same FD (e.g. both EPOLLIN and EPOLLOUT), but only one + * fires. We re-arm the one that disn't fire. This is an uncommon + * case. + */ +static int +rearm_fd(rb_thread_t *th, struct iom_fd *fdh) +{ + int fd = -1; + struct epoll_event ev; + union epfdn_as *as; + + ev.events = EPOLLONESHOT; + ev.data.ptr = fdh; + list_for_each(&fdh->fdhead, as, fdnode) { + struct epfdn *fdn = container_of(as, struct epfdn, as); + + if (fdn->owner.set) { + struct eset_item *esi = container_of(fdn, struct eset_item, fdn); + fd = esi->fd; + ev.events |= rb_events2ep(esi->events); + } + else { + struct epw *epw = container_of(fdn, struct epw, fdn); + fd = epw->fd; + ev.events |= rb_events2ep(epw->events); + } + } + return epoll_ctl(th->vm->iom->mp.fd, EPOLL_CTL_MOD, fd, &ev) ? errno : 0; +} + +/* + * this MUST not raise exceptions because sigwait_fd can be acquired, + * iom_wait_done can release sigwait_fd and raise + */ +static int +check_epoll_wait(rb_thread_t *th, int nr, struct epoll_event *eventlist, + int *sigwait_fd) +{ + int i; + int err = 0; + + if (nr < 0) return errno; + + for (i = 0; i < nr; i++) { + struct iom_fd *fdh = eventlist[i].data.ptr; + short revents = rb_ep2revents(eventlist[i].events); + union epfdn_as *as, *next; + + if (check_sigwait_fd(th, eventlist[i].data.ptr, &err, sigwait_fd)) + continue; + + /* + * Typical list size is 1; only multiple fibers waiting + * on the same FD increases fdh list size + */ + again_if_fd_closed: + list_for_each_safe(&fdh->fdhead, as, next, fdnode) { + struct epfdn *fdn = container_of(as, struct epfdn, as); + struct epw_set *eset = fdn->owner.set; + + if (eset) { + struct eset_item *esi = container_of(fdn, struct eset_item, fdn); + esi->revents = esi->events & revents; + if (esi->revents) { + int ret = eset->fsw.ret++; + list_del_init(&fdn->as.fdnode); + if (!ret) { + if (revents & POLLNVAL) { + eset->fsw.ret = -1; + eset->fsw.errnum = EBADF; + } + iom_waiter_ready(&eset->fsw.w); + } + } + } + else { + struct epw *epw = container_of(fdn, struct epw, fdn); + epw->revents = epw->events & revents; + if (epw->revents) { + list_del_init(&fdn->as.fdnode); + if (revents & POLLNVAL) + epw->revents |= POLLNVAL; + iom_waiter_ready(&epw->w); + } + } + } + + if (RB_UNLIKELY(!list_empty(&fdh->fdhead))) { + int rearm_err = rearm_fd(th, fdh); + + switch (rearm_err) { + case 0: break; + case ENOENT: + case EBADF: + /* descriptor got closed, force readiness */ + revents = EPOLLIN | EPOLLOUT | EPOLLPRI | POLLNVAL; + goto again_if_fd_closed; + default: + rb_bug_errno("epoll_ctl failed to rearm", rearm_err); + } + } + } + + /* notify the waiter thread in case we enqueued fibers for them */ + if (nr > 0) + iom_blockers_notify(th->vm->iom, -1); + + return (err || RUBY_VM_INTERRUPTED_ANY(th->ec)) ? EINTR : 0; +} + +/* perform a non-blocking epoll_wait while holding GVL */ +static void +ping_events(rb_thread_t *th, struct iom_fdset_waiter *unused) +{ + rb_iom_t *iom = th->vm->iom; + int epfd = iom ? iom->mp.fd : -1; + int sigwait_fd = rb_sigwait_fd_get(th); + int err = 0; + + if (epfd >= 0) { + int nr, nevents, retries = 0; + struct epoll_event *eventlist = iom_eventlist(iom, &nevents); + + do { + nr = epoll_wait(epfd, eventlist, nevents, 0); + err = check_epoll_wait(th, nr, eventlist, &sigwait_fd); + } while (!err && nr == nevents && ++retries); + iom_grow(iom, retries); + } + else if (sigwait_fd >= 0) { + err = check_signals_nogvl(th, sigwait_fd) ? EINTR : 0; + } + iom_wait_done(th, sigwait_fd, err, "epoll_wait", 0); +} + +/* for iom_pingable_common.h */ +static void +iom_do_wait(rb_thread_t *th, rb_iom_t *iom) +{ + struct epoll_event eventlist[256 / IOM_EVENT_SIZE]; + int nevents = numberof(eventlist); + int nr = nevents; + rb_hrtime_t hrt; + int msec = hrtime2eptimeout(iom_timeout_next(&hrt, &iom->c.timers)); + + if (msec != 0) { + int epfd = iom_epfd(iom); /* may raise */ + struct iom_blocker cur; + int err; + int sigwait_fd = rb_sigwait_fd_get(th); + + VM_ASSERT(epfd >= 0); + cur.th = th; + list_add_tail(&iom->c.blockers, &cur.bnode); + nr = err = 0; + BLOCKING_REGION(th, { + if (!RUBY_VM_INTERRUPTED(th->ec)) { + nr = epoll_wait(epfd, eventlist, nevents, msec); + iom_wait_check_nogvl(th, &err, nr, sigwait_fd); + } + }, sigwait_fd >= 0 ? ubf_sigwait : ubf_select, th, TRUE); + list_del(&cur.bnode); + if (!err) + err = check_epoll_wait(th, nr, eventlist, &sigwait_fd); + iom_wait_done(th, sigwait_fd, err, "epoll_wait", 0); + } + if (nr == nevents && iom_events_pending(th->vm)) /* || msec == 0 */ + ping_events(th, 0); +} + +static void +merge_events(struct epoll_event *ev, struct iom_fd *fdh) +{ + union epfdn_as *as; + + list_for_each(&fdh->fdhead, as, fdnode) { + struct epfdn *fdn = container_of(as, struct epfdn, as); + + if (fdn->owner.set) { + struct eset_item *esi = container_of(fdn, struct eset_item, fdn); + ev->events |= rb_events2ep(esi->events); + } + else { + struct epw *epw = container_of(fdn, struct epw, fdn); + ev->events |= rb_events2ep(epw->events); + } + } +} + +static int +epoll_ctl_ready_p(rb_thread_t *th, struct epw *epw) +{ + int e; + int epfd; + struct epoll_event ev; + int *flags = epw->fdn.owner.flags; + + epw->fdn.owner.flags = 0; /* ensure it's not confused with owner.set */ + + /* we cannot raise until list_add: */ + { + struct iom_fd *fdh = epw->fdn.as.pre_ctl.fdh; + + ev.data.ptr = fdh; + ev.events = rb_events2ep(epw->events); + /* + * merge events from other threads/fibers waiting on the same + * [ descriptor (int fd), description (struct file *) ] tuplet + */ + if (!list_empty(&fdh->fdhead)) /* uncommon, I hope... */ + merge_events(&ev, fdh); + + list_add(&fdh->fdhead, &epw->fdn.as.fdnode); + } + + epfd = iom_epfd(th->vm->iom); /* may raise */ + + /* we want to track if an FD is already being watched ourselves */ + if (flags) { + if (*flags & FMODE_IOM_ADDED) { /* ideal situation */ + e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev); + } + else { + e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev); + if (e == 0) + *flags |= FMODE_IOM_ADDED; + else if (e < 0 && errno == EEXIST) { + /* + * possible EEXIST if several fptrs point to the same FD: + * f1 = Fiber.start { io1.read(1) } + * io2 = IO.for_fd(io1.fileno) + * f2 = Fiber.start { io2.read(1) } + */ + *flags |= FMODE_IOM_ADDED; + e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev); + } + } + } + else { /* don't know if added or not, fall back to add on ENOENT */ + e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev); + if (e < 0 && errno == ENOENT) + e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev); + } + if (e < 0) { + e = errno; + switch (e) { + case EPERM: /* all ready if File or Dir */ + epw->revents = epw->events; + return TRUE; + default: + epw->revents = epw->events | POLLNVAL; + return TRUE; + } + } + return FALSE; +} + +static VALUE +epw_schedule(VALUE ptr) +{ + /* we must have no posibility of raising until list_add: */ + struct epw *epw = (struct epw *)ptr; + rb_thread_t *th = rb_ec_thread_ptr(epw->w.sw.ec); + + if (epw->fd < 0) /* TODO: behave like poll(2) and sleep? */ + return (VALUE)0; + + /* may raise on OOM: */ + epw->fdn.as.pre_ctl.fdh = iom_fd_get(&th->vm->iom->fdmap, epw->fd); + + if (!epoll_ctl_ready_p(th, epw)) { + ping_events(th, 0); + if (!rb_coro_schedule_ready_p(th->ec)) + VM_UNREACHABLE(epw_schedule); + } + + return (VALUE)epw->revents; /* may be zero if timed out */ +} + +static VALUE +epw_ensure(VALUE ptr) +{ + struct epw *epw = (struct epw *)ptr; + + list_del(&epw->fdn.as.fdnode); + iom_waiter_ensure(&epw->w); + IOM_ALLOCA_END(struct epw, epw); + + return Qfalse; +} + +static inline VALUE +epw_wait(VALUE epw) +{ + return rb_ensure(epw_schedule, epw, epw_ensure, epw); +} + +static int +iom_waitfd(rb_thread_t *th, int fd, int *flags, int events, + const rb_hrtime_t *rel) +{ + rb_iom_t *iom = iom_get(th); + struct epw *epw = IOM_ALLOCA(struct epw); + + /* unlike kqueue or select, we never need to reread fd */ + epw->fd = fd; + epw->fdn.owner.flags = flags; + /* + * if we did not have GVL, revents may be set immediately + * upon epoll_ctl by another thread running epoll_wait, + * so we must initialize it before epoll_ctl: + */ + epw->revents = 0; + epw->events = (short)events; + iom_waiter_add(th, &iom->c.fdws, &epw->w); + + return iom_wait_fd_result(th, rel, epw_wait, (VALUE)epw); +} + +int +rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, + const rb_hrtime_t *rel) +{ + return iom_waitfd(th, fptr->fd, &fptr->mode, events, rel); +} + +int +rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, const rb_hrtime_t *rel) +{ + return iom_waitfd(th, *fdp, 0, events, rel); +} + +/* used by fdset_prepare */ +static void +fdset_prepare_i(struct iom_fdset_waiter *fsw, int fd, int events) +{ + struct epw_set *eset = container_of(fsw, struct epw_set, fsw); + struct epoll_event ev; + rb_thread_t *th = rb_ec_thread_ptr(fsw->w.sw.ec); + int epfd = iom_epfd(th->vm->iom); /* may raise */ + struct iom_fd *fdh = iom_fd_get(&th->vm->iom->fdmap, fd); + struct eset_item *esi = ALLOC(struct eset_item); + + ev.data.ptr = fdh; + ev.events = rb_events2ep(events); + esi->fd = fd; + esi->revents = 0; + esi->events = events; + esi->fdn.owner.set = eset; + if (!list_empty(&fdh->fdhead)) { + /* + * happens when different Fibers call rb_thread_fd_select + * or rb_wait_for_single_fd on the same FD + */ + merge_events(&ev, fdh); + } + list_add(&fdh->fdhead, &esi->fdn.as.fdnode); + list_add_tail(&eset->fdset_node, &esi->fdset_node); + + /* always try EPOLL_CTL_MOD because it is faster to reuse an epitem */ + if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) != 0) { + int e = errno; + + if (e == ENOENT) { + if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) == 0) + return; + e = errno; + } + if (e == EPERM) { /* regular file or directory, totally ready */ + esi->revents = esi->events; + fsw->ret++; + } + else { /* EBADF or anything else */ + if (fsw->ret == 0) { + fsw->errnum = e; + fsw->ret = -1; + } + } + } +} + +static VALUE +eset_schedule(VALUE ptr) +{ + struct epw_set *eset = (struct epw_set *)ptr; + + /* EPERM from regular file gives fsw.ret > 0 */ + fdset_prepare(&eset->fsw); + if (eset->fsw.ret) /* error or regular file */ + return (VALUE)eset->fsw.ret; + + return fdset_schedule(&eset->fsw); +} + +static VALUE +eset_ensure(VALUE ptr) +{ + struct epw_set *eset = (struct epw_set *)ptr; + struct eset_item *esi, *nxt; + + list_for_each_safe(&eset->fdset_node, esi, nxt, fdset_node) { + list_del(&esi->fdset_node); + list_del(&esi->fdn.as.fdnode); + fdset_save_result_i(&eset->fsw, esi->fd, esi->revents); + xfree(esi); + } + return fdset_ensure(&eset->fsw, eset, sizeof(*eset)); +} + +static inline VALUE +eset_wait(VALUE eset) +{ + return rb_ensure(eset_schedule, eset, eset_ensure, eset); +} + +int +rb_iom_select(rb_thread_t *th, int maxfd, + rb_fdset_t *r, rb_fdset_t *w, rb_fdset_t *e, + const rb_hrtime_t *rel) +{ + rb_iom_t *iom = iom_get(th); + struct epw_set *eset = IOM_ALLOCA(struct epw_set); + + list_head_init(&eset->fdset_node); + iom_fdset_waiter_init(th, &eset->fsw, maxfd, r, w, e); + iom_waiter_add(th, &iom->c.fdsets, &eset->fsw.w); + + return (int)(rel ? iom_timeout_do(th, rel, eset_wait, (VALUE)eset) + : eset_wait((VALUE)eset)); +} + +void +rb_iom_destroy(rb_vm_t *vm) +{ + rb_iom_t *iom = vm->iom; + vm->iom = 0; + if (iom) { + /* + * it's possible; but crazy to share epoll FDs across processes + * (kqueue has a unique close-on-fork behavior) + */ + if (iom->mp.fd >= 0) + close(iom->mp.fd); + + iom_fdmap_destroy(&iom->fdmap); + xfree(iom); + } +} + +/* used by thread.c::rb_thread_atfork */ +#if defined(HAVE_WORKING_FORK) +static void +rb_iom_atfork_child(rb_thread_t *th) +{ + rb_iom_destroy(th->vm); +} +#endif + +/* used by thread_pthread.c */ +static int +rb_iom_reserved_fd(int fd) +{ + rb_iom_t *iom = GET_VM()->iom; + + return iom && fd == iom->mp.fd; +} +#include "iom_pingable_common.h" +#include "iom_common.h" diff --git a/iom_internal.h b/iom_internal.h new file mode 100644 index 0000000000..e52b7c1e23 --- /dev/null +++ b/iom_internal.h @@ -0,0 +1,608 @@ +#ifndef RB_IOM_COMMON_H +#define RB_IOM_COMMON_H + +#include "internal.h" +#include "iom.h" + +/* + * FIBER_USE_NATIVE (see cont.c) allows us to read machine stacks of + * yielded (stopped) fibers. Without native fiber support, cont.c needs + * to copy fiber stacks around, so we must use slower heap allocation + * instead of stack allocation (and increase our chance of hitting + * memory leak bugs) + */ +#include "fiber.h" + +#define FMODE_IOM_PRIVATE1 0x01000000 +#define FMODE_IOM_PRIVATE2 0x02000000 + +#if defined(RUBY_ALLOCV_LIMIT) +# define IOM_ALLOCA_LIMIT RUBY_ALLOCV_LIMIT +#else +# define IOM_ALLOCA_LIMIT 1024 +#endif + +/* + * conditional alloca for FIBER_USE_NATIVE or on-heap fallback for + * FIBER_USE_NATIVE==0 + */ +#define IOM_ONSTACK_SIZE_P(S) (FIBER_USE_NATIVE && S < IOM_ALLOCA_LIMIT) +#define IOM_ONSTACK_P(T) IOM_ONSTACK_SIZE_P(sizeof(T)) +#define IOM_ALLOCA(T) IOM_ONSTACK_P(T) ? ALLOCA_N(T, 1) : ALLOC(T) +#define IOM_ALLOCA_END(T,p) \ + do { \ + if (!IOM_ONSTACK_P(T)) ruby_sized_xfree(p, sizeof(T)); \ + } while (0) + +static inline short +next_pow2(short x) +{ + x |= x >> 1; + x |= x >> 2; + x |= x >> 4; + x |= x >> 8; + + return x + 1; +} + +#if defined(POLLNVAL) +# if (POLLNVAL != RB_WAITFD_IN && POLLNVAL != RB_WAITFD_OUT && \ + POLLNVAL != RB_WAITFD_PRI) +# define IOM_NVAL POLLNVAL +# endif +#endif + +#ifndef IOM_NVAL +# define IOM_NVAL next_pow2(RB_WAITFD_IN|RB_WAITFD_OUT|RB_WAITFD_PRI) +#endif + +/* + * fdmap is a singleton. + * + * It makes zero sense to have multiple fdmaps per-process; even less so + * than multiple ioms. The file descriptor table in POSIX is per-process; + * and POSIX requires syscalls to allocate the lowest available FD. + * This is also why we use an array instead of a hash table, as there will + * be no holes for big processes. + * + * If CPU cache contention becomes a problem, we can pad (struct iom_fd) + * to 64-bytes for cache alignment. + * + * Currently we use fdmap to deal with FD aliasing with epoll + * and kqueue interfaces.. FD aliasing happens when multiple + * Coros wait on the same FD; but epoll/kqueue APIs only allow + * registering a single data pointer per FD. + * + * In the future, we may implement rb_notify_fd_close using fdmap. + */ + +/* linear growth */ +#define RB_IOM_FD_PER_HEAP 64 +/* on-heap and persistent for process lifetime keep as small as possible. */ +struct iom_fd { + struct list_head fdhead; /* -kev.(rfdnode|wfdnode), epw.fdnode */ +}; + +/* singleton (per-rb_iom_t, or per process, if we ever need > 1 iom) */ +struct iom_fdmap { + struct iom_fd **map; + unsigned int heaps; + int max_fd; +}; + +struct iom_timer { + struct rb_sched_waiter sw; + rb_hrtime_t expire_at; +}; + +/* + * use this instead of rb_sched_waiter directly for Thread::Coro, + * because we need to register rb_execution_context_t.enode for + * "ensure" + */ +struct iom_waiter { + struct rb_sched_waiter sw; +}; + +struct iom_fd_waiter { + struct iom_waiter w; +#if FIBER_USE_NATIVE + int *fdp; /* (ideally), a pointer fptr->fd to detect closed FDs */ +#else + int fd; /* need to do full copy with non-native fiber */ +#endif + short events; /* requested events, like poll(2) */ + short revents; /* returned events, like poll(2) */ +}; + +/* + * On-stack if FIBER_USE_NATIVE, for rb_thread_fd_select + * This is used to emulate select() on epoll and kqueue, and + * allows iom_select to support concurrent rb_thread_fd_select + * callers. + */ +struct iom_fdset_waiter { + struct iom_waiter w; + int max; + int ret; + int errnum; + rb_fdset_t *in[3]; + /* + * without native fibers, other threads and fibers cannot safely + * read the stacks of stopped fibers + */ +#if FIBER_USE_NATIVE == 0 && RUBYVM_IOM == IOM_SELECT + rb_fdset_t *onstack_dst[3]; +#endif +}; + +/* threads sleeping w/o GVL in select, epoll_wait or kevent; ALWAYS on stack */ +struct iom_blocker { + rb_thread_t *th; + struct list_node bnode; /* -iom->blockers */ +}; + +/* common core of rb_iom_t */ +struct iom_core { + /* + * Everything here is protected by GVL at this time, + * URCU lists (LGPL-2.1+) may be used in the future + */ + struct list_head timers; /* -iom_timer.sw.wnode, sort by expire_at */ + + /* rb_wait_for_single_fd callers */ + struct list_head fdws; /* -iom_fd_waiter.w.sw.wnode, FIFO order */ + + /* rb_thread_fd_select callers */ + struct list_head fdsets; /* -iom_fdset_waiter.w.sw.wnode */ + + /* + * threads doing select/epoll_wait/kevent without GVL + * We notify this list because they may be starving each other + */ + struct list_head blockers; /* -iom_blocker.bnode */ +}; + +/* process.c */ +int rb_vm_waitpid_pending(rb_vm_t *vm); + +static void iom_timer_check(const rb_thread_t *); +static void iom_timer_add(rb_thread_t *, struct iom_timer *, + const rb_hrtime_t *); +static void iom_waiter_add(rb_thread_t *, struct list_head *, + struct iom_waiter *); +static rb_iom_t *iom_get(rb_thread_t *); +static void iom_blockers_notify(rb_iom_t *, int max); + +#define IOM_PINGABLE_P (RUBYVM_IOM == IOM_KQUEUE || RUBYVM_IOM == IOM_EPOLL) +#if IOM_PINGABLE_P +struct iom_multiplexer { + int fd; + /* array of "struct kevent" or "struct epoll_event" */ + VALUE buf; +}; + +static int iom_nevents(const rb_iom_t *); +static void *iom_eventlist(const rb_iom_t *, int *nevents); +static void iom_grow(rb_iom_t *, int retries); +static void iom_multiplexer_init(struct iom_multiplexer *mp); +static void fdset_prepare(struct iom_fdset_waiter *); +static VALUE fdset_schedule(struct iom_fdset_waiter *); +static VALUE fdset_ensure(struct iom_fdset_waiter *, void *, size_t); +static void fdset_save_result_i(struct iom_fdset_waiter *, int, short); +static int check_sigwait_fd(rb_thread_t *, const void *uptr, + int *err, int *sigwait_fd); +static int iom_events_pending(rb_vm_t *); + +/* TODO: may use this for rb_notify_fd_close */ +static struct iom_fd * +iom_fdhead_aref(struct iom_fdmap *fdmap, int fd) +{ + VM_ASSERT(fd >= 0); + return &fdmap->map[fd / RB_IOM_FD_PER_HEAP][fd % RB_IOM_FD_PER_HEAP]; +} + +static struct iom_fd * +iom_fd_get(struct iom_fdmap *fdmap, int fd) +{ + while (fd >= fdmap->max_fd) { + struct iom_fd *base, *h; + unsigned n = fdmap->heaps + 1; + unsigned i; + + fdmap->map = xrealloc2(fdmap->map, n, sizeof(struct iom_fd *)); + base = h = ALLOC_N(struct iom_fd, RB_IOM_FD_PER_HEAP); + for (i = 0; i < RB_IOM_FD_PER_HEAP; i++) { + list_head_init(&h->fdhead); + h++; + } + fdmap->map[fdmap->heaps++] = base; + fdmap->max_fd += RB_IOM_FD_PER_HEAP; + } + return iom_fdhead_aref(fdmap, fd); +} + +static void +iom_fdmap_init(struct iom_fdmap *fdmap) +{ + fdmap->max_fd = 0; + fdmap->heaps = 0; + fdmap->map = 0; +} + +static void +iom_fdmap_destroy(struct iom_fdmap *fdmap) +{ + unsigned n; + + for (n = 0; n < fdmap->heaps; n++) { + xfree(fdmap->map[n]); + } + xfree(fdmap->map); + iom_fdmap_init(fdmap); +} +#endif /* IOM_PINGABLE_P */ + +/* + * returns the rb_hrtime_t interval to wait for the next pending event + * returns NULL if we may wait indefinitely + */ +static rb_hrtime_t * +iom_timeout_next(rb_hrtime_t *rel, struct list_head *timers) +{ + struct rb_sched_waiter *sw; + + sw = list_top(timers, struct rb_sched_waiter, wnode); + if (sw) { + struct iom_timer *t = container_of(sw, struct iom_timer, sw); + + if (rb_hrtime_update_expire(rel, t->expire_at)) { + /* force an immediate return from epoll_wait/kevent if expired */ + *rel = 0; + } + return rel; + } + else { + return 0; + } +} + +static void +iom_timer_ensure(struct iom_timer *t) +{ + list_del(&t->sw.wnode); +} + +static VALUE +iom_timeout_ensure(VALUE ptr) +{ + struct iom_timer *t = (struct iom_timer *)ptr; + + iom_timer_ensure(t); + IOM_ALLOCA_END(struct iom_timer, t); + + return Qfalse; +} + +static VALUE +iom_timeout_do(rb_thread_t *th, const rb_hrtime_t *rel, + VALUE (*fn)(ANYARGS), VALUE arg) +{ + struct iom_timer *t = IOM_ALLOCA(struct iom_timer); + + iom_timer_add(th, t, rel); + return rb_ensure(fn, arg, iom_timeout_ensure, (VALUE)t); +} + +/* + * marks an an iom waiter as "ready" by enqueueing into the owner + * thread's runq. Must be called with GVL. + * Use ruby_iom_wake_signal if outside of GVL. + */ +static void +iom_waiter_ready(struct iom_waiter *w) +{ + rb_execution_context_t *ec = w->sw.ec; + rb_thread_t *owner = rb_ec_thread_ptr(ec); + + VM_ASSERT(rb_coro_sched_p(ec)); + list_del(&w->sw.wnode); + list_add_tail(&owner->runq, &w->sw.wnode); +} + +/* callable without GVL */ +void +ruby_iom_wake_signal(struct rb_sched_waiter *sw, int enqueue_coro) +{ + rb_thread_t *th = rb_ec_thread_ptr(sw->ec); + struct list_head *runq = 0; + + if (enqueue_coro && rb_coro_sched_p(sw->ec)) + runq = ruby_thread_has_gvl_p() ? &th->runq : &th->nogvl_runq; + + rb_native_mutex_lock(&th->interrupt_lock); + + if (runq) + list_add_tail(runq, &sw->wnode); + + /* + * th->ec != sw->ec is possible. We want to interrupt the + * currently running ec on th. Hit both since cont.c::ec_switch + * is not protected by th->interrupt_lock + */ + RUBY_VM_SET_INTERRUPT(sw->ec); + RUBY_VM_SET_INTERRUPT(th->ec); + + if (th->unblock.func) + (th->unblock.func)(th->unblock.arg); + + rb_native_mutex_unlock(&th->interrupt_lock); +} + +/* + * called by rb_ensure at the end, so we may use list_del instead of + * idempotent list_del_init + */ +static void +iom_waiter_ensure(struct iom_waiter *w) +{ + list_del(&w->sw.wnode); + list_del(&w->sw.ec->enode); +} + +static void +iom_fdset_waiter_init(rb_thread_t *th, struct iom_fdset_waiter *fsw, + int maxfd, rb_fdset_t *r, rb_fdset_t *w, rb_fdset_t *e) +{ + size_t i; + fsw->ret = 0; + fsw->max = maxfd; + fsw->errnum = 0; + VM_ASSERT(fsw->max > 0); + fsw->in[0] = r; + fsw->in[1] = w; + fsw->in[2] = e; + + for (i = 0; i < 3; i++) { + if (fsw->in[i]) + rb_fd_resize(fsw->max - 1, fsw->in[i]); + } +#if FIBER_USE_NATIVE == 0 && RUBYVM_IOM == IOM_SELECT + for (i = 0; i < 3; i++) { + if (fsw->in[i]) { + fsw->onstack_dst[i] = fsw->in[i]; + fsw->in[i] = ALLOC(rb_fdset_t); + rb_fd_init_copy(fsw->in[i], fsw->onstack_dst[i]); + } + else { + fsw->onstack_dst[i] = 0; + } + } +#endif /* !FIBER_USE_NATIVE && IOM_SELECT */ +} + +static inline int +iom_fdw_get_fd(struct iom_fd_waiter *fdw) +{ +#if FIBER_USE_NATIVE + return *fdw->fdp; /* able to detect closed FD in rb_io_t.fd */ +#else + return fdw->fd; +#endif +} + +static inline void +iom_fdw_init(struct iom_fd_waiter *fdw, int *fdp, int events) +{ +#if FIBER_USE_NATIVE + fdw->fdp = fdp; +#else + fdw->fd = *fdp; +#endif + fdw->events = (short)events; + fdw->revents = 0; +} + +static void +iom_core_init(struct iom_core *c) +{ + list_head_init(&c->timers); + list_head_init(&c->fdws); + list_head_init(&c->fdsets); + list_head_init(&c->blockers); +} + +static inline struct iom_fd_waiter * +fd_waiter_of(struct rb_sched_waiter *sw) +{ + struct iom_waiter *w = container_of(sw, struct iom_waiter, sw); + + return container_of(w, struct iom_fd_waiter, w); +} + +static inline struct iom_fdset_waiter * +fdset_waiter_of(struct rb_sched_waiter *sw) +{ + struct iom_waiter *w = container_of(sw, struct iom_waiter, sw); + + return container_of(w, struct iom_fdset_waiter, w); +} + +/* Use after calling select/epoll_wait/kevent to wait for events */ +static void +iom_wait_done(rb_thread_t *th, int sigwait_fd, int err, const char *msg, + struct iom_fdset_waiter *fsw) +{ + if (sigwait_fd >= 0) { + rb_sigwait_fd_put(th, sigwait_fd); + rb_sigwait_fd_migrate(th->vm); + } + iom_timer_check(th); + switch (err) { + case 0: return; + case EINTR: RUBY_VM_CHECK_INTS_BLOCKING(th->ec); return; + default: + if (fsw && !fsw->ret) { + fsw->ret = -1; + fsw->errnum = err; + } + if (err != EBADF) + rb_syserr_fail(err, msg); + } +} + +/* + * use for checking the result of epoll_wait, kevent, ppoll, or select + * and similar functions when used without GVL (and non-zero timeout). + */ +static void +iom_wait_check_nogvl(rb_thread_t *th, int *err, int nr, int sigwait_fd) +{ + /* kevent, epoll_wait, select all return -1 and set errno on error */ + if (nr < 0) + *err = errno; + + if (sigwait_fd >= 0 && (nr == 0 || *err == EINTR)) { + if (check_signals_nogvl(th, sigwait_fd)) + *err = EINTR; + } +} + +void rb_fiber_mark_self(const rb_fiber_t *fib); /* cont.c */ +void rb_coro_merge_runq(rb_thread_t *); /* cont.c */ +void rb_coro_finish_i(rb_execution_context_t *, rb_fiber_t *); + +static void +iom_mark_sched_waiters(const struct list_head *h) +{ + const struct rb_sched_waiter *sw = 0; + + list_for_each(h, sw, wnode) + rb_fiber_mark_self(sw->ec->fiber_ptr); +} + +void +rb_iom_mark_thread(rb_thread_t *th) +{ + rb_execution_context_t *ec = 0; + + list_for_each(&th->waiting_coro, ec, enode) + rb_fiber_mark_self(ec->fiber_ptr); + + rb_coro_merge_runq(th); + iom_mark_sched_waiters(&th->runq); +} + +void +rb_coro_finish(rb_thread_t *th) +{ + rb_execution_context_t *ec; + struct rb_sched_waiter *sw; + size_t nr; + + /* thread never used fibers, so it doesn't have coros */ + if (!th->ec->fiber_ptr) + return; + + /* deal with anything in the run queues: */ + do { + nr = 0; + if (0) { + rb_coro_merge_runq(th); + while ((sw = list_pop(&th->runq, struct rb_sched_waiter, wnode))) { + list_node_init(&sw->wnode); + nr++; + rb_coro_finish_i(th->ec, sw->ec->fiber_ptr); + } + } + + while ((ec = list_pop(&th->waiting_coro, rb_execution_context_t, enode))) { + list_node_init(&ec->enode); /* for list_del in iom_waiter_ensure */ + + nr++; + rb_coro_finish_i(th->ec, ec->fiber_ptr); + } + } while (nr); +} + +static inline int +iom_wait_fd_result(rb_thread_t *th, const rb_hrtime_t *rel, + VALUE (*fn)(ANYARGS), VALUE arg) +{ + int revents = (int)(rel ? iom_timeout_do(th, rel, fn, arg) : fn(arg)); + + if (revents & IOM_NVAL) { + errno = EBADF; + return -1; + } + return revents; +} + +enum fd_pollable { + FD_POLL_OK, /* sockets or pipes */ + FD_POLL_BUSY, FD_POLL_BAD }; + +#ifdef _WIN32 +# define RB_W32_IS_SOCKET(fd) rb_w32_is_socket(fd) +#else +# define RB_W32_IS_SOCKET(fd) (0) +#endif + +static inline enum fd_pollable +iom_fd_pollable(int fd) +{ + struct stat st; + + if (RB_W32_IS_SOCKET(fd)) + return FD_POLL_OK; + + if (fstat(fd, &st) == 0) { + switch (st.st_mode & S_IFMT) { +#ifdef S_IFSOCK + case S_IFSOCK: /* sockets ought to be OK for all platforms */ +#elif defined(_S_IFSOCK) + case _S_IFSOCK: +#endif + case S_IFIFO: /* I hope FIFOs and pipes are pollable... */ + case S_IFCHR: /* maybe? */ + return FD_POLL_OK; + default: + /* assume select() is non-sensical with others */ + return FD_POLL_BUSY; + } + } + return FD_POLL_BAD; +} + +static inline int +fdset_error_p(struct iom_fdset_waiter *fsw) +{ + int i, fd; + + for (fd = 0; fd < fsw->max; fd++) { + for (i = 0; i < 3; i++) { + const rb_fdset_t *in = fsw->in[i]; + + if (in && rb_fd_isset(fd, in)) { + switch (iom_fd_pollable(fd)) { + case FD_POLL_BUSY: + fsw->ret++; + break; + case FD_POLL_OK: + break; + case FD_POLL_BAD: + if (!fsw->ret) { + fsw->ret = -1; + fsw->errnum = EBADF; + return TRUE; + } + } + + /* only check each FD once */ + break; + } + } + } + return !!fsw->ret; +} +#endif /* IOM_INTERNAL_H */ diff --git a/iom_kqueue.h b/iom_kqueue.h new file mode 100644 index 0000000000..7b4c691d8a --- /dev/null +++ b/iom_kqueue.h @@ -0,0 +1,801 @@ +/* + * kqueue-based implementation of I/O Manager for RubyVM on *BSD + * + * The kevent API has an advantage over epoll_ctl+epoll_wait since + * it can simultaneously add filters and check for events with one + * syscall. It also allows EV_ADD to be used idempotently for + * enabling filters, where as epoll_ctl requires separate ADD and + * MOD operations. + * + * These are advantages in the common case... + * + * The epoll API has advantages in more esoteric cases: + * + * epoll has the advantage over kqueue when watching for multiple + * events (POLLIN|POLLOUT|POLLPRI) (which is rare). We also have + * to install two kevent filters to watch POLLIN|POLLOUT simutaneously. + * See udata_set/udata_get functions below for more on this. + * + * Finally, kevent does not support POLLPRI directly, we need to use + * select() (or perhaps poll() on some platforms) with a zero + * timeout to check for POLLPRI after EVFILT_READ returns. + * + * Finally, several *BSDs implement kqueue; and the quality of each + * implementation may vary. Anecdotally, *BSDs are not known to even + * support poll() consistently across different types of files. + * We will need to selective and careful about injecting them into + * kevent(). + */ +#include "iom_internal.h" + +/* LIST_HEAD (via ccan/list) conflicts with sys/queue.h (via sys/event.h) */ +#undef LIST_HEAD +#include +#include +#include + +/* needed for iom_pingable_common.h */ +#define IOM_EVENT_SIZE (sizeof(struct kevent)) + +/* We need to use EVFILT_READ to watch RB_WAITFD_PRI */ +#define WAITFD_READ (RB_WAITFD_IN|RB_WAITFD_PRI) + +/* allocated on heap (rb_vm_t.iom) */ +struct rb_iom_struct { + struct iom_core c; + struct iom_fdmap rfdmap; /* holds fdh for EVFILT_READ */ + struct iom_fdmap wfdmap; /* holds fdh for EVFILT_WRITE */ + struct iom_multiplexer mp; +}; + +struct kqw_set; +struct kqfdn { + /* + * both rfdnode and wfdnode are overloaded for deleting paired + * filters when watching both EVFILT_READ and EVFILT_WRITE on + * a single FD + */ + struct list_node rfdnode; /* -(ev.udata==fdh)->fdhead (EVFILT_READ) */ + struct list_node wfdnode; /* -(ev.udata==fdh)->fdhead (EVFILT_WRITE) */ + struct kqw_set *owner; +}; + +/* allocated on stack */ +struct kev { + struct kqfdn fdn; + + /* fdw.w.sw.wnode is overloaded for checking RB_WAITFD_PRI + * (see check_pri) */ + struct iom_fd_waiter fdw; +}; + +/* allocated on-stack */ +struct kqw_set { + struct iom_fdset_waiter fsw; + struct list_head fdset_head; /* - kset_item.fdset_node */ + /* we reuse iom->mp.buf (eventlist) for changelist */ + int nchanges; +}; + +/* + * per-FD in rb_thread_fd_select arg, value for kqw_set.fdn.as.tbl, + * allocated on-heap + */ +struct kset_item { + struct kqfdn fdn; + struct list_node fdset_node; /* kqw_set.kqw_head */ + int fd; + short events; /* requested events, like poll(2) */ + short revents; /* returned events, like poll(2) */ +}; + +/* + * we'll often call kevent with a zero timeout to perform a quick + * scan of events while holding GVL. I expect the BSD kernels to be + * optimizing this case (because I know for certain Linux does for epoll_wait). + */ +static const struct timespec zero; +static const short idx2events[] = { RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI }; + +static void +register_sigwait_fd(int kqfd) +{ + int sigwait_fd = signal_self_pipe.normal[0]; + + if (sigwait_fd >= 0) { + struct kevent kev; + + EV_SET(&kev, sigwait_fd, EVFILT_READ, EV_ADD, 0, 0, &sigwait_th); + + if (kevent(kqfd, &kev, 1, 0, 0, &zero) < 0) + rb_sys_fail("kevent"); + } +} + +static int +iom_kqfd(struct iom_multiplexer *mp) +{ + if (LIKELY(mp->fd >= 0)) + return mp->fd; + + mp->fd = kqueue(); + if (mp->fd < 0) { + if (rb_gc_for_fd(errno)) + mp->fd = kqueue(); + if (mp->fd < 0) + rb_sys_fail("kqueue"); + } + rb_fd_fix_cloexec(mp->fd); + register_sigwait_fd(mp->fd); + return mp->fd; +} + +static void +iom_init(rb_iom_t *iom) +{ + iom_core_init(&iom->c); + iom_fdmap_init(&iom->rfdmap); + iom_fdmap_init(&iom->wfdmap); + iom_multiplexer_init(&iom->mp); +} + +static struct kev * +kev_of(struct rb_sched_waiter *sw) +{ + struct iom_waiter *w = container_of(sw, struct iom_waiter, sw); + struct iom_fd_waiter *fdw = container_of(w, struct iom_fd_waiter, w); + + return container_of(fdw, struct kev, fdw); +} + +/* + * kevent does not have an EVFILT_* flag for (rarely-used) urgent data, + * at least not on FreeBSD 11.0, so we call select() separately after + * EVFILT_READ returns to set the RB_WAITFD_PRI bit: + */ +static void +check_pri(rb_thread_t *th, struct list_head *pri) +{ + rb_fdset_t efds; + int max, r, err = 0; + struct rb_sched_waiter *sw, *next; + struct timeval tv; + + if (RB_LIKELY(list_empty(pri))) + return; + + r = 0; + max = -1; + rb_fd_init(&efds); + + list_for_each(pri, sw, wnode) { + struct kev *kev = kev_of(sw); + int fd = iom_fdw_get_fd(&kev->fdw); + + if (fd >= 0) { + rb_fd_set(fd, &efds); + if (fd > max) + max = fd; + } + } + if (max >= 0) { +again: + tv.tv_sec = 0; + tv.tv_usec = 0; + r = native_fd_select(max + 1, 0, 0, &efds, &tv, th); + if (r < 0) { + err = errno; + if (err == EINTR) /* shouldn't be possible with zero timeout */ + goto again; + } + if (r >= 0) { + list_for_each_safe(pri, sw, next, wnode) { + struct kev *kev = kev_of(sw); + int fd = iom_fdw_get_fd(&kev->fdw); + + list_del_init(&sw->wnode); + /* + * favor spurious wakeup over empty revents: + * I have observed select(2) failing to detect urgent data + * readiness after EVFILT_READ fires for it on FreeBSD 11.0. + * test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb + * is relaxed on non-Linux for this reason. + * We could use a non-zero timeout for select(2), but + * we also don't want to release GVL and have th->runq + * processed before we set revents, so we favor spurious + * RB_WAITFD_PRI over an empty revents field + */ + if (fd >= 0 && (rb_fd_isset(fd, &efds) || !kev->fdw.revents)) + kev->fdw.revents |= RB_WAITFD_PRI; + } + } + } + rb_fd_term(&efds); +} + +/* + * kqueue is a more complicated than epoll for corner cases because we + * install separate filters for simultaneously watching read and write + * on the same FD, and now we must clear shared filters if only the + * event from the other side came in. + */ +static int +drop_pairs(rb_thread_t *th, int max, struct kevent *changelist, + struct list_head *rdel, struct list_head *wdel) +{ + int nchanges = 0; + struct kqfdn *fdn, *next; + + list_for_each_safe(rdel, fdn, next, wfdnode) { + struct kev *kev = container_of(fdn, struct kev, fdn); + int fd = iom_fdw_get_fd(&kev->fdw); + + list_del_init(&kev->fdn.wfdnode); /* delete from rdel */ + VM_ASSERT(kev->fdw.revents & RB_WAITFD_OUT); + + if (fd >= 0 && !(kev->fdw.revents & WAITFD_READ)) { + struct iom_fd *fdh = iom_fd_get(&th->vm->iom->rfdmap, fd); + + if (list_empty(&fdh->fdhead)) { + struct kevent *chg = &changelist[nchanges++]; + VM_ASSERT(nchanges <= max); + EV_SET(chg, fd, EVFILT_READ, EV_DELETE, 0, 0, 0); + } + } + } + list_for_each_safe(wdel, fdn, next, rfdnode) { + struct kev *kev = container_of(fdn, struct kev, fdn); + int fd = iom_fdw_get_fd(&kev->fdw); + + list_del_init(&kev->fdn.rfdnode); /* delete from wdel */ + VM_ASSERT(kev->fdw.revents & WAITFD_READ); + + if (fd >= 0 && !(kev->fdw.revents & RB_WAITFD_OUT)) { + struct iom_fd *fdh = iom_fd_get(&th->vm->iom->wfdmap, fd); + + if (list_empty(&fdh->fdhead)) { + struct kevent *chg = &changelist[nchanges++]; + VM_ASSERT(nchanges <= max); + EV_SET(chg, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0); + } + } + } + if (nchanges) { + int kqfd = th->vm->iom->mp.fd; + + if (kevent(kqfd, changelist, nchanges, 0, 0, &zero) < 0) + return errno; + } + return 0; +} + +/* + * kqueue requires separate filters when watching for simultaneously + * watching read and write events on the same FD. Different filters + * means they can have different udata pointers; at least with + * native kqueue. + * + * When emulating native kqueue behavior using epoll as libkqueue does, + * there is only space for one udata pointer. This is because epoll + * allows each "filter" (epitem in the kernel) to watch read and write + * simultaneously. + * epoll keys epitems using: [ fd, file description (struct file *) ]. + * In contrast, kevent keys its filters using: [ fd (ident), filter ]. + * + * So, with native kqueue, we can at least optimize away iom_fd_get + * function calls; but we cannot do that when libkqueue emulates + * kqueue with epoll and need to retrieve the fdh from the correct + * fdmap. + * + * Finally, the epoll implementation only needs one fdmap. + */ +static void * +udata_set(rb_iom_t *iom, struct iom_fd *fdh) +{ +#ifdef LIBKQUEUE +# warning libkqueue support is incomplete and does not handle corner cases + return iom; +#else /* native kqueue */ + return fdh; +#endif +} + +static struct iom_fd * +udata_get(const struct kevent *ev) +{ +#ifdef LIBKQUEUE + rb_iom_t *iom = ev->udata; + int fd = ev->ident; + + switch (ev->filter) { + case EVFILT_READ: return iom_fd_get(&iom->rfdmap, fd); + case EVFILT_WRITE: return iom_fd_get(&iom->wfdmap, fd); + default: + rb_bug("bad filter in libkqueue compatibility mode: %d", ev->filter); + } +#endif + return ev->udata; +} + +/* + * this MUST not raise exceptions because sigwait_fd can be acquired, + * iom_wait_done can release sigwait_fd and raise + */ +static int +check_kevent(rb_thread_t *th, int nr, struct kevent *eventlist, int *sigwait_fd) +{ + int i, err = 0; + struct list_head pri, wdel, rdel; + struct kqfdn *fdn = 0, *next; + + if (nr < 0) return errno; + if (nr == 0) return err; + + list_head_init(&pri); + list_head_init(&wdel); + list_head_init(&rdel); + + for (i = 0; i < nr; i++) { + struct kevent *ev = &eventlist[i]; + struct iom_fd *fdh; + + if (check_sigwait_fd(th, ev->udata, &err, sigwait_fd)) + continue; + + /* + * fdh is keyed to both FD and filter, so + * it's either in iom->rfdmap or iom->wfdmap + */ + fdh = udata_get(ev); + + if (ev->filter == EVFILT_READ) { + int pair_queued = 0; + + list_for_each_safe(&fdh->fdhead, fdn, next, rfdnode) { + struct kqw_set *kset = fdn->owner; + int rbits; + + if (kset) { + struct kset_item *ksi; + + ksi = container_of(fdn, struct kset_item, fdn); + rbits = ksi->events & WAITFD_READ; + VM_ASSERT(rbits && "unexpected EVFILT_READ"); + + /* + * The following may spuriously set RB_WAITFD_PRI: + * (see comments in check_pri for reasoning) + */ + ksi->revents |= rbits; + list_del_init(&fdn->rfdnode); + if (!kset->fsw.ret++) + iom_waiter_ready(&kset->fsw.w); + } + else { + struct kev *kev = container_of(fdn, struct kev, fdn); + + rbits = kev->fdw.events & WAITFD_READ; + VM_ASSERT(rbits && "unexpected EVFILT_READ"); + + if (!kev->fdw.revents) + iom_waiter_ready(&kev->fdw.w); + kev->fdw.revents |= (RB_WAITFD_IN & rbits); + list_del_init(&kev->fdn.rfdnode); + + if (iom_fdw_get_fd(&kev->fdw) < 0) + continue; + if (rbits & RB_WAITFD_PRI) + list_add(&pri, &kev->fdw.w.sw.wnode); + if ((kev->fdw.events & RB_WAITFD_OUT) && !pair_queued) { + pair_queued = 1; + list_add(&wdel, &kev->fdn.rfdnode); + } + } + } + } + else if (ev->filter == EVFILT_WRITE) { + int pair_queued = 0; + + list_for_each_safe(&fdh->fdhead, fdn, next, wfdnode) { + struct kqw_set *kset = fdn->owner; + int rbits; + + if (kset) { + struct kset_item *ksi; + + ksi = container_of(fdn, struct kset_item, fdn); + rbits = ksi->events & RB_WAITFD_OUT; + VM_ASSERT(rbits && "unexpected EVFILT_WRITE"); + ksi->revents |= rbits; + list_del_init(&fdn->wfdnode); + if (!kset->fsw.ret++) + iom_waiter_ready(&kset->fsw.w); + } + else { + struct kev *kev = container_of(fdn, struct kev, fdn); + + if (!kev->fdw.revents) + iom_waiter_ready(&kev->fdw.w); + kev->fdw.revents |= RB_WAITFD_OUT; + list_del_init(&kev->fdn.wfdnode); + + if ((kev->fdw.events & WAITFD_READ) && + iom_fdw_get_fd(&kev->fdw) >= 0 && + !pair_queued) { + pair_queued = 1; + list_add(&rdel, &kev->fdn.wfdnode); + } + } + } + } + else { + rb_bug("unexpected filter: %d", ev->filter); + } + } + check_pri(th, &pri); + err = drop_pairs(th, nr, eventlist, &rdel, &wdel); + + /* notify the waiter thread in case we enqueued fibers for them */ + if (nr > 0) + iom_blockers_notify(th->vm->iom, -1); + + return (err || RUBY_VM_INTERRUPTED_ANY(th->ec)) ? EINTR : 0; +} + +/* perform a non-blocking kevent check while holding GVL */ +static void +ping_events(rb_thread_t *th, struct iom_fdset_waiter *fsw) +{ + struct kqw_set *kset = fsw ? container_of(fsw, struct kqw_set, fsw) : 0; + rb_iom_t *iom = th->vm->iom; + int kqfd = kset ? iom_kqfd(&iom->mp) : (iom ? iom->mp.fd : -1); + int sigwait_fd = rb_sigwait_fd_get(th); + int err = 0; + + if (kqfd >= 0) { + int nr, nevents, retries = 0; + struct kevent *eventlist = iom_eventlist(iom, &nevents); + + do { + struct kevent *changelist = 0; + int nchanges = 0; + + /* only register changes once if we loop: */ + if (kset) { + changelist = eventlist; + nchanges = kset->nchanges; + kset = 0; + } + nr = kevent(kqfd, changelist, nchanges, eventlist, nevents, &zero); + err = check_kevent(th, nr, eventlist, &sigwait_fd); + } while (!err && nr == nevents && ++retries); + iom_grow(iom, retries); + } + else if (sigwait_fd >= 0) { + err = check_signals_nogvl(th, sigwait_fd) ? EINTR : 0; + } + iom_wait_done(th, sigwait_fd, err, "kevent", fsw); +} + +/* for iom_pingable_common.h */ +static void +iom_do_wait(rb_thread_t *th, rb_iom_t *iom) +{ + struct kevent eventlist[256 / IOM_EVENT_SIZE]; + int nevents = numberof(eventlist); + int nr = nevents; + rb_hrtime_t hrt, *rel = iom_timeout_next(&hrt, &iom->c.timers); + + if (!rel || *rel != 0) { + int kqfd = iom_kqfd(&iom->mp); /* may raise */ + struct iom_blocker cur; + int err; + int sigwait_fd = rb_sigwait_fd_get(th); + + VM_ASSERT(kqfd >= 0); + cur.th = th; + list_add_tail(&iom->c.blockers, &cur.bnode); + nr = err = 0; + BLOCKING_REGION(th, { + struct timespec ts; + struct timespec *tsp = rb_hrtime2timespec(&ts, rel); + + if (!RUBY_VM_INTERRUPTED(th->ec)) { + nr = kevent(kqfd, 0, 0, eventlist, nevents, tsp); + iom_wait_check_nogvl(th, &err, nr, sigwait_fd); + } + }, sigwait_fd >= 0 ? ubf_sigwait : ubf_select, th, TRUE); + list_del(&cur.bnode); + if (!err) + err = check_kevent(th, nr, eventlist, &sigwait_fd); + iom_wait_done(th, sigwait_fd, err, "kevent", 0); + } + if (nr == nevents && iom_events_pending(th->vm)) + ping_events(th, 0); +} + +/* + * kqueue shines here because there's only one syscall in the common case, + * but the corner-cases (e.g. waiting for both read and write) are complex + */ +static int +kevxchg_ping(int fd, struct kev *kev) +{ + rb_thread_t *th = rb_ec_thread_ptr(kev->fdw.w.sw.ec); + rb_iom_t *iom = iom_get(th); + int retries = 0; + int nr, err, nevents, nchanges = 0; + struct kevent *eventlist = iom_eventlist(iom, &nevents); + struct kevent *changelist = eventlist; + int kqfd = iom_kqfd(&iom->mp); + int sigwait_fd = rb_sigwait_fd_get(th); + + VM_ASSERT(nevents >= 2 && "iom->mp.buf should never be this small"); + + /* + * we must clear this before reading since we check for owner kset in + * check_kevent to distinguish between single and multi-FD select(2) + * callers + */ + kev->fdn.owner = 0; + + /* EVFILT_READ handles urgent data (POLLPRI)... hopefully */ + if (kev->fdw.events & WAITFD_READ) { + struct iom_fd *fdh = iom_fd_get(&iom->rfdmap, fd); + + if (list_empty(&fdh->fdhead)) { + struct kevent *chg = &changelist[nchanges++]; + void *udata = udata_set(iom, fdh); + EV_SET(chg, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, udata); + } + list_add(&fdh->fdhead, &kev->fdn.rfdnode); + } + if (kev->fdw.events & RB_WAITFD_OUT) { + struct iom_fd *fdh = iom_fd_get(&iom->wfdmap, fd); + + if (list_empty(&fdh->fdhead)) { + struct kevent *chg = &changelist[nchanges++]; + void *udata = udata_set(iom, fdh); + EV_SET(chg, fd, EVFILT_WRITE, EV_ADD|EV_ONESHOT, 0, 0, udata); + } + list_add(&fdh->fdhead, &kev->fdn.wfdnode); + } + do { + nr = kevent(kqfd, changelist, nchanges, eventlist, nevents, &zero); + err = check_kevent(th, nr, eventlist, &sigwait_fd); + } while (!err && nr == nevents && ++retries && (nchanges = 0) == 0); + + iom_grow(iom, retries); + iom_wait_done(th, sigwait_fd, err, "kevent from kevxchg_ping", 0); + return err; +} + +static VALUE +kev_schedule(VALUE ptr) +{ + struct kev *kev = (struct kev *)ptr; + int fd = iom_fdw_get_fd(&kev->fdw); + + if (fd >= 0) { + int err = kevxchg_ping(fd, kev); + + if (err) + kev->fdw.revents = kev->fdw.events | POLLNVAL; + else if (!rb_coro_schedule_ready_p(kev->fdw.w.sw.ec)) + VM_UNREACHABLE(kev_schedule); + } + return (VALUE)kev->fdw.revents; /* may be zero if timed out */ +} + +static VALUE +kev_ensure(VALUE ptr) +{ + struct kev *kev = (struct kev *)ptr; + list_del(&kev->fdn.rfdnode); + list_del(&kev->fdn.wfdnode); + iom_waiter_ensure(&kev->fdw.w); + IOM_ALLOCA_END(struct kev, kev); + return Qfalse; +} + +static inline VALUE +kev_wait(VALUE kev) +{ + return rb_ensure(kev_schedule, kev, kev_ensure, kev); +} + +static int +iom_waitfd(rb_thread_t *th, int *fdp, int events, const rb_hrtime_t *rel) +{ + rb_iom_t *iom = iom_get(th); + struct kev *kev = IOM_ALLOCA(struct kev); + + iom_fdw_init(&kev->fdw, fdp, events); + list_node_init(&kev->fdn.rfdnode); + list_node_init(&kev->fdn.wfdnode); + iom_waiter_add(th, &iom->c.fdws, &kev->fdw.w); + + return iom_wait_fd_result(th, rel, kev_wait, (VALUE)kev); +} + +/* + * XXX we may use fptr->mode to mark FDs which kqueue cannot handle, + * different BSDs may have different bugs which prevent certain + * FD types from being handled by kqueue... + */ +int +rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, + const rb_hrtime_t *rel) +{ + return iom_waitfd(th, &fptr->fd, events, rel); +} + +int +rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, const rb_hrtime_t *rel) +{ + return iom_waitfd(th, fdp, events, rel); +} + +static void +kset_add_event(struct kqw_set *kset, int fd, short filter, void *udata) +{ + static const long kevent_size = (long)IOM_EVENT_SIZE; + VALUE dst = rb_ec_vm_ptr(kset->fsw.w.sw.ec)->iom->mp.buf; + long old_len = RSTRING_LEN(dst); + struct kevent *chg; + size_t off = kset->nchanges * kevent_size; + long new_len = (kset->nchanges += 1) * kevent_size; + + if (old_len < new_len) { + rb_str_resize(dst, new_len); + rb_str_set_len(dst, new_len); + } + + chg = (struct kevent *)(RSTRING_PTR(dst) + off); + EV_SET(chg, fd, filter, EV_ADD|EV_ONESHOT, 0, 0, udata); +} + +/* used by fdset_prepare */ +static int +fdset_prepare_i(struct iom_fdset_waiter *fsw, int fd, int events) +{ + struct kqw_set *kset = container_of(fsw, struct kqw_set, fsw); + rb_iom_t *iom = rb_ec_vm_ptr(fsw->w.sw.ec)->iom; + struct kset_item *ksi = ALLOC(struct kset_item); + + ksi->fd = fd; + ksi->revents = 0; + ksi->events = events; + ksi->fdn.owner = kset; + list_add_tail(&kset->fdset_head, &ksi->fdset_node); + list_node_init(&ksi->fdn.rfdnode); + list_node_init(&ksi->fdn.wfdnode); + + if (events & (RB_WAITFD_IN|RB_WAITFD_PRI)) { + struct iom_fd *fdh = iom_fd_get(&iom->rfdmap, fd); + + if (list_empty(&fdh->fdhead)) + kset_add_event(kset, fd, EVFILT_READ, udata_set(iom, fdh)); + list_add(&fdh->fdhead, &ksi->fdn.rfdnode); + } + if (events & RB_WAITFD_OUT) { + struct iom_fd *fdh = iom_fd_get(&iom->wfdmap, fd); + + if (list_empty(&fdh->fdhead)) + kset_add_event(kset, fd, EVFILT_WRITE, udata_set(iom, fdh)); + list_add(&fdh->fdhead, &ksi->fdn.wfdnode); + } + return ST_CONTINUE; +} + +static VALUE +kset_schedule(VALUE ptr) +{ + struct kqw_set *kset = (struct kqw_set *)ptr; + + fdset_prepare(&kset->fsw); + /* + * no error check here since no kevent call at this point, + * fdset_schedule calls kevent: + */ + return fdset_schedule(&kset->fsw); +} + +static VALUE +kset_ensure(VALUE ptr) +{ + struct kqw_set *kset = (struct kqw_set *)ptr; + struct kset_item *ksi, *nxt; + + list_for_each_safe(&kset->fdset_head, ksi, nxt, fdset_node) { + list_del(&ksi->fdset_node); + list_del_init(&ksi->fdn.rfdnode); + list_del_init(&ksi->fdn.wfdnode); + fdset_save_result_i(&kset->fsw, ksi->fd, ksi->revents); + xfree(ksi); + } + return fdset_ensure(&kset->fsw, kset, sizeof(*kset)); +} + +static VALUE +kset_wait(VALUE kset) +{ + return rb_ensure(kset_schedule, kset, kset_ensure, kset); +} + +int +rb_iom_select(rb_thread_t *th, int maxfd, + rb_fdset_t *r, rb_fdset_t *w, rb_fdset_t *e, + const rb_hrtime_t *rel) +{ + rb_iom_t *iom = iom_get(th); + struct kqw_set *kset = IOM_ALLOCA(struct kqw_set); + + kset->nchanges = 0; + iom_fdset_waiter_init(th, &kset->fsw, maxfd, r, w, e); + list_head_init(&kset->fdset_head); + iom_waiter_add(th, &iom->c.fdsets, &kset->fsw.w); + + return (int)(rel ? iom_timeout_do(th, rel, kset_wait, (VALUE)kset) + : kset_wait((VALUE)kset)); +} + +/* + * this exists because native kqueue has a unique close-on-fork + * behavior, so we need to skip close() on fork + */ +static void +iom_free(rb_iom_t *iom) +{ + iom_fdmap_destroy(&iom->rfdmap); + iom_fdmap_destroy(&iom->wfdmap); + xfree(iom); +} + +void +rb_iom_destroy(rb_vm_t *vm) +{ + rb_iom_t *iom = vm->iom; + + vm->iom = 0; + if (iom) { + if (iom->mp.fd >= 0) + close(iom->mp.fd); + iom_free(iom); + } +} + +#if defined(HAVE_WORKING_FORK) +/* used by thread.c::rb_thread_atfork */ +static void +rb_iom_atfork_child(rb_thread_t *th) +{ +#ifdef LIBKQUEUE + /* + * libkqueue uses epoll, /dev/poll FD, or poll/select on a pipe, + * so there is an FD. + * I guess we're going to hit an error in case somebody uses + * libkqueue with a real native kqueue backend, but nobody does + * that in production, hopefully. + */ + rb_iom_destroy(th->vm); +#else /* native kqueue is close-on-fork, so we do not close ourselves */ + rb_iom_t *iom = th->vm->iom; + if (iom) { + iom_free(iom); + th->vm->iom = 0; + } +#endif /* !LIBKQUEUE */ +} +#endif /* defined(HAVE_WORKING_FORK) */ + +/* used by thread_pthread.c */ +static int +rb_iom_reserved_fd(int fd) +{ + rb_iom_t *iom = GET_VM()->iom; + + return iom && fd == iom->mp.fd; +} +#include "iom_pingable_common.h" +#include "iom_common.h" diff --git a/iom_pingable_common.h b/iom_pingable_common.h new file mode 100644 index 0000000000..2f6140ef18 --- /dev/null +++ b/iom_pingable_common.h @@ -0,0 +1,214 @@ +/* + * shared between "pingable" implementations (iom_kqueue.h and iom_epoll.h) + * "pingable" means it is relatively cheap to check for events with a + * zero timeout on epoll_wait/kevent w/o releasing GVL. + * select(2) is not "pingable" because it has O(n) setup/teardown costs + * where "n" is number of descriptors watched. + */ + +/* only for iom_kqueue.h and iom_epoll.h */ +static void iom_do_wait(rb_thread_t *, rb_iom_t *); + +static VALUE +iom_do_schedule(VALUE ptr) +{ + rb_thread_t *th = (rb_thread_t *)ptr; + rb_iom_t *iom; + + /* we should not retrieve more events if we can run right some */ + VM_ASSERT(list_empty(&th->runq)); + iom = rb_vm_waitpid_pending(th->vm) ? iom_get(th) : th->vm->iom; + if (iom) + iom_do_wait(th, iom); + + return Qfalse; +} + +/* Runs the scheduler */ +void +rb_iom_schedule(rb_thread_t *th, const rb_hrtime_t *rel) +{ + /* first, we try to run anything in th->runq */ + if (rb_coro_schedule_ready_p(th->ec)) + return; + + /* root_fiber can sleep */ + if (rb_coro_sched_p(th->ec)) { + /* + * Thread::Coro never sleeps on epoll_wait or kevent, + * ping and try scheduling ourselves away, again + */ + ping_events(th, 0); + (void)rb_coro_schedule_ready_p(th->ec); + } + else if (rel) { + iom_timeout_do(th, rel, iom_do_schedule, (VALUE)th); + } + else { + iom_do_schedule((VALUE)th); + } +} + +/* + * iom_kqueue.h::fdset_prepare_i does this atomically by creating a + * userspace buffer of changes for registration in fdset_schedule. + * + * iom_epoll.h::fdset_prepare_i uses epoll_ctl to register events + * here, but GVL prevents existing epoll_wait callers from fishing + * for events while this is running (EPOLL_CTL_MOD is faster + * than EPOLL_CTL_ADD). + */ +static void +fdset_prepare(struct iom_fdset_waiter *fsw) +{ + int i, fd; + + /* we must not retrieve events in these loops */ + for (fd = 0; fd < fsw->max; fd++) { + int events = 0; + + for (i = 0; i < 3; i++) { + rb_fdset_t *in = fsw->in[i]; + + if (in && rb_fd_isset(fd, in)) + events |= idx2events[i]; + } + if (events) { + fdset_prepare_i(fsw, fd, events); + if (fsw->ret < 0) + return; + } + } + + /* fdset_save_result_i will repopulate fsw->in */ + for (i = 0; i < 3; i++) { + rb_fdset_t *in = fsw->in[i]; + + if (in) rb_fd_zero(in); + } +} + +/* + * Retrieves events via epoll_wait or kevent + * kqueue atomically registers and check events with one syscall, here. + */ +static VALUE +fdset_schedule(struct iom_fdset_waiter *fsw) +{ + ping_events(rb_ec_thread_ptr(fsw->w.sw.ec), fsw); + if (!fsw->ret) { /* fsw->ret may be set by ping_events */ + if (!rb_coro_schedule_ready_p(fsw->w.sw.ec)) + VM_UNREACHABLE(fdset_schedule); + } + return (VALUE)fsw->ret; +} + +/* + * Updates the rb_fdset_t of rb_thread_fd_select callers + */ +static void +fdset_save_result_i(struct iom_fdset_waiter *fsw, int fd, short revents) +{ + int i; + + if (fsw->ret <= 0) + return; + for (i = 0; i < 3; i++) { + if (revents & idx2events[i]) { + rb_fdset_t *in = fsw->in[i]; + + VM_ASSERT(in && "revents set without fsw->in"); + rb_fd_set(fd, in); + } + } +} + +static VALUE +fdset_ensure(struct iom_fdset_waiter *fsw, void *ptr, size_t n) +{ + int err = fsw->errnum; + + iom_waiter_ensure(&fsw->w); + if (!IOM_ONSTACK_SIZE_P(n)) + ruby_sized_xfree(ptr, n); + + errno = err; + return Qfalse; +} + +/* @udata is epoll_data_t.ptr or kevent.udata */ +static int +check_sigwait_fd(rb_thread_t *th, const void *udata, int *err, int *sigwait_fd) +{ + if (UNLIKELY(udata == &sigwait_th)) { + /* + * if we couldn't acquire sigwait_fd before calling kevent/epoll_wait, + * try to get it now. iom_wait_done will call rb_sigwait_fd_put + */ + if (*sigwait_fd < 0) + *sigwait_fd = rb_sigwait_fd_get(th); /* caller will _put */ + + if (*sigwait_fd >= 0 && check_signals_nogvl(th, *sigwait_fd)) + *err = EINTR; + + return TRUE; /* `continue' the loop because sigwait_fd is reserved */ + } + /* normal FD */ + return FALSE; +} + +/* limit growth of eventlist buffer */ +#define IOM_MAX_EVENTS (1 << 17) + +/* returns the number of events we can fetch while holding GVL */ +static int +iom_nevents(const rb_iom_t *iom) +{ + return RSTRING_LENINT(iom->mp.buf) / IOM_EVENT_SIZE; +} + +/* returns the eventlist buffer for kevent or epoll_wait to write to */ +static void * +iom_eventlist(const rb_iom_t *iom, int *nevents) +{ + *nevents = iom_nevents(iom); + + return (void *)RSTRING_PTR(iom->mp.buf); +} + +/* grows the eventlist buffer if needed */ +static void +iom_grow(rb_iom_t *iom, int retries) +{ + if (retries) { + int old_nevents = iom_nevents(iom); + int new_nevents = retries * old_nevents; + + if (new_nevents <= 0 || new_nevents > IOM_MAX_EVENTS) + new_nevents = IOM_MAX_EVENTS; + + rb_str_resize(iom->mp.buf, new_nevents * IOM_EVENT_SIZE); + } +} + +static void +iom_multiplexer_init(struct iom_multiplexer *mp) +{ + mp->fd = -1; + mp->buf = rb_str_tmp_new(IOM_EVENT_SIZE * 8); +} + +/* do we have unretrieved events? */ +static int +iom_events_pending(rb_vm_t *vm) +{ + if (!list_empty(&vm->iom->c.fdws)) return TRUE; + if (!list_empty(&vm->iom->c.fdsets)) return TRUE; + return rb_vm_waitpid_pending(vm); +} + +void +rb_iom_mark(rb_iom_t *iom) +{ + rb_gc_mark(iom->mp.buf); +} diff --git a/iom_select.h b/iom_select.h new file mode 100644 index 0000000000..3b1c440d5f --- /dev/null +++ b/iom_select.h @@ -0,0 +1,460 @@ +/* + * select()-based implementation of I/O Manager for RubyVM + * + * This is crippled and relies heavily on GVL compared to the + * epoll and kqueue versions. Every call to select requires + * scanning the entire iom->c.fdws list; so it gets silly expensive + * with hundreds or thousands of FDs. + */ +#include "iom_internal.h" + +/* allocated on heap (rb_vm_t.iom) */ +struct rb_iom_struct { + struct iom_core c; + /* + * generation counters for determining if we need to redo select() + * for newly registered FDs. kevent and epoll APIs do not require + * this, as events can be registered and retrieved at any time by + * different threads. + */ + rb_serial_t select_start; /* WR protected by GVL (no need to protect RD) */ + rb_serial_t select_gen; /* RDWR protected by GVL */ +}; + +/* allocated on stack */ +struct select_do { + rb_thread_t *th; + unsigned int do_wait : 1; +}; + +static const short idx2events[] = { RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI }; + +static void +iom_init(rb_iom_t *iom) +{ + iom->select_start = 0; + iom->select_gen = 0; + iom_core_init(&iom->c); +} + +/* FIXME inefficient, but maybe not worth optimizing ... */ +static void +fd_merge(int max, rb_fdset_t *dst, const rb_fdset_t *src) +{ + int fd; + + for (fd = 0; fd < max; fd++) { + if (rb_fd_isset(fd, src)) + rb_fd_set(fd, dst); + } +} + +static rb_fdset_t * +fdset_init_once(rb_fdset_t **dst, rb_fdset_t *src) +{ + if (!*dst) { + rb_fd_init(src); + *dst = src; + } + return *dst; +} + +static int +fd_intersect(struct iom_fdset_waiter *fsw, unsigned i, const rb_fdset_t *res) +{ + int ret = 0; + rb_fdset_t *in = fsw->in[i]; + + if (in) { + int fd; + rb_fdset_t *dst = 0, tmp; + + for (fd = 0; fd < fsw->max; fd++) { + if (rb_fd_isset(fd, in) && rb_fd_isset(fd, res)) { + rb_fd_set(fd, fdset_init_once(&dst, &tmp)); + ret++; + } + } + + /* copy results back to user-supplied fdset */ + if (ret) { + rb_fd_dup(in, dst); + rb_fd_term(dst); + } + else { + rb_fd_zero(in); + } + } + return ret; +} + +static void +iom_select_wait(struct select_do *sd) +{ + rb_thread_t *th = sd->th; + rb_iom_t *iom = th->vm->iom; + rb_fdset_t fdsets[3]; + rb_fdset_t *fds[3] = { 0 }; + rb_hrtime_t hrt = 0; + rb_hrtime_t *rel = &hrt; + struct iom_blocker cur; + int nr = 0, err = 0, max = 0; + int sigwait_fd = rb_sigwait_fd_get(th); + unsigned int i; + struct rb_sched_waiter *sw = 0, *next; + + iom->select_start = iom->select_gen; + + /* deal with rb_iom_select callers: */ + list_for_each_safe(&iom->c.fdsets, sw, next, wnode) { + struct iom_fdset_waiter *fsw = fdset_waiter_of(sw); + + if (fsw->max > max) + max = fsw->max; + for (i = 0; i < 3; i++) { + const rb_fdset_t *src = fsw->in[i]; + + if (src) + fd_merge(fsw->max, fdset_init_once(&fds[i], &fdsets[i]), src); + } + } + + /* deal with rb_iom_waitfd callers: */ + list_for_each_safe(&iom->c.fdws, sw, next, wnode) { + struct iom_fd_waiter *fdw = fd_waiter_of(sw); + int fd = iom_fdw_get_fd(fdw); + + if (fd < 0) { /* closed */ + fdw->revents = fdw->events; + iom_waiter_ready(&fdw->w); + sd->do_wait = 0; + continue; + } + if (fd >= max) + max = fd + 1; + for (i = 0; i < 3; i++) { + if (fdw->events & idx2events[i]) + rb_fd_set(fd, fdset_init_once(&fds[i], &fdsets[i])); + } + } + + if (sigwait_fd >= 0) { + if (sigwait_fd >= max) + max = sigwait_fd + 1; + rb_fd_set(sigwait_fd, fdset_init_once(&fds[0], &fdsets[0])); + } + + if (sd->do_wait && (max > 0 || rb_vm_waitpid_pending(th->vm))) + rel = iom_timeout_next(&hrt, &iom->c.timers); + + for (i = 0; i < 3; i++) + if (fds[i]) rb_fd_resize(max - 1, fds[i]); + + cur.th = th; + VM_ASSERT(list_empty(&iom->c.blockers)); + list_add(&iom->c.blockers, &cur.bnode); + BLOCKING_REGION(th, { + if (!RUBY_VM_INTERRUPTED(th->ec)) { + struct timeval tv; + struct timeval *tvp = rb_hrtime2timeval(&tv, rel); + + nr = native_fd_select(max, fds[0], fds[1], fds[2], tvp, th); + iom_wait_check_nogvl(th, &err, nr, sigwait_fd); + } + }, sigwait_fd >= 0 ? ubf_sigwait : ubf_select, th, TRUE); + list_del(&cur.bnode); + + /* crap, somebody closed an FD in one of the sets, scan all of it */ + if (nr < 0 && err == EBADF) + nr = INT_MAX; + + if (nr > 0) { + /* rb_thread_fd_select callers */ + list_for_each_safe(&iom->c.fdsets, sw, next, wnode) { + struct iom_fdset_waiter *fsw = fdset_waiter_of(sw); + + if (err == EBADF) { + if (fdset_error_p(fsw)) + iom_waiter_ready(&fsw->w); + } + else { + for (i = 0; i < 3; i++) + fsw->ret += fd_intersect(fsw, i, fds[i]); + if (fsw->ret) + iom_waiter_ready(&fsw->w); + } + } + + /* rb_wait_for_single_fd callers */ + list_for_each_safe(&iom->c.fdws, sw, next, wnode) { + struct iom_fd_waiter *fdw = fd_waiter_of(sw); + int fd = iom_fdw_get_fd(fdw); + + if (err == EBADF && fd >= 0) { + if (iom_fd_pollable(fd) != FD_POLL_OK) + fd = -1; + } + + if (fd < 0) { /* closed */ + fdw->revents = fdw->events; + } + else { + for (i = 0; i < 3; i++) { + if (fds[i] && rb_fd_isset(fd, fds[i])) + fdw->revents |= fdw->events & idx2events[i]; + } + } + + /* got revents? enqueue ourselves to be run! */ + if (fdw->revents) + iom_waiter_ready(&fdw->w); + } + + if (sigwait_fd >= 0 && rb_fd_isset(sigwait_fd, fds[0])) + err = check_signals_nogvl(th, sigwait_fd) ? EINTR : 0; + } + + for (i = 0; i < 3; i++) + if (fds[i]) rb_fd_term(fds[i]); + + /* notify other threads in case we enqueued fibers for them */ + if (nr > 0) + iom_blockers_notify(iom, -1); + iom_wait_done(th, sigwait_fd, err, "select", 0); +} + +static int +retry_select_p(const rb_iom_t *iom) +{ + /* + * if somebody changed iom->c.fdws while we were inside select, + * rerun it with zero timeout to avoid a race condition. + * This is not necessary for epoll or kqueue because the kernel + * is constantly monitoring the watched set. + */ + return (iom->select_start != iom->select_gen); +} + +static VALUE +iom_do_select(VALUE ptr) +{ + struct select_do *sd = (struct select_do *)ptr; + rb_thread_t *th = sd->th; + rb_iom_t *iom; + int retry; + + if (rb_coro_schedule_ready_p(th->ec)) + return Qtrue; + + iom = rb_vm_waitpid_pending(th->vm) ? iom_get(th) : th->vm->iom; + if (!iom) + return Qfalse; + + do { + /* + * select() setup cost is high, so minimize calls to it and + * check other things, instead: + */ + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + if (rb_coro_schedule_ready_p(th->ec)) + return Qtrue; + + retry = 0; + /* only one thread may call select at a time to avoid deadlock */ + if (list_empty(&iom->c.blockers)) { + /* ...and we're the thread to call select! */ + iom_select_wait(sd); + retry = retry_select_p(iom); + } + else if (list_empty(&th->runq)) { + /* some other thread is select-ing, let them notify us */ + struct iom_blocker cur; + + cur.th = th; + list_add_tail(&iom->c.blockers, &cur.bnode); + native_sleep(th, 0); + list_del(&cur.bnode); + } + } while (retry); + return Qfalse; +} + +void +rb_iom_schedule(rb_thread_t *th, const rb_hrtime_t *rel) +{ + struct select_do sd; + + sd.th = th; + sd.do_wait = !rb_coro_sched_p(th->ec); + + if (rel && sd.do_wait) + iom_timeout_do(th, rel, iom_do_select, (VALUE)&sd); + else { + iom_do_select((VALUE)&sd); + } +} + +static void +schedule_waiter(struct iom_waiter *w) +{ + while (!rb_coro_schedule_ready_p(w->sw.ec)) { + rb_thread_t *th = rb_ec_thread_ptr(w->sw.ec); + rb_iom_t *iom = iom_get(th); + + if (list_empty(&iom->c.blockers)) { /* do select() ourselves */ + struct select_do sd; + + sd.th = th; + sd.do_wait = 0; + iom_select_wait(&sd); + } + else { + /* + * kick the first select thread, retry_select_p will return true + * since caller bumped iom->select_gen + */ + iom_blockers_notify(iom, 1); + } + } +} + +static VALUE +fdw_schedule(VALUE ptr) +{ + struct iom_fd_waiter *fdw = (struct iom_fd_waiter *)ptr; + + schedule_waiter(&fdw->w); + + return (VALUE)fdw->revents; /* may be zero if timed out */ +} + +/* only epoll takes advantage of this (kqueue may for portability bugs) */ +int +rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, + const rb_hrtime_t *rel) +{ + return rb_iom_waitfd(th, &fptr->fd, events, rel); +} + +static VALUE +fdw_ensure(VALUE ptr) +{ + struct iom_fd_waiter *fdw = (struct iom_fd_waiter *)ptr; + + iom_waiter_ensure(&fdw->w); + IOM_ALLOCA_END(struct iom_fd_waiter, fdw); + + return Qfalse; +} + +static inline VALUE +fdw_wait(VALUE fdw) +{ + return rb_ensure(fdw_schedule, fdw, fdw_ensure, fdw); +} + +int +rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, const rb_hrtime_t *rel) +{ + rb_iom_t *iom = iom_get(th); + struct iom_fd_waiter *fdw; + + if (*fdp < 0) { + errno = EBADF; + return -1; + } + + fdw = IOM_ALLOCA(struct iom_fd_waiter); + iom_fdw_init(fdw, fdp, events); + iom_waiter_add(th, &iom->c.fdws, &fdw->w); + iom->select_gen++; + + return iom_wait_fd_result(th, rel, fdw_wait, (VALUE)fdw); +} + +static VALUE +fsw_ensure(VALUE ptr) +{ + struct iom_fdset_waiter *fsw = (struct iom_fdset_waiter *)ptr; + int errnum = fsw->errnum; + size_t i; + + iom_waiter_ensure(&fsw->w); + for (i = 0; i < 3; i++) { +#if !FIBER_USE_NATIVE + if (fsw->in[i]) { + rb_fd_dup(fsw->onstack_dst[i], fsw->in[i]); + rb_fd_term(fsw->in[i]); + xfree(fsw->in[i]); + } +#endif + } + + IOM_ALLOCA_END(struct iom_fdset_waiter, fsw); + + if (errnum) errno = errnum; + return Qfalse; +} + +static VALUE +fsw_schedule(VALUE ptr) +{ + struct iom_fdset_waiter *fsw = (struct iom_fdset_waiter *)ptr; + + schedule_waiter(&fsw->w); + + return (VALUE)fsw->ret; +} + +static inline VALUE +fsw_wait(VALUE fsw) +{ + return rb_ensure(fsw_schedule, fsw, fsw_ensure, fsw); +} + +int +rb_iom_select(rb_thread_t *th, int maxfd, + rb_fdset_t *r, rb_fdset_t *w, rb_fdset_t *e, + const rb_hrtime_t *rel) +{ + rb_iom_t *iom = iom_get(th); + struct iom_fdset_waiter *fsw = IOM_ALLOCA(struct iom_fdset_waiter); + + iom_fdset_waiter_init(th, fsw, maxfd, r, w, e); + iom_waiter_add(th, &iom->c.fdsets, &fsw->w); + iom->select_gen++; + return (int)(rel ? iom_timeout_do(th, rel, fsw_wait, (VALUE)fsw) + : fsw_wait((VALUE)fsw)); +} + +void +rb_iom_destroy(rb_vm_t *vm) +{ + if (vm->iom) { + xfree(vm->iom); + vm->iom = 0; + } +} + +/* used by thread.c::rb_thread_atfork */ +#if defined(HAVE_WORKING_FORK) +static void +rb_iom_atfork_child(rb_thread_t *th) +{ + rb_iom_destroy(th->vm); +} +#endif /* defined(HAVE_WORKING_FORK) */ + +/* used by thread_pthread.c */ +static int +rb_iom_reserved_fd(int fd) +{ + return 0; +} + +void +rb_iom_mark(rb_iom_t *iom) +{ + /* nothing for select() */ +} +#include "iom_common.h" diff --git a/process.c b/process.c index 70934e0841..2e8414d375 100644 --- a/process.c +++ b/process.c @@ -17,6 +17,8 @@ #include "ruby/thread.h" #include "ruby/util.h" #include "vm_core.h" +#include "iom.h" +#include "fiber.h" #include #include @@ -935,15 +937,19 @@ void rb_sigwait_fd_put(const rb_thread_t *, int fd); void rb_thread_sleep_interruptible(void); static int -waitpid_signal(struct rb_sched_waiter *sw) +waitpid_notify(struct rb_sched_waiter *sw, int ret) { + struct waitpid_state *w = container_of(sw, struct waitpid_state, sw); + + if (ret) { + w->ret = ret; + list_del_init(&sw->wnode); + } if (sw->ec) { /* rb_waitpid */ - rb_threadptr_interrupt(rb_ec_thread_ptr(sw->ec)); + ruby_iom_wake_signal(sw, ret); return TRUE; } else { /* ruby_waitpid_locked */ - struct waitpid_state *w = container_of(sw, struct waitpid_state, sw); - if (w->cond) { rb_native_cond_signal(w->cond); return TRUE; @@ -963,10 +969,10 @@ sigwait_fd_migrate_sleeper(rb_vm_t *vm) struct rb_sched_waiter *sw = 0; list_for_each(&vm->waiting_pids, sw, wnode) { - if (waitpid_signal(sw)) return; + if (waitpid_notify(sw, 0)) return; } list_for_each(&vm->waiting_grps, sw, wnode) { - if (waitpid_signal(sw)) return; + if (waitpid_notify(sw, 0)) return; } } @@ -978,8 +984,24 @@ rb_sigwait_fd_migrate(rb_vm_t *vm) rb_native_mutex_unlock(&vm->waitpid_lock); } +int +rb_vm_waitpid_pending(rb_vm_t *vm) +{ + int ret; + + rb_native_mutex_lock(&vm->waitpid_lock); + ret = (!list_empty(&vm->waiting_pids) || !list_empty(&vm->waiting_grps)); + rb_native_mutex_unlock(&vm->waitpid_lock); + + return ret; +} + #if RUBY_SIGCHLD extern volatile unsigned int ruby_nocldwait; /* signal.c */ +#else +# define ruby_nocldwait 0 +#endif + /* called by timer thread or thread which acquired sigwait_fd */ static void waitpid_each(struct list_head *head) @@ -992,20 +1014,13 @@ waitpid_each(struct list_head *head) if (!ret) continue; if (ret == -1) w->errnum = errno; - - w->ret = ret; - list_del_init(&sw->wnode); - waitpid_signal(sw); + waitpid_notify(sw, ret); } } -#else -# define ruby_nocldwait 0 -#endif void ruby_waitpid_all(rb_vm_t *vm) { -#if RUBY_SIGCHLD rb_native_mutex_lock(&vm->waitpid_lock); waitpid_each(&vm->waiting_pids); if (list_empty(&vm->waiting_pids)) { @@ -1017,7 +1032,6 @@ ruby_waitpid_all(rb_vm_t *vm) ; /* keep looping */ } rb_native_mutex_unlock(&vm->waitpid_lock); -#endif } static void @@ -1107,6 +1121,7 @@ waitpid_sleep(VALUE x) { struct waitpid_state *w = (struct waitpid_state *)x; + rb_thread_check_ints(); while (!w->ret) { rb_thread_sleep_interruptible(); } @@ -1114,6 +1129,18 @@ waitpid_sleep(VALUE x) return Qfalse; } +static int +waitpid_state_on_heap(const struct waitpid_state *w) +{ + if (w->options & WNOHANG) + return FALSE; + + if (!FIBER_USE_NATIVE && rb_coro_sched_p(w->sw.ec)) + return TRUE; + + return FALSE; +} + static VALUE waitpid_cleanup(VALUE x) { @@ -1130,9 +1157,10 @@ waitpid_cleanup(VALUE x) return Qfalse; } -static void -waitpid_wait(struct waitpid_state *w) +static VALUE +waitpid_wait(VALUE p) { + struct waitpid_state *w = (struct waitpid_state *)p; rb_vm_t *vm = rb_ec_vm_ptr(w->sw.ec); struct list_head *head; @@ -1144,7 +1172,6 @@ waitpid_wait(struct waitpid_state *w) rb_native_mutex_lock(&vm->waitpid_lock); head = waitpid_sleep_prepare(w, vm); if (head) { - w->cond = 0; /* order matters, favor specified PIDs rather than -1 or 0 */ list_add(head, &w->sw.wnode); } @@ -1153,6 +1180,8 @@ waitpid_wait(struct waitpid_state *w) if (head) { rb_ensure(waitpid_sleep, (VALUE)w, waitpid_cleanup, (VALUE)w); } + + return Qfalse; } static void * @@ -1182,35 +1211,57 @@ waitpid_no_SIGCHLD(struct waitpid_state *w) w->errnum = errno; } +static VALUE +waitpid_copy_free(VALUE p) +{ + struct waitpid_state **w2 = (struct waitpid_state **)p; + + *w2[0] = *w2[1]; /* w2[0] points to on-stack value */ + xfree(w2[1]); + + return Qfalse; +} + rb_pid_t rb_waitpid(rb_pid_t pid, int *st, int flags) { - struct waitpid_state w; + struct waitpid_state ws; + struct waitpid_state *w = &ws; - waitpid_state_init(&w, pid, flags); - w.sw.ec = GET_EC(); + w->sw.ec = GET_EC(); + waitpid_state_init(w, pid, flags); + if (waitpid_state_on_heap(w)) { + struct waitpid_state *w2[2]; - if (WAITPID_USE_SIGCHLD) { - waitpid_wait(&w); + w = ALLOC(struct waitpid_state); + *w = ws; + w2[0] = &ws; + w2[1] = w; + + rb_ensure(waitpid_wait, (VALUE)w, waitpid_copy_free, (VALUE)w2); + w = &ws; + } + else if (WAITPID_USE_SIGCHLD || rb_coro_sched_p(w->sw.ec)) { + waitpid_wait((VALUE)w); } else { - waitpid_no_SIGCHLD(&w); + waitpid_no_SIGCHLD(w); } - if (st) *st = w.status; - if (w.ret == -1) { - errno = w.errnum; + if (st) *st = w->status; + if (w->ret == -1) { + errno = w->errnum; } - else if (w.ret > 0) { + else if (w->ret > 0) { if (ruby_nocldwait) { - w.ret = -1; + w->ret = -1; errno = ECHILD; } else { - rb_last_status_set(w.status, w.ret); + rb_last_status_set(w->status, w->ret); } } - return w.ret; + return w->ret; } @@ -1454,6 +1505,7 @@ proc_detach(VALUE obj, VALUE pid) static void before_exec_async_signal_safe(void) { + rb_iom_destroy(GET_VM()); } static void diff --git a/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb b/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb index 777e9d14dd..54c57a04df 100644 --- a/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb +++ b/test/-ext-/wait_for_single_fd/test_wait_for_single_fd.rb @@ -1,5 +1,6 @@ # frozen_string_literal: false require 'test/unit' +require 'socket' class TestWaitForSingleFD < Test::Unit::TestCase require '-test-/wait_for_single_fd' @@ -46,4 +47,72 @@ def test_wait_for_kqueue skip 'no kqueue' unless IO.respond_to?(:kqueue_test_wait) assert_equal RB_WAITFD_IN, IO.kqueue_test_wait end + + def test_coro_start_rw + %w(LDFLAGS LIBS DLDFLAGS).each do |k| + next if RbConfig::CONFIG[k] !~ /-lkqueue\b/ + skip "FIXME libkqueue is buggy on this test" + end + + host = '127.0.0.1' + TCPServer.open(host, 0) do |srv| + nbc = Socket.new(:INET, :STREAM, 0) + con = TCPSocket.new(host, srv.addr[1]) + acc = srv.accept + begin + # 2 events, but only one triggers: + flags = RB_WAITFD_IN|RB_WAITFD_OUT + f = Thread::Coro.new { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal RB_WAITFD_OUT, f.value + + # trigger readability + con.write('a') + flags = RB_WAITFD_IN + f = Thread::Coro.new { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + # ensure both events trigger: + flags = RB_WAITFD_IN|RB_WAITFD_OUT + f = Thread::Coro.new { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + # ensure the EVFILT_WRITE (kqueue) is disabled after previous test: + flags = RB_WAITFD_IN + f = Thread::Coro.new { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + skip 'missing MSG_OOB' unless Socket.const_defined?(:MSG_OOB) + + # make sure urgent data works with kqueue: + assert_equal 3, con.send('123', Socket::MSG_OOB) + flags = RB_WAITFD_PRI + f = Thread::Coro.new { IO.wait_for_single_fd(acc.fileno, flags, nil) } + assert_equal flags, f.value + + flags = RB_WAITFD_IN|RB_WAITFD_OUT|RB_WAITFD_PRI + f = Thread::Coro.new { IO.wait_for_single_fd(acc.fileno, flags, nil) } + if RUBY_PLATFORM =~ /linux/ + assert_equal flags, f.value + else + kq_possible = [ RB_WAITFD_IN|RB_WAITFD_OUT, flags ] + assert_include kq_possible, f.value + end + + nbc.connect_nonblock(srv.local_address, exception: false) + fd = nbc.fileno + flags = RB_WAITFD_IN|RB_WAITFD_OUT + f1 = Thread::Coro.new { IO.wait_for_single_fd(fd, flags, nil) } + f2 = Thread::Coro.new { IO.wait_for_single_fd(fd, RB_WAITFD_IN, nil) } + assert_equal RB_WAITFD_OUT, f1.value + abc = srv.accept + abc.write('!') + assert_equal RB_WAITFD_IN, f2.value + ensure + con.close + acc.close + nbc.close + abc&.close + end + end + end end diff --git a/test/lib/leakchecker.rb b/test/lib/leakchecker.rb index af9200bf77..625b07992c 100644 --- a/test/lib/leakchecker.rb +++ b/test/lib/leakchecker.rb @@ -41,6 +41,15 @@ def find_fds if d.respond_to? :fileno a -= [d.fileno] end + + # only place we use epoll is internally, do not count it against us + a.delete_if do |fd| + begin + File.readlink("#{fd_dir}/#{fd}").match?(/\beventpoll\b/) + rescue + false + end + end a } fds.sort diff --git a/test/ruby/test_thread_coro.rb b/test/ruby/test_thread_coro.rb new file mode 100644 index 0000000000..6ad34cf0d4 --- /dev/null +++ b/test/ruby/test_thread_coro.rb @@ -0,0 +1,494 @@ +# frozen_string_literal: true +require 'test/unit' +require 'fiber' +require 'io/nonblock' +require 'io/wait' +require 'socket' +require 'tempfile' + +class TestThreadCoro < Test::Unit::TestCase + def require_nonblock + IO.method_defined?(:nonblock?) or skip 'pipe does not support non-blocking' + end + + # libkqueue is good for compile-testing, but can't handle corner cases + # like native kqueue does + def skip_libkqueue + %w(LDFLAGS LIBS DLDFLAGS).each do |k| + next if RbConfig::CONFIG[k] !~ /-lkqueue\b/ + skip "FIXME libkqueue is buggy on this test" + end + end + + def test_value + nr = 0 + f = Thread::Coro.new { nr += 1 } + assert_equal 1, f.value, 'value returns as expected' + assert_equal 1, f.value, 'value is idempotent' + end + + def test_join + nr = 0 + f = Thread::Coro.new { nr += 1 } + assert_equal f, f.join + assert_equal 1, f.value + assert_equal f, f.join, 'join is idempotent' + assert_equal f, f.join(10), 'join is idempotent w/ timeout' + assert_equal 1, f.value, 'value is unchanged after join' + end + + def test_runs_first + ary = [] + f = Thread::Coro.new { ary << 1 } + ary << 2 + assert_equal [1, 2], f.value + end + + def test_io_select_pipe + IO.pipe do |r, w| + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + f1 = Thread::Coro.new { IO.select([r], nil, nil, 0.02) } + assert_nil f1.value + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, 0.02 + assert_operator diff, :<, 0.2 + + skip_libkqueue # FIXME + f2 = Thread::Coro.new { IO.select([r], [w]) } + assert_equal [ [], [w], [] ], f2.value + + w.write '.' + f3 = Thread::Coro.new { IO.select([r]) } + assert_equal [ [r], [], [] ], f3.value + end + end + + def test_io_wait_pipe + IO.pipe do |r, w| + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + f1 = Thread::Coro.new { r.wait_readable(0.02) } + f2 = Thread::Coro.new { w.wait_writable } + assert_nil f1.value + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, 0.02 + assert_operator diff, :<, 0.2 + assert_same w, f2.value + skip_libkqueue + w.write '.' + f3 = Thread::Coro.new { r.wait_readable } + assert_same r, f3.value + end + end + + def test_io_wait_read + require_nonblock + skip_libkqueue + tick = 0.1 # 100ms, TIME_QUANTUM_USEC in thread_pthread.c + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + rdr = Thread::Coro.new { r.read(1) } + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + assert_nil rdr.join(tick), 'join returns nil on timeout' + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, tick, 'Thread::Coro still running' + w.write('a') + assert_equal 'a', rdr.value, 'finished' + + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + rwait = Thread::Coro.new { r.wait_readable(tick) } + assert_nil rwait.value, 'IO returned no events after timeout' + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, tick, 'Thread::Coro waited for timeout' + end + end + + # make sure we do not create extra FDs (from epoll/kqueue) + # for operations which do not need it + def test_fd_creation_and_fork + require_nonblock + assert_separately(%w(-r io/nonblock), <<~'end;') # do + fdrange = (3..64) + reserved = -'The given fd is not accessible because RubyVM reserves it' + fdstats = lambda do + stats = Hash.new(0) + fdrange.map do |fd| + begin + IO.for_fd(fd, autoclose: false) + stats[:good] += 1 + rescue Errno::EBADF # ignore + rescue ArgumentError => e + raise if e.message != reserved + stats[:reserved] += 1 + end + end + stats.freeze + end + + before = fdstats.call + Thread::Coro.new { :fib }.join + assert_equal before, fdstats.call, + 'Thread::Coro.new + Thread::Coro#join does not create new FD' + + # now, epoll/kqueue implementations create FDs, ensure they're reserved + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + + before_io = fdstats.call + assert_equal before[:reserved], before_io[:reserved], + 'no reserved FDs created before I/O attempted' + + f1 = Thread::Coro.new { r.read(1) } + after_io = fdstats.call + assert_equal before_io[:good], after_io[:good], + 'no change in unreserved FDs during I/O wait' + + w.write('!') + assert_equal '!', f1.value, 'auto-Thread::Coro IO#read works' + assert_operator before_io[:reserved], :<=, after_io[:reserved], + 'a reserved FD may be created on some implementations' + + # ensure we do not share epoll/kqueue FDs across fork + if Process.respond_to?(:fork) && Process.respond_to?(:waitpid2) + pid = fork do + after_fork = fdstats.call + w.write(after_fork.inspect == before_io.inspect ? '1' : '0') + + # ensure auto-Thread::Coro works properly after forking + IO.pipe do |a, b| + a.nonblock = b.nonblock = true + f1 = Thread::Coro.new { a.read(1) } + f2 = Thread::Coro.new { b.write('!') } + assert_equal 1, f2.value + w.write(f1.value == '!' ? '!' : '?') + end + + exit!(0) + end + assert_equal '1', r.read(1), 'reserved FD cleared after fork' + assert_equal '!', r.read(1), 'auto-Thread::Coro works after forking' + _, status = Process.waitpid2(pid) + assert_predicate status, :success?, 'forked child exited properly' + end + end + end; + end + + def test_waitpid + skip 'fork required' unless Process.respond_to?(:fork) + skip 'SIGCHLD required' unless Signal.list['CHLD'] + + assert_separately(%w(-r io/nonblock), <<~'end;') # do + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + pid = fork do + sleep 0.1 + exit!(0) + end + f = Thread::Coro.new { Process.waitpid2(pid) } + wpid, st = f.value + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + assert_operator diff, :>=, 0.1, 'child did not return immediately' + assert_equal pid, wpid, 'waited pid matches' + assert_predicate st, :success?, 'process exited successfully' + + IO.pipe do |r, w| + # "blocking" Process.waitpid2 takes precedence over trap(:CHLD) + waited = [] + chld_called = false + trap(:CHLD) { chld_called = true; waited.concat(Process.waitall) } + pid = fork do + r.read(1) + exit!(0) + end + assert_nil Process.waitpid2(pid, Process::WNOHANG), 'child is waiting' + assert_nil(Thread::Coro.new do + Process.waitpid2(pid, Process::WNOHANG) + end.value, 'WNOHANG works normally in Thread::Coro') + f = Thread::Coro.new { Process.waitpid2(pid) } + assert_equal 1, w.write('.'), 'woke child' + wpid, st = f.value + assert_equal pid, wpid, 'waited pid matches' + assert_predicate st, :success?, 'child exited successfully' + assert_empty waited, 'waitpid had predence' + assert chld_called, 'CHLD handler got called anyways' + + chld_called = false + [ 'DEFAULT', 'SIG_DFL', 'SYSTEM_DEFAULT' ].each do |handler| + trap(:CHLD, handler) + pid = fork do + r.read(1) + exit!(0) + end + t = "trap(:CHLD, #{handler.inspect})" + f = Thread::Coro.new { Process.waitpid2(pid) } + assert_equal 1, w.write('.'), "woke child for #{t}" + wpid, st = f.value + assert_predicate st, :success?, "child exited successfully for #{t}" + assert_equal wpid, pid, "return value correct for #{t}" + assert_equal false, chld_called, + "old CHLD handler did not fire for #{t}" + end + end + end; + end + + def test_cpu_usage_io + disabled = GC.disable + require_nonblock + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + t = fv = wr = nil + + # select() requires higher CPU% :< + assert_cpu_usage_low(pct: 0.50) do + t = Thread.new do + Thread::Coro.new do + r.read(1) + end.value + end + wr = Thread.new do + sleep 0.5 + w.write('..') + end + fv = Thread::Coro.new do + r.read(1) + end.value + end + assert_equal '.', fv + assert_equal '.', t.value + wr.join + end + ensure + GC.enable unless disabled + end + + def test_cpu_usage_waitpid + disabled = GC.disable + thrs = [] + pid = spawn(*%W(#{EnvUtil.rubybin} --disable=gems -e #{'sleep 0.5'})) + 2.times do + thrs << Thread.new { Thread::Coro.new { Process.waitall }.value } + end + assert_cpu_usage_low(pct: 0.44) do + thrs.map!(&:value) + end + ensure + GC.enable unless disabled + end + + # As usual, cannot resume/run Thread::Coros across threads, but the + # scheduler works across threads + def test_cross_thread_schedule + require_nonblock + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + t = Thread.new { Thread::Coro.new { r.read(1) }.value } + assert_nil t.join(0.1) + f = Thread::Coro.new { w.write('a') } + assert_equal 'a', t.value + assert_equal 1, f.value + end + end + + # tricky for kqueue and epoll implementations + def test_multi_readers + skip_libkqueue if RUBY_PLATFORM =~ /linux/ + require_nonblock + IO.pipe do |r, w| + r.nonblock = w.nonblock = true + nr = 30 + fibs = nr.times.map { Thread::Coro.new { r.read(1) } } + fibs.each { |f| assert_nil f.join(0.001) } + + exp = nr.times.map { -'a' }.freeze + w.write(exp.join) + assert_equal exp, fibs.map(&:value); + end + end + + def test_unix_rw + unless defined?(UNIXSocket) && UNIXSocket.respond_to?(:pair) + skip 'no UNIXSocket.pair' + end + + # at least epoll emulation of kqueue cannot deal with simultaneous + # watching the same FD on both EVFILT_READ and EVFILT_WRITE + skip_libkqueue + + # ensure simultaneous waits on the same FD works + UNIXSocket.pair do |a, b| + assert_predicate a, :wait_writable + f0 = Thread::Coro.new { a.wait_readable } # cold + f9 = Thread::Coro.new { a.wait_writable } # already armed + assert_equal a, f9.value + + f4 = Thread::Coro.new { a.wait_readable } + b.write('.') + f5 = Thread::Coro.new { a.wait_writable } + f6 = Thread::Coro.new { a.wait_readable } + assert_equal a, f4.value + assert_equal a, f5.value + assert_equal a, f6.value + assert_equal a, f0.value + end + end + + def test_reg_file_wait_readable + skip_libkqueue + File.open(__FILE__) do |fp| + t = Thread::Coro.new { fp.wait_readable } + assert_same fp, t.value + end + end + + def test_reg_file_select_readable + skip_libkqueue + File.open(__FILE__) do |fp| + t = Thread::Coro.new { IO.select([fp]) } + assert_equal [[fp],[],[]], t.value + end + end + + def test_reg_file_wait_writable + skip_libkqueue + Dir.mktmpdir('wait_writable') do |dir| + File.open("#{dir}/reg", "w") do |fp| + t = Thread::Coro.new { fp.wait_writable } + assert_same fp, t.value + end + end + end + + def test_reg_file_select_writable + skip_libkqueue + Dir.mktmpdir('select_writable') do |dir| + File.open("#{dir}/reg", "w") do |fp| + t = Thread::Coro.new { IO.select(nil, [fp]) } + assert_equal [[],[fp],[]], t.value + end + end + end + + def test_system + st = [] + th = Thread::Coro.new do + ret = system(*%W(#{EnvUtil.rubybin} --disable=gems -e exit!(false))) + st << $? + ret + end + assert_equal false, th.value + th = Thread::Coro.new do + ret = system(*%W(#{EnvUtil.rubybin} --disable=gems -e exit!(true))) + st << $? + ret + end + assert_equal true, th.value + assert_not_predicate st[0], :success? + assert_predicate st[1], :success? + end + + def test_ensure_at_exit + assert_in_out_err([], <<-EOS, %w(ok)) + IO.pipe do |r, _| + Thread::Coro.new { + begin + IO.select([r]) + ensure + puts "ok" + end + } + end + EOS + end + + def test_ensure_by_gc_on_file_limit + assert_ruby_status(nil, <<-'EOS') + begin + n = Process.getrlimit(Process::RLIMIT_NOFILE)[0] + sane_max = 64 + Process.setrlimit(Process::RLIMIT_NOFILE, sane_max) if n > sane_max + rescue + end if Process.const_defined?(:RLIMIT_NOFILE) + r, w = IO.pipe + a = b = '0' + begin + Thread::Coro.new do + begin + a.succ! + IO.pipe { IO.select([r]) } + ensure + b.succ! + end + end + rescue Errno::EMFILE, Errno::ENFILE, Errno::ENOMEM + warn "a=#{a} b=#{b}" if a != b + exit!(a == b && a.to_i > 4) + end while true + EOS + end + + def test_puts_in_sub_thread + assert_in_out_err([], <<-EOS, %w(ok)) + Thread.new do + Thread::Coro.new do + begin + IO.pipe { |r,_| IO.select([r]) } + ensure + puts "ok" + end + end + end.join(0.001) + EOS + end + + def test_raise_in_main_thread + assert_normal_exit %q{ + IO.pipe { |r,_| + Thread::Coro.new do + begin + IO.select([r]) + ensure + raise + end + end + } + } + end + + def test_raise_in_sub_thread + assert_normal_exit %q{ + IO.pipe { |r,_| + th = Thread.new do + Thread::Coro.new do + begin + IO.select([r]) + ensure + raise + end + end + sleep + end + Thread.pass until th.stop? + exit!(0) + } + } + end + + def test_ensure_thread_kill + IO.pipe { |r,_| + q = Queue.new + th = Thread.new do + Thread::Coro.new { + begin + IO.select([r]) + ensure + q.push 'ok' + end + } + end + Thread.pass until th.stop? + th.kill + th.join + assert_equal 'ok', q.pop(true) + } + end +end diff --git a/thread.c b/thread.c index 30c37835ca..2014b8b589 100644 --- a/thread.c +++ b/thread.c @@ -75,6 +75,7 @@ #include "iseq.h" #include "vm_core.h" #include "hrtime.h" +#include "iom.h" #ifndef USE_NATIVE_THREAD_PRIORITY #define USE_NATIVE_THREAD_PRIORITY 0 @@ -105,7 +106,6 @@ static int rb_threadptr_dead(rb_thread_t *th); static void rb_check_deadlock(rb_vm_t *vm); static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th); static const char *thread_status_name(rb_thread_t *th, int detail); -static int hrtime_update_expire(rb_hrtime_t *, const rb_hrtime_t); NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd)); static int consume_communication_pipe(int fd); static int check_signals_nogvl(rb_thread_t *, int sigwait_fd); @@ -239,6 +239,16 @@ timeout_prepare(rb_hrtime_t **to, rb_hrtime_t *rel, rb_hrtime_t *end, } } +static rb_hrtime_t * +timeout2hrtime(rb_hrtime_t *val, const struct timeval *timeout) +{ + if (timeout) { + *val = rb_timeval2hrtime(timeout); + return val; + } + return 0; +} + #if THREAD_DEBUG #ifdef HAVE_VA_ARGS_MACRO void rb_thread_debug(const char *file, int line, const char *fmt, ...); @@ -750,6 +760,8 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_s (void *)th, th->locking_mutex); } + rb_coro_finish(th); /* run "ensure" statements */ + /* delete self other than main thread from living_threads */ rb_vm_living_threads_remove(th->vm, th); if (main_th->status == THREAD_KILLED && rb_thread_alone()) { @@ -948,7 +960,7 @@ thread_join_sleep(VALUE arg) th->vm->sleeper--; } else { - if (hrtime_update_expire(p->limit, end)) { + if (rb_hrtime_update_expire(p->limit, end)) { thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n", thread_id_str(target_th)); return Qfalse; @@ -1026,6 +1038,27 @@ thread_join(rb_thread_t *target_th, rb_hrtime_t *rel) static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double); +rb_hrtime_t * +rb_join_interval(rb_hrtime_t *rel, int argc, VALUE *argv) +{ + VALUE limit; + + rb_scan_args(argc, argv, "01", &limit); + + /* + * This supports INFINITY and negative values, so we can't use + * rb_time_interval right now... + */ + switch (TYPE(limit)) { + case T_NIL: return 0; + case T_FIXNUM: + *rel = rb_sec2hrtime(NUM2TIMET(limit)); + return rel; + default: + return double2hrtime(rel, rb_num2dbl(limit)); + } +} + /* * call-seq: * thr.join -> thr @@ -1068,26 +1101,9 @@ static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double); static VALUE thread_join_m(int argc, VALUE *argv, VALUE self) { - VALUE limit; - rb_hrtime_t rel, *to = 0; - - rb_scan_args(argc, argv, "01", &limit); + rb_hrtime_t rel; - /* - * This supports INFINITY and negative values, so we can't use - * rb_time_interval right now... - */ - switch (TYPE(limit)) { - case T_NIL: break; - case T_FIXNUM: - rel = rb_sec2hrtime(NUM2TIMET(limit)); - to = &rel; - break; - default: - to = double2hrtime(&rel, rb_num2dbl(limit)); - } - - return thread_join(rb_thread_ptr(self), to); + return thread_join(rb_thread_ptr(self), rb_join_interval(&rel, argc, argv)); } /* @@ -1158,7 +1174,6 @@ getclockofday(struct timespec *ts) * Don't inline this, since library call is already time consuming * and we don't want "struct timespec" on stack too long for GC */ -NOINLINE(rb_hrtime_t rb_hrtime_now(void)); rb_hrtime_t rb_hrtime_now(void) { @@ -1207,8 +1222,8 @@ COMPILER_WARNING_IGNORED(-Wmaybe-uninitialized) * Returns true if @end has past * Updates @ts and returns false otherwise */ -static int -hrtime_update_expire(rb_hrtime_t *timeout, const rb_hrtime_t end) +int +rb_hrtime_update_expire(rb_hrtime_t *timeout, const rb_hrtime_t end) { rb_hrtime_t now = rb_hrtime_now(); @@ -1235,7 +1250,7 @@ sleep_hrtime(rb_thread_t *th, rb_hrtime_t rel, unsigned int fl) woke = vm_check_ints_blocking(th->ec); if (woke && !(fl & SLEEP_SPURIOUS_CHECK)) break; - if (hrtime_update_expire(&rel, end)) + if (rb_hrtime_update_expire(&rel, end)) break; } th->status = prev_status; @@ -1259,12 +1274,19 @@ void rb_thread_sleep_interruptible(void) { rb_thread_t *th = GET_THREAD(); - enum rb_thread_status prev_status = th->status; - th->status = THREAD_STOPPED; - native_sleep(th, 0); - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - th->status = prev_status; + if (rb_coro_sched_p(th->ec)) { + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + if (!rb_coro_schedule_ready_p(th->ec)) + VM_UNREACHABLE(rb_thread_sleep_interruptible); + } + else { + enum rb_thread_status prev_status = th->status; + th->status = THREAD_STOPPED; + native_sleep(th, 0); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + th->status = prev_status; + } } static void @@ -3803,7 +3825,7 @@ wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end) case ERESTART: #endif *result = 0; - if (rel && hrtime_update_expire(rel, end)) { + if (rel && rb_hrtime_update_expire(rel, end)) { *rel = 0; } return TRUE; @@ -3813,7 +3835,7 @@ wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end) else if (*result == 0) { /* check for spurious wakeup */ if (rel) { - return !hrtime_update_expire(rel, end); + return !rb_hrtime_update_expire(rel, end); } return TRUE; } @@ -3968,6 +3990,13 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * set.th = GET_THREAD(); RUBY_VM_CHECK_INTS_BLOCKING(set.th->ec); + if (rb_coro_sched_p(set.th->ec)) { + rb_hrtime_t hrt; + const rb_hrtime_t *rel = timeout2hrtime(&hrt, timeout); + + return rb_iom_select(set.th, max, read, write, except, rel); + } + set.max = max; set.sigwait_fd = rb_sigwait_fd_get(set.th); set.rset = read; @@ -4038,6 +4067,9 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) rb_unblock_function_t *ubf; RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + if (rb_coro_sched_p(th->ec)) + return rb_iom_waitfd(th, &fd, events, timeout2hrtime(&rel, timeout)); + timeout_prepare(&to, &rel, &end, timeout); fds[0].fd = fd; fds[0].events = (short)events; @@ -4161,7 +4193,13 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv) struct select_args args; int r; VALUE ptr = (VALUE)&args; + rb_thread_t *th = GET_THREAD(); + + if (rb_coro_sched_p(th->ec)) { + rb_hrtime_t hrt; + return rb_iom_waitfd(th, &fd, events, timeout2hrtime(&hrt, tv)); + } args.as.fd = fd; args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL; args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL; @@ -4385,10 +4423,13 @@ terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th) /* mjit.c */ void mjit_child_after_fork(void); void rb_fiber_atfork(rb_thread_t *); +static void rb_iom_atfork_child(rb_thread_t *); + void rb_thread_atfork(void) { rb_thread_t *th = GET_THREAD(); + rb_iom_atfork_child(th); rb_thread_atfork_internal(th, terminate_atfork_i); th->join_list = NULL; rb_mutex_cleanup_keeping_mutexes(th); @@ -5425,3 +5466,21 @@ rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data) return rb_ensure(b_proc, data, rb_ary_pop, cur_th->pending_interrupt_mask_stack); } + +#ifndef RUBYVM_IOM +# if defined(HAVE_SYS_EVENT_H) && defined(HAVE_KQUEUE) && defined(HAVE_KEVENT) +# define RUBYVM_IOM IOM_KQUEUE +# elif defined(HAVE_SYS_EPOLL_H) && defined(HAVE_EPOLL_CREATE) && \ + defined(HAVE_EPOLL_CTL) && defined(HAVE_EPOLL_WAIT) +# define RUBYVM_IOM IOM_EPOLL +# else +# define RUBYVM_IOM IOM_SELECT +# endif +#endif +#if RUBYVM_IOM == IOM_KQUEUE +# include "iom_kqueue.h" +#elif RUBYVM_IOM == IOM_EPOLL +# include "iom_epoll.h" +#else +# include "iom_select.h" +#endif diff --git a/thread_pthread.c b/thread_pthread.c index f5107bd7af..0159b65291 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -532,7 +532,7 @@ native_cond_timeout(rb_nativethread_cond_t *cond, const rb_hrtime_t rel) else { struct timespec ts; - rb_timespec_now(&ts); + rb_timespec_now(&ts); /* CLOCK_REALTIME */ return rb_hrtime_add(rb_timespec2hrtime(&ts), rel); } } @@ -1260,7 +1260,6 @@ native_cond_sleep(rb_thread_t *th, rb_hrtime_t *rel) } } th->unblock.func = 0; - rb_native_mutex_unlock(lock); } GVL_UNLOCK_END(th); @@ -1829,6 +1828,8 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr) } #endif +static int rb_iom_reserved_fd(int); /* check kqueue or epoll FD */ + int rb_reserved_fd_p(int fd) { @@ -1838,6 +1839,8 @@ rb_reserved_fd_p(int fd) #endif if (fd == signal_self_pipe.normal[0] || fd == signal_self_pipe.normal[1]) goto check_pid; + if (rb_iom_reserved_fd(fd)) + goto check_pid; return 0; check_pid: @@ -1978,7 +1981,7 @@ rb_sigwait_sleep(rb_thread_t *th, int sigwait_fd, const struct timespec *ts) return; if (n || (th && RUBY_VM_INTERRUPTED(th->ec))) return; - if (ts && hrtime_update_expire(&rel, end)) + if (ts && rb_hrtime_update_expire(&rel, end)) return; } } diff --git a/thread_win32.c b/thread_win32.c index 83896d81ff..a760d601cd 100644 --- a/thread_win32.c +++ b/thread_win32.c @@ -686,6 +686,8 @@ static struct { } timer_thread; #define TIMER_THREAD_CREATED_P() (timer_thread.id != 0) +void ruby_waitpid_all(rb_vm_t *); /* process.c */ + static unsigned long __stdcall timer_thread_func(void *dummy) { @@ -695,7 +697,7 @@ timer_thread_func(void *dummy) while (WaitForSingleObject(timer_thread.lock, TIME_QUANTUM_USEC/1000) == WAIT_TIMEOUT) { timer_thread_function(); - ruby_sigchld_handler(vm); /* probably no-op */ + ruby_waitpid_all(vm); rb_threadptr_check_signal(vm->main_thread); } thread_debug("timer killed\n"); @@ -774,10 +776,12 @@ ruby_alloca_chkstk(size_t len, void *sp) } } #endif + +static int rb_iom_reserved_fd(int); /* iom_select.h */ int rb_reserved_fd_p(int fd) { - return 0; + return rb_iom_reserved_fd(fd); } int diff --git a/vm.c b/vm.c index 5c33cfb300..260c9fc7f3 100644 --- a/vm.c +++ b/vm.c @@ -8,6 +8,7 @@ **********************************************************************/ +#include "iom.h" #include "internal.h" #include "ruby/vm.h" #include "ruby/st.h" @@ -2154,6 +2155,9 @@ rb_vm_mark(void *ptr) rb_gc_mark_values(RUBY_NSIG, vm->trap_list.cmd); + if (vm->iom) + rb_iom_mark(vm->iom); + mjit_mark(); } @@ -2202,6 +2206,7 @@ ruby_vm_destruct(rb_vm_t *vm) rb_fiber_reset_root_local_storage(th->self); thread_free(th); } + rb_iom_destroy(vm); rb_vm_living_threads_init(vm); ruby_vm_run_at_exit_hooks(vm); if (vm->loading_table) { @@ -2435,6 +2440,8 @@ thread_mark(void *ptr) RUBY_MARK_ENTER("thread"); rb_fiber_mark_self(th->ec->fiber_ptr); + rb_iom_mark_thread(th); + /* mark ruby objects */ RUBY_MARK_UNLESS_NULL(th->first_proc); if (th->first_proc) RUBY_MARK_UNLESS_NULL(th->first_args); @@ -2531,6 +2538,9 @@ th_init(rb_thread_t *th, VALUE self) { th->self = self; rb_threadptr_root_fiber_setup(th); + list_head_init(&th->runq); + list_head_init(&th->waiting_coro); + list_head_init(&th->nogvl_runq); { /* vm_stack_size is word number. diff --git a/vm_core.h b/vm_core.h index 59108dafb8..04405ae0f0 100644 --- a/vm_core.h +++ b/vm_core.h @@ -556,6 +556,8 @@ typedef struct rb_hook_list_struct { int need_clean; } rb_hook_list_t; +struct rb_iom_struct; + typedef struct rb_vm_struct { VALUE self; @@ -569,7 +571,7 @@ typedef struct rb_vm_struct { #ifdef USE_SIGALTSTACK void *main_altstack; #endif - + struct rb_iom_struct *iom; rb_serial_t fork_gen; rb_nativethread_lock_t waitpid_lock; struct list_head waiting_pids; /* PID > 0: <=> struct waitpid_state */ @@ -827,6 +829,9 @@ typedef struct rb_execution_context_struct { /* ensure & callcc */ rb_ensure_list_t *ensure_list; + /* ensure for Thread::Coro */ + struct list_node enode; + /* trace information */ struct rb_trace_arg_struct *trace_arg; @@ -923,6 +928,18 @@ typedef struct rb_thread_struct { rb_fiber_t *root_fiber; rb_jmpbuf_t root_jmpbuf; + /* + * Coros waiting for something, needed to support "ensure" in + * Thread::Coro. Links with rb_execution_context_t.enode + */ + struct list_head waiting_coro; + + /* ready-to-run coros, <=> rb_sched_waiter.wnode, protected by GVL */ + struct list_head runq; + + /* same as runq, but protected by interrupt_lock instead of GVL */ + struct list_head nogvl_runq; + /* misc */ VALUE name; @@ -1749,6 +1766,7 @@ void rb_threadptr_pending_interrupt_enque(rb_thread_t *th, VALUE v); void rb_ec_error_print(rb_execution_context_t * volatile ec, volatile VALUE errinfo); void rb_execution_context_mark(const rb_execution_context_t *ec); void rb_fiber_close(rb_fiber_t *fib); +void rb_coro_finish(rb_thread_t *); void Init_native_thread(rb_thread_t *th); #define RUBY_VM_CHECK_INTS(ec) rb_vm_check_ints(ec) -- EW