From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS57858 46.29.248.0/23 X-Spam-Status: No, score=-1.7 required=3.0 tests=AWL,BAYES_00,RCVD_IN_SBL, RCVD_IN_XBL,RDNS_NONE,SPF_FAIL,SPF_HELO_FAIL shortcircuit=no autolearn=no autolearn_force=no version=3.4.0 Received: from 80x24.org (unknown [46.29.248.238]) by dcvr.yhbt.net (Postfix) with ESMTP id E737D1FAE3 for ; Sun, 28 Jan 2018 10:39:28 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [PATCH 2/2] threadlet: non-native fiber support Date: Sun, 28 Jan 2018 10:39:07 +0000 Message-Id: <20180128103907.12069-3-e@80x24.org> In-Reply-To: <20180128103907.12069-1-e@80x24.org> References: <20180128103907.12069-1-e@80x24.org> List-Id: Non-native fibers requires us to use heap allocation instead of stack allocation, so expose FIBER_USE_NATIVE via fiber.h to allow iom_*.h to fall back to heap allocation on certain platforms. --- common.mk | 2 + cont.c | 48 +-------------------- fiber.h | 54 ++++++++++++++++++++++++ iom_common.h | 82 +++++++++++++++++++++++------------- iom_epoll.h | 73 +++++++++++++++++++------------- iom_internal.h | 114 +++++++++++++++++++++++++++++++++++++++++++++----- iom_kqueue.h | 82 ++++++++++++++++++++---------------- iom_pingable_common.h | 7 +--- iom_select.h | 83 +++++++++++++++++++++++------------- 9 files changed, 356 insertions(+), 189 deletions(-) create mode 100644 fiber.h diff --git a/common.mk b/common.mk index 850e3e3c77..79c09298e9 100644 --- a/common.mk +++ b/common.mk @@ -1503,6 +1503,7 @@ cont.$(OBJEXT): {$(VPATH)}cont.c cont.$(OBJEXT): {$(VPATH)}defines.h cont.$(OBJEXT): {$(VPATH)}encoding.h cont.$(OBJEXT): {$(VPATH)}eval_intern.h +cont.$(OBJEXT): {$(VPATH)}fiber.h cont.$(OBJEXT): {$(VPATH)}gc.h cont.$(OBJEXT): {$(VPATH)}id.h cont.$(OBJEXT): {$(VPATH)}intern.h @@ -2676,6 +2677,7 @@ thread.$(OBJEXT): {$(VPATH)}defines.h thread.$(OBJEXT): {$(VPATH)}encoding.h thread.$(OBJEXT): {$(VPATH)}eval_intern.h thread.$(OBJEXT): {$(VPATH)}gc.h +thread.$(OBJEXT): {$(VPATH)}fiber.h thread.$(OBJEXT): {$(VPATH)}id.h thread.$(OBJEXT): {$(VPATH)}intern.h thread.$(OBJEXT): {$(VPATH)}internal.h diff --git a/cont.c b/cont.c index 7772fcb7d0..39b3e1b9ad 100644 --- a/cont.c +++ b/cont.c @@ -12,56 +12,10 @@ #include "iom.h" #include "internal.h" #include "vm_core.h" +#include "fiber.h" #include "gc.h" #include "eval_intern.h" -/* FIBER_USE_NATIVE enables Fiber performance improvement using system - * dependent method such as make/setcontext on POSIX system or - * CreateFiber() API on Windows. - * This hack make Fiber context switch faster (x2 or more). - * However, it decrease maximum number of Fiber. For example, on the - * 32bit POSIX OS, ten or twenty thousands Fiber can be created. - * - * Details is reported in the paper "A Fast Fiber Implementation for Ruby 1.9" - * in Proc. of 51th Programming Symposium, pp.21--28 (2010) (in Japanese). - */ - -#if !defined(FIBER_USE_NATIVE) -# if defined(HAVE_GETCONTEXT) && defined(HAVE_SETCONTEXT) -# if 0 -# elif defined(__NetBSD__) -/* On our experience, NetBSD doesn't support using setcontext() and pthread - * simultaneously. This is because pthread_self(), TLS and other information - * are represented by stack pointer (higher bits of stack pointer). - * TODO: check such constraint on configure. - */ -# define FIBER_USE_NATIVE 0 -# elif defined(__sun) -/* On Solaris because resuming any Fiber caused SEGV, for some reason. - */ -# define FIBER_USE_NATIVE 0 -# elif defined(__ia64) -/* At least, Linux/ia64's getcontext(3) doesn't save register window. - */ -# define FIBER_USE_NATIVE 0 -# elif defined(__GNU__) -/* GNU/Hurd doesn't fully support getcontext, setcontext, makecontext - * and swapcontext functions. Disabling their usage till support is - * implemented. More info at - * http://darnassus.sceen.net/~hurd-web/open_issues/glibc/#getcontext - */ -# define FIBER_USE_NATIVE 0 -# else -# define FIBER_USE_NATIVE 1 -# endif -# elif defined(_WIN32) -# define FIBER_USE_NATIVE 1 -# endif -#endif -#if !defined(FIBER_USE_NATIVE) -#define FIBER_USE_NATIVE 0 -#endif - #if FIBER_USE_NATIVE #ifndef _WIN32 #include diff --git a/fiber.h b/fiber.h new file mode 100644 index 0000000000..3f0f5d5500 --- /dev/null +++ b/fiber.h @@ -0,0 +1,54 @@ +#ifndef RUBY_FIBER_H +#define RUBY_FIBER_H + +#include "internal.h" +#include "vm_core.h" + +/* FIBER_USE_NATIVE enables Fiber performance improvement using system + * dependent method such as make/setcontext on POSIX system or + * CreateFiber() API on Windows. + * This hack make Fiber context switch faster (x2 or more). + * However, it decrease maximum number of Fiber. For example, on the + * 32bit POSIX OS, ten or twenty thousands Fiber can be created. + * + * Details is reported in the paper "A Fast Fiber Implementation for Ruby 1.9" + * in Proc. of 51th Programming Symposium, pp.21--28 (2010) (in Japanese). + */ + +#if !defined(FIBER_USE_NATIVE) +# if defined(HAVE_GETCONTEXT) && defined(HAVE_SETCONTEXT) +# if 0 +# elif defined(__NetBSD__) +/* On our experience, NetBSD doesn't support using setcontext() and pthread + * simultaneously. This is because pthread_self(), TLS and other information + * are represented by stack pointer (higher bits of stack pointer). + * TODO: check such constraint on configure. + */ +# define FIBER_USE_NATIVE 0 +# elif defined(__sun) +/* On Solaris because resuming any Fiber caused SEGV, for some reason. + */ +# define FIBER_USE_NATIVE 0 +# elif defined(__ia64) +/* At least, Linux/ia64's getcontext(3) doesn't save register window. + */ +# define FIBER_USE_NATIVE 0 +# elif defined(__GNU__) +/* GNU/Hurd doesn't fully support getcontext, setcontext, makecontext + * and swapcontext functions. Disabling their usage till support is + * implemented. More info at + * http://darnassus.sceen.net/~hurd-web/open_issues/glibc/#getcontext + */ +# define FIBER_USE_NATIVE 0 +# else +# define FIBER_USE_NATIVE 1 +# endif +# elif defined(_WIN32) +# define FIBER_USE_NATIVE 1 +# endif +#endif +#if !defined(FIBER_USE_NATIVE) +#define FIBER_USE_NATIVE 0 +#endif + +#endif /* RUBY_FIBER_H */ diff --git a/iom_common.h b/iom_common.h index 5334604117..e68092d00a 100644 --- a/iom_common.h +++ b/iom_common.h @@ -124,49 +124,73 @@ iom_schedule_pid(VALUE ptr) /* check ints above could've unlinked us and marked us ready */ if (list_empty((struct list_head *)&pw->w.wnode)) { list_del_init(&pw->w.timer.n.rnode); - return Qfalse; } - return rb_fiber_yield(0, 0); + else { /* pw fields will be set in another execution context */ + rb_fiber_yield(0, 0); + + if (pw->statusp) { + *pw->statusp = pw->status; + } + if (pw->pid > 0) { + rb_last_status_set(pw->status, pw->pid); + } + } + return (VALUE)pw->pid; +} + +static VALUE +iom_waitpid_done(VALUE ptr) +{ + struct rb_iom_pid_waiter *pw = (struct rb_iom_pid_waiter *)ptr; + int errnum = pw->pid == -1 ? pw->errnum : 0; + + rb_iom_waiter_done(&pw->w); + + if (!FIBER_USE_NATIVE) { + xfree(pw); + } + if (errnum) { + errno = errnum; + } + return Qfalse; } rb_pid_t rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options, double *timeout) { - struct rb_iom_pid_waiter pw; + rb_iom_t *iom; + struct rb_iom_pid_waiter *pw; + rb_pid_t rpid; - pw.options = options; VM_ASSERT((options & WNOHANG) == 0 && "WNOHANG should be handled in rb_waitpid"); - /* - * unlike rb_iom_waitfd, we typically call *waitpid before - * trying with a non-blocking operation + * unlike rb_iom_waitfd, typically call *waitpid before + * trying with a non-blocking operation (because users + * typically do not) */ - pw.pid = rb_waitpid(pid, &pw.status, pw.options | WNOHANG); - - if (pw.pid == 0) { - rb_iom_t *iom = rb_iom_get(th); - - pw.th = th; - pw.pid = pid; - rb_iom_timer_add(th, &pw.w.timer, timeout, IOM_FIB|IOM_WAIT); - - /* LIFO, to match Linux wait4() blocking behavior */ - list_add(&iom->pids, &pw.w.wnode); - rb_ensure(iom_schedule_pid, (VALUE)&pw, - rb_iom_waiter_done, (VALUE)&pw.w); - if (pw.pid == -1) { - errno = pw.errnum; + rpid = rb_waitpid(pid, status, options | WNOHANG); + if (rpid != 0) { + if (rpid > 0 && status) { + rb_last_status_set(*status, rpid); } + return rpid; } - if (status) { - *status = pw.status; - } - if (pw.pid > 0) { - rb_last_status_set(pw.status, pw.pid); - } - return pw.pid; + + iom = rb_iom_get(th); + pw = FIBER_USE_NATIVE ? ALLOCA_N(struct rb_iom_pid_waiter, 1) + : ALLOC(struct rb_iom_pid_waiter); + pw->options = options; + pw->statusp = status; + pw->th = th; + pw->pid = pid; + rb_iom_timer_add(th, &pw->w.timer, timeout, IOM_FIB|IOM_WAIT); + + /* LIFO, to match Linux wait4() blocking behavior */ + list_add(&iom->pids, &pw->w.wnode); + return (pid_t)rb_ensure(iom_schedule_pid, (VALUE)pw, + iom_waitpid_done, (VALUE)pw); } void diff --git a/iom_epoll.h b/iom_epoll.h index c38b418c2c..bcdd3a24f0 100644 --- a/iom_epoll.h +++ b/iom_epoll.h @@ -437,53 +437,62 @@ epmod_yield(VALUE ptr) rb_thread_t *th = epw->fdn.as.pre_ctl.th; size_t nresume; + if (epw->fd < 0) { /* TODO: behave like poll(2) and sleep? */ + return (VALUE)0; + } + + /* may raise on OOM: */ + epw->fdn.as.pre_ctl.fdh = rb_iom_fd_get(&th->vm->iom->fdmap, epw->fd); + epoll_ctl_or_raise(th, epw); ping_events(th, 0); (void)rb_threadlet_do_yield_p(th, &nresume); if (epw->revents) { list_del_init(&epw->w.timer.n.rnode); - return Qfalse; } - return rb_fiber_yield(0, 0); + else { /* revents will be set in another execution context */ + rb_fiber_yield(0, 0); + } + return (VALUE)epw->revents; /* may be zero if timed out */ } static VALUE epw_done(VALUE ptr) { struct epw *epw = (struct epw *)ptr; + list_del(&epw->fdn.as.fdnode); - return rb_iom_waiter_done((VALUE)&epw->w); + rb_iom_waiter_done(&epw->w); + + if (!FIBER_USE_NATIVE) { + xfree(epw); + } + return Qfalse; } static int iom_waitfd(rb_thread_t *th, int fd, int *flags, int events, double *timeout) { rb_iom_t *iom = rb_iom_get(th); - struct epw epw; + struct epw *epw = FIBER_USE_NATIVE ? ALLOCA_N(struct epw, 1) + : ALLOC(struct epw); /* unlike kqueue or select, we never need to reread fd */ - epw.fd = fd; - if (epw.fd < 0) { /* TODO: behave like poll(2) and sleep? */ - return 0; - } - - /* may raise on OOM: */ - epw.fdn.as.pre_ctl.fdh = rb_iom_fd_get(&iom->fdmap, epw.fd); - epw.fdn.as.pre_ctl.th = th; - epw.fdn.owner.flags = flags; + epw->fd = fd; + epw->fdn.as.pre_ctl.th = th; + epw->fdn.owner.flags = flags; /* * if we did not have GVL, revents may be set immediately * upon epoll_ctl by another thread running epoll_wait, * so we must initialize it before epoll_ctl: */ - epw.revents = 0; - epw.events = (short)events; + epw->revents = 0; + epw->events = (short)events; - list_add(&iom->epws, &epw.w.wnode); - rb_iom_timer_add(th, &epw.w.timer, timeout, IOM_FIB|IOM_WAIT); - rb_ensure(epmod_yield, (VALUE)&epw, epw_done, (VALUE)&epw); + list_add(&iom->epws, &epw->w.wnode); + rb_iom_timer_add(th, &epw->w.timer, timeout, IOM_FIB|IOM_WAIT); - return (int)epw.revents; /* may be zero if timed out */ + return (int)rb_ensure(epmod_yield, (VALUE)epw, epw_done, (VALUE)epw); } int @@ -592,9 +601,11 @@ epfdset_yield(VALUE ptr) (void)rb_threadlet_do_yield_p(eset->fsw.th, &nresume); if (eset->fsw.ret) { list_del_init(&eset->fsw.w.timer.n.rnode); - return Qfalse; } - return rb_fiber_yield(0, 0); + else { /* fsw.ret will be set in another execution context */ + rb_fiber_yield(0, 0); + } + return (VALUE)eset->fsw.ret; } static rb_fdset_t * @@ -651,6 +662,9 @@ epfdset_done(VALUE ptr) rb_fd_term(res); } } + if (!FIBER_USE_NATIVE) { + xfree(eset); + } return Qfalse; } @@ -660,15 +674,14 @@ rb_iom_select(rb_thread_t *th, int maxfd, double *timeout) { rb_iom_t *iom = rb_iom_get(th); - struct epw_set eset; - - eset.tbl = 0; - rb_iom_fdset_waiter_init(th, &eset.fsw, maxfd, r, w, e); - list_add(&iom->esets, &eset.fsw.w.wnode); - rb_iom_timer_add(th, &eset.fsw.w.timer, timeout, IOM_FIB|IOM_WAIT); - rb_ensure(epfdset_yield, (VALUE)&eset, epfdset_done, (VALUE)&eset); - - return eset.fsw.ret; + struct epw_set *eset = FIBER_USE_NATIVE ? ALLOCA_N(struct epw_set, 1) + : ALLOC(struct epw_set); + eset->tbl = 0; + rb_iom_fdset_waiter_init(th, &eset->fsw, maxfd, r, w, e); + list_add(&iom->esets, &eset->fsw.w.wnode); + rb_iom_timer_add(th, &eset->fsw.w.timer, timeout, IOM_FIB|IOM_WAIT); + + return (int)rb_ensure(epfdset_yield, (VALUE)eset, epfdset_done, (VALUE)eset); } void diff --git a/iom_internal.h b/iom_internal.h index bd41c7841e..c77ba917d9 100644 --- a/iom_internal.h +++ b/iom_internal.h @@ -4,6 +4,15 @@ #include "internal.h" #include "iom.h" +/* + * FIBER_USE_NATIVE (see cont.c) allows us to read machine stacks of + * yielded (stopped) fibers. Without native fiber support, cont.c needs + * to copy fiber stacks around, so we must use slower heap allocation + * instead of stack allocation (and increase our chance of hitting + * memory leak bugs) + */ +#include "fiber.h" + /* cont.c */ void rb_threadlet_enqueue(VALUE fibval); @@ -68,7 +77,11 @@ struct rb_iom_waiter { /* for rb_wait_for_single_fd: */ struct rb_iom_fd_waiter { struct rb_iom_waiter w; /* w.wnode - iom->fds */ +#if FIBER_USE_NATIVE int *fdp; /* (ideally), a pointer fptr->fd to detect closed FDs */ +#else + int fd; /* need to do full copy with non-native fiber */ +#endif short events; /* requested events, like poll(2) */ short revents; /* returned events, like poll(2) */ }; @@ -80,6 +93,14 @@ struct rb_iom_fdset_waiter { int max; int ret; rb_fdset_t *in[3]; + + /* + * without native fibers, other threads and fibers cannot safely + * read the stacks of stopped fibers + */ +#if FIBER_USE_NATIVE == 0 && RUBYVM_IOM == IOM_SELECT + rb_fdset_t *onstack_dst[3]; +#endif rb_fdset_t *out[3]; rb_fdset_t sets[3]; }; @@ -87,6 +108,7 @@ struct rb_iom_fdset_waiter { struct rb_iom_pid_waiter { struct rb_iom_waiter w; /* w.wnode - iom->pids */ rb_thread_t *th; + int *statusp; /* same pid, status, options same as waitpid(2) */ rb_pid_t pid; int status; @@ -187,10 +209,30 @@ static VALUE rb_iom_timer_done(VALUE ptr) { struct rb_iom_timer *t = (struct rb_iom_timer *)ptr; + list_del(&t->n.tnode); + if (!FIBER_USE_NATIVE) { + xfree(t); + } return Qfalse; } +static void +rb_iom_timer_do(rb_thread_t *th, double *timeout, + VALUE (*fn)(ANYARGS), void *arg) +{ + double t0; + struct rb_iom_timer *t; + + t = FIBER_USE_NATIVE ? ALLOCA_N(struct rb_iom_timer, 1) + : ALLOC(struct rb_iom_timer); + + t0 = timeofday(); + rb_iom_timer_add(th, t, timeout, 0); + rb_ensure(fn, (VALUE)arg, rb_iom_timer_done, (VALUE)t); + *timeout -= timeofday() - t0; +} + static void rb_iom_waiter_ready(struct rb_iom_waiter *w) { @@ -204,13 +246,11 @@ rb_iom_waiter_ready(struct rb_iom_waiter *w) } } -static VALUE -rb_iom_waiter_done(VALUE ptr) +static void +rb_iom_waiter_done(struct rb_iom_waiter *w) { - struct rb_iom_waiter *w = (struct rb_iom_waiter *)ptr; list_del(&w->timer.n.tnode); list_del(&w->wnode); - return Qfalse; } /* cont.c */ @@ -225,28 +265,45 @@ rb_threadlet_do_yield_p(rb_thread_t *th, size_t *nresume) { rb_fiber_t *cur_threadlet = rb_threadlet_sched_p(th) ? th->ec->fiber_ptr : 0; struct rb_iom_timer *t = 0, *next = 0; - LIST_HEAD(tmp); + VALUE v; + size_t n = 0; + int ret = 0; + struct list_head *tmp; + + tmp = FIBER_USE_NATIVE ? ALLOCA_N(struct list_head, 1) : 0; + if (!tmp) { /* should be embed */ + v = rb_str_tmp_new((long)sizeof(struct list_head)); + tmp = (struct list_head *)RSTRING_PTR(v); + } + list_head_init(tmp); *nresume = 0; /* * do not infinite loop as new fibers get added to * th->afrunq, only work off a temporary list: */ - list_append_list(&tmp, &th->afrunq); - list_for_each_safe(&tmp, t, next, n.rnode) { + list_append_list(tmp, &th->afrunq); + list_for_each_safe(tmp, t, next, n.rnode) { VALUE fibval = rb_iom_timer_fibval(t); rb_fiber_t *fib = RTYPEDDATA_DATA(fibval); if (fib == cur_threadlet || !rb_fiber_resumable_p(th, fib)) { /* tell the caller to yield */ - list_prepend_list(&th->afrunq, &tmp); - return 1; + list_prepend_list(&th->afrunq, tmp); + ret = 1; + goto out; } - (*nresume)++; + n++; rb_fiber_resume(fibval, 0, 0); } - return 0; +out: + if (!FIBER_USE_NATIVE) { + rb_str_resize(v, 0); + rb_gc_force_recycle(v); + } + *nresume = n; + return ret; } static void @@ -261,6 +318,19 @@ rb_iom_fdset_waiter_init(rb_thread_t *th, struct rb_iom_fdset_waiter *fsw, fsw->in[1] = w; fsw->in[2] = e; memset(fsw->out, 0, sizeof(fsw->out)); + +#if FIBER_USE_NATIVE == 0 && RUBYVM_IOM == IOM_SELECT + { + int i; + for (i = 0; i < 3; i++) { + if (fsw->in[i]) { + fsw->onstack_dst[i] = fsw->in[i]; + fsw->in[i] = ALLOC(rb_fdset_t); + rb_fd_init_copy(fsw->in[i], fsw->onstack_dst[i]); + } + } + } +#endif } /* XXX: is this necessary? */ @@ -274,6 +344,28 @@ rb_iom_mark_runq(const rb_thread_t *th) } } +static inline int +rb_iom_fdw_get_fd(struct rb_iom_fd_waiter *fdw) +{ +#if FIBER_USE_NATIVE + return *fdw->fdp; +#else + return fdw->fd; +#endif +} + +static inline void +rb_iom_fdw_init(struct rb_iom_fd_waiter *fdw, int *fdp, int events) +{ +#if FIBER_USE_NATIVE + fdw->fdp = fdp; +#else + fdw->fd = *fdp; +#endif + fdw->events = (short)events; + fdw->revents = 0; +} + static rb_iom_t *rb_iom_get(rb_thread_t *); static void rb_iom_blockers_notify(rb_iom_t *, int max); diff --git a/iom_kqueue.h b/iom_kqueue.h index 42c542c202..5f3da8f978 100644 --- a/iom_kqueue.h +++ b/iom_kqueue.h @@ -178,7 +178,7 @@ check_pri(rb_thread_t *th, struct list_head *pri) rb_fd_init(&efds); list_for_each(pri, kev, fdw.w.wnode) { - int fd = *kev->fdw.fdp; + int fd = rb_iom_fdw_get_fd(&kev->fdw); if (fd >= 0) { rb_fd_set(fd, &efds); if (fd > max) { @@ -199,7 +199,7 @@ again: } if (r >= 0) { list_for_each_safe(pri, kev, next, fdw.w.wnode) { - int fd = *kev->fdw.fdp; + int fd = rb_iom_fdw_get_fd(&kev->fdw); list_del_init(&kev->fdw.w.wnode); /* @@ -236,7 +236,7 @@ drop_pairs(rb_thread_t *th, int max, struct kevent *changelist, struct kev *kev = 0, *next = 0; list_for_each_safe(rdel, kev, next, fdn.wfdnode) { - int fd = *kev->fdw.fdp; + int fd = rb_iom_fdw_get_fd(&kev->fdw); list_del_init(&kev->fdn.wfdnode); /* delete from rdel */ assert(kev->fdw.revents & RB_WAITFD_OUT); @@ -251,7 +251,7 @@ drop_pairs(rb_thread_t *th, int max, struct kevent *changelist, } } list_for_each_safe(wdel, kev, next, fdn.rfdnode) { - int fd = *kev->fdw.fdp; + int fd = rb_iom_fdw_get_fd(&kev->fdw); list_del_init(&kev->fdn.rfdnode); /* delete from wdel */ assert(kev->fdw.revents & WAITFD_READ); @@ -378,7 +378,7 @@ check_kevent(rb_thread_t *th, int nr, struct kevent *eventlist) kev->fdw.revents |= (RB_WAITFD_IN & rbits); list_del_init(&kev->fdn.rfdnode); - if (*kev->fdw.fdp < 0) { + if (rb_iom_fdw_get_fd(&kev->fdw) < 0) { continue; } if (rbits & RB_WAITFD_PRI) { @@ -420,7 +420,7 @@ check_kevent(rb_thread_t *th, int nr, struct kevent *eventlist) list_del_init(&kev->fdn.wfdnode); if ((kev->fdw.events & WAITFD_READ) && - *kev->fdw.fdp >= 0 && + rb_iom_fdw_get_fd(&kev->fdw) >= 0 && !pair_queued) { pair_queued = 1; list_add(&rdel, &kev->fdn.wfdnode); @@ -596,7 +596,7 @@ static VALUE kevxchg_yield(VALUE ptr) { struct kev *kev = (struct kev *)ptr; - int fd = *kev->fdw.fdp; + int fd = rb_iom_fdw_get_fd(&kev->fdw); rb_thread_t *th = kev->fdn.owner.th; if (fd >= 0) { @@ -606,11 +606,12 @@ kevxchg_yield(VALUE ptr) (void)rb_threadlet_do_yield_p(th, &nresume); if (kev->fdw.revents) { list_del_init(&kev->fdw.w.timer.n.rnode); - return Qfalse; } - return rb_fiber_yield(0, 0); + else { + rb_fiber_yield(0, 0); + } } - return Qfalse; + return (VALUE)kev->fdw.revents; /* may be zero if timed out */ } static VALUE @@ -619,26 +620,27 @@ kev_done(VALUE ptr) struct kev *kev = (struct kev *)ptr; list_del(&kev->fdn.rfdnode); list_del(&kev->fdn.wfdnode); - return rb_iom_waiter_done((VALUE)&kev->fdw.w); + rb_iom_waiter_done(&kev->fdw.w); + if (!FIBER_USE_NATIVE) { + xfree(kev); + } + return Qfalse; } static int iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout) { rb_iom_t *iom = rb_iom_get(th); - struct kev kev; - - kev.fdn.owner.th = th; /* read and cleared ASAP in kevxchg_yield */ - kev.fdw.fdp = fdp; - kev.fdw.events = (short)events; - kev.fdw.revents = 0; - list_add(&iom->kevs, &kev.fdw.w.wnode); - list_node_init(&kev.fdn.rfdnode); - list_node_init(&kev.fdn.wfdnode); - rb_iom_timer_add(th, &kev.fdw.w.timer, timeout, IOM_FIB|IOM_WAIT); - rb_ensure(kevxchg_yield, (VALUE)&kev, kev_done, (VALUE)&kev); - - return kev.fdw.revents; /* may be zero if timed out */ + struct kev *kev = FIBER_USE_NATIVE ? ALLOCA_N(struct kev, 1) + : ALLOC(struct kev); + + kev->fdn.owner.th = th; /* read and cleared ASAP in kevxchg_yield */ + rb_iom_fdw_init(&kev->fdw, fdp, events); + list_add(&iom->kevs, &kev->fdw.w.wnode); + list_node_init(&kev->fdn.rfdnode); + list_node_init(&kev->fdn.wfdnode); + rb_iom_timer_add(th, &kev->fdw.w.timer, timeout, IOM_FIB|IOM_WAIT); + return (int)rb_ensure(kevxchg_yield, (VALUE)kev, kev_done, (VALUE)kev); } /* @@ -737,6 +739,8 @@ kset_yield(VALUE ptr) { struct kqw_set *kset = (struct kqw_set *)ptr; size_t nresume; + VALUE vchanges = kset->vchanges = rb_str_tmp_new(0); + kset->tbl = st_init_numtable(); /* may raise */ /* must not call kevent to retry events while this loop is running: */ @@ -758,14 +762,16 @@ kset_yield(VALUE ptr) /* OK to call kevent, now: */ ping_events(kset->fsw.th, kset); - rb_str_resize(kset->vchanges, 0); - rb_gc_force_recycle(kset->vchanges); + rb_str_resize(vchanges, 0); + rb_gc_force_recycle(vchanges); (void)rb_threadlet_do_yield_p(kset->fsw.th, &nresume); if (kset->fsw.ret) { list_del_init(&kset->fsw.w.timer.n.rnode); - return Qfalse; } - return rb_fiber_yield(0, 0); + else { + rb_fiber_yield(0, 0); + } + return (VALUE)kset->fsw.ret; } static rb_fdset_t * @@ -823,6 +829,9 @@ kset_done(VALUE ptr) rb_fd_term(res); } } + if (!FIBER_USE_NATIVE) { + xfree(kset); + } return Qfalse; } @@ -832,15 +841,14 @@ rb_iom_select(rb_thread_t *th, int maxfd, double *timeout) { rb_iom_t *iom = rb_iom_get(th); - struct kqw_set kset = { 0 }; - - kset.vchanges = rb_str_tmp_new(0); - rb_iom_fdset_waiter_init(th, &kset.fsw, maxfd, r, w, e); - list_add(&iom->ksets, &kset.fsw.w.wnode); - rb_iom_timer_add(th, &kset.fsw.w.timer, timeout, IOM_FIB|IOM_WAIT); - rb_ensure(kset_yield, (VALUE)&kset, kset_done, (VALUE)&kset); - - return kset.fsw.ret; + struct kqw_set *kset = FIBER_USE_NATIVE ? ALLOCA_N(struct kqw_set, 1) + : ALLOC(struct kqw_set); + kset->nchanges = 0; + rb_iom_fdset_waiter_init(th, &kset->fsw, maxfd, r, w, e); + list_add(&iom->ksets, &kset->fsw.w.wnode); + rb_iom_timer_add(th, &kset->fsw.w.timer, timeout, IOM_FIB|IOM_WAIT); + + return (int)rb_ensure(kset_yield, (VALUE)kset, kset_done, (VALUE)kset); } static void diff --git a/iom_pingable_common.h b/iom_pingable_common.h index 0058a9cfd0..dae4c0d4db 100644 --- a/iom_pingable_common.h +++ b/iom_pingable_common.h @@ -40,12 +40,7 @@ rb_iom_schedule(rb_thread_t *th, double *timeout) } } else if (timeout) { - double t0 = timeofday(); - struct rb_iom_timer t; - - rb_iom_timer_add(th, &t, timeout, 0); - rb_ensure(rb_iom_do_schedule, (VALUE)th, rb_iom_timer_done, (VALUE)&t); - *timeout -= timeofday() - t0; + rb_iom_timer_do(th, timeout, rb_iom_do_schedule, th); } else { rb_iom_timer_check(th); diff --git a/iom_select.h b/iom_select.h index f49b78e774..7f82203e22 100644 --- a/iom_select.h +++ b/iom_select.h @@ -137,7 +137,7 @@ iom_select_wait(struct select_do *sd) } list_for_each_safe(&iom->fds, fdw, next, w.wnode) { - int fd = *fdw->fdp; + int fd = rb_iom_fdw_get_fd(fdw); if (fd < 0) { /* closed */ fdw->revents = fdw->events; rb_iom_waiter_ready(&fdw->w); @@ -187,7 +187,7 @@ iom_select_wait(struct select_do *sd) } list_for_each_safe(&iom->fds, fdw, next, w.wnode) { - int fd = *fdw->fdp; + int fd = rb_iom_fdw_get_fd(fdw); if (fd < 0) { /* closed */ fdw->revents = fdw->events; @@ -291,12 +291,7 @@ rb_iom_schedule(rb_thread_t *th, double *timeout) sd.do_wait = !rb_threadlet_sched_p(th); if (timeout && sd.do_wait) { - double t0 = timeofday(); - struct rb_iom_timer t; - - rb_iom_timer_add(th, &t, timeout, 0); - rb_ensure(iom_do_select, (VALUE)&sd, rb_iom_timer_done, (VALUE)&t); - *timeout -= timeofday() - t0; + rb_iom_timer_do(th, timeout, iom_do_select, &sd); } else { rb_iom_timer_check(th); @@ -337,9 +332,12 @@ iom_schedule_fdw(VALUE ptr) iom_schedule_th(th); if (fdw->revents) { list_del_init(&fdw->w.timer.n.rnode); - return Qfalse; } - return rb_fiber_yield(0, 0); + else { + rb_fiber_yield(0, 0); + } + + return (VALUE)fdw->revents; /* may be zero if timed out */ } /* only epoll takes advantage of this (kqueue may for portability bugs) */ @@ -349,24 +347,44 @@ rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, double *timeout) return rb_iom_waitfd(th, &fptr->fd, events, timeout); } +static VALUE +fdw_done(VALUE ptr) +{ + struct rb_iom_fd_waiter *fdw = (struct rb_iom_fd_waiter *)ptr; + + rb_iom_waiter_done(&fdw->w); + + if (!FIBER_USE_NATIVE) { + xfree(fdw); + } + return Qfalse; +} + int rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout) { rb_iom_t *iom = rb_iom_get(th); - struct rb_iom_fd_waiter fdw; + struct rb_iom_fd_waiter *fdw; if (*fdp < 0) return 0; - fdw.fdp = fdp; - fdw.events = (short)events; - fdw.revents = 0; + + fdw = FIBER_USE_NATIVE ? ALLOCA_N(struct rb_iom_fd_waiter, 1) + : ALLOC(struct rb_iom_fd_waiter); + rb_iom_fdw_init(fdw, fdp, events); /* use FIFO order for fairness */ - list_add_tail(&iom->fds, &fdw.w.wnode); - rb_iom_timer_add(th, &fdw.w.timer, timeout, IOM_FIB|IOM_WAIT); + list_add_tail(&iom->fds, &fdw->w.wnode); + rb_iom_timer_add(th, &fdw->w.timer, timeout, IOM_FIB|IOM_WAIT); iom->select_gen++; - rb_ensure(iom_schedule_fdw, (VALUE)&fdw, rb_iom_waiter_done, (VALUE)&fdw.w); + return (int)rb_ensure(iom_schedule_fdw, (VALUE)fdw, fdw_done, (VALUE)fdw); +} - return (int)fdw.revents; /* may be zero if timed out */ +static int fsw_on_heap(void) +{ + if (sizeof(struct rb_iom_fdset_waiter) > RUBY_ALLOCV_LIMIT) { + return 1; + } + return FIBER_USE_NATIVE ? 0 : 1; } static VALUE @@ -378,10 +396,20 @@ rb_iom_select_done(VALUE ptr) list_del(&fsw->w.timer.n.tnode); list_del(&fsw->w.wnode); for (i = 0; i < 3; i++) { +#if !FIBER_USE_NATIVE + if (fsw->in[i]) { + rb_fd_dup(fsw->onstack_dst[i], fsw->in[i]); + rb_fd_term(fsw->in[i]); + xfree(fsw->in[i]); + } +#endif if (fsw->out[i]) { rb_fd_term(fsw->out[i]); } } + if (fsw_on_heap()) { + xfree(fsw); + } return Qfalse; } @@ -394,9 +422,11 @@ iom_schedule_fsw(VALUE ptr) iom_schedule_th(fsw->th); if (fsw->ret) { list_del_init(&fsw->w.timer.n.rnode); - return Qfalse; } - return rb_fiber_yield(0, 0); + else { /* fsw->ret will be set in another execution context */ + rb_fiber_yield(0, 0); + } + return (VALUE)fsw->ret; } int @@ -405,21 +435,16 @@ rb_iom_select(rb_thread_t *th, int maxfd, double *timeout) { rb_iom_t *iom = rb_iom_get(th); - VALUE v; struct rb_iom_fdset_waiter *fsw; - int ret; - fsw = ALLOCV_N(struct rb_iom_fdset_waiter, v, 1); + fsw = fsw_on_heap() ? ALLOC(struct rb_iom_fdset_waiter) + : ALLOCA_N(struct rb_iom_fdset_waiter, 1); rb_iom_fdset_waiter_init(th, fsw, maxfd, r, w, e); list_add(&iom->fdsets, &fsw->w.wnode); rb_iom_timer_add(th, &fsw->w.timer, timeout, IOM_FIB|IOM_WAIT); iom->select_gen++; - rb_ensure(iom_schedule_fsw, (VALUE)fsw, rb_iom_select_done, (VALUE)fsw); - ret = fsw->ret; - if (v) { - ALLOCV_END(v); - } - return ret; + return (int)rb_ensure(iom_schedule_fsw, (VALUE)fsw, + rb_iom_select_done, (VALUE)fsw); } void -- EW