From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS37560 197.231.220.0/22 X-Spam-Status: No, score=-3.3 required=3.0 tests=AWL,BAYES_00,RCVD_IN_XBL, SPF_FAIL,SPF_HELO_FAIL,TO_EQ_FM_DOM_SPF_FAIL shortcircuit=no autolearn=no autolearn_force=no version=3.4.0 Received: from 80x24.org (exit1.ipredator.se [197.231.221.211]) by dcvr.yhbt.net (Postfix) with ESMTP id 84741201CF for ; Fri, 19 May 2017 04:27:41 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [PATCH] auto fiber scheduling and friends (VERY LIGHTLY TESTED) Date: Fri, 19 May 2017 04:27:38 +0000 Message-Id: <20170519042738.7174-1-e@80x24.org> List-Id: Currently, IO scheduling seems to, waitpid/sleep/other scheduling is not done, yet; but we do not need to support everything at once during dev. main API: Fiber#start -> enable auto-scheduling and run Fiber until it automatically yields (due to EAGAIN/EWOULDBLOCK) The following behave like their Thread counterparts: Fiber#join - run internal scheduler until Fiber is terminated Fiber#value - ditto Fiber#run (in prelude.rb) Fiber.start (ditto) I think we can iron out the internal APIs and behavior, first, and gradually add support for auto-Fiber.yield points. Right now, it takes over rb_wait_for_single_fd() function if the running Fiber is auto-enabled (cont.c::rb_fiber_auto_sched_p) Changes to existing functions are minimal. New files (important structs and relations should be documented): iom.h - internal API for the rest of RubyVM (incomplete?) iom_common.h - common stuff internal to iom_*.h iom_select.h - select()-specific pieces Changes to existing data structures: rb_thread_t.afhead - list of fibers to auto-resume rb_fiber_t.afnode - link to th->afhead rb_vm_t.iom - Ruby I/O Manager (rb_iom_t) :) Besides rb_iom_t, all the new structs are stack-only and relies extensively on ccan/list for O(1) insert/delete. Right now, I reuse some static functions in thread.c, so thread.c includes iom_select.h TODO: Hijack other blocking functions (waitpid, IO.select, ...) iom_epoll.h + iom_kqueue.h (easy once iom.h definitions are done) I am using "double" for timeout since it is more convenient for arithmetic like parts of thread.c. Most platforms have good FP, I think. Also, all "blocking" functions (rb_iom_wait*) will have timeout support Test script I used to download a script from my server: ----8<--- require 'net/http' require 'uri' require 'digest/sha1' require 'fiber' url = 'http://80x24.org/git-i-forgot-to-pack/objects/pack/pack-97b25a76c03b489d4cbbd85b12d0e1ad28717e55.idx' uri = URI(url) use_ssl = "https" == uri.scheme fibs = 10.times.map do Fiber.start do cur = Fiber.current.object_id # XXX getaddrinfo() and connect() are blocking # XXX resolv/replace + connect_nonblock Net::HTTP.start(uri.host, uri.port, use_ssl: use_ssl) do |http| req = Net::HTTP::Get.new(uri) http.request(req) do |res| dig = Digest::SHA1.new res.read_body do |buf| dig.update(buf) #warn "#{cur} #{buf.bytesize}\n" end warn "#{cur} #{dig.hexdigest}\n" end end warn "done\n" :done end end warn "joining #{Time.now}\n" fibs[-1].join(4) warn "joined #{Time.now}\n" all = fibs.dup warn "1 joined, wait for the rest\n" until fibs.empty? fibs.each(&:join) fibs.keep_if(&:alive?) warn fibs.inspect end p all.map(&:value) Fiber.new do puts 'HI' end.run.join END --- common.mk | 3 + configure.in | 2 +- cont.c | 156 +++++++++++++++++++- include/ruby/io.h | 2 + iom.h | 82 +++++++++++ iom_common.h | 93 ++++++++++++ iom_select.h | 419 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ prelude.rb | 12 ++ thread.c | 22 +++ vm.c | 7 +- vm_core.h | 5 +- 11 files changed, 795 insertions(+), 8 deletions(-) create mode 100644 iom.h create mode 100644 iom_common.h create mode 100644 iom_select.h diff --git a/common.mk b/common.mk index befe98fddc..e52b1872b4 100644 --- a/common.mk +++ b/common.mk @@ -2596,6 +2596,9 @@ thread.$(OBJEXT): {$(VPATH)}id.h thread.$(OBJEXT): {$(VPATH)}intern.h thread.$(OBJEXT): {$(VPATH)}internal.h thread.$(OBJEXT): {$(VPATH)}io.h +thread.$(OBJEXT): {$(VPATH)}iom.h +thread.$(OBJEXT): {$(VPATH)}iom_common.h +thread.$(OBJEXT): {$(VPATH)}iom_select.h thread.$(OBJEXT): {$(VPATH)}method.h thread.$(OBJEXT): {$(VPATH)}missing.h thread.$(OBJEXT): {$(VPATH)}node.h diff --git a/configure.in b/configure.in index cc008dbebf..9c5def88ed 100644 --- a/configure.in +++ b/configure.in @@ -523,7 +523,7 @@ AS_CASE(["$target_os"], AC_SUBST(LD) if test "$GCC" = yes; then linker_flag=-Wl, - : ${optflags=-O3} + : ${optflags=-O0} gcc_major=`echo =__GNUC__ | $CC -E -xc - | sed '/^=/!d;s///'` gcc_minor=`echo =__GNUC_MINOR__ | $CC -E -xc - | sed '/^=/!d;s///'` test -n "$gcc_major" || gcc_major=0 diff --git a/cont.c b/cont.c index 4d6176f00c..ced496f61b 100644 --- a/cont.c +++ b/cont.c @@ -13,6 +13,7 @@ #include "vm_core.h" #include "gc.h" #include "eval_intern.h" +#include "iom.h" /* FIBER_USE_NATIVE enables Fiber performance improvement using system * dependent method such as make/setcontext on POSIX system or @@ -126,6 +127,14 @@ static machine_stack_cache_t terminated_machine_stack; struct rb_fiber_struct { rb_context_t cont; struct rb_fiber_struct *prev; + + /* + * afnode.next == 0 auto fiber disabled + * afnode.next == &afnode auto fiber enabled, but not enqueued (running) + * afnode.next != &afnode auto fiber runnable (enqueued in th->afhead) + */ + struct list_node afnode; + enum fiber_status status; /* If a fiber invokes "transfer", * then this fiber can't "resume" any more after that. @@ -1496,19 +1505,24 @@ rb_fiber_terminate(rb_fiber_t *fib) fiber_switch(return_fiber(), 1, &value, 0); } -VALUE -rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv) +static void +fiber_check_resume(const rb_fiber_t *fib) { - rb_fiber_t *fib; - GetFiberPtr(fibval, fib); - if (fib->prev != 0 || fib->cont.type == ROOT_FIBER_CONTEXT) { rb_raise(rb_eFiberError, "double resume"); } if (fib->transferred != 0) { rb_raise(rb_eFiberError, "cannot resume transferred Fiber"); } +} +VALUE +rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv) +{ + rb_fiber_t *fib; + GetFiberPtr(fibval, fib); + + fiber_check_resume(fib); return fiber_switch(fib, argc, argv, 1); } @@ -1651,7 +1665,136 @@ rb_fiber_s_current(VALUE klass) return rb_fiber_current(); } +/* enqueue given Fiber so it may be auto-resumed */ +void +rb_fiber_auto_enqueue(VALUE fibval) +{ + rb_fiber_t *fib; + rb_thread_t *th; + + GetFiberPtr(fibval, fib); + GetThreadPtr(fib->cont.saved_thread.self, th); + list_add_tail(&th->afhead, &fib->afnode); +} + +/* for vm.c::rb_thread_mark */ +void +rb_fiber_auto_schedule_mark(const rb_thread_t *th) +{ + rb_fiber_t *fib = 0; + list_for_each(&th->afhead, fib, afnode) { + rb_gc_mark(fib->cont.self); + } +} + +/* Returns true if auto-fiber is enabled for current fiber */ +int +rb_fiber_auto_sched_p(const rb_thread_t *th) +{ + const rb_fiber_t *cur = th->fiber; + + return (cur && cur->afnode.next && th->root_fiber != cur); +} + +/* + * Resumes any ready fibers in the auto-Fiber run queue + * returns true if yielding current Fiber is needed, false if not + */ +int +rb_fiber_auto_do_yield_p(rb_thread_t *th) +{ + rb_fiber_t *current_auto = rb_fiber_auto_sched_p(th) ? th->fiber : 0; + rb_fiber_t *fib = 0, *next = 0; + LIST_HEAD(tmp); + + /* + * do not infinite loop as new fibers get added to + * th->afhead, only work off a temporary list: + */ + list_append_list(&tmp, &th->afhead); + list_for_each_safe(&tmp, fib, next, afnode) { + if (fib == current_auto || (fib->prev && fib != th->root_fiber)) { + /* tell the caller to yield */ + list_prepend_list(&th->afhead, &tmp); + return 1; + } + list_del_init(&fib->afnode); + fiber_check_resume(fib); + fiber_switch(fib, 0, 0, 1); + } + return 0; +} + +/* + * Enable auto-scheduling for the Fiber and resume it + */ +static VALUE +rb_fiber_auto_start(int argc, VALUE *argv, VALUE self) +{ + rb_fiber_t *fib; + GetFiberPtr(self, fib); + if (fib->afnode.next) { + rb_raise(rb_eFiberError, "Fiber already started"); + } + list_node_init(&fib->afnode); + fiber_check_resume(fib); + return fiber_switch(fib, argc, argv, 1); +} + +static void +fiber_auto_join(rb_fiber_t *fib, double *timeout) +{ + rb_thread_t *th = GET_THREAD(); + rb_fiber_t *cur = fiber_current(); + if (cur == fib) { + rb_raise(rb_eFiberError, "Target fiber must not be current fiber"); + } + if (th->root_fiber == fib) { + rb_raise(rb_eFiberError, "Target fiber must not be root fiber"); + } + if (fib->cont.saved_thread.self != th->self) { + rb_raise(rb_eFiberError, "Target fiber not owned by current thread"); + } + if (!fib->afnode.next) { + rb_raise(rb_eFiberError, "Target fiber is not an auto-fiber"); + } + + while (fib->status != TERMINATED && (timeout == 0 || *timeout >= 0.0)) { + rb_iom_schedule(th, timeout); + } +} + +static VALUE +rb_fiber_auto_join(int argc, VALUE *argv, VALUE self) +{ + rb_fiber_t *fib; + double timeout, *t; + VALUE limit; + + GetFiberPtr(self, fib); + rb_scan_args(argc, argv, "01", &limit); + + if (NIL_P(limit)) { + t = 0; + } else { + timeout = rb_num2dbl(limit); + t = &timeout; + } + + fiber_auto_join(fib, t); + return fib->status == TERMINATED ? fib->cont.self : Qnil; +} + +static VALUE +rb_fiber_auto_value(VALUE self) +{ + rb_fiber_t *fib; + GetFiberPtr(self, fib); + + fiber_auto_join(fib, 0); + return fib->cont.value; +} /* * Document-class: FiberError @@ -1688,6 +1831,9 @@ Init_Cont(void) rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1); rb_define_method(rb_cFiber, "initialize", rb_fiber_init, 0); rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1); + rb_define_method(rb_cFiber, "start", rb_fiber_auto_start, -1); + rb_define_method(rb_cFiber, "join", rb_fiber_auto_join, -1); + rb_define_method(rb_cFiber, "value", rb_fiber_auto_value, 0); } RUBY_SYMBOL_EXPORT_BEGIN diff --git a/include/ruby/io.h b/include/ruby/io.h index 60d6f6d32e..3bd6f06cf3 100644 --- a/include/ruby/io.h +++ b/include/ruby/io.h @@ -116,6 +116,8 @@ typedef struct rb_io_t { /* #define FMODE_UNIX 0x00200000 */ /* #define FMODE_INET 0x00400000 */ /* #define FMODE_INET6 0x00800000 */ +/* #define FMODE_IOM_PRIVATE1 0x01000000 */ /* OS-dependent */ +/* #define FMODE_IOM_PRIVATE2 0x02000000 */ /* OS-dependent */ #define GetOpenFile(obj,fp) rb_io_check_closed((fp) = RFILE(rb_io_taint_check(obj))->fptr) diff --git a/iom.h b/iom.h new file mode 100644 index 0000000000..2323d39ca6 --- /dev/null +++ b/iom.h @@ -0,0 +1,82 @@ +/* + * iom -> I/O Manager for RubyVM (auto-Fiber-aware) + * + * On platforms with epoll or kqueue, this should be ready for multicore; + * even if the rest of the RubyVM is not. + * + * Some inspiration taken from Mio in GHC: + * http://haskell.cs.yale.edu/wp-content/uploads/2013/08/hask035-voellmy.pdf + */ +#ifndef RUBY_IOM_H +#define RUBY_IOM_H +#include "ruby.h" +#include "ruby/intern.h" +#include "vm_core.h" + +typedef struct rb_iom_struct rb_iom_t; + +/* WARNING: unstable API, only for Ruby internal use */ + +/* + * Note: the first "rb_thread_t *" is a placeholder and may be replaced + * with "rb_execution_context_t *" in the future. + */ + +/* + * All functions with "wait" in it take an optional double * +timeout+ + * argument specifying the timeout in seconds. If NULL, it can wait + * forever until the event happens (or the fiber is explicitly resumed). + * + * (maybe) TODO: If non-NULL, the timeout will be updated to the + * remaining time upon returning. Not sure if useful, could just be + * a a waste of cycles; so not implemented, yet. + */ + +/* + * Relinquish calling fiber while waiting for +events+ on file descriptor + * pointed to by +fdp+. + * Multiple native threads can enter this function at the same time. + * + * Events are RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI + * + * Returns a mask of events. + */ +int rb_iom_waitfd(rb_thread_t *, int *fdp, int events, double *timeout); + +/* + * Relinquish calling fiber to wait for the given PID to change status. + * Multiple native threads can enter this function at the same time. + * If timeout is negative, wait forever. + */ +rb_pid_t rb_iom_waitpid(rb_thread_t *, + rb_pid_t, int *status, int options, double *timeout); + +/* + * Relinquish calling fiber for at least the duration of given timeout + * in seconds. If timeout is negative, wait forever (until explicitly + * resumed). + * Multiple native threads can enter this function at the same time. + */ +void rb_iom_sleep(rb_thread_t *, double *timeout); + +/* callback for SIGCHLD, needed to implemented rb_iom_waitpid */ +void rb_iom_sigchld(rb_iom_t *); + +/* callbacks for fork */ +void rb_iom_atfork_prepare(rb_thread_t *, rb_iom_t *); +void rb_iom_atfork_parent(rb_thread_t *, rb_iom_t *); +void rb_iom_atfork_child(rb_thread_t *, rb_iom_t *); + +/* + * there is no public create function, creation is lazy to avoid incurring + * overhead for small scripts which do not need fibers, we only need this + * at VM destruction + */ +int rb_iom_destroy(rb_iom_t *); + +/* + * schedule + */ +void rb_iom_schedule(rb_thread_t *th, double *timeout); + +#endif /* RUBY_IOM_H */ diff --git a/iom_common.h b/iom_common.h new file mode 100644 index 0000000000..2cab87a88e --- /dev/null +++ b/iom_common.h @@ -0,0 +1,93 @@ +#ifndef RB_IOM_COMMON_H +#define RB_IOM_COMMON_H + +#include "iom.h" + +/* cont.c */ +void rb_fiber_auto_enqueue(VALUE fibval); +int rb_fiber_auto_do_yield_p(rb_thread_t *); + +/* allocated on stack */ +struct rb_iom_timer { + struct list_node tnode; /* <=> rb_iom_struct.timers */ + double expires_at; /* absolute monotonic time */ +}; + +struct rb_iom_waiter { + struct rb_iom_timer timer; + rb_thread_t *th; + VALUE fibval; /* Qfalse: enqueued in afhead */ + struct list_node wnode; /* <=> rb_iom_struct.(fds|pids) */ +}; + +struct rb_iom_fd_waiter { + struct rb_iom_waiter w; + int *fdp; + short events; + short revents; +}; + +struct rb_iom_pid_waiter { + struct rb_iom_waiter w; + rb_pid_t pid; + int status; + int options; +}; + +/* check for expired timers */ +static void +rb_iom_timer_check(struct list_head *timers) +{ + struct rb_iom_timer *i = 0, *next = 0; + double now = timeofday(); + + list_for_each_safe(timers, i, next, tnode) { + if (i->expires_at <= now) { + struct rb_iom_waiter *w; + VALUE fibval; + + w = container_of(i, struct rb_iom_waiter, timer); + list_del_init(&i->tnode); + list_del_init(&w->wnode); + fibval = w->fibval; + w->fibval = Qfalse; + + /* non-auto-fibers may set timer in rb_iom_schedule */ + if (fibval != Qfalse) { + rb_fiber_auto_enqueue(fibval); + } + } + return; /* done, timers is a sorted list */ + } +} + +/* insert a new +timer+ into +timers+, maintain sort order by expires_at */ +static void +rb_iom_timer_add(struct list_head *timers, struct rb_iom_timer *add, + const double *timeout) +{ + rb_iom_timer_check(timers); + if (timeout) { + struct rb_iom_timer *i = 0; + add->expires_at = timeofday() + *timeout; + + /* + * search backwards: assume typical projects have multiple objects + * sharing the same timeout values, so new timers will expire later + * than existing timers + */ + list_for_each_rev(timers, i, tnode) { + if (add->expires_at > i->expires_at) { + list_add_after(timers, &i->tnode, &add->tnode); + return; + } + } + list_add(timers, &add->tnode); + } + else { + /* not active, just allow list_del to function in cleanup */ + list_node_init(&add->tnode); + } +} + +#endif /* IOM_COMMON_H */ diff --git a/iom_select.h b/iom_select.h new file mode 100644 index 0000000000..66e6b9dd86 --- /dev/null +++ b/iom_select.h @@ -0,0 +1,419 @@ +/* + * select()-based implementation of I/O Manager for RubyVM + * + * This is crippled and relies heavily on GVL compared to the + * epoll and kqueue versions + */ +#include "iom_common.h" +#include "internal.h" + +/* allocated on heap (rb_vm_t.iom) */ +struct rb_iom_struct { + /* + * Everything here is protected by GVL at this time, + * URCU lists (LGPL-2.1+) may be used in the future + */ + struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */ + struct list_head fds; /* -rb_iom_fd_waiter.w.wnode, FIFO order */ + struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */ + + /* threads waiting for selector thread to finish */ + struct list_head swaitq; /* select_wait.swnode */ + + rb_serial_t select_start; /* WR protected by GVL (no need to protect RD) */ + rb_serial_t select_gen; /* RDWR protected by GVL */ + rb_thread_t *selector; /* rb_thread_fd_select owner (acts as lock w/ GVL */ + + /* these could be on stack, but they're huge on some platforms: */ + rb_fdset_t rfds; + rb_fdset_t wfds; + rb_fdset_t efds; +}; + +/* allocated on stack */ +struct select_wait { + struct list_node swnode; + union { + VALUE thval; + VALUE run_select; /* or yield */ + } as; +}; + +/* we lazily create this, small scripts may never need iom */ +static rb_iom_t * +rb_iom_create(rb_thread_t *th) +{ + rb_iom_t *iom = ALLOC(rb_iom_t); + + iom->select_start = 0; + iom->select_gen = 0; + iom->selector = 0; + list_head_init(&iom->timers); + list_head_init(&iom->fds); + list_head_init(&iom->pids); + list_head_init(&iom->swaitq); + + return iom; +} + +static rb_iom_t * +rb_iom_get(rb_thread_t *th) +{ + VM_ASSERT(th && th->vm); + if (!th->vm->iom) { + th->vm->iom = rb_iom_create(th); + } + return th->vm->iom; +} + +static void +waiter_cleanup(struct rb_iom_waiter *w) +{ + list_del(&w->timer.tnode); + list_del(&w->wnode); +} + +static VALUE +fd_waiter_cleanup(VALUE ptr) +{ + struct rb_iom_fd_waiter *fdw = (struct rb_iom_fd_waiter *)ptr; + waiter_cleanup(&fdw->w); + return Qfalse; +} + +static struct timeval * +next_timeout(rb_iom_t *iom, struct timeval *tv) +{ + struct rb_iom_timer *t; + + t = list_top(&iom->timers, struct rb_iom_timer, tnode); + + if (t) { + double diff = t->expires_at - timeofday(); + if (diff > 0) { + *tv = double2timeval(diff); + } + else { + tv->tv_sec = 0; + tv->tv_usec = 0; + } + return tv; + } + return 0; +} + +static VALUE +iom_select_do(VALUE ptr) +{ + rb_thread_t *th = (rb_thread_t *)ptr; + rb_iom_t *iom = th->vm->iom; + int max = -1; + struct timeval tv, *tvp; + struct rb_iom_fd_waiter *fdw = 0; + rb_fdset_t *rfds, *wfds, *efds; + + if (list_empty(&iom->fds) && list_empty(&iom->timers)) { + rb_thread_schedule(); + return Qfalse; + } + + rfds = wfds = efds = 0; + rb_fd_init(&iom->rfds); + rb_fd_init(&iom->wfds); + rb_fd_init(&iom->efds); + + list_for_each(&iom->fds, fdw, w.wnode) { + int fd = *fdw->fdp; + if (fd < 0) { + continue; /* closed */ + } + if (fd > max) { + max = fd; + } + if (fdw->events & RB_WAITFD_IN) { + rb_fd_set(fd, rfds = &iom->rfds); + } + if (fdw->events & RB_WAITFD_OUT) { + rb_fd_set(fd, wfds = &iom->wfds); + } + if (fdw->events & RB_WAITFD_PRI) { + rb_fd_set(fd, efds = &iom->efds); + } + } + + tvp = next_timeout(iom, &tv); + + /* release GVL.. */ + rb_thread_fd_select(max + 1, rfds, wfds, efds, tvp); + /* .. we have GVL again */ + + return Qfalse; +} + +static VALUE +iom_select_done(VALUE ptr) +{ + rb_thread_t *th = (rb_thread_t *)ptr; + rb_iom_t *iom = th->vm->iom; + struct rb_iom_fd_waiter *fdw = 0, *next = 0; + struct select_wait *sw; + VALUE thselect; + + iom->selector = 0; + list_for_each_safe(&iom->fds, fdw, next, w.wnode) { + int fd = *fdw->fdp; + if (fd < 0) { + continue; /* closed */ + } + if (fdw->events & RB_WAITFD_IN && rb_fd_isset(fd, &iom->rfds)) { + fdw->revents |= RB_WAITFD_IN; + } + if (fdw->events & RB_WAITFD_OUT && rb_fd_isset(fd, &iom->wfds)) { + fdw->revents |= RB_WAITFD_OUT; + } + if (fdw->events & RB_WAITFD_PRI && rb_fd_isset(fd, &iom->efds)) { + fdw->revents |= RB_WAITFD_PRI; + } + + /* got revents? enqueue ourselves to be run! */ + if (fdw->revents) { + VALUE fibval = fdw->w.fibval; + + fdw->w.fibval = Qfalse; + list_del_init(&fdw->w.timer.tnode); + list_del_init(&fdw->w.wnode); + VM_ASSERT(fibval != Qfalse && "fibval invalid in fds list"); + rb_fiber_auto_enqueue(fibval); + } + } + + rb_fd_term(&iom->rfds); + rb_fd_term(&iom->wfds); + rb_fd_term(&iom->efds); + + /* + * did we have anybody write to `fds` while we were inside select? + * if so, designate the last `fds` writer to call select() again: + */ + do { + sw = list_tail(&iom->swaitq, struct select_wait, swnode); + + if (!sw) { + return Qfalse; + } + /* designate the next select() thread: */ + thselect = sw->as.thval; + list_del_init(&sw->swnode); + sw->as.run_select = Qtrue; + /* it's gotta be alive to do run select(), otherwise find another */ + } while (NIL_P(rb_thread_wakeup_alive(thselect))); + + if (sw) { + struct select_wait *swnext = 0; + + /* everybody else will do rb_fiber_yield: */ + list_for_each_safe(&iom->swaitq, sw, swnext, swnode) { + VALUE thyield = sw->as.thval; + sw->as.run_select = Qfalse; + rb_thread_wakeup_alive(thyield); + } + } + + return Qfalse; +} + +static VALUE +select_wait_sleep(VALUE ptr) +{ + struct select_wait *sw = (struct select_wait *)ptr; + VALUE th = sw->as.thval; + + do { + rb_thread_sleep_deadly(); + } while (sw->as.thval == th); + return Qfalse; +} + +static VALUE +select_wait_done(VALUE ptr) +{ + struct select_wait *sw = (struct select_wait *)ptr; + list_del(&sw->swnode); + return Qfalse; +} + +static void +iom_schedule_any(rb_thread_t *th) +{ + rb_iom_t *iom = th->vm->iom; + rb_thread_t *s; + + if (rb_fiber_auto_do_yield_p(th)) { + rb_fiber_yield(0, 0); + } +try_select: + s = iom->selector; + if (s) { /* somebody else is running select() */ + /* + * if our watch set changed after select() started, + * we need to kick it and restart with the new set + */ + int need_kick = iom->select_start != iom->select_gen; + struct select_wait sw; + + sw.as.thval = th->self; + list_add_tail(&iom->swaitq, &sw.swnode); + if (need_kick) { + ubf_select(s); + } + rb_ensure(select_wait_sleep, (VALUE)&sw, select_wait_done, (VALUE)&sw); + if (need_kick && sw.as.run_select != Qfalse) { + goto try_select; + } + if (rb_fiber_auto_sched_p(th)) { + rb_fiber_yield(0, 0); + } + } + else { + iom->selector = th; + iom->select_start = iom->select_gen; + rb_ensure(iom_select_do, (VALUE)th, iom_select_done, (VALUE)th); + rb_iom_timer_check(&iom->timers); + if (rb_fiber_auto_do_yield_p(th)) { + rb_fiber_yield(0, 0); + } + } +} + +static VALUE +schedule_timed(VALUE ptr) +{ + rb_thread_t *th = (rb_thread_t *)ptr; + iom_schedule_any(th); + return Qfalse; +} + +static VALUE +schedule_timed_end(VALUE ptr) +{ + struct rb_iom_waiter *w = (struct rb_iom_waiter *)ptr; + + list_del_init(&w->timer.tnode); + return Qfalse; +} + +void +rb_iom_schedule(rb_thread_t *th, double *timeout) +{ + rb_iom_t *iom = rb_iom_get(th); + + if (timeout) { + struct rb_iom_waiter w; + double t0 = timeofday(); + + w.fibval = rb_fiber_auto_sched_p(th) ? rb_fiber_current() : Qfalse; + w.th = th; + list_node_init(&w.wnode); /* unused, only to make list_del_init work */ + rb_iom_timer_add(&iom->timers, &w.timer, timeout); + rb_ensure(schedule_timed, (VALUE)th, schedule_timed_end, (VALUE)&w); + + *timeout -= timeofday() - t0; + } + else { + rb_iom_timer_check(&iom->timers); + iom_schedule_any(th); + } +} + +static VALUE +iom_schedule_fd(VALUE ptr) +{ + struct rb_iom_fd_waiter *fdw = (struct rb_iom_fd_waiter *)ptr; + rb_thread_t *th = fdw->w.th; + rb_iom_t *iom = rb_iom_get(th); + rb_thread_t *s; + + if (rb_fiber_auto_do_yield_p(th)) { + return rb_fiber_yield(0, 0); + } + +try_select: + s = iom->selector; + if (s) { /* somebody else is running select() */ + /* + * our watch set definitely changed after select() started + * (because of GVL), kick it and restart with the new set + */ + struct select_wait sw; + + sw.as.thval = th->self; + list_add_tail(&iom->swaitq, &sw.swnode); + ubf_select(s); + rb_ensure(select_wait_sleep, (VALUE)&sw, select_wait_done, (VALUE)&sw); + if (sw.as.run_select == Qfalse) { + return rb_fiber_yield(0, 0); + } + goto try_select; + } + else { + iom->selector = th; + iom->select_start = iom->select_gen; + rb_ensure(iom_select_do, (VALUE)th, iom_select_done, (VALUE)th); + rb_iom_timer_check(&iom->timers); + (void)rb_fiber_auto_do_yield_p(th); + } + return rb_fiber_yield(0, 0); +} + +int +rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout) +{ + rb_iom_t *iom = rb_iom_get(th); + struct rb_iom_fd_waiter fdw; + + if (*fdp < 0) return -1; + fdw.fdp = fdp; + fdw.events = (short)events; + fdw.revents = 0; + fdw.w.th = th; + fdw.w.fibval = rb_fiber_current(); + + /* use FIFO order for fairness in iom_select_done */ + list_add_tail(&iom->fds, &fdw.w.wnode); + + rb_iom_timer_add(&iom->timers, &fdw.w.timer, timeout); + iom->select_gen++; + rb_ensure(iom_schedule_fd, (VALUE)&fdw, fd_waiter_cleanup, (VALUE)&fdw); + + if (*fdp < 0) return -1; + + return (int)fdw.revents; /* may be zero if timed out */ +} + +rb_pid_t +rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options, + double *timeout) +{ + rb_iom_t *iom = rb_iom_get(th); + struct rb_iom_pid_waiter pw; + + VM_ASSERT((options & WNOHANG) == 0 && + "WNOHANG should be handled in rb_waitpid"); + + pw.pid = pid; + pw.options = options; + pw.w.th = th; + pw.w.fibval = rb_fiber_current(); + rb_iom_timer_add(&iom->timers, &pw.w.timer, timeout); + + /* LIFO, to match Linux wait4() blocking behavior */ + list_add(&iom->pids, &pw.w.wnode); + + /* do not touch select_gen, this has nothing to do with select() */ + + /* TODO + * rb_ensure(iom_schedule_pid, (VALUE)&pw, pid_waiter_cleanup, (VALUE)&pw); + */ + + return -1; +} diff --git a/prelude.rb b/prelude.rb index 7b98e28285..1e71a3f168 100644 --- a/prelude.rb +++ b/prelude.rb @@ -16,6 +16,18 @@ def self.exclusive end end +class Fiber + def self.start(*args, &block) + # loses Fiber#resume return value, but maybe people do not care + new(&block).run + end + + def run + start + self + end +end + class IO # call-seq: diff --git a/thread.c b/thread.c index fd3db3648f..4ff8510100 100644 --- a/thread.c +++ b/thread.c @@ -70,6 +70,10 @@ #include "ruby/thread.h" #include "ruby/thread_native.h" #include "internal.h" +#include "iom.h" + +/* cont.c */ +int rb_fiber_auto_sched_p(const rb_thread_t *th); #ifndef USE_NATIVE_THREAD_PRIORITY #define USE_NATIVE_THREAD_PRIORITY 0 @@ -3909,6 +3913,16 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv) struct timespec *timeout = NULL; rb_thread_t *th = GET_THREAD(); + if (rb_fiber_auto_sched_p(th)) { + double *to = NULL; + double tout; + if (tv) { + tout = (double)tv->tv_sec + (double)tv->tv_usec * 1e-6; + to = &tout; + } + return rb_iom_waitfd(th, &fd, events, to); + } + #define poll_update() \ (update_timespec(timeout, limit), \ TRUE) @@ -5060,6 +5074,12 @@ rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data) } void +rb_thread_ubf_select(rb_thread_t *th) +{ + ubf_select(th); +} + +void ruby_kill(rb_pid_t pid, int sig) { int err; @@ -5081,3 +5101,5 @@ ruby_kill(rb_pid_t pid, int sig) rb_sys_fail(0); } } + +#include "iom_select.h" diff --git a/vm.c b/vm.c index 52d505ab7c..90a24b4a9d 100644 --- a/vm.c +++ b/vm.c @@ -2345,6 +2345,7 @@ rb_thread_recycle_stack_release(VALUE *stack) } void rb_fiber_mark_self(rb_fiber_t *fib); +void rb_fiber_auto_schedule_mark(const rb_thread_t *th); void rb_thread_mark(void *ptr) @@ -2387,6 +2388,9 @@ rb_thread_mark(void *ptr) RUBY_MARK_UNLESS_NULL(th->top_wrapper); rb_fiber_mark_self(th->fiber); rb_fiber_mark_self(th->root_fiber); + if (th->afhead.n.next && !list_empty(&th->afhead)) { + rb_fiber_auto_schedule_mark(th); + } RUBY_MARK_UNLESS_NULL(th->stat_insn_usage); RUBY_MARK_UNLESS_NULL(th->last_status); @@ -2513,6 +2517,8 @@ th_init(rb_thread_t *th, VALUE self) th->ec.cfp = (void *)(th->ec.stack + th->ec.stack_size); + list_head_init(&th->afhead); + vm_push_frame(th, 0 /* dummy iseq */, VM_FRAME_MAGIC_DUMMY | VM_ENV_FLAG_LOCAL | VM_FRAME_FLAG_FINISH | VM_FRAME_FLAG_CFRAME /* dummy frame */, Qnil /* dummy self */, VM_BLOCK_HANDLER_NONE /* dummy block ptr */, 0 /* dummy cref/me */, @@ -3103,7 +3109,6 @@ Init_BareVM(void) } MEMZERO(th, rb_thread_t, 1); rb_thread_set_current_raw(th); - vm_init2(vm); vm->objspace = rb_objspace_alloc(); ruby_current_vm = vm; diff --git a/vm_core.h b/vm_core.h index 35b1748218..c74314a743 100644 --- a/vm_core.h +++ b/vm_core.h @@ -481,6 +481,8 @@ typedef struct rb_hook_list_struct { int need_clean; } rb_hook_list_t; +struct rb_iom_struct; + typedef struct rb_vm_struct { VALUE self; @@ -489,7 +491,7 @@ typedef struct rb_vm_struct { struct rb_thread_struct *main_thread; struct rb_thread_struct *running_thread; - + struct rb_iom_struct *iom; struct list_head living_threads; size_t living_thread_num; VALUE thgroup_default; @@ -809,6 +811,7 @@ typedef struct rb_thread_struct { rb_fiber_t *fiber; rb_fiber_t *root_fiber; rb_jmpbuf_t root_jmpbuf; + struct list_head afhead; /* -rb_fiber_t.afnode */ /* ensure & callcc */ rb_ensure_list_t *ensure_list; -- EW