dumping ground for random patches and texts
 help / color / mirror / Atom feed
* [PATCH 1/8] wip-iom_select
@ 2017-05-18 22:30 Eric Wong
  2017-05-18 22:30 ` [PATCH 2/8] compiles: Eric Wong
                   ` (6 more replies)
  0 siblings, 7 replies; 8+ messages in thread
From: Eric Wong @ 2017-05-18 22:30 UTC (permalink / raw)
  To: spew

---
 include/ruby/io.h |   1 +
 iom.h             |  62 ++++++++++++++++++++++++
 iom_select.c      | 140 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 vm.c              |   2 +
 vm_core.h         |   5 ++
 5 files changed, 210 insertions(+)
 create mode 100644 iom.h
 create mode 100644 iom_select.c

diff --git a/include/ruby/io.h b/include/ruby/io.h
index 60d6f6d32e..1f0b1d2dfa 100644
--- a/include/ruby/io.h
+++ b/include/ruby/io.h
@@ -116,6 +116,7 @@ typedef struct rb_io_t {
 /* #define FMODE_UNIX                  0x00200000 */
 /* #define FMODE_INET                  0x00400000 */
 /* #define FMODE_INET6                 0x00800000 */
+/* #define FMODE_EPOLL                 0x01000000 */ /* Linux-only */
 
 #define GetOpenFile(obj,fp) rb_io_check_closed((fp) = RFILE(rb_io_taint_check(obj))->fptr)
 
diff --git a/iom.h b/iom.h
new file mode 100644
index 0000000000..755dd73e40
--- /dev/null
+++ b/iom.h
@@ -0,0 +1,62 @@
+/*
+ * iom -> I/O Manager for RubyVM (auto-Fiber-aware)
+ *
+ * On platforms with epoll or kqueue, this should be ready for multicore;
+ * even if the rest of the RubyVM is not.
+ *
+ * Some inspiration taken from Mio in GHC:
+ * http://haskell.cs.yale.edu/wp-content/uploads/2013/08/hask035-voellmy.pdf
+ */
+#ifndef RUBY_IOM_H
+#define RUBY_IOM_H
+#include "ruby.h"
+#include "ruby/intern.h"
+#include "vm_core.h"
+
+typedef struct rb_iom_struct rb_iom_t;
+
+/*
+ * there is no public create function, creation is lazy to avoid incurring
+ * overhead for small scripts which do not need fibers
+ */
+int rb_iom_destroy(rb_iom_t *);
+
+/*
+ * Relinquish calling fiber while waiting for +events+ on +rb_io_t.fd+.
+ * If timespec is NULL, it will wait forever, otherwise
+ * wait for up to the relative time given by timespec.
+ */
+void rb_iom_waitio(rb_thread_t *, rb_io_t *, int events, struct timespec *);
+
+/*
+ * Relinquish calling fiber while waiting for +events+ on +fd+.
+ * If timespec is NULL, it will wait forever, otherwise
+ * wait for up to the relative time given by timespec.
+ * Use rb_iom_waitio instead of rb_iom_fdwait when possible,
+ * the former is more efficient on some platforms.
+ */
+void rb_iom_waitfd(rb_thread_t *, int fd, int events, struct timespec *);
+
+/*
+ * Relinquish calling fiber for at least the duration of given timespec,
+ * timespec is NULL, wait forever (until explicitly resumed).
+ */
+void rb_iom_sleep(rb_thread_t *, struct timespec *);
+
+/*
+ * Relinquish calling fiber to wait for the given PID to
+ * change status for least the duration of given timespec.
+ */
+void rb_iom_waitpid(rb_thread_t *, rb_pid_t, int *status, struct timespec *);
+
+/*
+ * callback for SIGCHLD, needed to implemented rb_iom_waitpid
+ */
+void rb_iom_sigchld(rb_iom_t *);
+
+/* callbacks for fork */
+void rb_iom_atfork_prepare(rb_thread_t *, rb_iom_t *);
+void rb_iom_atfork_parent(rb_thread_t *, rb_iom_t *);
+void rb_iom_atfork_child(rb_thread_t *, rb_iom_t *);
+
+#endif /* RUBY_IOM_H */
diff --git a/iom_select.c b/iom_select.c
new file mode 100644
index 0000000000..c1cdddb989
--- /dev/null
+++ b/iom_select.c
@@ -0,0 +1,140 @@
+#include "iom.h"
+#include "ccan/list/list.h"
+#include "ruby/thread_native.h"
+#include "ruby/st.h"
+
+/* on heap (rb_vm_t.iom) */
+struct rb_iom_struct {
+    rb_nativethread_lock_t lock;
+    struct list_head timers; /* <=> rb_select_timer.tnode, sort by expire_at */
+    int self_pipe[2];
+    st_table *watches;
+    rb_fdset_t rfds;
+    rb_fdset_t wfds;
+    rb_fdset_t efds;
+    struct list_head pids; /* <=> rb_pid_wait.wnode, FIFO order */
+    /* TODO: optimized structure for waitpid((-N|-1|0|N), ...); */
+};
+
+/* only allocated on stack */
+struct rb_select_timer {
+    struct list_node tnode;
+    struct timespec expires_at; /* absolute time */
+    union {
+	struct rb_fd_wait *fdw;
+	struct rb_pid_wait *pidw;
+	VALUE mask; /* (RB_IMMEDIATE_P(mask) ? pidw : fdw; 0 == sleep-only */
+    } as;
+};
+
+/* only allocated on stack */
+struct rb_iom_waiter {
+    VALUE fibval;
+    struct list_node afqn; /* <=> rb_thread_t.autofiberq */
+    rb_thread_t *th;
+    struct rb_select_timer *timer; /* may be NULL */
+};
+
+/* only allocated on stack */
+struct rb_fd_waiter {
+    struct rb_iom_waiter w;
+    int fd;
+    int events;
+};
+
+/* only allocated on stack */
+struct rb_pid_wait {
+    struct rb_iom_waiter w;
+    rb_pid_t pid;
+    int status;
+    int options;
+};
+
+/* we lazily create this, small scripts may never need this */
+static rb_iom_t *
+rb_iom_create(rb_thread_t *th)
+{
+    rb_iom_t *iom = ZALLOC(rb_iom_t);
+    rb_io_t io;
+
+    if (rb_cloexec_pipe(iom->self_pipe) < 0) {
+	xfree(iom);
+	return NULL;
+    }
+
+    io.pathv = Qnil;
+    io.fd = iom->self_pipe[0];
+    rb_io_set_nonblock(&io);
+    io.fd = iom->self_pipe[1];
+    rb_io_set_nonblock(&io);
+
+    list_head_init(&iom->timers);
+    list_head_init(&iom->pids);
+    rb_nativethread_lock_initialize(&iom->lock);
+    iom->watches = st_init_numtable();
+    rb_fd_init(&iom->rfds);
+    rb_fd_init(&iom->wfds);
+    rb_fd_init(&iom->efds);
+
+    return iom;
+}
+
+static rb_iom_t *
+rb_iom_get(rb_thread_t *th)
+{
+    VM_ASSERT(th);
+    VM_ASSERT(th->vm);
+    if (!th->vm->iom) {
+	th->vm->iom = rb_iom_create(th);
+    }
+    return th->vm->iom;
+}
+
+/* TODO: share portability code with Process.clock_gettime */
+static void now_ts(struct timespec *ts)
+{
+    clock_gettime(CLOCK_MONOTONIC, &timer->expires_at);
+}
+
+static void
+select_timer_init(struct rb_select_timer *timer, const struct timespec *rel)
+{
+    list_node_init(&timer->tnode); /* allow list_del_init to be idempotent */
+    now_ts(&timer->expires_at);
+
+    timer->expires_at.tv_sec += rel->tv_sec;
+    timer->expires_at.tv_nsec += rel->tv_nsec;
+    if (timer->expires_at.tv_nsec >= 1000000000) {
+	timer->expires_at.tv_nsec -= 1000000000;
+	timer->expires_at.tv_sec += 1;
+    }
+}
+
+static void waiter_init(struct rb_iom_waiter *w, const rb_thread *th)
+{
+    w->fibval = rb_fiber_current();
+    list_node_init(&w->afqn); /* allow list_del_init to be idempotent */
+    w->th = th;
+}
+
+void
+rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, struct timespec *ts)
+{
+    /* only epoll can take advantage of rb_iom_waitio... */
+    rb_iom_io_fd_wait(th, fptr->fd, events, ts);
+}
+
+void
+rb_iom_waitfd(rb_thread_t *th, int fd, int events, struct timespec *ts)
+{
+    rb_iom_t *iom = rb_iom_get(th);
+    struct rb_fd_waiter fdw;
+
+    waiter_init(&fdw.w, th);
+    if (ts) {
+	fdw.w.timer = alloca(sizeof(struct rb_select_timer));
+	select_timer_init(fdw.w.timer, ts);
+    }
+
+
+}
diff --git a/vm.c b/vm.c
index 52d505ab7c..60b7bf070b 100644
--- a/vm.c
+++ b/vm.c
@@ -2513,6 +2513,8 @@ th_init(rb_thread_t *th, VALUE self)
 
     th->ec.cfp = (void *)(th->ec.stack + th->ec.stack_size);
 
+    list_head_init(&th->auto_fiberq);
+
     vm_push_frame(th, 0 /* dummy iseq */, VM_FRAME_MAGIC_DUMMY | VM_ENV_FLAG_LOCAL | VM_FRAME_FLAG_FINISH | VM_FRAME_FLAG_CFRAME /* dummy frame */,
 		  Qnil /* dummy self */, VM_BLOCK_HANDLER_NONE /* dummy block ptr */,
 		  0 /* dummy cref/me */,
diff --git a/vm_core.h b/vm_core.h
index 35b1748218..3ee3e5c0c5 100644
--- a/vm_core.h
+++ b/vm_core.h
@@ -481,6 +481,8 @@ typedef struct rb_hook_list_struct {
     int need_clean;
 } rb_hook_list_t;
 
+struct rb_iomgr_struct;
+
 typedef struct rb_vm_struct {
     VALUE self;
 
@@ -490,6 +492,8 @@ typedef struct rb_vm_struct {
     struct rb_thread_struct *main_thread;
     struct rb_thread_struct *running_thread;
 
+    struct rb_iomgr_struct *iomgr;
+
     struct list_head living_threads;
     size_t living_thread_num;
     VALUE thgroup_default;
@@ -809,6 +813,7 @@ typedef struct rb_thread_struct {
     rb_fiber_t *fiber;
     rb_fiber_t *root_fiber;
     rb_jmpbuf_t root_jmpbuf;
+    struct list_head auto_fiberq;
 
     /* ensure & callcc */
     rb_ensure_list_t *ensure_list;
-- 
EW


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 2/8] compiles:
  2017-05-18 22:30 [PATCH 1/8] wip-iom_select Eric Wong
@ 2017-05-18 22:30 ` Eric Wong
  2017-05-18 22:30 ` [PATCH 3/8] wip Eric Wong
                   ` (5 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2017-05-18 22:30 UTC (permalink / raw)
  To: spew

require 'net/http'
require 'uri'
require 'digest/sha1'
require 'fiber'
url = 'https://80x24.org/git-i-forgot-to-pack/objects/pack/pack-97b25a76c03b489d4cbbd85b12d0e1ad28717e55.pack'
uri = URI(url)
Fiber.current.auto = true
fibs = 10.times.map do
  Fiber.new do
    Fiber.current.auto = true
    cur = Fiber.current.object_id
    Net::HTTP.start(uri.host, uri.port, use_ssl: true) do |http|
      req = Net::HTTP::Get.new(uri)
      http.request(req) do |res|
        dig = Digest::SHA1.new
        res.read_body do |buf|
          dig.update(buf)
          warn "#{cur} #{buf.bytesize}\n"
        end
        warn "#{cur} #{dig.hexdigest}\n"
      end
    end
    :done
  end
end

fibs.delete_if do |f|
  f.resume == :done
end until fibs.empty?
---
 common.mk         |   3 +
 cont.c            |  27 ++++++
 include/ruby/io.h |   3 +-
 iom.h             |  57 ++++++++-----
 iom_common.h      |  96 ++++++++++++++++++++++
 iom_select.c      | 140 -------------------------------
 iom_select.h      | 242 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 thread.c          |  22 +++++
 vm_core.h         |   4 +-
 9 files changed, 429 insertions(+), 165 deletions(-)
 create mode 100644 iom_common.h
 delete mode 100644 iom_select.c
 create mode 100644 iom_select.h

diff --git a/common.mk b/common.mk
index 9e554753c6..5bdec64409 100644
--- a/common.mk
+++ b/common.mk
@@ -2588,6 +2588,9 @@ thread.$(OBJEXT): {$(VPATH)}id.h
 thread.$(OBJEXT): {$(VPATH)}intern.h
 thread.$(OBJEXT): {$(VPATH)}internal.h
 thread.$(OBJEXT): {$(VPATH)}io.h
+thread.$(OBJEXT): {$(VPATH)}iom.h
+thread.$(OBJEXT): {$(VPATH)}iom_common.h
+thread.$(OBJEXT): {$(VPATH)}iom_select.h
 thread.$(OBJEXT): {$(VPATH)}method.h
 thread.$(OBJEXT): {$(VPATH)}missing.h
 thread.$(OBJEXT): {$(VPATH)}node.h
diff --git a/cont.c b/cont.c
index 4d6176f00c..05e0a83c94 100644
--- a/cont.c
+++ b/cont.c
@@ -127,6 +127,7 @@ struct rb_fiber_struct {
     rb_context_t cont;
     struct rb_fiber_struct *prev;
     enum fiber_status status;
+    unsigned int auto_fiber:1;
     /* If a fiber invokes "transfer",
      * then this fiber can't "resume" any more after that.
      * You shouldn't mix "transfer" and "resume".
@@ -1502,6 +1503,9 @@ rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
     rb_fiber_t *fib;
     GetFiberPtr(fibval, fib);
 
+    if (fib->prev != 0) {
+	rb_raise(rb_eFiberError, "zdouble resume");
+    }
     if (fib->prev != 0 || fib->cont.type == ROOT_FIBER_CONTEXT) {
 	rb_raise(rb_eFiberError, "double resume");
     }
@@ -1651,7 +1655,28 @@ rb_fiber_s_current(VALUE klass)
     return rb_fiber_current();
 }
 
+static VALUE
+rb_fiber_auto_get(VALUE self)
+{
+    rb_fiber_t *fiber = fiber_current();
 
+    return fiber->auto_fiber ? Qtrue : Qfalse;
+}
+
+static VALUE
+rb_fiber_auto_set(VALUE self, VALUE val)
+{
+    rb_fiber_t *fiber = fiber_current();
+
+    return (fiber->auto_fiber = RTEST(val)) ? Qtrue : Qfalse;
+}
+
+int rb_fiber_auto_sched_p(const rb_thread_t *th)
+{
+    const rb_fiber_t *cur = th->fiber;
+
+    return (cur && cur->auto_fiber && th->root_fiber != cur);
+}
 
 /*
  *  Document-class: FiberError
@@ -1688,6 +1713,8 @@ Init_Cont(void)
     rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1);
     rb_define_method(rb_cFiber, "initialize", rb_fiber_init, 0);
     rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1);
+    rb_define_method(rb_cFiber, "auto", rb_fiber_auto_get, 0);
+    rb_define_method(rb_cFiber, "auto=", rb_fiber_auto_set, 1);
 }
 
 RUBY_SYMBOL_EXPORT_BEGIN
diff --git a/include/ruby/io.h b/include/ruby/io.h
index 1f0b1d2dfa..3bd6f06cf3 100644
--- a/include/ruby/io.h
+++ b/include/ruby/io.h
@@ -116,7 +116,8 @@ typedef struct rb_io_t {
 /* #define FMODE_UNIX                  0x00200000 */
 /* #define FMODE_INET                  0x00400000 */
 /* #define FMODE_INET6                 0x00800000 */
-/* #define FMODE_EPOLL                 0x01000000 */ /* Linux-only */
+/* #define FMODE_IOM_PRIVATE1          0x01000000 */ /* OS-dependent */
+/* #define FMODE_IOM_PRIVATE2          0x02000000 */ /* OS-dependent */
 
 #define GetOpenFile(obj,fp) rb_io_check_closed((fp) = RFILE(rb_io_taint_check(obj))->fptr)
 
diff --git a/iom.h b/iom.h
index 755dd73e40..070b610458 100644
--- a/iom.h
+++ b/iom.h
@@ -15,43 +15,49 @@
 
 typedef struct rb_iom_struct rb_iom_t;
 
-/*
- * there is no public create function, creation is lazy to avoid incurring
- * overhead for small scripts which do not need fibers
- */
-int rb_iom_destroy(rb_iom_t *);
+/* WARNING: unstable API, only for Ruby internal use */
 
 /*
- * Relinquish calling fiber while waiting for +events+ on +rb_io_t.fd+.
- * If timespec is NULL, it will wait forever, otherwise
- * wait for up to the relative time given by timespec.
+ * Note: the first "rb_thread_t *" is a placeholder and may be replaced
+ * with "rb_execution_context_t *" in the future.
  */
-void rb_iom_waitio(rb_thread_t *, rb_io_t *, int events, struct timespec *);
 
 /*
- * Relinquish calling fiber while waiting for +events+ on +fd+.
- * If timespec is NULL, it will wait forever, otherwise
- * wait for up to the relative time given by timespec.
- * Use rb_iom_waitio instead of rb_iom_fdwait when possible,
- * the former is more efficient on some platforms.
+ * All functions with "wait" in it take an optional double * +timeout+
+ * argument specifying the timeout in seconds.  If NULL, it can wait
+ * forever until the event happens (or the fiber is explicitly resumed).
+ * If non-NULL, the timeout will be updated to the remaining time
+ * upon return.
  */
-void rb_iom_waitfd(rb_thread_t *, int fd, int events, struct timespec *);
 
 /*
- * Relinquish calling fiber for at least the duration of given timespec,
- * timespec is NULL, wait forever (until explicitly resumed).
+ * Relinquish calling fiber while waiting for +events+ on file descriptor
+ * pointed to by +fdp+.
+ * Multiple native threads can enter this function at the same time.
+ *
+ * Events are RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI
+ *
+ * Returns a mask of events.
  */
-void rb_iom_sleep(rb_thread_t *, struct timespec *);
+int rb_iom_waitfd(rb_thread_t *, int *fdp, int events, double *timeout);
 
 /*
- * Relinquish calling fiber to wait for the given PID to
- * change status for least the duration of given timespec.
+ * Relinquish calling fiber to wait for the given PID to change status.
+ * Multiple native threads can enter this function at the same time.
+ * If timeout is negative, wait forever.
  */
-void rb_iom_waitpid(rb_thread_t *, rb_pid_t, int *status, struct timespec *);
+rb_pid_t rb_iom_waitpid(rb_thread_t *,
+			rb_pid_t, int *status, int options, double *timeout);
 
 /*
- * callback for SIGCHLD, needed to implemented rb_iom_waitpid
+ * Relinquish calling fiber for at least the duration of given timeout
+ * in seconds.  If timeout is negative, wait forever (until explicitly
+ * resumed).
+ * Multiple native threads can enter this function at the same time.
  */
+void rb_iom_sleep(rb_thread_t *, double *timeout);
+
+/* callback for SIGCHLD, needed to implemented rb_iom_waitpid */
 void rb_iom_sigchld(rb_iom_t *);
 
 /* callbacks for fork */
@@ -59,4 +65,11 @@ void rb_iom_atfork_prepare(rb_thread_t *, rb_iom_t *);
 void rb_iom_atfork_parent(rb_thread_t *, rb_iom_t *);
 void rb_iom_atfork_child(rb_thread_t *, rb_iom_t *);
 
+/*
+ * there is no public create function, creation is lazy to avoid incurring
+ * overhead for small scripts which do not need fibers, we only need this
+ * at VM destruction
+ */
+int rb_iom_destroy(rb_iom_t *);
+
 #endif /* RUBY_IOM_H */
diff --git a/iom_common.h b/iom_common.h
new file mode 100644
index 0000000000..03d4ac560a
--- /dev/null
+++ b/iom_common.h
@@ -0,0 +1,96 @@
+#ifndef RB_IOM_COMMON_H
+#define RB_IOM_COMMON_H
+
+#include "iom.h"
+
+/* allocated on stack */
+struct rb_iom_timer {
+    struct list_node tnode; /* <=> rb_iom_struct.timers */
+    double expires_at; /* absolute monotonic time */
+};
+
+struct rb_iom_waiter {
+    VALUE fibval;
+    struct list_node node; /* <=> (rb_thread_t.autofiberq|rb_iom_struct.fds) */
+    rb_thread_t *th;
+    struct rb_iom_timer timer;
+};
+
+struct rb_iom_fd_waiter {
+    struct rb_iom_waiter w;
+    int *fdp;
+    short events;
+    short revents;
+};
+
+struct rb_iom_pid_waiter {
+    struct rb_iom_waiter w;
+    rb_pid_t pid;
+    int status;
+    int options;
+};
+
+/* check for expired timers */
+static void
+rb_iom_timer_check(struct list_head *timers)
+{
+    struct rb_iom_timer *i = NULL, *next = NULL;
+    double now = timeofday();
+    struct rb_iom_waiter *w;
+
+    list_for_each_safe(timers, i, next, tnode) {
+	if (i->expires_at <= now) {
+	    w = container_of(i, struct rb_iom_waiter, timer);
+	    list_del_init(&i->tnode);
+	    list_del(&w->node);
+	    list_add_tail(&w->th->auto_fiberq, &w->node);
+	    w->th = NULL;
+	}
+	break; /* done, timers is a sorted list */
+    }
+}
+
+/* insert a new +timer+ into +timers+, maintain sort order by expires_at */
+static void
+rb_iom_timer_add(struct list_head *timers, struct rb_iom_timer *add,
+		const double *timeout)
+{
+    rb_iom_timer_check(timers);
+    if (timeout) {
+	struct rb_iom_timer *i = NULL;
+	add->expires_at = timeofday() + *timeout;
+
+	/*
+	 * search backwards: assume typical projects have multiple objects
+	 * sharing the same timeout values, so newly added timers will expire
+	 * expire later than existing timers
+	 */
+	list_for_each_rev(timers, i, tnode) {
+	    if (add->expires_at > i->expires_at) {
+		list_add_after(timers, &i->tnode, &add->tnode);
+		return;
+	    }
+	}
+	list_add(timers, &add->tnode);
+    }
+    else {
+	/* not active, just allow list_del_init to function in cleanup */
+	list_node_init(&add->tnode);
+    }
+}
+
+static void
+rb_iom_schedule(rb_thread_t *th, struct rb_iom_waiter *current)
+{
+    struct rb_iom_waiter *i = NULL, *next = NULL;
+
+    list_for_each_safe(&th->auto_fiberq, i, next, node) {
+	list_del_init(&i->node);
+        if (i == current) {
+          return;
+        }
+	rb_fiber_resume(i->fibval, 0, NULL);
+    }
+}
+
+#endif /* IOM_COMMON_H */
diff --git a/iom_select.c b/iom_select.c
deleted file mode 100644
index c1cdddb989..0000000000
--- a/iom_select.c
+++ /dev/null
@@ -1,140 +0,0 @@
-#include "iom.h"
-#include "ccan/list/list.h"
-#include "ruby/thread_native.h"
-#include "ruby/st.h"
-
-/* on heap (rb_vm_t.iom) */
-struct rb_iom_struct {
-    rb_nativethread_lock_t lock;
-    struct list_head timers; /* <=> rb_select_timer.tnode, sort by expire_at */
-    int self_pipe[2];
-    st_table *watches;
-    rb_fdset_t rfds;
-    rb_fdset_t wfds;
-    rb_fdset_t efds;
-    struct list_head pids; /* <=> rb_pid_wait.wnode, FIFO order */
-    /* TODO: optimized structure for waitpid((-N|-1|0|N), ...); */
-};
-
-/* only allocated on stack */
-struct rb_select_timer {
-    struct list_node tnode;
-    struct timespec expires_at; /* absolute time */
-    union {
-	struct rb_fd_wait *fdw;
-	struct rb_pid_wait *pidw;
-	VALUE mask; /* (RB_IMMEDIATE_P(mask) ? pidw : fdw; 0 == sleep-only */
-    } as;
-};
-
-/* only allocated on stack */
-struct rb_iom_waiter {
-    VALUE fibval;
-    struct list_node afqn; /* <=> rb_thread_t.autofiberq */
-    rb_thread_t *th;
-    struct rb_select_timer *timer; /* may be NULL */
-};
-
-/* only allocated on stack */
-struct rb_fd_waiter {
-    struct rb_iom_waiter w;
-    int fd;
-    int events;
-};
-
-/* only allocated on stack */
-struct rb_pid_wait {
-    struct rb_iom_waiter w;
-    rb_pid_t pid;
-    int status;
-    int options;
-};
-
-/* we lazily create this, small scripts may never need this */
-static rb_iom_t *
-rb_iom_create(rb_thread_t *th)
-{
-    rb_iom_t *iom = ZALLOC(rb_iom_t);
-    rb_io_t io;
-
-    if (rb_cloexec_pipe(iom->self_pipe) < 0) {
-	xfree(iom);
-	return NULL;
-    }
-
-    io.pathv = Qnil;
-    io.fd = iom->self_pipe[0];
-    rb_io_set_nonblock(&io);
-    io.fd = iom->self_pipe[1];
-    rb_io_set_nonblock(&io);
-
-    list_head_init(&iom->timers);
-    list_head_init(&iom->pids);
-    rb_nativethread_lock_initialize(&iom->lock);
-    iom->watches = st_init_numtable();
-    rb_fd_init(&iom->rfds);
-    rb_fd_init(&iom->wfds);
-    rb_fd_init(&iom->efds);
-
-    return iom;
-}
-
-static rb_iom_t *
-rb_iom_get(rb_thread_t *th)
-{
-    VM_ASSERT(th);
-    VM_ASSERT(th->vm);
-    if (!th->vm->iom) {
-	th->vm->iom = rb_iom_create(th);
-    }
-    return th->vm->iom;
-}
-
-/* TODO: share portability code with Process.clock_gettime */
-static void now_ts(struct timespec *ts)
-{
-    clock_gettime(CLOCK_MONOTONIC, &timer->expires_at);
-}
-
-static void
-select_timer_init(struct rb_select_timer *timer, const struct timespec *rel)
-{
-    list_node_init(&timer->tnode); /* allow list_del_init to be idempotent */
-    now_ts(&timer->expires_at);
-
-    timer->expires_at.tv_sec += rel->tv_sec;
-    timer->expires_at.tv_nsec += rel->tv_nsec;
-    if (timer->expires_at.tv_nsec >= 1000000000) {
-	timer->expires_at.tv_nsec -= 1000000000;
-	timer->expires_at.tv_sec += 1;
-    }
-}
-
-static void waiter_init(struct rb_iom_waiter *w, const rb_thread *th)
-{
-    w->fibval = rb_fiber_current();
-    list_node_init(&w->afqn); /* allow list_del_init to be idempotent */
-    w->th = th;
-}
-
-void
-rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, struct timespec *ts)
-{
-    /* only epoll can take advantage of rb_iom_waitio... */
-    rb_iom_io_fd_wait(th, fptr->fd, events, ts);
-}
-
-void
-rb_iom_waitfd(rb_thread_t *th, int fd, int events, struct timespec *ts)
-{
-    rb_iom_t *iom = rb_iom_get(th);
-    struct rb_fd_waiter fdw;
-
-    waiter_init(&fdw.w, th);
-    if (ts) {
-	fdw.w.timer = alloca(sizeof(struct rb_select_timer));
-	select_timer_init(fdw.w.timer, ts);
-    }
-
-
-}
diff --git a/iom_select.h b/iom_select.h
new file mode 100644
index 0000000000..21603a7e5a
--- /dev/null
+++ b/iom_select.h
@@ -0,0 +1,242 @@
+/*
+ * select()-based implementation of I/O Manager for RubyVM
+ *
+ * This is crippled and relies heavily on GVL compared to the
+ * epoll and kqueue versions
+ */
+#include "iom_common.h"
+
+/* allocated on heap (rb_vm_t.iom) */
+struct rb_iom_struct {
+    /*
+     * lists are all protected by GVL at this time,
+     * URCU lists (LGPL-2.1+) may be used in the future
+     */
+    struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
+    struct list_head fds; /* -rb_iom_fd_waiter.w.node */
+    struct list_head pids; /* -rb_iom_pid_waiter.w.node, FIFO order */
+
+    rb_nativethread_lock_t lock; /* protects select() and fdsets */
+    rb_fdset_t rfds;
+    rb_fdset_t wfds;
+    rb_fdset_t efds;
+    rb_thread_t *scheduler;
+};
+
+/* we lazily create this, small scripts may never need iom */
+static rb_iom_t *
+rb_iom_create(rb_thread_t *th)
+{
+    rb_iom_t *iom = ALLOC(rb_iom_t);
+
+    list_head_init(&iom->timers);
+    list_head_init(&iom->fds);
+    list_head_init(&iom->pids);
+    rb_nativethread_lock_initialize(&iom->lock);
+
+    return iom;
+}
+
+static rb_iom_t *
+rb_iom_get(rb_thread_t *th)
+{
+    VM_ASSERT(th && th->vm);
+    if (!th->vm->iom) {
+	th->vm->iom = rb_iom_create(th);
+    }
+    return th->vm->iom;
+}
+
+static void
+waiter_cleanup(struct rb_iom_waiter *w)
+{
+    list_del(&w->timer.tnode);
+    if (w->th) {
+	list_del_init(&w->node);
+    }
+    else if (!list_empty((struct list_head *)&w->node)) {
+      /* we got scheduled in auto_fiberq, wait for another to resume us */
+      rb_fiber_yield(0, NULL);
+    }
+}
+
+static VALUE
+fd_waiter_cleanup(VALUE ptr)
+{
+    struct rb_iom_fd_waiter *fdw = (struct rb_iom_fd_waiter *)ptr;
+    waiter_cleanup(&fdw->w);
+    return Qfalse;
+}
+
+static void
+waiter_init(struct rb_iom_waiter *w, rb_thread_t *th)
+{
+    w->fibval = rb_fiber_current();
+    w->th = th;
+}
+
+static struct timeval *
+iom_timeout(rb_iom_t *iom, struct timeval *tv)
+{
+    struct rb_iom_timer *t = list_top(&iom->timers, struct rb_iom_timer, tnode);
+
+    if (t) {
+	double diff = t->expires_at - timeofday();
+	if (diff > 0) {
+	    *tv = double2timeval(diff);
+	}
+	else {
+	    tv->tv_sec = 0;
+	    tv->tv_usec = 0;
+	}
+	return tv;
+    }
+    return NULL;
+}
+
+static VALUE
+iom_select_begin(VALUE ptr)
+{
+    rb_thread_t *th = (rb_thread_t *)ptr;
+    rb_iom_t *iom = th->vm->iom;
+    int max = 0;
+    struct timeval tv;
+    struct rb_iom_fd_waiter *fdw = NULL;
+    rb_fdset_t *rfds = NULL, *wfds = NULL, *efds = NULL;
+
+    rb_fd_init(&iom->rfds);
+    rb_fd_init(&iom->wfds);
+    rb_fd_init(&iom->efds);
+
+    list_for_each(&iom->fds, fdw, w.node) {
+	int fd = *fdw->fdp;
+	if (fd < 0) {
+	    continue; /* closed */
+	}
+	if (fd > max) {
+	    max = fd;
+	}
+	if (fdw->events & RB_WAITFD_IN) {
+	    rb_fd_set(fd, rfds = &iom->rfds);
+	}
+	if (fdw->events & RB_WAITFD_OUT) {
+	    rb_fd_set(fd, wfds = &iom->wfds);
+	}
+	if (fdw->events & RB_WAITFD_PRI) {
+	    rb_fd_set(fd, efds = &iom->efds);
+	}
+    }
+    rb_thread_fd_select(max + 1, rfds, wfds, efds, iom_timeout(iom, &tv));
+
+    return Qfalse;
+}
+
+static VALUE
+iom_select_end(VALUE ptr)
+{
+    rb_thread_t *th = (rb_thread_t *)ptr;
+    rb_iom_t *iom = th->vm->iom;
+    struct rb_iom_fd_waiter *fdw = NULL, *next = NULL;
+
+    iom->scheduler = NULL;
+    list_for_each_safe(&iom->fds, fdw, next, w.node) {
+	int fd = *fdw->fdp;
+	if (fd < 0) {
+	    continue; /* closed */
+	}
+	if (fdw->events & RB_WAITFD_IN && rb_fd_isset(fd, &iom->rfds)) {
+	    fdw->revents |= RB_WAITFD_IN;
+	}
+	if (fdw->events & RB_WAITFD_OUT && rb_fd_isset(fd, &iom->wfds)) {
+	    fdw->revents |= RB_WAITFD_OUT;
+	}
+	if (fdw->events & RB_WAITFD_PRI && rb_fd_isset(fd, &iom->efds)) {
+	    fdw->revents |= RB_WAITFD_PRI;
+	}
+
+	/* got revents? enqueue ourselves to be run! */
+	if (fdw->revents) {
+	    list_del_init(&fdw->w.timer.tnode);
+	    list_del(&fdw->w.node);
+	    list_add_tail(&fdw->w.th->auto_fiberq, &fdw->w.node);
+	    fdw->w.th = NULL;
+	}
+    }
+
+    rb_fd_term(&iom->rfds);
+    rb_fd_term(&iom->wfds);
+    rb_fd_term(&iom->efds);
+
+    native_mutex_unlock(&iom->lock);
+    return Qfalse;
+}
+
+static VALUE
+iom_schedule_fd(VALUE ptr)
+{
+    struct rb_iom_fd_waiter *fdw = (struct rb_iom_fd_waiter *)ptr;
+    rb_thread_t *th = fdw->w.th;
+    rb_iom_t *iom = th->vm->iom;
+
+    rb_iom_schedule(th, &fdw->w); /* try to get some work done elsewhere */
+
+    while (fdw->w.th != NULL) {
+	rb_thread_t *sched = iom->scheduler;
+        if (!sched) {
+            iom->scheduler = th;
+            /*
+             * nope, rb_iom_schedule did not do our work for us,
+             * maybe we can wait on select(), ourselves:
+             */
+            if (native_mutex_trylock(&iom->lock) == 0) {
+                rb_ensure(iom_select_begin, (VALUE)th, iom_select_end, (VALUE)th);
+            }
+	}
+	else {
+	    /* or kick whoever is running select into doing work for us */
+	    ubf_select(sched);
+	    rb_thread_schedule_limits(0);
+	    rb_iom_schedule(th, &fdw->w);
+	    if (list_empty((struct list_head *)&fdw->w.node)) {
+		return Qfalse;
+	    }
+	}
+    }
+    return Qfalse;
+}
+
+int
+rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
+{
+    rb_iom_t *iom = rb_iom_get(th);
+    struct rb_iom_fd_waiter fdw;
+
+    if (*fdp < 0) return -1;
+    fdw.fdp = fdp;
+    fdw.events = (short)events;
+    fdw.revents = 0;
+    waiter_init(&fdw.w, th);
+    list_add(&iom->fds, &fdw.w.node);
+    rb_iom_timer_add(&iom->timers, &fdw.w.timer, timeout);
+    rb_ensure(iom_schedule_fd, (VALUE)&fdw, fd_waiter_cleanup, (VALUE)&fdw);
+
+    if (*fdp < 0) return -1;
+
+    return (int)fdw.revents; /* may be zero if timed out */
+}
+
+rb_pid_t
+rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
+		double *timeout)
+{
+    rb_iom_t *iom = rb_iom_get(th);
+    struct rb_iom_pid_waiter pw;
+
+    VM_ASSERT((options & WNOHANG) == 0 &&
+		"WNOHANG should be handled in rb_waitpid");
+
+    waiter_init(&pw.w, th);
+    rb_iom_timer_add(&iom->timers, &pw.w.timer, timeout);
+
+    return -1;
+}
diff --git a/thread.c b/thread.c
index fd3db3648f..4ff8510100 100644
--- a/thread.c
+++ b/thread.c
@@ -70,6 +70,10 @@
 #include "ruby/thread.h"
 #include "ruby/thread_native.h"
 #include "internal.h"
+#include "iom.h"
+
+/* cont.c */
+int rb_fiber_auto_sched_p(const rb_thread_t *th);
 
 #ifndef USE_NATIVE_THREAD_PRIORITY
 #define USE_NATIVE_THREAD_PRIORITY 0
@@ -3909,6 +3913,16 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
     struct timespec *timeout = NULL;
     rb_thread_t *th = GET_THREAD();
 
+    if (rb_fiber_auto_sched_p(th)) {
+	double *to = NULL;
+	double tout;
+	if (tv) {
+	    tout = (double)tv->tv_sec + (double)tv->tv_usec * 1e-6;
+	    to = &tout;
+	}
+	return rb_iom_waitfd(th, &fd, events, to);
+    }
+
 #define poll_update() \
     (update_timespec(timeout, limit), \
      TRUE)
@@ -5060,6 +5074,12 @@ rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data)
 }
 
 void
+rb_thread_ubf_select(rb_thread_t *th)
+{
+    ubf_select(th);
+}
+
+void
 ruby_kill(rb_pid_t pid, int sig)
 {
     int err;
@@ -5081,3 +5101,5 @@ ruby_kill(rb_pid_t pid, int sig)
 	rb_sys_fail(0);
     }
 }
+
+#include "iom_select.h"
diff --git a/vm_core.h b/vm_core.h
index 3ee3e5c0c5..dade714b2a 100644
--- a/vm_core.h
+++ b/vm_core.h
@@ -481,7 +481,7 @@ typedef struct rb_hook_list_struct {
     int need_clean;
 } rb_hook_list_t;
 
-struct rb_iomgr_struct;
+struct rb_iom_struct;
 
 typedef struct rb_vm_struct {
     VALUE self;
@@ -492,7 +492,7 @@ typedef struct rb_vm_struct {
     struct rb_thread_struct *main_thread;
     struct rb_thread_struct *running_thread;
 
-    struct rb_iomgr_struct *iomgr;
+    struct rb_iom_struct *iom;
 
     struct list_head living_threads;
     size_t living_thread_num;
-- 
EW


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 3/8] wip
  2017-05-18 22:30 [PATCH 1/8] wip-iom_select Eric Wong
  2017-05-18 22:30 ` [PATCH 2/8] compiles: Eric Wong
@ 2017-05-18 22:30 ` Eric Wong
  2017-05-18 22:30 ` [PATCH 4/8] omg Eric Wong
                   ` (4 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2017-05-18 22:30 UTC (permalink / raw)
  To: spew

---
 iom_select.h | 34 ++++++++++++++++++++++------------
 1 file changed, 22 insertions(+), 12 deletions(-)

diff --git a/iom_select.h b/iom_select.h
index 21603a7e5a..ebc5af87ff 100644
--- a/iom_select.h
+++ b/iom_select.h
@@ -15,8 +15,8 @@ struct rb_iom_struct {
     struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
     struct list_head fds; /* -rb_iom_fd_waiter.w.node */
     struct list_head pids; /* -rb_iom_pid_waiter.w.node, FIFO order */
+    int gen;
 
-    rb_nativethread_lock_t lock; /* protects select() and fdsets */
     rb_fdset_t rfds;
     rb_fdset_t wfds;
     rb_fdset_t efds;
@@ -32,7 +32,6 @@ rb_iom_create(rb_thread_t *th)
     list_head_init(&iom->timers);
     list_head_init(&iom->fds);
     list_head_init(&iom->pids);
-    rb_nativethread_lock_initialize(&iom->lock);
 
     return iom;
 }
@@ -54,9 +53,11 @@ waiter_cleanup(struct rb_iom_waiter *w)
     if (w->th) {
 	list_del_init(&w->node);
     }
-    else if (!list_empty((struct list_head *)&w->node)) {
-      /* we got scheduled in auto_fiberq, wait for another to resume us */
-      rb_fiber_yield(0, NULL);
+    else {
+	while (!list_empty((struct list_head *)&w->node)) {
+	    /* we got scheduled in auto_fiberq, wait for another to resume us */
+	    rb_fiber_yield(0, NULL);
+	}
     }
 }
 
@@ -99,10 +100,15 @@ iom_select_begin(VALUE ptr)
 {
     rb_thread_t *th = (rb_thread_t *)ptr;
     rb_iom_t *iom = th->vm->iom;
-    int max = 0;
+    int max, gen;
     struct timeval tv;
     struct rb_iom_fd_waiter *fdw = NULL;
-    rb_fdset_t *rfds = NULL, *wfds = NULL, *efds = NULL;
+    rb_fdset_t *rfds, *wfds, *efds;
+
+again:
+    rfds = wfds = efds = NULL;
+    gen = iom->gen;
+    max = 0;
 
     rb_fd_init(&iom->rfds);
     rb_fd_init(&iom->wfds);
@@ -126,7 +132,14 @@ iom_select_begin(VALUE ptr)
 	    rb_fd_set(fd, efds = &iom->efds);
 	}
     }
-    rb_thread_fd_select(max + 1, rfds, wfds, efds, iom_timeout(iom, &tv));
+
+    r = rb_thread_fd_select(max + 1, rfds, wfds, efds, iom_timeout(iom, &tv));
+    if (r < 0 && errno == EINTR && iom->gen != gen) {
+	rb_fd_term(&iom->rfds);
+	rb_fd_term(&iom->wfds);
+	rb_fd_term(&iom->efds);
+	goto again;
+    }
 
     return Qfalse;
 }
@@ -167,7 +180,6 @@ iom_select_end(VALUE ptr)
     rb_fd_term(&iom->wfds);
     rb_fd_term(&iom->efds);
 
-    native_mutex_unlock(&iom->lock);
     return Qfalse;
 }
 
@@ -188,9 +200,7 @@ iom_schedule_fd(VALUE ptr)
              * nope, rb_iom_schedule did not do our work for us,
              * maybe we can wait on select(), ourselves:
              */
-            if (native_mutex_trylock(&iom->lock) == 0) {
-                rb_ensure(iom_select_begin, (VALUE)th, iom_select_end, (VALUE)th);
-            }
+	    rb_ensure(iom_select_begin, (VALUE)th, iom_select_end, (VALUE)th);
 	}
 	else {
 	    /* or kick whoever is running select into doing work for us */
-- 
EW


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 4/8] omg
  2017-05-18 22:30 [PATCH 1/8] wip-iom_select Eric Wong
  2017-05-18 22:30 ` [PATCH 2/8] compiles: Eric Wong
  2017-05-18 22:30 ` [PATCH 3/8] wip Eric Wong
@ 2017-05-18 22:30 ` Eric Wong
  2017-05-18 22:30 ` [PATCH 5/8] ok Eric Wong
                   ` (3 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2017-05-18 22:30 UTC (permalink / raw)
  To: spew

---
 cont.c       | 94 +++++++++++++++++++++++++++++++++++++++++++++++++-----------
 iom_common.h | 43 ++++++++++++---------------
 iom_select.h | 86 ++++++++++++++++++++++--------------------------------
 thread.c     |  2 +-
 vm.c         |  2 +-
 vm_core.h    |  2 +-
 6 files changed, 133 insertions(+), 96 deletions(-)

diff --git a/cont.c b/cont.c
index 05e0a83c94..8282eabd81 100644
--- a/cont.c
+++ b/cont.c
@@ -126,8 +126,15 @@ static machine_stack_cache_t terminated_machine_stack;
 struct rb_fiber_struct {
     rb_context_t cont;
     struct rb_fiber_struct *prev;
+
+    /*
+     * afnode.next == NULL       auto fiber disabled
+     * afnode.next == &afnode    auto fiber enabled, but not enqueued (running)
+     * afnode.next != &afnode    auto fiber runnable (enqueued in th->afhead)
+     */
+    struct list_node afnode;
+
     enum fiber_status status;
-    unsigned int auto_fiber:1;
     /* If a fiber invokes "transfer",
      * then this fiber can't "resume" any more after that.
      * You shouldn't mix "transfer" and "resume".
@@ -1497,22 +1504,28 @@ rb_fiber_terminate(rb_fiber_t *fib)
     fiber_switch(return_fiber(), 1, &value, 0);
 }
 
-VALUE
-rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
+static void
+fiber_check_resume(const rb_fiber_t *fib)
 {
-    rb_fiber_t *fib;
-    GetFiberPtr(fibval, fib);
-
     if (fib->prev != 0) {
-	rb_raise(rb_eFiberError, "zdouble resume");
+	rb_raise(rb_eFiberError, "double resume (no prev)");
     }
-    if (fib->prev != 0 || fib->cont.type == ROOT_FIBER_CONTEXT) {
-	rb_raise(rb_eFiberError, "double resume");
+    if (fib->cont.type == ROOT_FIBER_CONTEXT) {
+	rb_raise(rb_eFiberError, "double resume (root)");
     }
     if (fib->transferred != 0) {
 	rb_raise(rb_eFiberError, "cannot resume transferred Fiber");
     }
+}
 
+VALUE
+rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
+{
+    rb_fiber_t *fib;
+    GetFiberPtr(fibval, fib);
+
+    fprintf(stderr, "resume\n");
+    fiber_check_resume(fib);
     return fiber_switch(fib, argc, argv, 1);
 }
 
@@ -1655,27 +1668,76 @@ rb_fiber_s_current(VALUE klass)
     return rb_fiber_current();
 }
 
+void
+rb_fiber_auto_enqueue(VALUE fibval)
+{
+    rb_fiber_t *fib;
+    rb_thread_t *th;
+
+    GetFiberPtr(fibval, fib);
+    GetThreadPtr(fib->cont.saved_thread.self, th);
+    fprintf(stderr, "fibenq: %p\n", fib);
+    fiber_check_resume(fib);
+    list_add_tail(&th->afhead, &fib->afnode);
+}
+
+void
+rb_fiber_auto_schedule(rb_thread_t *th)
+{
+    rb_fiber_t *fib = NULL, *next = NULL;
+    rb_fiber_t *current = fiber_current();
+
+    list_for_each_safe(&th->afhead, fib, next, afnode) {
+	list_del_init(&fib->afnode);
+	if (fib == current) {
+	    write(2, "self\n", 5);
+	    return;
+	}
+	fprintf(stderr, "fibswitch: %p\n", fib);
+	fiber_check_resume(fib);
+	fiber_switch(fib, 0, NULL, 1);
+    }
+}
+
 static VALUE
 rb_fiber_auto_get(VALUE self)
 {
-    rb_fiber_t *fiber = fiber_current();
+    rb_fiber_t *fib;
+    GetFiberPtr(self, fib);
 
-    return fiber->auto_fiber ? Qtrue : Qfalse;
+    return fib->afnode.next ? Qtrue : Qfalse;
 }
 
 static VALUE
 rb_fiber_auto_set(VALUE self, VALUE val)
 {
-    rb_fiber_t *fiber = fiber_current();
-
-    return (fiber->auto_fiber = RTEST(val)) ? Qtrue : Qfalse;
+    rb_fiber_t *fib;
+    GetFiberPtr(self, fib);
+    if (RTEST(val)) {
+	if (fib->afnode.next == NULL) {
+	    rb_thread_t *th;
+	    GetThreadPtr(fib->cont.saved_thread.self, th);
+	    fprintf(stderr, "fibset: %p\n", fib);
+	    fiber_check_resume(fib);
+	    list_add_tail(&th->afhead, &fib->afnode);
+	}
+	return Qtrue;
+    }
+    else {
+	if (fib->afnode.next) {
+	    list_del_init(&fib->afnode);
+	    fib->afnode.next = NULL;
+	}
+	return Qfalse;
+    }
 }
 
-int rb_fiber_auto_sched_p(const rb_thread_t *th)
+int
+rb_fiber_auto_sched_p(const rb_thread_t *th)
 {
     const rb_fiber_t *cur = th->fiber;
 
-    return (cur && cur->auto_fiber && th->root_fiber != cur);
+    return (cur && cur->afnode.next && th->root_fiber != cur);
 }
 
 /*
diff --git a/iom_common.h b/iom_common.h
index 03d4ac560a..c70b45499f 100644
--- a/iom_common.h
+++ b/iom_common.h
@@ -3,6 +3,10 @@
 
 #include "iom.h"
 
+/* cont.c */
+void rb_fiber_auto_enqueue(VALUE fibval);
+void rb_fiber_auto_schedule(rb_thread_t *);
+
 /* allocated on stack */
 struct rb_iom_timer {
     struct list_node tnode; /* <=> rb_iom_struct.timers */
@@ -10,10 +14,10 @@ struct rb_iom_timer {
 };
 
 struct rb_iom_waiter {
-    VALUE fibval;
-    struct list_node node; /* <=> (rb_thread_t.autofiberq|rb_iom_struct.fds) */
-    rb_thread_t *th;
+    struct list_node wnode; /* <=> rb_iom_struct.(fds|pids) */
     struct rb_iom_timer timer;
+    rb_thread_t *th;
+    VALUE fibval;
 };
 
 struct rb_iom_fd_waiter {
@@ -36,17 +40,20 @@ rb_iom_timer_check(struct list_head *timers)
 {
     struct rb_iom_timer *i = NULL, *next = NULL;
     double now = timeofday();
-    struct rb_iom_waiter *w;
 
     list_for_each_safe(timers, i, next, tnode) {
 	if (i->expires_at <= now) {
+	    struct rb_iom_waiter *w;
+	    VALUE fibval;
+
 	    w = container_of(i, struct rb_iom_waiter, timer);
 	    list_del_init(&i->tnode);
-	    list_del(&w->node);
-	    list_add_tail(&w->th->auto_fiberq, &w->node);
-	    w->th = NULL;
+	    list_del_init(&w->wnode);
+	    fibval = w->fibval;
+	    w->fibval = Qfalse;
+	    rb_fiber_auto_enqueue(fibval);
 	}
-	break; /* done, timers is a sorted list */
+	return; /* done, timers is a sorted list */
     }
 }
 
@@ -62,8 +69,8 @@ rb_iom_timer_add(struct list_head *timers, struct rb_iom_timer *add,
 
 	/*
 	 * search backwards: assume typical projects have multiple objects
-	 * sharing the same timeout values, so newly added timers will expire
-	 * expire later than existing timers
+	 * sharing the same timeout values, so new timers will expire later
+	 * than existing timers
 	 */
 	list_for_each_rev(timers, i, tnode) {
 	    if (add->expires_at > i->expires_at) {
@@ -74,23 +81,9 @@ rb_iom_timer_add(struct list_head *timers, struct rb_iom_timer *add,
 	list_add(timers, &add->tnode);
     }
     else {
-	/* not active, just allow list_del_init to function in cleanup */
+	/* not active, just allow list_del to function in cleanup */
 	list_node_init(&add->tnode);
     }
 }
 
-static void
-rb_iom_schedule(rb_thread_t *th, struct rb_iom_waiter *current)
-{
-    struct rb_iom_waiter *i = NULL, *next = NULL;
-
-    list_for_each_safe(&th->auto_fiberq, i, next, node) {
-	list_del_init(&i->node);
-        if (i == current) {
-          return;
-        }
-	rb_fiber_resume(i->fibval, 0, NULL);
-    }
-}
-
 #endif /* IOM_COMMON_H */
diff --git a/iom_select.h b/iom_select.h
index ebc5af87ff..c178c8881e 100644
--- a/iom_select.h
+++ b/iom_select.h
@@ -9,18 +9,17 @@
 /* allocated on heap (rb_vm_t.iom) */
 struct rb_iom_struct {
     /*
-     * lists are all protected by GVL at this time,
+     * Everything here is protected by GVL at this time,
      * URCU lists (LGPL-2.1+) may be used in the future
      */
     struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
-    struct list_head fds; /* -rb_iom_fd_waiter.w.node */
-    struct list_head pids; /* -rb_iom_pid_waiter.w.node, FIFO order */
-    int gen;
+    struct list_head fds; /* -rb_iom_fd_waiter.w.wnode, FIFO order */
+    struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
 
+    rb_thread_t *selector; /* thread blocked on rb_thread_fd_select */
     rb_fdset_t rfds;
     rb_fdset_t wfds;
     rb_fdset_t efds;
-    rb_thread_t *scheduler;
 };
 
 /* we lazily create this, small scripts may never need iom */
@@ -29,6 +28,7 @@ rb_iom_create(rb_thread_t *th)
 {
     rb_iom_t *iom = ALLOC(rb_iom_t);
 
+    iom->selector = NULL;
     list_head_init(&iom->timers);
     list_head_init(&iom->fds);
     list_head_init(&iom->pids);
@@ -50,15 +50,7 @@ static void
 waiter_cleanup(struct rb_iom_waiter *w)
 {
     list_del(&w->timer.tnode);
-    if (w->th) {
-	list_del_init(&w->node);
-    }
-    else {
-	while (!list_empty((struct list_head *)&w->node)) {
-	    /* we got scheduled in auto_fiberq, wait for another to resume us */
-	    rb_fiber_yield(0, NULL);
-	}
-    }
+    list_del(&w->wnode);
 }
 
 static VALUE
@@ -69,15 +61,8 @@ fd_waiter_cleanup(VALUE ptr)
     return Qfalse;
 }
 
-static void
-waiter_init(struct rb_iom_waiter *w, rb_thread_t *th)
-{
-    w->fibval = rb_fiber_current();
-    w->th = th;
-}
-
 static struct timeval *
-iom_timeout(rb_iom_t *iom, struct timeval *tv)
+next_timeout(rb_iom_t *iom, struct timeval *tv)
 {
     struct rb_iom_timer *t = list_top(&iom->timers, struct rb_iom_timer, tnode);
 
@@ -100,21 +85,17 @@ iom_select_begin(VALUE ptr)
 {
     rb_thread_t *th = (rb_thread_t *)ptr;
     rb_iom_t *iom = th->vm->iom;
-    int max, gen;
-    struct timeval tv;
+    int max = -1;
+    struct timeval tv, *tvp;
     struct rb_iom_fd_waiter *fdw = NULL;
     rb_fdset_t *rfds, *wfds, *efds;
 
-again:
     rfds = wfds = efds = NULL;
-    gen = iom->gen;
-    max = 0;
-
     rb_fd_init(&iom->rfds);
     rb_fd_init(&iom->wfds);
     rb_fd_init(&iom->efds);
 
-    list_for_each(&iom->fds, fdw, w.node) {
+    list_for_each(&iom->fds, fdw, w.wnode) {
 	int fd = *fdw->fdp;
 	if (fd < 0) {
 	    continue; /* closed */
@@ -133,13 +114,11 @@ again:
 	}
     }
 
-    r = rb_thread_fd_select(max + 1, rfds, wfds, efds, iom_timeout(iom, &tv));
-    if (r < 0 && errno == EINTR && iom->gen != gen) {
-	rb_fd_term(&iom->rfds);
-	rb_fd_term(&iom->wfds);
-	rb_fd_term(&iom->efds);
-	goto again;
-    }
+    tvp = next_timeout(iom, &tv);
+
+    /* release GVL.. */
+    rb_thread_fd_select(max + 1, rfds, wfds, efds, tvp);
+    /* .. we have GVL again */
 
     return Qfalse;
 }
@@ -151,8 +130,8 @@ iom_select_end(VALUE ptr)
     rb_iom_t *iom = th->vm->iom;
     struct rb_iom_fd_waiter *fdw = NULL, *next = NULL;
 
-    iom->scheduler = NULL;
-    list_for_each_safe(&iom->fds, fdw, next, w.node) {
+    iom->selector = NULL;
+    list_for_each_safe(&iom->fds, fdw, next, w.wnode) {
 	int fd = *fdw->fdp;
 	if (fd < 0) {
 	    continue; /* closed */
@@ -169,10 +148,12 @@ iom_select_end(VALUE ptr)
 
 	/* got revents? enqueue ourselves to be run! */
 	if (fdw->revents) {
+	    VALUE fibval = fdw->w.fibval;
+
+	    fdw->w.fibval = Qfalse;
 	    list_del_init(&fdw->w.timer.tnode);
-	    list_del(&fdw->w.node);
-	    list_add_tail(&fdw->w.th->auto_fiberq, &fdw->w.node);
-	    fdw->w.th = NULL;
+	    list_del_init(&fdw->w.wnode);
+	    rb_fiber_auto_enqueue(fibval);
 	}
     }
 
@@ -190,12 +171,14 @@ iom_schedule_fd(VALUE ptr)
     rb_thread_t *th = fdw->w.th;
     rb_iom_t *iom = th->vm->iom;
 
-    rb_iom_schedule(th, &fdw->w); /* try to get some work done elsewhere */
+    write(2, "a\n", 2);
+    rb_fiber_auto_schedule(th);
+    write(2, "b\n", 2);
 
-    while (fdw->w.th != NULL) {
-	rb_thread_t *sched = iom->scheduler;
+    while (fdw->w.fibval != Qfalse) {
+	rb_thread_t *sched = iom->selector;
         if (!sched) {
-            iom->scheduler = th;
+            iom->selector = th;
             /*
              * nope, rb_iom_schedule did not do our work for us,
              * maybe we can wait on select(), ourselves:
@@ -206,11 +189,8 @@ iom_schedule_fd(VALUE ptr)
 	    /* or kick whoever is running select into doing work for us */
 	    ubf_select(sched);
 	    rb_thread_schedule_limits(0);
-	    rb_iom_schedule(th, &fdw->w);
-	    if (list_empty((struct list_head *)&fdw->w.node)) {
-		return Qfalse;
-	    }
 	}
+	rb_fiber_auto_schedule(th);
     }
     return Qfalse;
 }
@@ -225,8 +205,9 @@ rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
     fdw.fdp = fdp;
     fdw.events = (short)events;
     fdw.revents = 0;
-    waiter_init(&fdw.w, th);
-    list_add(&iom->fds, &fdw.w.node);
+    fdw.w.th = th;
+    fdw.w.fibval = rb_fiber_current();
+    list_add(&iom->fds, &fdw.w.wnode);
     rb_iom_timer_add(&iom->timers, &fdw.w.timer, timeout);
     rb_ensure(iom_schedule_fd, (VALUE)&fdw, fd_waiter_cleanup, (VALUE)&fdw);
 
@@ -245,7 +226,8 @@ rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
     VM_ASSERT((options & WNOHANG) == 0 &&
 		"WNOHANG should be handled in rb_waitpid");
 
-    waiter_init(&pw.w, th);
+    pw.w.th = th;
+    pw.w.fibval = rb_fiber_current();
     rb_iom_timer_add(&iom->timers, &pw.w.timer, timeout);
 
     return -1;
diff --git a/thread.c b/thread.c
index 4ff8510100..0bffa9c022 100644
--- a/thread.c
+++ b/thread.c
@@ -3913,7 +3913,7 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
     struct timespec *timeout = NULL;
     rb_thread_t *th = GET_THREAD();
 
-    if (rb_fiber_auto_sched_p(th)) {
+    if (0 && rb_fiber_auto_sched_p(th)) {
 	double *to = NULL;
 	double tout;
 	if (tv) {
diff --git a/vm.c b/vm.c
index 60b7bf070b..16d0c3d647 100644
--- a/vm.c
+++ b/vm.c
@@ -2513,7 +2513,7 @@ th_init(rb_thread_t *th, VALUE self)
 
     th->ec.cfp = (void *)(th->ec.stack + th->ec.stack_size);
 
-    list_head_init(&th->auto_fiberq);
+    list_head_init(&th->afhead);
 
     vm_push_frame(th, 0 /* dummy iseq */, VM_FRAME_MAGIC_DUMMY | VM_ENV_FLAG_LOCAL | VM_FRAME_FLAG_FINISH | VM_FRAME_FLAG_CFRAME /* dummy frame */,
 		  Qnil /* dummy self */, VM_BLOCK_HANDLER_NONE /* dummy block ptr */,
diff --git a/vm_core.h b/vm_core.h
index dade714b2a..05f72c3527 100644
--- a/vm_core.h
+++ b/vm_core.h
@@ -813,7 +813,7 @@ typedef struct rb_thread_struct {
     rb_fiber_t *fiber;
     rb_fiber_t *root_fiber;
     rb_jmpbuf_t root_jmpbuf;
-    struct list_head auto_fiberq;
+    struct list_head afhead; /* -rb_fiber_t.afnode */
 
     /* ensure & callcc */
     rb_ensure_list_t *ensure_list;
-- 
EW


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 5/8] ok
  2017-05-18 22:30 [PATCH 1/8] wip-iom_select Eric Wong
                   ` (2 preceding siblings ...)
  2017-05-18 22:30 ` [PATCH 4/8] omg Eric Wong
@ 2017-05-18 22:30 ` Eric Wong
  2017-05-18 22:30 ` [PATCH 6/8] wip Eric Wong
                   ` (2 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2017-05-18 22:30 UTC (permalink / raw)
  To: spew

---
 cont.c       |  5 -----
 iom_select.h | 52 +++++++++++++++++++++++++++++-----------------------
 thread.c     |  2 +-
 3 files changed, 30 insertions(+), 29 deletions(-)

diff --git a/cont.c b/cont.c
index 8282eabd81..7730ddd6e2 100644
--- a/cont.c
+++ b/cont.c
@@ -1524,7 +1524,6 @@ rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
     rb_fiber_t *fib;
     GetFiberPtr(fibval, fib);
 
-    fprintf(stderr, "resume\n");
     fiber_check_resume(fib);
     return fiber_switch(fib, argc, argv, 1);
 }
@@ -1676,7 +1675,6 @@ rb_fiber_auto_enqueue(VALUE fibval)
 
     GetFiberPtr(fibval, fib);
     GetThreadPtr(fib->cont.saved_thread.self, th);
-    fprintf(stderr, "fibenq: %p\n", fib);
     fiber_check_resume(fib);
     list_add_tail(&th->afhead, &fib->afnode);
 }
@@ -1690,10 +1688,8 @@ rb_fiber_auto_schedule(rb_thread_t *th)
     list_for_each_safe(&th->afhead, fib, next, afnode) {
 	list_del_init(&fib->afnode);
 	if (fib == current) {
-	    write(2, "self\n", 5);
 	    return;
 	}
-	fprintf(stderr, "fibswitch: %p\n", fib);
 	fiber_check_resume(fib);
 	fiber_switch(fib, 0, NULL, 1);
     }
@@ -1717,7 +1713,6 @@ rb_fiber_auto_set(VALUE self, VALUE val)
 	if (fib->afnode.next == NULL) {
 	    rb_thread_t *th;
 	    GetThreadPtr(fib->cont.saved_thread.self, th);
-	    fprintf(stderr, "fibset: %p\n", fib);
 	    fiber_check_resume(fib);
 	    list_add_tail(&th->afhead, &fib->afnode);
 	}
diff --git a/iom_select.h b/iom_select.h
index c178c8881e..cec1939c66 100644
--- a/iom_select.h
+++ b/iom_select.h
@@ -5,6 +5,7 @@
  * epoll and kqueue versions
  */
 #include "iom_common.h"
+#include "internal.h"
 
 /* allocated on heap (rb_vm_t.iom) */
 struct rb_iom_struct {
@@ -16,6 +17,7 @@ struct rb_iom_struct {
     struct list_head fds; /* -rb_iom_fd_waiter.w.wnode, FIFO order */
     struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
 
+    rb_serial_t gen;
     rb_thread_t *selector; /* thread blocked on rb_thread_fd_select */
     rb_fdset_t rfds;
     rb_fdset_t wfds;
@@ -28,6 +30,7 @@ rb_iom_create(rb_thread_t *th)
 {
     rb_iom_t *iom = ALLOC(rb_iom_t);
 
+    iom->gen = 0;
     iom->selector = NULL;
     list_head_init(&iom->timers);
     list_head_init(&iom->fds);
@@ -81,7 +84,7 @@ next_timeout(rb_iom_t *iom, struct timeval *tv)
 }
 
 static VALUE
-iom_select_begin(VALUE ptr)
+iom_select_do(VALUE ptr)
 {
     rb_thread_t *th = (rb_thread_t *)ptr;
     rb_iom_t *iom = th->vm->iom;
@@ -89,8 +92,8 @@ iom_select_begin(VALUE ptr)
     struct timeval tv, *tvp;
     struct rb_iom_fd_waiter *fdw = NULL;
     rb_fdset_t *rfds, *wfds, *efds;
-
     rfds = wfds = efds = NULL;
+
     rb_fd_init(&iom->rfds);
     rb_fd_init(&iom->wfds);
     rb_fd_init(&iom->efds);
@@ -124,7 +127,7 @@ iom_select_begin(VALUE ptr)
 }
 
 static VALUE
-iom_select_end(VALUE ptr)
+iom_select_done(VALUE ptr)
 {
     rb_thread_t *th = (rb_thread_t *)ptr;
     rb_iom_t *iom = th->vm->iom;
@@ -170,29 +173,32 @@ iom_schedule_fd(VALUE ptr)
     struct rb_iom_fd_waiter *fdw = (struct rb_iom_fd_waiter *)ptr;
     rb_thread_t *th = fdw->w.th;
     rb_iom_t *iom = th->vm->iom;
+    rb_thread_t *s;
 
-    write(2, "a\n", 2);
+    /* kick off any other idle fibers, first */
     rb_fiber_auto_schedule(th);
-    write(2, "b\n", 2);
-
-    while (fdw->w.fibval != Qfalse) {
-	rb_thread_t *sched = iom->selector;
-        if (!sched) {
-            iom->selector = th;
-            /*
-             * nope, rb_iom_schedule did not do our work for us,
-             * maybe we can wait on select(), ourselves:
-             */
-	    rb_ensure(iom_select_begin, (VALUE)th, iom_select_end, (VALUE)th);
-	}
-	else {
-	    /* or kick whoever is running select into doing work for us */
-	    ubf_select(sched);
-	    rb_thread_schedule_limits(0);
-	}
-	rb_fiber_auto_schedule(th);
+    if (fdw->w.fibval == Qfalse) {
+	return rb_fiber_yield(0, NULL);
     }
-    return Qfalse;
+
+    /* rb_fiber_auto_schedule did not activate us... */
+    s = iom->selector;
+    if (s) {
+	iom->gen++;
+	/* get current selector to watch us */
+	ubf_select(s);
+    }
+    else {
+	/* go to sleep in select() ourselves */
+	rb_serial_t gen;
+	do {
+	    gen = iom->gen;
+	    iom->selector = th;
+	    rb_ensure(iom_select_do, (VALUE)th, iom_select_done, (VALUE)th);
+	    rb_fiber_auto_schedule(th);
+	} while (gen != iom->gen);
+    }
+    return rb_fiber_yield(0, NULL);
 }
 
 int
diff --git a/thread.c b/thread.c
index 0bffa9c022..4ff8510100 100644
--- a/thread.c
+++ b/thread.c
@@ -3913,7 +3913,7 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
     struct timespec *timeout = NULL;
     rb_thread_t *th = GET_THREAD();
 
-    if (0 && rb_fiber_auto_sched_p(th)) {
+    if (rb_fiber_auto_sched_p(th)) {
 	double *to = NULL;
 	double tout;
 	if (tv) {
-- 
EW


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 6/8] wip
  2017-05-18 22:30 [PATCH 1/8] wip-iom_select Eric Wong
                   ` (3 preceding siblings ...)
  2017-05-18 22:30 ` [PATCH 5/8] ok Eric Wong
@ 2017-05-18 22:30 ` Eric Wong
  2017-05-18 22:30 ` [PATCH 7/8] wip-fib-join Eric Wong
  2017-05-18 22:30 ` [PATCH 8/8] seems to work Eric Wong
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2017-05-18 22:30 UTC (permalink / raw)
  To: spew

---
 cont.c       | 11 ++++++++++-
 iom_select.h | 42 ++++++++++++++++++++++++++++++++----------
 2 files changed, 42 insertions(+), 11 deletions(-)

diff --git a/cont.c b/cont.c
index 7730ddd6e2..52f34500c6 100644
--- a/cont.c
+++ b/cont.c
@@ -1684,10 +1684,19 @@ rb_fiber_auto_schedule(rb_thread_t *th)
 {
     rb_fiber_t *fib = NULL, *next = NULL;
     rb_fiber_t *current = fiber_current();
+    struct list_head tmp;
 
-    list_for_each_safe(&th->afhead, fib, next, afnode) {
+    /*
+     * do not infinite loop as new fibers get added to
+     * th->afhead, only work off a temporary list:
+     */
+    list_append_list(&tmp, &th->afhead);
+    list_for_each_safe(&tmp, fib, next, afnode) {
 	list_del_init(&fib->afnode);
+
+	/* resume current fiber naturally without switch */
 	if (fib == current) {
+	    list_prepend_list(&th->afhead, &tmp);
 	    return;
 	}
 	fiber_check_resume(fib);
diff --git a/iom_select.h b/iom_select.h
index cec1939c66..4da443de34 100644
--- a/iom_select.h
+++ b/iom_select.h
@@ -24,6 +24,15 @@ struct rb_iom_struct {
     rb_fdset_t efds;
 };
 
+
+struct wait_select {
+    struct list_node wsnode;
+    union {
+	rb_thread_t *th;
+	VALUE run_select; /* or yield */
+    } as;
+};
+
 /* we lazily create this, small scripts may never need iom */
 static rb_iom_t *
 rb_iom_create(rb_thread_t *th)
@@ -177,28 +186,41 @@ iom_schedule_fd(VALUE ptr)
 
     /* kick off any other idle fibers, first */
     rb_fiber_auto_schedule(th);
+
     if (fdw->w.fibval == Qfalse) {
+	/* inside th->afhead, wait for somebody to schedule us */
 	return rb_fiber_yield(0, NULL);
     }
 
     /* rb_fiber_auto_schedule did not activate us... */
+try_select:
     s = iom->selector;
     if (s) {
-	iom->gen++;
-	/* get current selector to watch us */
+	/* another thread is calling select(), kick them: */
+	struct wait_select ws;
+
+	ws.as.th = th;
+	list_add_tail(&iom->swaitq, &ws.wsnode);
 	ubf_select(s);
+	do {
+	    rb_thread_sleep_deadly();
+	} while (ws.as.th == th);
+
+	if (ws.as.run_select == Qfalse) {
+	    return rb_fiber_yield(0, NULL);
+	}
+	goto try_select;
     }
     else {
 	/* go to sleep in select() ourselves */
-	rb_serial_t gen;
-	do {
-	    gen = iom->gen;
-	    iom->selector = th;
-	    rb_ensure(iom_select_do, (VALUE)th, iom_select_done, (VALUE)th);
-	    rb_fiber_auto_schedule(th);
-	} while (gen != iom->gen);
+	iom->selector = th;
+	rb_ensure(iom_select_do, (VALUE)th, iom_select_done, (VALUE)th);
+	list_for_each_safe(&iom->swaitq,
+
+	rb_fiber_auto_schedule(th);
+	if (fdw->w.fibval == Qfalse) {
+	    return rb_fiber_yield(0, NULL);
     }
-    return rb_fiber_yield(0, NULL);
 }
 
 int
-- 
EW


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 7/8] wip-fib-join
  2017-05-18 22:30 [PATCH 1/8] wip-iom_select Eric Wong
                   ` (4 preceding siblings ...)
  2017-05-18 22:30 ` [PATCH 6/8] wip Eric Wong
@ 2017-05-18 22:30 ` Eric Wong
  2017-05-18 22:30 ` [PATCH 8/8] seems to work Eric Wong
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2017-05-18 22:30 UTC (permalink / raw)
  To: spew

require 'net/http'
require 'uri'
require 'digest/sha1'
require 'fiber'
url = 'http://80x24.org/git-i-forgot-to-pack/objects/pack/pack-97b25a76c03b489d4cbbd85b12d0e1ad28717e55.pack'
url = 'http://80x24.org/git-i-forgot-to-pack/objects/pack/pack-97b25a76c03b489d4cbbd85b12d0e1ad28717e55.idx'
uri = URI(url)
use_ssl = "https" == uri.scheme
fibs = 10.times.map do
  Fiber.new do
    cur = Fiber.current.object_id
    Net::HTTP.start(uri.host, uri.port, use_ssl: use_ssl) do |http|
      req = Net::HTTP::Get.new(uri)
      http.request(req) do |res|
	dig = Digest::SHA1.new
	res.read_body do |buf|
	  dig.update(buf)
	  #warn "#{cur} #{buf.bytesize}\n"
	end
	warn "#{cur} #{dig.hexdigest}\n"
      end
    end
    :done
  end
end

fibs.each { |f| f.auto = true }
while true
  fibs.each { |f| f.join }
  fibs.keep_if(&:alive?)
end until fibs.empty?
---
 configure.in |   2 +-
 cont.c       |  71 +++++++++++++++++++++++++---
 iom_common.h |   4 +-
 iom_select.h | 149 ++++++++++++++++++++++++++++++++++++++++++++---------------
 vm.c         |   5 +-
 vm_core.h    |   2 -
 6 files changed, 182 insertions(+), 51 deletions(-)

diff --git a/configure.in b/configure.in
index cc008dbebf..9c5def88ed 100644
--- a/configure.in
+++ b/configure.in
@@ -523,7 +523,7 @@ AS_CASE(["$target_os"],
 AC_SUBST(LD)
 if test "$GCC" = yes; then
     linker_flag=-Wl,
-    : ${optflags=-O3}
+    : ${optflags=-O0}
     gcc_major=`echo =__GNUC__ | $CC -E -xc - | sed '/^=/!d;s///'`
     gcc_minor=`echo =__GNUC_MINOR__ | $CC -E -xc - | sed '/^=/!d;s///'`
     test -n "$gcc_major" || gcc_major=0
diff --git a/cont.c b/cont.c
index 52f34500c6..bf06eac355 100644
--- a/cont.c
+++ b/cont.c
@@ -1524,6 +1524,10 @@ rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
     rb_fiber_t *fib;
     GetFiberPtr(fibval, fib);
 
+    if (fib->afnode.next) {
+	list_del_init(&fib->afnode);
+    }
+    /* fprintf(stderr, "resume: %p\n", fib); */
     fiber_check_resume(fib);
     return fiber_switch(fib, argc, argv, 1);
 }
@@ -1675,33 +1679,47 @@ rb_fiber_auto_enqueue(VALUE fibval)
 
     GetFiberPtr(fibval, fib);
     GetThreadPtr(fib->cont.saved_thread.self, th);
-    fiber_check_resume(fib);
+    /* fprintf(stderr, "autoenq: %p\n", fib); */
     list_add_tail(&th->afhead, &fib->afnode);
 }
 
 void
+rb_fiber_auto_schedule_mark(const rb_thread_t *th)
+{
+    rb_fiber_t *fib = NULL;
+    list_for_each(&th->afhead, fib, afnode) {
+	rb_gc_mark(fib->cont.self);
+    }
+}
+
+VALUE
 rb_fiber_auto_schedule(rb_thread_t *th)
 {
     rb_fiber_t *fib = NULL, *next = NULL;
     rb_fiber_t *current = fiber_current();
-    struct list_head tmp;
+    LIST_HEAD(tmp);
 
     /*
      * do not infinite loop as new fibers get added to
      * th->afhead, only work off a temporary list:
      */
     list_append_list(&tmp, &th->afhead);
+    /* fprintf(stderr, "current: %p\n", current); */
+    /* list_for_each_safe(&tmp, fib, next, afnode) { */
+	/* fprintf(stderr, "fib: %p\n", fib); */
+    /* } */
     list_for_each_safe(&tmp, fib, next, afnode) {
 	list_del_init(&fib->afnode);
 
-	/* resume current fiber naturally without switch */
 	if (fib == current) {
 	    list_prepend_list(&th->afhead, &tmp);
-	    return;
+	    return Qtrue;
 	}
+	/* fprintf(stderr, "autosched: %p\n", fib); */
 	fiber_check_resume(fib);
 	fiber_switch(fib, 0, NULL, 1);
     }
+    return Qfalse;
 }
 
 static VALUE
@@ -1720,10 +1738,9 @@ rb_fiber_auto_set(VALUE self, VALUE val)
     GetFiberPtr(self, fib);
     if (RTEST(val)) {
 	if (fib->afnode.next == NULL) {
-	    rb_thread_t *th;
-	    GetThreadPtr(fib->cont.saved_thread.self, th);
+	    list_head_init(&fib->afnode);
 	    fiber_check_resume(fib);
-	    list_add_tail(&th->afhead, &fib->afnode);
+	    return fiber_switch(fib, 0, NULL, 1);
 	}
 	return Qtrue;
     }
@@ -1736,6 +1753,45 @@ rb_fiber_auto_set(VALUE self, VALUE val)
     }
 }
 
+void rb_iom_schedule(rb_thread_t *);
+
+static VALUE
+fiber_join(rb_fiber_t *fib, double delay)
+{
+    rb_thread_t *th = GET_THREAD();
+    rb_fiber_t *cur = fiber_current();
+
+    if (cur == fib) {
+	rb_raise(rb_eFiberError, "Target fiber must not be current fiber");
+    }
+    if (th->root_fiber == fib) {
+	rb_raise(rb_eFiberError, "Target fiber must not be root fiber");
+    }
+    if (fib->cont.saved_thread.self != th->self) {
+	rb_raise(rb_eFiberError, "Target fiber not owned by current thread");
+    }
+    while (fib->status != TERMINATED) {
+	rb_iom_schedule(th);
+    }
+    return fib->cont.self;
+}
+
+static VALUE
+rb_fiber_join(int argc, VALUE *argv, VALUE self)
+{
+    rb_fiber_t *fib;
+    double delay = -1;
+    VALUE limit;
+
+    GetFiberPtr(self, fib);
+    rb_scan_args(argc, argv, "01", &limit);
+    if (!NIL_P(limit)) {
+	delay = rb_num2dbl(limit);
+    }
+
+    return fiber_join(fib, delay);
+}
+
 int
 rb_fiber_auto_sched_p(const rb_thread_t *th)
 {
@@ -1781,6 +1837,7 @@ Init_Cont(void)
     rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1);
     rb_define_method(rb_cFiber, "auto", rb_fiber_auto_get, 0);
     rb_define_method(rb_cFiber, "auto=", rb_fiber_auto_set, 1);
+    rb_define_method(rb_cFiber, "join", rb_fiber_join, -1);
 }
 
 RUBY_SYMBOL_EXPORT_BEGIN
diff --git a/iom_common.h b/iom_common.h
index c70b45499f..5a586100b3 100644
--- a/iom_common.h
+++ b/iom_common.h
@@ -5,7 +5,7 @@
 
 /* cont.c */
 void rb_fiber_auto_enqueue(VALUE fibval);
-void rb_fiber_auto_schedule(rb_thread_t *);
+VALUE rb_fiber_auto_schedule(rb_thread_t *);
 
 /* allocated on stack */
 struct rb_iom_timer {
@@ -17,7 +17,7 @@ struct rb_iom_waiter {
     struct list_node wnode; /* <=> rb_iom_struct.(fds|pids) */
     struct rb_iom_timer timer;
     rb_thread_t *th;
-    VALUE fibval;
+    VALUE fibval; /* Qfalse: enqueued in afhead */
 };
 
 struct rb_iom_fd_waiter {
diff --git a/iom_select.h b/iom_select.h
index 4da443de34..a42643d73d 100644
--- a/iom_select.h
+++ b/iom_select.h
@@ -16,19 +16,20 @@ struct rb_iom_struct {
     struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
     struct list_head fds; /* -rb_iom_fd_waiter.w.wnode, FIFO order */
     struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
+    struct list_head swaitq; /* select_wait.swnode */
 
-    rb_serial_t gen;
+    rb_serial_t select_start;
+    rb_serial_t select_gen;
     rb_thread_t *selector; /* thread blocked on rb_thread_fd_select */
     rb_fdset_t rfds;
     rb_fdset_t wfds;
     rb_fdset_t efds;
 };
 
-
-struct wait_select {
-    struct list_node wsnode;
+struct select_wait {
+    struct list_node swnode;
     union {
-	rb_thread_t *th;
+	VALUE thval;
 	VALUE run_select; /* or yield */
     } as;
 };
@@ -39,11 +40,13 @@ rb_iom_create(rb_thread_t *th)
 {
     rb_iom_t *iom = ALLOC(rb_iom_t);
 
-    iom->gen = 0;
+    iom->select_start = 0;
+    iom->select_gen = 0;
     iom->selector = NULL;
     list_head_init(&iom->timers);
     list_head_init(&iom->fds);
     list_head_init(&iom->pids);
+    list_head_init(&iom->swaitq);
 
     return iom;
 }
@@ -76,7 +79,9 @@ fd_waiter_cleanup(VALUE ptr)
 static struct timeval *
 next_timeout(rb_iom_t *iom, struct timeval *tv)
 {
-    struct rb_iom_timer *t = list_top(&iom->timers, struct rb_iom_timer, tnode);
+    struct rb_iom_timer *t;
+
+    t = list_top(&iom->timers, struct rb_iom_timer, tnode);
 
     if (t) {
 	double diff = t->expires_at - timeofday();
@@ -141,6 +146,9 @@ iom_select_done(VALUE ptr)
     rb_thread_t *th = (rb_thread_t *)ptr;
     rb_iom_t *iom = th->vm->iom;
     struct rb_iom_fd_waiter *fdw = NULL, *next = NULL;
+    struct select_wait *sw;
+    VALUE thselect;
+    VALUE current = rb_fiber_current();
 
     iom->selector = NULL;
     list_for_each_safe(&iom->fds, fdw, next, w.wnode) {
@@ -165,7 +173,9 @@ iom_select_done(VALUE ptr)
 	    fdw->w.fibval = Qfalse;
 	    list_del_init(&fdw->w.timer.tnode);
 	    list_del_init(&fdw->w.wnode);
-	    rb_fiber_auto_enqueue(fibval);
+	    if (fibval != current) {
+		rb_fiber_auto_enqueue(fibval);
+	    }
 	}
     }
 
@@ -173,53 +183,99 @@ iom_select_done(VALUE ptr)
     rb_fd_term(&iom->wfds);
     rb_fd_term(&iom->efds);
 
+    /*
+     * did we have anybody write to `fds` while we were inside select?
+     * if so, designate the last `fds` writer to call select() again:
+     */
+    do {
+	sw = list_tail(&iom->swaitq, struct select_wait, swnode);
+
+	if (!sw) {
+	    return Qfalse;
+	}
+	/* designate the next select() thread: */
+	thselect = sw->as.thval;
+	list_del_init(&sw->swnode);
+	sw->as.run_select = Qtrue;
+	/* it's gotta be alive to do run select(), otherwise find another */
+    } while (NIL_P(rb_thread_wakeup_alive(thselect)));
+
+    if (sw) {
+	struct select_wait *swnext = NULL;
+
+	/* everybody else will do rb_fiber_yield: */
+	list_for_each_safe(&iom->swaitq, sw, swnext, swnode) {
+	    VALUE thyield = sw->as.thval;
+	    list_del_init(&sw->swnode);
+	    sw->as.run_select = Qfalse;
+	    rb_thread_wakeup_alive(thyield);
+	}
+    }
+
+    return Qfalse;
+}
+
+static VALUE
+select_wait_sleep(VALUE ptr)
+{
+    struct select_wait *sw = (struct select_wait *)ptr;
+    VALUE th = sw->as.thval;
+
+    do {
+	rb_thread_sleep_deadly();
+    } while (sw->as.thval == th);
+    return Qfalse;
+}
+
+static VALUE
+select_wait_done(VALUE ptr)
+{
+    struct select_wait *sw = (struct select_wait *)ptr;
+    list_del(&sw->swnode);
     return Qfalse;
 }
 
 static VALUE
 iom_schedule_fd(VALUE ptr)
 {
-    struct rb_iom_fd_waiter *fdw = (struct rb_iom_fd_waiter *)ptr;
-    rb_thread_t *th = fdw->w.th;
-    rb_iom_t *iom = th->vm->iom;
-    rb_thread_t *s;
+    return rb_fiber_yield(0, NULL);
+}
 
-    /* kick off any other idle fibers, first */
+void
+rb_iom_schedule(rb_thread_t *th)
+{
+    rb_iom_t *iom = rb_iom_get(th);
+    rb_thread_t *s;
     rb_fiber_auto_schedule(th);
+    /* fprintf(stderr, "empty? %d\n", !!list_empty(&th->afhead)); */
 
-    if (fdw->w.fibval == Qfalse) {
-	/* inside th->afhead, wait for somebody to schedule us */
-	return rb_fiber_yield(0, NULL);
-    }
-
-    /* rb_fiber_auto_schedule did not activate us... */
 try_select:
     s = iom->selector;
-    if (s) {
-	/* another thread is calling select(), kick them: */
-	struct wait_select ws;
-
-	ws.as.th = th;
-	list_add_tail(&iom->swaitq, &ws.wsnode);
-	ubf_select(s);
-	do {
-	    rb_thread_sleep_deadly();
-	} while (ws.as.th == th);
-
-	if (ws.as.run_select == Qfalse) {
-	    return rb_fiber_yield(0, NULL);
+    if (s) { /* somebody else is running select() */
+	/*
+	 * if our watch set changed after select() started,
+	 * we need to kick it and restart with the new set
+	 */
+	int need_kick = iom->select_start != iom->select_gen;
+	struct select_wait sw;
+
+	sw.as.thval = th->self;
+	list_add_tail(&iom->swaitq, &sw.swnode);
+
+	if (need_kick) {
+	    ubf_select(s);
+	}
+	rb_ensure(select_wait_sleep, (VALUE)&sw, select_wait_done, (VALUE)&sw);
+	if (need_kick) {
+	    goto try_select;
 	}
-	goto try_select;
     }
     else {
-	/* go to sleep in select() ourselves */
 	iom->selector = th;
+	iom->select_start = iom->select_gen;
 	rb_ensure(iom_select_do, (VALUE)th, iom_select_done, (VALUE)th);
-	list_for_each_safe(&iom->swaitq,
-
+	rb_iom_timer_check(&iom->timers);
 	rb_fiber_auto_schedule(th);
-	if (fdw->w.fibval == Qfalse) {
-	    return rb_fiber_yield(0, NULL);
     }
 }
 
@@ -235,8 +291,16 @@ rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
     fdw.revents = 0;
     fdw.w.th = th;
     fdw.w.fibval = rb_fiber_current();
+
+    /*
+     * order probably doesn't matter for FDs, but maybe LIFO is better
+     * since later Fibers tend to have higher FDs, and putting higher
+     * FDs first means rb_fd_set() needs to do fewer reallocs
+     */
     list_add(&iom->fds, &fdw.w.wnode);
+
     rb_iom_timer_add(&iom->timers, &fdw.w.timer, timeout);
+    iom->select_gen++;
     rb_ensure(iom_schedule_fd, (VALUE)&fdw, fd_waiter_cleanup, (VALUE)&fdw);
 
     if (*fdp < 0) return -1;
@@ -254,9 +318,18 @@ rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
     VM_ASSERT((options & WNOHANG) == 0 &&
 		"WNOHANG should be handled in rb_waitpid");
 
+    pw.pid = pid;
+    pw.options = options;
     pw.w.th = th;
     pw.w.fibval = rb_fiber_current();
     rb_iom_timer_add(&iom->timers, &pw.w.timer, timeout);
 
+    /* LIFO, matches Linux wait4() behavior */
+    list_add(&iom->pids, &pw.w.wnode);
+
+    /* do not touch select_gen, this has nothing to do with select() */
+
+    /* rb_ensure(iom_schedule_pid, (VALUE)&pw, pid_waiter_cleanup, (VALUE)&pw); */
+
     return -1;
 }
diff --git a/vm.c b/vm.c
index 16d0c3d647..90a24b4a9d 100644
--- a/vm.c
+++ b/vm.c
@@ -2345,6 +2345,7 @@ rb_thread_recycle_stack_release(VALUE *stack)
 }
 
 void rb_fiber_mark_self(rb_fiber_t *fib);
+void rb_fiber_auto_schedule_mark(const rb_thread_t *th);
 
 void
 rb_thread_mark(void *ptr)
@@ -2387,6 +2388,9 @@ rb_thread_mark(void *ptr)
     RUBY_MARK_UNLESS_NULL(th->top_wrapper);
     rb_fiber_mark_self(th->fiber);
     rb_fiber_mark_self(th->root_fiber);
+    if (th->afhead.n.next && !list_empty(&th->afhead)) {
+	rb_fiber_auto_schedule_mark(th);
+    }
     RUBY_MARK_UNLESS_NULL(th->stat_insn_usage);
     RUBY_MARK_UNLESS_NULL(th->last_status);
 
@@ -3105,7 +3109,6 @@ Init_BareVM(void)
     }
     MEMZERO(th, rb_thread_t, 1);
     rb_thread_set_current_raw(th);
-
     vm_init2(vm);
     vm->objspace = rb_objspace_alloc();
     ruby_current_vm = vm;
diff --git a/vm_core.h b/vm_core.h
index 05f72c3527..c74314a743 100644
--- a/vm_core.h
+++ b/vm_core.h
@@ -491,9 +491,7 @@ typedef struct rb_vm_struct {
 
     struct rb_thread_struct *main_thread;
     struct rb_thread_struct *running_thread;
-
     struct rb_iom_struct *iom;
-
     struct list_head living_threads;
     size_t living_thread_num;
     VALUE thgroup_default;
-- 
EW


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 8/8] seems to work...
  2017-05-18 22:30 [PATCH 1/8] wip-iom_select Eric Wong
                   ` (5 preceding siblings ...)
  2017-05-18 22:30 ` [PATCH 7/8] wip-fib-join Eric Wong
@ 2017-05-18 22:30 ` Eric Wong
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2017-05-18 22:30 UTC (permalink / raw)
  To: spew

require 'net/http'
require 'uri'
require 'digest/sha1'
require 'fiber'
url = 'http://80x24.org/git-i-forgot-to-pack/objects/pack/pack-97b25a76c03b489d4cbbd85b12d0e1ad28717e55.idx'
uri = URI(url)
use_ssl = "https" == uri.scheme
fibs = 30.times.map do
  Fiber.new do
    cur = Fiber.current.object_id
    Net::HTTP.start(uri.host, uri.port, use_ssl: use_ssl) do |http|
      req = Net::HTTP::Get.new(uri)
      http.request(req) do |res|
	dig = Digest::SHA1.new
	res.read_body do |buf|
	  dig.update(buf)
	  #warn "#{cur} #{buf.bytesize}\n"
	end
	warn "#{cur} #{dig.hexdigest}\n"
      end
    end
    warn "done\n"
    :done
  end
end

fibs.each { |f| f.auto = true }

fibs[0].join

warn "the rest\n"
until fibs.empty?
  fibs.each { |f| f.join }
  fibs.keep_if(&:alive?)
  warn fibs.inspect
end
---
 cont.c       |  63 ++++++++++++++++-----------------
 iom_common.h |   8 ++---
 iom_select.h | 112 ++++++++++++++++++++++++++++++++++++++++++-----------------
 3 files changed, 116 insertions(+), 67 deletions(-)

diff --git a/cont.c b/cont.c
index bf06eac355..4d4927a0fa 100644
--- a/cont.c
+++ b/cont.c
@@ -128,7 +128,7 @@ struct rb_fiber_struct {
     struct rb_fiber_struct *prev;
 
     /*
-     * afnode.next == NULL       auto fiber disabled
+     * afnode.next == 0          auto fiber disabled
      * afnode.next == &afnode    auto fiber enabled, but not enqueued (running)
      * afnode.next != &afnode    auto fiber runnable (enqueued in th->afhead)
      */
@@ -1527,7 +1527,6 @@ rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
     if (fib->afnode.next) {
 	list_del_init(&fib->afnode);
     }
-    /* fprintf(stderr, "resume: %p\n", fib); */
     fiber_check_resume(fib);
     return fiber_switch(fib, argc, argv, 1);
 }
@@ -1679,24 +1678,36 @@ rb_fiber_auto_enqueue(VALUE fibval)
 
     GetFiberPtr(fibval, fib);
     GetThreadPtr(fib->cont.saved_thread.self, th);
-    /* fprintf(stderr, "autoenq: %p\n", fib); */
     list_add_tail(&th->afhead, &fib->afnode);
 }
 
 void
 rb_fiber_auto_schedule_mark(const rb_thread_t *th)
 {
-    rb_fiber_t *fib = NULL;
+    rb_fiber_t *fib = 0;
     list_for_each(&th->afhead, fib, afnode) {
 	rb_gc_mark(fib->cont.self);
     }
 }
 
-VALUE
-rb_fiber_auto_schedule(rb_thread_t *th)
+/* Returns true if auto-fiber is enabled for current fiber */
+int
+rb_fiber_auto_sched_p(const rb_thread_t *th)
+{
+    const rb_fiber_t *cur = th->fiber;
+
+    return (cur && cur->afnode.next && th->root_fiber != cur);
+}
+
+/*
+ * Resumes any ready fibers in the auto-Fiber run queue
+ * returns true if yielding current Fiber is needed, false if not
+ */
+int
+rb_fiber_auto_do_yield_p(rb_thread_t *th)
 {
-    rb_fiber_t *fib = NULL, *next = NULL;
-    rb_fiber_t *current = fiber_current();
+    rb_fiber_t *current_auto = rb_fiber_auto_sched_p(th) ? th->fiber : 0;
+    rb_fiber_t *fib = 0, *next = 0;
     LIST_HEAD(tmp);
 
     /*
@@ -1704,22 +1715,17 @@ rb_fiber_auto_schedule(rb_thread_t *th)
      * th->afhead, only work off a temporary list:
      */
     list_append_list(&tmp, &th->afhead);
-    /* fprintf(stderr, "current: %p\n", current); */
-    /* list_for_each_safe(&tmp, fib, next, afnode) { */
-	/* fprintf(stderr, "fib: %p\n", fib); */
-    /* } */
     list_for_each_safe(&tmp, fib, next, afnode) {
-	list_del_init(&fib->afnode);
-
-	if (fib == current) {
+	if (fib == current_auto || (fib->prev && fib != th->root_fiber)) {
+	    /* tell the caller to yield */
 	    list_prepend_list(&th->afhead, &tmp);
-	    return Qtrue;
+	    return 1;
 	}
-	/* fprintf(stderr, "autosched: %p\n", fib); */
+	list_del_init(&fib->afnode);
 	fiber_check_resume(fib);
-	fiber_switch(fib, 0, NULL, 1);
+	fiber_switch(fib, 0, 0, 1);
     }
-    return Qfalse;
+    return 0;
 }
 
 static VALUE
@@ -1737,17 +1743,20 @@ rb_fiber_auto_set(VALUE self, VALUE val)
     rb_fiber_t *fib;
     GetFiberPtr(self, fib);
     if (RTEST(val)) {
-	if (fib->afnode.next == NULL) {
-	    list_head_init(&fib->afnode);
+	if (fib->afnode.next == 0) {
+	    /* rb_thread_t *th = GET_THREAD(); */
+	    /* fprintf(stderr, "auto=: %p\n", fib); */
+	    /* list_add_tail(&th->afhead, &fib->afnode); */
+	    list_node_init(&fib->afnode);
 	    fiber_check_resume(fib);
-	    return fiber_switch(fib, 0, NULL, 1);
+	    return fiber_switch(fib, 0, 0, 1);
 	}
 	return Qtrue;
     }
     else {
 	if (fib->afnode.next) {
 	    list_del_init(&fib->afnode);
-	    fib->afnode.next = NULL;
+	    fib->afnode.next = 0;
 	}
 	return Qfalse;
     }
@@ -1792,14 +1801,6 @@ rb_fiber_join(int argc, VALUE *argv, VALUE self)
     return fiber_join(fib, delay);
 }
 
-int
-rb_fiber_auto_sched_p(const rb_thread_t *th)
-{
-    const rb_fiber_t *cur = th->fiber;
-
-    return (cur && cur->afnode.next && th->root_fiber != cur);
-}
-
 /*
  *  Document-class: FiberError
  *
diff --git a/iom_common.h b/iom_common.h
index 5a586100b3..e832944ff4 100644
--- a/iom_common.h
+++ b/iom_common.h
@@ -5,7 +5,7 @@
 
 /* cont.c */
 void rb_fiber_auto_enqueue(VALUE fibval);
-VALUE rb_fiber_auto_schedule(rb_thread_t *);
+int rb_fiber_auto_do_yield_p(rb_thread_t *);
 
 /* allocated on stack */
 struct rb_iom_timer {
@@ -14,10 +14,10 @@ struct rb_iom_timer {
 };
 
 struct rb_iom_waiter {
-    struct list_node wnode; /* <=> rb_iom_struct.(fds|pids) */
     struct rb_iom_timer timer;
     rb_thread_t *th;
     VALUE fibval; /* Qfalse: enqueued in afhead */
+    struct list_node wnode; /* <=> rb_iom_struct.(fds|pids) */
 };
 
 struct rb_iom_fd_waiter {
@@ -38,7 +38,7 @@ struct rb_iom_pid_waiter {
 static void
 rb_iom_timer_check(struct list_head *timers)
 {
-    struct rb_iom_timer *i = NULL, *next = NULL;
+    struct rb_iom_timer *i = 0, *next = 0;
     double now = timeofday();
 
     list_for_each_safe(timers, i, next, tnode) {
@@ -64,7 +64,7 @@ rb_iom_timer_add(struct list_head *timers, struct rb_iom_timer *add,
 {
     rb_iom_timer_check(timers);
     if (timeout) {
-	struct rb_iom_timer *i = NULL;
+	struct rb_iom_timer *i = 0;
 	add->expires_at = timeofday() + *timeout;
 
 	/*
diff --git a/iom_select.h b/iom_select.h
index a42643d73d..4913b4e378 100644
--- a/iom_select.h
+++ b/iom_select.h
@@ -16,16 +16,21 @@ struct rb_iom_struct {
     struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
     struct list_head fds; /* -rb_iom_fd_waiter.w.wnode, FIFO order */
     struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
+
+    /* threads waiting for selector thread to finish */
     struct list_head swaitq; /* select_wait.swnode */
 
-    rb_serial_t select_start;
-    rb_serial_t select_gen;
-    rb_thread_t *selector; /* thread blocked on rb_thread_fd_select */
+    rb_serial_t select_start; /* WR protected by GVL (no need to protect RD) */
+    rb_serial_t select_gen; /* RDWR protected by GVL */
+    rb_thread_t *selector; /* rb_thread_fd_select owner (acts as lock w/ GVL */
+
+    /* these could be on stack, but they're huge on some platforms: */
     rb_fdset_t rfds;
     rb_fdset_t wfds;
     rb_fdset_t efds;
 };
 
+/* allocated on stack */
 struct select_wait {
     struct list_node swnode;
     union {
@@ -42,7 +47,7 @@ rb_iom_create(rb_thread_t *th)
 
     iom->select_start = 0;
     iom->select_gen = 0;
-    iom->selector = NULL;
+    iom->selector = 0;
     list_head_init(&iom->timers);
     list_head_init(&iom->fds);
     list_head_init(&iom->pids);
@@ -94,7 +99,7 @@ next_timeout(rb_iom_t *iom, struct timeval *tv)
 	}
 	return tv;
     }
-    return NULL;
+    return 0;
 }
 
 static VALUE
@@ -104,10 +109,15 @@ iom_select_do(VALUE ptr)
     rb_iom_t *iom = th->vm->iom;
     int max = -1;
     struct timeval tv, *tvp;
-    struct rb_iom_fd_waiter *fdw = NULL;
+    struct rb_iom_fd_waiter *fdw = 0;
     rb_fdset_t *rfds, *wfds, *efds;
-    rfds = wfds = efds = NULL;
 
+    if (list_empty(&iom->fds) && list_empty(&iom->timers)) {
+	rb_thread_schedule();
+	return Qfalse;
+    }
+
+    rfds = wfds = efds = 0;
     rb_fd_init(&iom->rfds);
     rb_fd_init(&iom->wfds);
     rb_fd_init(&iom->efds);
@@ -145,12 +155,11 @@ iom_select_done(VALUE ptr)
 {
     rb_thread_t *th = (rb_thread_t *)ptr;
     rb_iom_t *iom = th->vm->iom;
-    struct rb_iom_fd_waiter *fdw = NULL, *next = NULL;
+    struct rb_iom_fd_waiter *fdw = 0, *next = 0;
     struct select_wait *sw;
     VALUE thselect;
-    VALUE current = rb_fiber_current();
 
-    iom->selector = NULL;
+    iom->selector = 0;
     list_for_each_safe(&iom->fds, fdw, next, w.wnode) {
 	int fd = *fdw->fdp;
 	if (fd < 0) {
@@ -173,9 +182,7 @@ iom_select_done(VALUE ptr)
 	    fdw->w.fibval = Qfalse;
 	    list_del_init(&fdw->w.timer.tnode);
 	    list_del_init(&fdw->w.wnode);
-	    if (fibval != current) {
-		rb_fiber_auto_enqueue(fibval);
-	    }
+	    rb_fiber_auto_enqueue(fibval);
 	}
     }
 
@@ -201,12 +208,11 @@ iom_select_done(VALUE ptr)
     } while (NIL_P(rb_thread_wakeup_alive(thselect)));
 
     if (sw) {
-	struct select_wait *swnext = NULL;
+	struct select_wait *swnext = 0;
 
 	/* everybody else will do rb_fiber_yield: */
 	list_for_each_safe(&iom->swaitq, sw, swnext, swnode) {
 	    VALUE thyield = sw->as.thval;
-	    list_del_init(&sw->swnode);
 	    sw->as.run_select = Qfalse;
 	    rb_thread_wakeup_alive(thyield);
 	}
@@ -235,19 +241,14 @@ select_wait_done(VALUE ptr)
     return Qfalse;
 }
 
-static VALUE
-iom_schedule_fd(VALUE ptr)
-{
-    return rb_fiber_yield(0, NULL);
-}
-
 void
 rb_iom_schedule(rb_thread_t *th)
 {
     rb_iom_t *iom = rb_iom_get(th);
     rb_thread_t *s;
-    rb_fiber_auto_schedule(th);
-    /* fprintf(stderr, "empty? %d\n", !!list_empty(&th->afhead)); */
+    if (rb_fiber_auto_do_yield_p(th)) {
+	rb_fiber_yield(0, 0);
+    }
 
 try_select:
     s = iom->selector;
@@ -275,8 +276,57 @@ try_select:
 	iom->select_start = iom->select_gen;
 	rb_ensure(iom_select_do, (VALUE)th, iom_select_done, (VALUE)th);
 	rb_iom_timer_check(&iom->timers);
-	rb_fiber_auto_schedule(th);
+	if (rb_fiber_auto_do_yield_p(th)) {
+	    rb_fiber_yield(0, 0);
+	}
+    }
+}
+
+static VALUE
+iom_schedule_fd(VALUE ptr)
+{
+    struct rb_iom_fd_waiter *fdw = (struct rb_iom_fd_waiter *)ptr;
+    rb_thread_t *th = fdw->w.th;
+    rb_iom_t *iom = rb_iom_get(th);
+    rb_thread_t *s;
+
+    if (rb_fiber_auto_do_yield_p(th)) {
+	return rb_fiber_yield(0, 0);
+    }
+
+try_select:
+    s = iom->selector;
+    if (s) { /* somebody else is running select() */
+	/*
+	 * if our watch set changed after select() started,
+	 * we need to kick it and restart with the new set
+	 */
+	struct select_wait sw;
+
+	return rb_fiber_yield(0, 0);
+	sw.as.thval = th->self;
+	list_add_tail(&iom->swaitq, &sw.swnode);
+	ubf_select(s);
+	rb_ensure(select_wait_sleep, (VALUE)&sw, select_wait_done, (VALUE)&sw);
+	if (sw.as.run_select == Qfalse) {
+	    return rb_fiber_yield(0, 0);
+	}
+	goto try_select;
+    }
+    else {
+	iom->selector = th;
+	iom->select_start = iom->select_gen;
+	rb_ensure(iom_select_do, (VALUE)th, iom_select_done, (VALUE)th);
+	rb_iom_timer_check(&iom->timers);
+	if (rb_fiber_auto_do_yield_p(th)) {
+	    return rb_fiber_yield(0, 0);
+	}
+    }
+    return rb_fiber_yield(0, 0);
+    if (fdw->w.fibval != Qfalse && th->fiber != th->root_fiber) {
+	return rb_fiber_yield(0, 0);
     }
+    return Qfalse;
 }
 
 int
@@ -292,12 +342,8 @@ rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
     fdw.w.th = th;
     fdw.w.fibval = rb_fiber_current();
 
-    /*
-     * order probably doesn't matter for FDs, but maybe LIFO is better
-     * since later Fibers tend to have higher FDs, and putting higher
-     * FDs first means rb_fd_set() needs to do fewer reallocs
-     */
-    list_add(&iom->fds, &fdw.w.wnode);
+    /* use FIFO order for fairness in iom_select_done */
+    list_add_tail(&iom->fds, &fdw.w.wnode);
 
     rb_iom_timer_add(&iom->timers, &fdw.w.timer, timeout);
     iom->select_gen++;
@@ -324,12 +370,14 @@ rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
     pw.w.fibval = rb_fiber_current();
     rb_iom_timer_add(&iom->timers, &pw.w.timer, timeout);
 
-    /* LIFO, matches Linux wait4() behavior */
+    /* LIFO, to match Linux wait4() blocking behavior */
     list_add(&iom->pids, &pw.w.wnode);
 
     /* do not touch select_gen, this has nothing to do with select() */
 
-    /* rb_ensure(iom_schedule_pid, (VALUE)&pw, pid_waiter_cleanup, (VALUE)&pw); */
+    /* TODO
+     * rb_ensure(iom_schedule_pid, (VALUE)&pw, pid_waiter_cleanup, (VALUE)&pw);
+     */
 
     return -1;
 }
-- 
EW


^ permalink raw reply related	[flat|nested] 8+ messages in thread

end of thread, other threads:[~2017-05-18 22:31 UTC | newest]

Thread overview: 8+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2017-05-18 22:30 [PATCH 1/8] wip-iom_select Eric Wong
2017-05-18 22:30 ` [PATCH 2/8] compiles: Eric Wong
2017-05-18 22:30 ` [PATCH 3/8] wip Eric Wong
2017-05-18 22:30 ` [PATCH 4/8] omg Eric Wong
2017-05-18 22:30 ` [PATCH 5/8] ok Eric Wong
2017-05-18 22:30 ` [PATCH 6/8] wip Eric Wong
2017-05-18 22:30 ` [PATCH 7/8] wip-fib-join Eric Wong
2017-05-18 22:30 ` [PATCH 8/8] seems to work Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).