* [PATCH 1/8] wip-iom_select
@ 2017-05-18 22:30 Eric Wong
2017-05-18 22:30 ` [PATCH 2/8] compiles: Eric Wong
` (6 more replies)
0 siblings, 7 replies; 8+ messages in thread
From: Eric Wong @ 2017-05-18 22:30 UTC (permalink / raw)
To: spew
---
include/ruby/io.h | 1 +
iom.h | 62 ++++++++++++++++++++++++
iom_select.c | 140 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
vm.c | 2 +
vm_core.h | 5 ++
5 files changed, 210 insertions(+)
create mode 100644 iom.h
create mode 100644 iom_select.c
diff --git a/include/ruby/io.h b/include/ruby/io.h
index 60d6f6d32e..1f0b1d2dfa 100644
--- a/include/ruby/io.h
+++ b/include/ruby/io.h
@@ -116,6 +116,7 @@ typedef struct rb_io_t {
/* #define FMODE_UNIX 0x00200000 */
/* #define FMODE_INET 0x00400000 */
/* #define FMODE_INET6 0x00800000 */
+/* #define FMODE_EPOLL 0x01000000 */ /* Linux-only */
#define GetOpenFile(obj,fp) rb_io_check_closed((fp) = RFILE(rb_io_taint_check(obj))->fptr)
diff --git a/iom.h b/iom.h
new file mode 100644
index 0000000000..755dd73e40
--- /dev/null
+++ b/iom.h
@@ -0,0 +1,62 @@
+/*
+ * iom -> I/O Manager for RubyVM (auto-Fiber-aware)
+ *
+ * On platforms with epoll or kqueue, this should be ready for multicore;
+ * even if the rest of the RubyVM is not.
+ *
+ * Some inspiration taken from Mio in GHC:
+ * http://haskell.cs.yale.edu/wp-content/uploads/2013/08/hask035-voellmy.pdf
+ */
+#ifndef RUBY_IOM_H
+#define RUBY_IOM_H
+#include "ruby.h"
+#include "ruby/intern.h"
+#include "vm_core.h"
+
+typedef struct rb_iom_struct rb_iom_t;
+
+/*
+ * there is no public create function, creation is lazy to avoid incurring
+ * overhead for small scripts which do not need fibers
+ */
+int rb_iom_destroy(rb_iom_t *);
+
+/*
+ * Relinquish calling fiber while waiting for +events+ on +rb_io_t.fd+.
+ * If timespec is NULL, it will wait forever, otherwise
+ * wait for up to the relative time given by timespec.
+ */
+void rb_iom_waitio(rb_thread_t *, rb_io_t *, int events, struct timespec *);
+
+/*
+ * Relinquish calling fiber while waiting for +events+ on +fd+.
+ * If timespec is NULL, it will wait forever, otherwise
+ * wait for up to the relative time given by timespec.
+ * Use rb_iom_waitio instead of rb_iom_fdwait when possible,
+ * the former is more efficient on some platforms.
+ */
+void rb_iom_waitfd(rb_thread_t *, int fd, int events, struct timespec *);
+
+/*
+ * Relinquish calling fiber for at least the duration of given timespec,
+ * timespec is NULL, wait forever (until explicitly resumed).
+ */
+void rb_iom_sleep(rb_thread_t *, struct timespec *);
+
+/*
+ * Relinquish calling fiber to wait for the given PID to
+ * change status for least the duration of given timespec.
+ */
+void rb_iom_waitpid(rb_thread_t *, rb_pid_t, int *status, struct timespec *);
+
+/*
+ * callback for SIGCHLD, needed to implemented rb_iom_waitpid
+ */
+void rb_iom_sigchld(rb_iom_t *);
+
+/* callbacks for fork */
+void rb_iom_atfork_prepare(rb_thread_t *, rb_iom_t *);
+void rb_iom_atfork_parent(rb_thread_t *, rb_iom_t *);
+void rb_iom_atfork_child(rb_thread_t *, rb_iom_t *);
+
+#endif /* RUBY_IOM_H */
diff --git a/iom_select.c b/iom_select.c
new file mode 100644
index 0000000000..c1cdddb989
--- /dev/null
+++ b/iom_select.c
@@ -0,0 +1,140 @@
+#include "iom.h"
+#include "ccan/list/list.h"
+#include "ruby/thread_native.h"
+#include "ruby/st.h"
+
+/* on heap (rb_vm_t.iom) */
+struct rb_iom_struct {
+ rb_nativethread_lock_t lock;
+ struct list_head timers; /* <=> rb_select_timer.tnode, sort by expire_at */
+ int self_pipe[2];
+ st_table *watches;
+ rb_fdset_t rfds;
+ rb_fdset_t wfds;
+ rb_fdset_t efds;
+ struct list_head pids; /* <=> rb_pid_wait.wnode, FIFO order */
+ /* TODO: optimized structure for waitpid((-N|-1|0|N), ...); */
+};
+
+/* only allocated on stack */
+struct rb_select_timer {
+ struct list_node tnode;
+ struct timespec expires_at; /* absolute time */
+ union {
+ struct rb_fd_wait *fdw;
+ struct rb_pid_wait *pidw;
+ VALUE mask; /* (RB_IMMEDIATE_P(mask) ? pidw : fdw; 0 == sleep-only */
+ } as;
+};
+
+/* only allocated on stack */
+struct rb_iom_waiter {
+ VALUE fibval;
+ struct list_node afqn; /* <=> rb_thread_t.autofiberq */
+ rb_thread_t *th;
+ struct rb_select_timer *timer; /* may be NULL */
+};
+
+/* only allocated on stack */
+struct rb_fd_waiter {
+ struct rb_iom_waiter w;
+ int fd;
+ int events;
+};
+
+/* only allocated on stack */
+struct rb_pid_wait {
+ struct rb_iom_waiter w;
+ rb_pid_t pid;
+ int status;
+ int options;
+};
+
+/* we lazily create this, small scripts may never need this */
+static rb_iom_t *
+rb_iom_create(rb_thread_t *th)
+{
+ rb_iom_t *iom = ZALLOC(rb_iom_t);
+ rb_io_t io;
+
+ if (rb_cloexec_pipe(iom->self_pipe) < 0) {
+ xfree(iom);
+ return NULL;
+ }
+
+ io.pathv = Qnil;
+ io.fd = iom->self_pipe[0];
+ rb_io_set_nonblock(&io);
+ io.fd = iom->self_pipe[1];
+ rb_io_set_nonblock(&io);
+
+ list_head_init(&iom->timers);
+ list_head_init(&iom->pids);
+ rb_nativethread_lock_initialize(&iom->lock);
+ iom->watches = st_init_numtable();
+ rb_fd_init(&iom->rfds);
+ rb_fd_init(&iom->wfds);
+ rb_fd_init(&iom->efds);
+
+ return iom;
+}
+
+static rb_iom_t *
+rb_iom_get(rb_thread_t *th)
+{
+ VM_ASSERT(th);
+ VM_ASSERT(th->vm);
+ if (!th->vm->iom) {
+ th->vm->iom = rb_iom_create(th);
+ }
+ return th->vm->iom;
+}
+
+/* TODO: share portability code with Process.clock_gettime */
+static void now_ts(struct timespec *ts)
+{
+ clock_gettime(CLOCK_MONOTONIC, &timer->expires_at);
+}
+
+static void
+select_timer_init(struct rb_select_timer *timer, const struct timespec *rel)
+{
+ list_node_init(&timer->tnode); /* allow list_del_init to be idempotent */
+ now_ts(&timer->expires_at);
+
+ timer->expires_at.tv_sec += rel->tv_sec;
+ timer->expires_at.tv_nsec += rel->tv_nsec;
+ if (timer->expires_at.tv_nsec >= 1000000000) {
+ timer->expires_at.tv_nsec -= 1000000000;
+ timer->expires_at.tv_sec += 1;
+ }
+}
+
+static void waiter_init(struct rb_iom_waiter *w, const rb_thread *th)
+{
+ w->fibval = rb_fiber_current();
+ list_node_init(&w->afqn); /* allow list_del_init to be idempotent */
+ w->th = th;
+}
+
+void
+rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, struct timespec *ts)
+{
+ /* only epoll can take advantage of rb_iom_waitio... */
+ rb_iom_io_fd_wait(th, fptr->fd, events, ts);
+}
+
+void
+rb_iom_waitfd(rb_thread_t *th, int fd, int events, struct timespec *ts)
+{
+ rb_iom_t *iom = rb_iom_get(th);
+ struct rb_fd_waiter fdw;
+
+ waiter_init(&fdw.w, th);
+ if (ts) {
+ fdw.w.timer = alloca(sizeof(struct rb_select_timer));
+ select_timer_init(fdw.w.timer, ts);
+ }
+
+
+}
diff --git a/vm.c b/vm.c
index 52d505ab7c..60b7bf070b 100644
--- a/vm.c
+++ b/vm.c
@@ -2513,6 +2513,8 @@ th_init(rb_thread_t *th, VALUE self)
th->ec.cfp = (void *)(th->ec.stack + th->ec.stack_size);
+ list_head_init(&th->auto_fiberq);
+
vm_push_frame(th, 0 /* dummy iseq */, VM_FRAME_MAGIC_DUMMY | VM_ENV_FLAG_LOCAL | VM_FRAME_FLAG_FINISH | VM_FRAME_FLAG_CFRAME /* dummy frame */,
Qnil /* dummy self */, VM_BLOCK_HANDLER_NONE /* dummy block ptr */,
0 /* dummy cref/me */,
diff --git a/vm_core.h b/vm_core.h
index 35b1748218..3ee3e5c0c5 100644
--- a/vm_core.h
+++ b/vm_core.h
@@ -481,6 +481,8 @@ typedef struct rb_hook_list_struct {
int need_clean;
} rb_hook_list_t;
+struct rb_iomgr_struct;
+
typedef struct rb_vm_struct {
VALUE self;
@@ -490,6 +492,8 @@ typedef struct rb_vm_struct {
struct rb_thread_struct *main_thread;
struct rb_thread_struct *running_thread;
+ struct rb_iomgr_struct *iomgr;
+
struct list_head living_threads;
size_t living_thread_num;
VALUE thgroup_default;
@@ -809,6 +813,7 @@ typedef struct rb_thread_struct {
rb_fiber_t *fiber;
rb_fiber_t *root_fiber;
rb_jmpbuf_t root_jmpbuf;
+ struct list_head auto_fiberq;
/* ensure & callcc */
rb_ensure_list_t *ensure_list;
--
EW
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 2/8] compiles:
2017-05-18 22:30 [PATCH 1/8] wip-iom_select Eric Wong
@ 2017-05-18 22:30 ` Eric Wong
2017-05-18 22:30 ` [PATCH 3/8] wip Eric Wong
` (5 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2017-05-18 22:30 UTC (permalink / raw)
To: spew
require 'net/http'
require 'uri'
require 'digest/sha1'
require 'fiber'
url = 'https://80x24.org/git-i-forgot-to-pack/objects/pack/pack-97b25a76c03b489d4cbbd85b12d0e1ad28717e55.pack'
uri = URI(url)
Fiber.current.auto = true
fibs = 10.times.map do
Fiber.new do
Fiber.current.auto = true
cur = Fiber.current.object_id
Net::HTTP.start(uri.host, uri.port, use_ssl: true) do |http|
req = Net::HTTP::Get.new(uri)
http.request(req) do |res|
dig = Digest::SHA1.new
res.read_body do |buf|
dig.update(buf)
warn "#{cur} #{buf.bytesize}\n"
end
warn "#{cur} #{dig.hexdigest}\n"
end
end
:done
end
end
fibs.delete_if do |f|
f.resume == :done
end until fibs.empty?
---
common.mk | 3 +
cont.c | 27 ++++++
include/ruby/io.h | 3 +-
iom.h | 57 ++++++++-----
iom_common.h | 96 ++++++++++++++++++++++
iom_select.c | 140 -------------------------------
iom_select.h | 242 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
thread.c | 22 +++++
vm_core.h | 4 +-
9 files changed, 429 insertions(+), 165 deletions(-)
create mode 100644 iom_common.h
delete mode 100644 iom_select.c
create mode 100644 iom_select.h
diff --git a/common.mk b/common.mk
index 9e554753c6..5bdec64409 100644
--- a/common.mk
+++ b/common.mk
@@ -2588,6 +2588,9 @@ thread.$(OBJEXT): {$(VPATH)}id.h
thread.$(OBJEXT): {$(VPATH)}intern.h
thread.$(OBJEXT): {$(VPATH)}internal.h
thread.$(OBJEXT): {$(VPATH)}io.h
+thread.$(OBJEXT): {$(VPATH)}iom.h
+thread.$(OBJEXT): {$(VPATH)}iom_common.h
+thread.$(OBJEXT): {$(VPATH)}iom_select.h
thread.$(OBJEXT): {$(VPATH)}method.h
thread.$(OBJEXT): {$(VPATH)}missing.h
thread.$(OBJEXT): {$(VPATH)}node.h
diff --git a/cont.c b/cont.c
index 4d6176f00c..05e0a83c94 100644
--- a/cont.c
+++ b/cont.c
@@ -127,6 +127,7 @@ struct rb_fiber_struct {
rb_context_t cont;
struct rb_fiber_struct *prev;
enum fiber_status status;
+ unsigned int auto_fiber:1;
/* If a fiber invokes "transfer",
* then this fiber can't "resume" any more after that.
* You shouldn't mix "transfer" and "resume".
@@ -1502,6 +1503,9 @@ rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
rb_fiber_t *fib;
GetFiberPtr(fibval, fib);
+ if (fib->prev != 0) {
+ rb_raise(rb_eFiberError, "zdouble resume");
+ }
if (fib->prev != 0 || fib->cont.type == ROOT_FIBER_CONTEXT) {
rb_raise(rb_eFiberError, "double resume");
}
@@ -1651,7 +1655,28 @@ rb_fiber_s_current(VALUE klass)
return rb_fiber_current();
}
+static VALUE
+rb_fiber_auto_get(VALUE self)
+{
+ rb_fiber_t *fiber = fiber_current();
+ return fiber->auto_fiber ? Qtrue : Qfalse;
+}
+
+static VALUE
+rb_fiber_auto_set(VALUE self, VALUE val)
+{
+ rb_fiber_t *fiber = fiber_current();
+
+ return (fiber->auto_fiber = RTEST(val)) ? Qtrue : Qfalse;
+}
+
+int rb_fiber_auto_sched_p(const rb_thread_t *th)
+{
+ const rb_fiber_t *cur = th->fiber;
+
+ return (cur && cur->auto_fiber && th->root_fiber != cur);
+}
/*
* Document-class: FiberError
@@ -1688,6 +1713,8 @@ Init_Cont(void)
rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1);
rb_define_method(rb_cFiber, "initialize", rb_fiber_init, 0);
rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1);
+ rb_define_method(rb_cFiber, "auto", rb_fiber_auto_get, 0);
+ rb_define_method(rb_cFiber, "auto=", rb_fiber_auto_set, 1);
}
RUBY_SYMBOL_EXPORT_BEGIN
diff --git a/include/ruby/io.h b/include/ruby/io.h
index 1f0b1d2dfa..3bd6f06cf3 100644
--- a/include/ruby/io.h
+++ b/include/ruby/io.h
@@ -116,7 +116,8 @@ typedef struct rb_io_t {
/* #define FMODE_UNIX 0x00200000 */
/* #define FMODE_INET 0x00400000 */
/* #define FMODE_INET6 0x00800000 */
-/* #define FMODE_EPOLL 0x01000000 */ /* Linux-only */
+/* #define FMODE_IOM_PRIVATE1 0x01000000 */ /* OS-dependent */
+/* #define FMODE_IOM_PRIVATE2 0x02000000 */ /* OS-dependent */
#define GetOpenFile(obj,fp) rb_io_check_closed((fp) = RFILE(rb_io_taint_check(obj))->fptr)
diff --git a/iom.h b/iom.h
index 755dd73e40..070b610458 100644
--- a/iom.h
+++ b/iom.h
@@ -15,43 +15,49 @@
typedef struct rb_iom_struct rb_iom_t;
-/*
- * there is no public create function, creation is lazy to avoid incurring
- * overhead for small scripts which do not need fibers
- */
-int rb_iom_destroy(rb_iom_t *);
+/* WARNING: unstable API, only for Ruby internal use */
/*
- * Relinquish calling fiber while waiting for +events+ on +rb_io_t.fd+.
- * If timespec is NULL, it will wait forever, otherwise
- * wait for up to the relative time given by timespec.
+ * Note: the first "rb_thread_t *" is a placeholder and may be replaced
+ * with "rb_execution_context_t *" in the future.
*/
-void rb_iom_waitio(rb_thread_t *, rb_io_t *, int events, struct timespec *);
/*
- * Relinquish calling fiber while waiting for +events+ on +fd+.
- * If timespec is NULL, it will wait forever, otherwise
- * wait for up to the relative time given by timespec.
- * Use rb_iom_waitio instead of rb_iom_fdwait when possible,
- * the former is more efficient on some platforms.
+ * All functions with "wait" in it take an optional double * +timeout+
+ * argument specifying the timeout in seconds. If NULL, it can wait
+ * forever until the event happens (or the fiber is explicitly resumed).
+ * If non-NULL, the timeout will be updated to the remaining time
+ * upon return.
*/
-void rb_iom_waitfd(rb_thread_t *, int fd, int events, struct timespec *);
/*
- * Relinquish calling fiber for at least the duration of given timespec,
- * timespec is NULL, wait forever (until explicitly resumed).
+ * Relinquish calling fiber while waiting for +events+ on file descriptor
+ * pointed to by +fdp+.
+ * Multiple native threads can enter this function at the same time.
+ *
+ * Events are RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI
+ *
+ * Returns a mask of events.
*/
-void rb_iom_sleep(rb_thread_t *, struct timespec *);
+int rb_iom_waitfd(rb_thread_t *, int *fdp, int events, double *timeout);
/*
- * Relinquish calling fiber to wait for the given PID to
- * change status for least the duration of given timespec.
+ * Relinquish calling fiber to wait for the given PID to change status.
+ * Multiple native threads can enter this function at the same time.
+ * If timeout is negative, wait forever.
*/
-void rb_iom_waitpid(rb_thread_t *, rb_pid_t, int *status, struct timespec *);
+rb_pid_t rb_iom_waitpid(rb_thread_t *,
+ rb_pid_t, int *status, int options, double *timeout);
/*
- * callback for SIGCHLD, needed to implemented rb_iom_waitpid
+ * Relinquish calling fiber for at least the duration of given timeout
+ * in seconds. If timeout is negative, wait forever (until explicitly
+ * resumed).
+ * Multiple native threads can enter this function at the same time.
*/
+void rb_iom_sleep(rb_thread_t *, double *timeout);
+
+/* callback for SIGCHLD, needed to implemented rb_iom_waitpid */
void rb_iom_sigchld(rb_iom_t *);
/* callbacks for fork */
@@ -59,4 +65,11 @@ void rb_iom_atfork_prepare(rb_thread_t *, rb_iom_t *);
void rb_iom_atfork_parent(rb_thread_t *, rb_iom_t *);
void rb_iom_atfork_child(rb_thread_t *, rb_iom_t *);
+/*
+ * there is no public create function, creation is lazy to avoid incurring
+ * overhead for small scripts which do not need fibers, we only need this
+ * at VM destruction
+ */
+int rb_iom_destroy(rb_iom_t *);
+
#endif /* RUBY_IOM_H */
diff --git a/iom_common.h b/iom_common.h
new file mode 100644
index 0000000000..03d4ac560a
--- /dev/null
+++ b/iom_common.h
@@ -0,0 +1,96 @@
+#ifndef RB_IOM_COMMON_H
+#define RB_IOM_COMMON_H
+
+#include "iom.h"
+
+/* allocated on stack */
+struct rb_iom_timer {
+ struct list_node tnode; /* <=> rb_iom_struct.timers */
+ double expires_at; /* absolute monotonic time */
+};
+
+struct rb_iom_waiter {
+ VALUE fibval;
+ struct list_node node; /* <=> (rb_thread_t.autofiberq|rb_iom_struct.fds) */
+ rb_thread_t *th;
+ struct rb_iom_timer timer;
+};
+
+struct rb_iom_fd_waiter {
+ struct rb_iom_waiter w;
+ int *fdp;
+ short events;
+ short revents;
+};
+
+struct rb_iom_pid_waiter {
+ struct rb_iom_waiter w;
+ rb_pid_t pid;
+ int status;
+ int options;
+};
+
+/* check for expired timers */
+static void
+rb_iom_timer_check(struct list_head *timers)
+{
+ struct rb_iom_timer *i = NULL, *next = NULL;
+ double now = timeofday();
+ struct rb_iom_waiter *w;
+
+ list_for_each_safe(timers, i, next, tnode) {
+ if (i->expires_at <= now) {
+ w = container_of(i, struct rb_iom_waiter, timer);
+ list_del_init(&i->tnode);
+ list_del(&w->node);
+ list_add_tail(&w->th->auto_fiberq, &w->node);
+ w->th = NULL;
+ }
+ break; /* done, timers is a sorted list */
+ }
+}
+
+/* insert a new +timer+ into +timers+, maintain sort order by expires_at */
+static void
+rb_iom_timer_add(struct list_head *timers, struct rb_iom_timer *add,
+ const double *timeout)
+{
+ rb_iom_timer_check(timers);
+ if (timeout) {
+ struct rb_iom_timer *i = NULL;
+ add->expires_at = timeofday() + *timeout;
+
+ /*
+ * search backwards: assume typical projects have multiple objects
+ * sharing the same timeout values, so newly added timers will expire
+ * expire later than existing timers
+ */
+ list_for_each_rev(timers, i, tnode) {
+ if (add->expires_at > i->expires_at) {
+ list_add_after(timers, &i->tnode, &add->tnode);
+ return;
+ }
+ }
+ list_add(timers, &add->tnode);
+ }
+ else {
+ /* not active, just allow list_del_init to function in cleanup */
+ list_node_init(&add->tnode);
+ }
+}
+
+static void
+rb_iom_schedule(rb_thread_t *th, struct rb_iom_waiter *current)
+{
+ struct rb_iom_waiter *i = NULL, *next = NULL;
+
+ list_for_each_safe(&th->auto_fiberq, i, next, node) {
+ list_del_init(&i->node);
+ if (i == current) {
+ return;
+ }
+ rb_fiber_resume(i->fibval, 0, NULL);
+ }
+}
+
+#endif /* IOM_COMMON_H */
diff --git a/iom_select.c b/iom_select.c
deleted file mode 100644
index c1cdddb989..0000000000
--- a/iom_select.c
+++ /dev/null
@@ -1,140 +0,0 @@
-#include "iom.h"
-#include "ccan/list/list.h"
-#include "ruby/thread_native.h"
-#include "ruby/st.h"
-
-/* on heap (rb_vm_t.iom) */
-struct rb_iom_struct {
- rb_nativethread_lock_t lock;
- struct list_head timers; /* <=> rb_select_timer.tnode, sort by expire_at */
- int self_pipe[2];
- st_table *watches;
- rb_fdset_t rfds;
- rb_fdset_t wfds;
- rb_fdset_t efds;
- struct list_head pids; /* <=> rb_pid_wait.wnode, FIFO order */
- /* TODO: optimized structure for waitpid((-N|-1|0|N), ...); */
-};
-
-/* only allocated on stack */
-struct rb_select_timer {
- struct list_node tnode;
- struct timespec expires_at; /* absolute time */
- union {
- struct rb_fd_wait *fdw;
- struct rb_pid_wait *pidw;
- VALUE mask; /* (RB_IMMEDIATE_P(mask) ? pidw : fdw; 0 == sleep-only */
- } as;
-};
-
-/* only allocated on stack */
-struct rb_iom_waiter {
- VALUE fibval;
- struct list_node afqn; /* <=> rb_thread_t.autofiberq */
- rb_thread_t *th;
- struct rb_select_timer *timer; /* may be NULL */
-};
-
-/* only allocated on stack */
-struct rb_fd_waiter {
- struct rb_iom_waiter w;
- int fd;
- int events;
-};
-
-/* only allocated on stack */
-struct rb_pid_wait {
- struct rb_iom_waiter w;
- rb_pid_t pid;
- int status;
- int options;
-};
-
-/* we lazily create this, small scripts may never need this */
-static rb_iom_t *
-rb_iom_create(rb_thread_t *th)
-{
- rb_iom_t *iom = ZALLOC(rb_iom_t);
- rb_io_t io;
-
- if (rb_cloexec_pipe(iom->self_pipe) < 0) {
- xfree(iom);
- return NULL;
- }
-
- io.pathv = Qnil;
- io.fd = iom->self_pipe[0];
- rb_io_set_nonblock(&io);
- io.fd = iom->self_pipe[1];
- rb_io_set_nonblock(&io);
-
- list_head_init(&iom->timers);
- list_head_init(&iom->pids);
- rb_nativethread_lock_initialize(&iom->lock);
- iom->watches = st_init_numtable();
- rb_fd_init(&iom->rfds);
- rb_fd_init(&iom->wfds);
- rb_fd_init(&iom->efds);
-
- return iom;
-}
-
-static rb_iom_t *
-rb_iom_get(rb_thread_t *th)
-{
- VM_ASSERT(th);
- VM_ASSERT(th->vm);
- if (!th->vm->iom) {
- th->vm->iom = rb_iom_create(th);
- }
- return th->vm->iom;
-}
-
-/* TODO: share portability code with Process.clock_gettime */
-static void now_ts(struct timespec *ts)
-{
- clock_gettime(CLOCK_MONOTONIC, &timer->expires_at);
-}
-
-static void
-select_timer_init(struct rb_select_timer *timer, const struct timespec *rel)
-{
- list_node_init(&timer->tnode); /* allow list_del_init to be idempotent */
- now_ts(&timer->expires_at);
-
- timer->expires_at.tv_sec += rel->tv_sec;
- timer->expires_at.tv_nsec += rel->tv_nsec;
- if (timer->expires_at.tv_nsec >= 1000000000) {
- timer->expires_at.tv_nsec -= 1000000000;
- timer->expires_at.tv_sec += 1;
- }
-}
-
-static void waiter_init(struct rb_iom_waiter *w, const rb_thread *th)
-{
- w->fibval = rb_fiber_current();
- list_node_init(&w->afqn); /* allow list_del_init to be idempotent */
- w->th = th;
-}
-
-void
-rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, struct timespec *ts)
-{
- /* only epoll can take advantage of rb_iom_waitio... */
- rb_iom_io_fd_wait(th, fptr->fd, events, ts);
-}
-
-void
-rb_iom_waitfd(rb_thread_t *th, int fd, int events, struct timespec *ts)
-{
- rb_iom_t *iom = rb_iom_get(th);
- struct rb_fd_waiter fdw;
-
- waiter_init(&fdw.w, th);
- if (ts) {
- fdw.w.timer = alloca(sizeof(struct rb_select_timer));
- select_timer_init(fdw.w.timer, ts);
- }
-
-
-}
diff --git a/iom_select.h b/iom_select.h
new file mode 100644
index 0000000000..21603a7e5a
--- /dev/null
+++ b/iom_select.h
@@ -0,0 +1,242 @@
+/*
+ * select()-based implementation of I/O Manager for RubyVM
+ *
+ * This is crippled and relies heavily on GVL compared to the
+ * epoll and kqueue versions
+ */
+#include "iom_common.h"
+
+/* allocated on heap (rb_vm_t.iom) */
+struct rb_iom_struct {
+ /*
+ * lists are all protected by GVL at this time,
+ * URCU lists (LGPL-2.1+) may be used in the future
+ */
+ struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
+ struct list_head fds; /* -rb_iom_fd_waiter.w.node */
+ struct list_head pids; /* -rb_iom_pid_waiter.w.node, FIFO order */
+
+ rb_nativethread_lock_t lock; /* protects select() and fdsets */
+ rb_fdset_t rfds;
+ rb_fdset_t wfds;
+ rb_fdset_t efds;
+ rb_thread_t *scheduler;
+};
+
+/* we lazily create this, small scripts may never need iom */
+static rb_iom_t *
+rb_iom_create(rb_thread_t *th)
+{
+ rb_iom_t *iom = ALLOC(rb_iom_t);
+
+ list_head_init(&iom->timers);
+ list_head_init(&iom->fds);
+ list_head_init(&iom->pids);
+ rb_nativethread_lock_initialize(&iom->lock);
+
+ return iom;
+}
+
+static rb_iom_t *
+rb_iom_get(rb_thread_t *th)
+{
+ VM_ASSERT(th && th->vm);
+ if (!th->vm->iom) {
+ th->vm->iom = rb_iom_create(th);
+ }
+ return th->vm->iom;
+}
+
+static void
+waiter_cleanup(struct rb_iom_waiter *w)
+{
+ list_del(&w->timer.tnode);
+ if (w->th) {
+ list_del_init(&w->node);
+ }
+ else if (!list_empty((struct list_head *)&w->node)) {
+ /* we got scheduled in auto_fiberq, wait for another to resume us */
+ rb_fiber_yield(0, NULL);
+ }
+}
+
+static VALUE
+fd_waiter_cleanup(VALUE ptr)
+{
+ struct rb_iom_fd_waiter *fdw = (struct rb_iom_fd_waiter *)ptr;
+ waiter_cleanup(&fdw->w);
+ return Qfalse;
+}
+
+static void
+waiter_init(struct rb_iom_waiter *w, rb_thread_t *th)
+{
+ w->fibval = rb_fiber_current();
+ w->th = th;
+}
+
+static struct timeval *
+iom_timeout(rb_iom_t *iom, struct timeval *tv)
+{
+ struct rb_iom_timer *t = list_top(&iom->timers, struct rb_iom_timer, tnode);
+
+ if (t) {
+ double diff = t->expires_at - timeofday();
+ if (diff > 0) {
+ *tv = double2timeval(diff);
+ }
+ else {
+ tv->tv_sec = 0;
+ tv->tv_usec = 0;
+ }
+ return tv;
+ }
+ return NULL;
+}
+
+static VALUE
+iom_select_begin(VALUE ptr)
+{
+ rb_thread_t *th = (rb_thread_t *)ptr;
+ rb_iom_t *iom = th->vm->iom;
+ int max = 0;
+ struct timeval tv;
+ struct rb_iom_fd_waiter *fdw = NULL;
+ rb_fdset_t *rfds = NULL, *wfds = NULL, *efds = NULL;
+
+ rb_fd_init(&iom->rfds);
+ rb_fd_init(&iom->wfds);
+ rb_fd_init(&iom->efds);
+
+ list_for_each(&iom->fds, fdw, w.node) {
+ int fd = *fdw->fdp;
+ if (fd < 0) {
+ continue; /* closed */
+ }
+ if (fd > max) {
+ max = fd;
+ }
+ if (fdw->events & RB_WAITFD_IN) {
+ rb_fd_set(fd, rfds = &iom->rfds);
+ }
+ if (fdw->events & RB_WAITFD_OUT) {
+ rb_fd_set(fd, wfds = &iom->wfds);
+ }
+ if (fdw->events & RB_WAITFD_PRI) {
+ rb_fd_set(fd, efds = &iom->efds);
+ }
+ }
+ rb_thread_fd_select(max + 1, rfds, wfds, efds, iom_timeout(iom, &tv));
+
+ return Qfalse;
+}
+
+static VALUE
+iom_select_end(VALUE ptr)
+{
+ rb_thread_t *th = (rb_thread_t *)ptr;
+ rb_iom_t *iom = th->vm->iom;
+ struct rb_iom_fd_waiter *fdw = NULL, *next = NULL;
+
+ iom->scheduler = NULL;
+ list_for_each_safe(&iom->fds, fdw, next, w.node) {
+ int fd = *fdw->fdp;
+ if (fd < 0) {
+ continue; /* closed */
+ }
+ if (fdw->events & RB_WAITFD_IN && rb_fd_isset(fd, &iom->rfds)) {
+ fdw->revents |= RB_WAITFD_IN;
+ }
+ if (fdw->events & RB_WAITFD_OUT && rb_fd_isset(fd, &iom->wfds)) {
+ fdw->revents |= RB_WAITFD_OUT;
+ }
+ if (fdw->events & RB_WAITFD_PRI && rb_fd_isset(fd, &iom->efds)) {
+ fdw->revents |= RB_WAITFD_PRI;
+ }
+
+ /* got revents? enqueue ourselves to be run! */
+ if (fdw->revents) {
+ list_del_init(&fdw->w.timer.tnode);
+ list_del(&fdw->w.node);
+ list_add_tail(&fdw->w.th->auto_fiberq, &fdw->w.node);
+ fdw->w.th = NULL;
+ }
+ }
+
+ rb_fd_term(&iom->rfds);
+ rb_fd_term(&iom->wfds);
+ rb_fd_term(&iom->efds);
+
+ native_mutex_unlock(&iom->lock);
+ return Qfalse;
+}
+
+static VALUE
+iom_schedule_fd(VALUE ptr)
+{
+ struct rb_iom_fd_waiter *fdw = (struct rb_iom_fd_waiter *)ptr;
+ rb_thread_t *th = fdw->w.th;
+ rb_iom_t *iom = th->vm->iom;
+
+ rb_iom_schedule(th, &fdw->w); /* try to get some work done elsewhere */
+
+ while (fdw->w.th != NULL) {
+ rb_thread_t *sched = iom->scheduler;
+ if (!sched) {
+ iom->scheduler = th;
+ /*
+ * nope, rb_iom_schedule did not do our work for us,
+ * maybe we can wait on select(), ourselves:
+ */
+ if (native_mutex_trylock(&iom->lock) == 0) {
+ rb_ensure(iom_select_begin, (VALUE)th, iom_select_end, (VALUE)th);
+ }
+ }
+ else {
+ /* or kick whoever is running select into doing work for us */
+ ubf_select(sched);
+ rb_thread_schedule_limits(0);
+ rb_iom_schedule(th, &fdw->w);
+ if (list_empty((struct list_head *)&fdw->w.node)) {
+ return Qfalse;
+ }
+ }
+ }
+ return Qfalse;
+}
+
+int
+rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
+{
+ rb_iom_t *iom = rb_iom_get(th);
+ struct rb_iom_fd_waiter fdw;
+
+ if (*fdp < 0) return -1;
+ fdw.fdp = fdp;
+ fdw.events = (short)events;
+ fdw.revents = 0;
+ waiter_init(&fdw.w, th);
+ list_add(&iom->fds, &fdw.w.node);
+ rb_iom_timer_add(&iom->timers, &fdw.w.timer, timeout);
+ rb_ensure(iom_schedule_fd, (VALUE)&fdw, fd_waiter_cleanup, (VALUE)&fdw);
+
+ if (*fdp < 0) return -1;
+
+ return (int)fdw.revents; /* may be zero if timed out */
+}
+
+rb_pid_t
+rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
+ double *timeout)
+{
+ rb_iom_t *iom = rb_iom_get(th);
+ struct rb_iom_pid_waiter pw;
+
+ VM_ASSERT((options & WNOHANG) == 0 &&
+ "WNOHANG should be handled in rb_waitpid");
+
+ waiter_init(&pw.w, th);
+ rb_iom_timer_add(&iom->timers, &pw.w.timer, timeout);
+
+ return -1;
+}
diff --git a/thread.c b/thread.c
index fd3db3648f..4ff8510100 100644
--- a/thread.c
+++ b/thread.c
@@ -70,6 +70,10 @@
#include "ruby/thread.h"
#include "ruby/thread_native.h"
#include "internal.h"
+#include "iom.h"
+
+/* cont.c */
+int rb_fiber_auto_sched_p(const rb_thread_t *th);
#ifndef USE_NATIVE_THREAD_PRIORITY
#define USE_NATIVE_THREAD_PRIORITY 0
@@ -3909,6 +3913,16 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
struct timespec *timeout = NULL;
rb_thread_t *th = GET_THREAD();
+ if (rb_fiber_auto_sched_p(th)) {
+ double *to = NULL;
+ double tout;
+ if (tv) {
+ tout = (double)tv->tv_sec + (double)tv->tv_usec * 1e-6;
+ to = &tout;
+ }
+ return rb_iom_waitfd(th, &fd, events, to);
+ }
+
#define poll_update() \
(update_timespec(timeout, limit), \
TRUE)
@@ -5060,6 +5074,12 @@ rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data)
}
void
+rb_thread_ubf_select(rb_thread_t *th)
+{
+ ubf_select(th);
+}
+
+void
ruby_kill(rb_pid_t pid, int sig)
{
int err;
@@ -5081,3 +5101,5 @@ ruby_kill(rb_pid_t pid, int sig)
rb_sys_fail(0);
}
}
+
+#include "iom_select.h"
diff --git a/vm_core.h b/vm_core.h
index 3ee3e5c0c5..dade714b2a 100644
--- a/vm_core.h
+++ b/vm_core.h
@@ -481,7 +481,7 @@ typedef struct rb_hook_list_struct {
int need_clean;
} rb_hook_list_t;
-struct rb_iomgr_struct;
+struct rb_iom_struct;
typedef struct rb_vm_struct {
VALUE self;
@@ -492,7 +492,7 @@ typedef struct rb_vm_struct {
struct rb_thread_struct *main_thread;
struct rb_thread_struct *running_thread;
- struct rb_iomgr_struct *iomgr;
+ struct rb_iom_struct *iom;
struct list_head living_threads;
size_t living_thread_num;
--
EW
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 3/8] wip
2017-05-18 22:30 [PATCH 1/8] wip-iom_select Eric Wong
2017-05-18 22:30 ` [PATCH 2/8] compiles: Eric Wong
@ 2017-05-18 22:30 ` Eric Wong
2017-05-18 22:30 ` [PATCH 4/8] omg Eric Wong
` (4 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2017-05-18 22:30 UTC (permalink / raw)
To: spew
---
iom_select.h | 34 ++++++++++++++++++++++------------
1 file changed, 22 insertions(+), 12 deletions(-)
diff --git a/iom_select.h b/iom_select.h
index 21603a7e5a..ebc5af87ff 100644
--- a/iom_select.h
+++ b/iom_select.h
@@ -15,8 +15,8 @@ struct rb_iom_struct {
struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
struct list_head fds; /* -rb_iom_fd_waiter.w.node */
struct list_head pids; /* -rb_iom_pid_waiter.w.node, FIFO order */
+ int gen;
- rb_nativethread_lock_t lock; /* protects select() and fdsets */
rb_fdset_t rfds;
rb_fdset_t wfds;
rb_fdset_t efds;
@@ -32,7 +32,6 @@ rb_iom_create(rb_thread_t *th)
list_head_init(&iom->timers);
list_head_init(&iom->fds);
list_head_init(&iom->pids);
- rb_nativethread_lock_initialize(&iom->lock);
return iom;
}
@@ -54,9 +53,11 @@ waiter_cleanup(struct rb_iom_waiter *w)
if (w->th) {
list_del_init(&w->node);
}
- else if (!list_empty((struct list_head *)&w->node)) {
- /* we got scheduled in auto_fiberq, wait for another to resume us */
- rb_fiber_yield(0, NULL);
+ else {
+ while (!list_empty((struct list_head *)&w->node)) {
+ /* we got scheduled in auto_fiberq, wait for another to resume us */
+ rb_fiber_yield(0, NULL);
+ }
}
}
@@ -99,10 +100,15 @@ iom_select_begin(VALUE ptr)
{
rb_thread_t *th = (rb_thread_t *)ptr;
rb_iom_t *iom = th->vm->iom;
- int max = 0;
+ int max, gen;
struct timeval tv;
struct rb_iom_fd_waiter *fdw = NULL;
- rb_fdset_t *rfds = NULL, *wfds = NULL, *efds = NULL;
+ rb_fdset_t *rfds, *wfds, *efds;
+
+again:
+ rfds = wfds = efds = NULL;
+ gen = iom->gen;
+ max = 0;
rb_fd_init(&iom->rfds);
rb_fd_init(&iom->wfds);
@@ -126,7 +132,14 @@ iom_select_begin(VALUE ptr)
rb_fd_set(fd, efds = &iom->efds);
}
}
- rb_thread_fd_select(max + 1, rfds, wfds, efds, iom_timeout(iom, &tv));
+
+ r = rb_thread_fd_select(max + 1, rfds, wfds, efds, iom_timeout(iom, &tv));
+ if (r < 0 && errno == EINTR && iom->gen != gen) {
+ rb_fd_term(&iom->rfds);
+ rb_fd_term(&iom->wfds);
+ rb_fd_term(&iom->efds);
+ goto again;
+ }
return Qfalse;
}
@@ -167,7 +180,6 @@ iom_select_end(VALUE ptr)
rb_fd_term(&iom->wfds);
rb_fd_term(&iom->efds);
- native_mutex_unlock(&iom->lock);
return Qfalse;
}
@@ -188,9 +200,7 @@ iom_schedule_fd(VALUE ptr)
* nope, rb_iom_schedule did not do our work for us,
* maybe we can wait on select(), ourselves:
*/
- if (native_mutex_trylock(&iom->lock) == 0) {
- rb_ensure(iom_select_begin, (VALUE)th, iom_select_end, (VALUE)th);
- }
+ rb_ensure(iom_select_begin, (VALUE)th, iom_select_end, (VALUE)th);
}
else {
/* or kick whoever is running select into doing work for us */
--
EW
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 4/8] omg
2017-05-18 22:30 [PATCH 1/8] wip-iom_select Eric Wong
2017-05-18 22:30 ` [PATCH 2/8] compiles: Eric Wong
2017-05-18 22:30 ` [PATCH 3/8] wip Eric Wong
@ 2017-05-18 22:30 ` Eric Wong
2017-05-18 22:30 ` [PATCH 5/8] ok Eric Wong
` (3 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2017-05-18 22:30 UTC (permalink / raw)
To: spew
---
cont.c | 94 +++++++++++++++++++++++++++++++++++++++++++++++++-----------
iom_common.h | 43 ++++++++++++---------------
iom_select.h | 86 ++++++++++++++++++++++--------------------------------
thread.c | 2 +-
vm.c | 2 +-
vm_core.h | 2 +-
6 files changed, 133 insertions(+), 96 deletions(-)
diff --git a/cont.c b/cont.c
index 05e0a83c94..8282eabd81 100644
--- a/cont.c
+++ b/cont.c
@@ -126,8 +126,15 @@ static machine_stack_cache_t terminated_machine_stack;
struct rb_fiber_struct {
rb_context_t cont;
struct rb_fiber_struct *prev;
+
+ /*
+ * afnode.next == NULL auto fiber disabled
+ * afnode.next == &afnode auto fiber enabled, but not enqueued (running)
+ * afnode.next != &afnode auto fiber runnable (enqueued in th->afhead)
+ */
+ struct list_node afnode;
+
enum fiber_status status;
- unsigned int auto_fiber:1;
/* If a fiber invokes "transfer",
* then this fiber can't "resume" any more after that.
* You shouldn't mix "transfer" and "resume".
@@ -1497,22 +1504,28 @@ rb_fiber_terminate(rb_fiber_t *fib)
fiber_switch(return_fiber(), 1, &value, 0);
}
-VALUE
-rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
+static void
+fiber_check_resume(const rb_fiber_t *fib)
{
- rb_fiber_t *fib;
- GetFiberPtr(fibval, fib);
-
if (fib->prev != 0) {
- rb_raise(rb_eFiberError, "zdouble resume");
+ rb_raise(rb_eFiberError, "double resume (no prev)");
}
- if (fib->prev != 0 || fib->cont.type == ROOT_FIBER_CONTEXT) {
- rb_raise(rb_eFiberError, "double resume");
+ if (fib->cont.type == ROOT_FIBER_CONTEXT) {
+ rb_raise(rb_eFiberError, "double resume (root)");
}
if (fib->transferred != 0) {
rb_raise(rb_eFiberError, "cannot resume transferred Fiber");
}
+}
+VALUE
+rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
+{
+ rb_fiber_t *fib;
+ GetFiberPtr(fibval, fib);
+
+ fprintf(stderr, "resume\n");
+ fiber_check_resume(fib);
return fiber_switch(fib, argc, argv, 1);
}
@@ -1655,27 +1668,76 @@ rb_fiber_s_current(VALUE klass)
return rb_fiber_current();
}
+void
+rb_fiber_auto_enqueue(VALUE fibval)
+{
+ rb_fiber_t *fib;
+ rb_thread_t *th;
+
+ GetFiberPtr(fibval, fib);
+ GetThreadPtr(fib->cont.saved_thread.self, th);
+ fprintf(stderr, "fibenq: %p\n", fib);
+ fiber_check_resume(fib);
+ list_add_tail(&th->afhead, &fib->afnode);
+}
+
+void
+rb_fiber_auto_schedule(rb_thread_t *th)
+{
+ rb_fiber_t *fib = NULL, *next = NULL;
+ rb_fiber_t *current = fiber_current();
+
+ list_for_each_safe(&th->afhead, fib, next, afnode) {
+ list_del_init(&fib->afnode);
+ if (fib == current) {
+ write(2, "self\n", 5);
+ return;
+ }
+ fprintf(stderr, "fibswitch: %p\n", fib);
+ fiber_check_resume(fib);
+ fiber_switch(fib, 0, NULL, 1);
+ }
+}
+
static VALUE
rb_fiber_auto_get(VALUE self)
{
- rb_fiber_t *fiber = fiber_current();
+ rb_fiber_t *fib;
+ GetFiberPtr(self, fib);
- return fiber->auto_fiber ? Qtrue : Qfalse;
+ return fib->afnode.next ? Qtrue : Qfalse;
}
static VALUE
rb_fiber_auto_set(VALUE self, VALUE val)
{
- rb_fiber_t *fiber = fiber_current();
-
- return (fiber->auto_fiber = RTEST(val)) ? Qtrue : Qfalse;
+ rb_fiber_t *fib;
+ GetFiberPtr(self, fib);
+ if (RTEST(val)) {
+ if (fib->afnode.next == NULL) {
+ rb_thread_t *th;
+ GetThreadPtr(fib->cont.saved_thread.self, th);
+ fprintf(stderr, "fibset: %p\n", fib);
+ fiber_check_resume(fib);
+ list_add_tail(&th->afhead, &fib->afnode);
+ }
+ return Qtrue;
+ }
+ else {
+ if (fib->afnode.next) {
+ list_del_init(&fib->afnode);
+ fib->afnode.next = NULL;
+ }
+ return Qfalse;
+ }
}
-int rb_fiber_auto_sched_p(const rb_thread_t *th)
+int
+rb_fiber_auto_sched_p(const rb_thread_t *th)
{
const rb_fiber_t *cur = th->fiber;
- return (cur && cur->auto_fiber && th->root_fiber != cur);
+ return (cur && cur->afnode.next && th->root_fiber != cur);
}
/*
diff --git a/iom_common.h b/iom_common.h
index 03d4ac560a..c70b45499f 100644
--- a/iom_common.h
+++ b/iom_common.h
@@ -3,6 +3,10 @@
#include "iom.h"
+/* cont.c */
+void rb_fiber_auto_enqueue(VALUE fibval);
+void rb_fiber_auto_schedule(rb_thread_t *);
+
/* allocated on stack */
struct rb_iom_timer {
struct list_node tnode; /* <=> rb_iom_struct.timers */
@@ -10,10 +14,10 @@ struct rb_iom_timer {
};
struct rb_iom_waiter {
- VALUE fibval;
- struct list_node node; /* <=> (rb_thread_t.autofiberq|rb_iom_struct.fds) */
- rb_thread_t *th;
+ struct list_node wnode; /* <=> rb_iom_struct.(fds|pids) */
struct rb_iom_timer timer;
+ rb_thread_t *th;
+ VALUE fibval;
};
struct rb_iom_fd_waiter {
@@ -36,17 +40,20 @@ rb_iom_timer_check(struct list_head *timers)
{
struct rb_iom_timer *i = NULL, *next = NULL;
double now = timeofday();
- struct rb_iom_waiter *w;
list_for_each_safe(timers, i, next, tnode) {
if (i->expires_at <= now) {
+ struct rb_iom_waiter *w;
+ VALUE fibval;
+
w = container_of(i, struct rb_iom_waiter, timer);
list_del_init(&i->tnode);
- list_del(&w->node);
- list_add_tail(&w->th->auto_fiberq, &w->node);
- w->th = NULL;
+ list_del_init(&w->wnode);
+ fibval = w->fibval;
+ w->fibval = Qfalse;
+ rb_fiber_auto_enqueue(fibval);
}
- break; /* done, timers is a sorted list */
+ return; /* done, timers is a sorted list */
}
}
@@ -62,8 +69,8 @@ rb_iom_timer_add(struct list_head *timers, struct rb_iom_timer *add,
/*
* search backwards: assume typical projects have multiple objects
- * sharing the same timeout values, so newly added timers will expire
- * expire later than existing timers
+ * sharing the same timeout values, so new timers will expire later
+ * than existing timers
*/
list_for_each_rev(timers, i, tnode) {
if (add->expires_at > i->expires_at) {
@@ -74,23 +81,9 @@ rb_iom_timer_add(struct list_head *timers, struct rb_iom_timer *add,
list_add(timers, &add->tnode);
}
else {
- /* not active, just allow list_del_init to function in cleanup */
+ /* not active, just allow list_del to function in cleanup */
list_node_init(&add->tnode);
}
}
-static void
-rb_iom_schedule(rb_thread_t *th, struct rb_iom_waiter *current)
-{
- struct rb_iom_waiter *i = NULL, *next = NULL;
-
- list_for_each_safe(&th->auto_fiberq, i, next, node) {
- list_del_init(&i->node);
- if (i == current) {
- return;
- }
- rb_fiber_resume(i->fibval, 0, NULL);
- }
-}
-
#endif /* IOM_COMMON_H */
diff --git a/iom_select.h b/iom_select.h
index ebc5af87ff..c178c8881e 100644
--- a/iom_select.h
+++ b/iom_select.h
@@ -9,18 +9,17 @@
/* allocated on heap (rb_vm_t.iom) */
struct rb_iom_struct {
/*
- * lists are all protected by GVL at this time,
+ * Everything here is protected by GVL at this time,
* URCU lists (LGPL-2.1+) may be used in the future
*/
struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
- struct list_head fds; /* -rb_iom_fd_waiter.w.node */
- struct list_head pids; /* -rb_iom_pid_waiter.w.node, FIFO order */
- int gen;
+ struct list_head fds; /* -rb_iom_fd_waiter.w.wnode, FIFO order */
+ struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
+ rb_thread_t *selector; /* thread blocked on rb_thread_fd_select */
rb_fdset_t rfds;
rb_fdset_t wfds;
rb_fdset_t efds;
- rb_thread_t *scheduler;
};
/* we lazily create this, small scripts may never need iom */
@@ -29,6 +28,7 @@ rb_iom_create(rb_thread_t *th)
{
rb_iom_t *iom = ALLOC(rb_iom_t);
+ iom->selector = NULL;
list_head_init(&iom->timers);
list_head_init(&iom->fds);
list_head_init(&iom->pids);
@@ -50,15 +50,7 @@ static void
waiter_cleanup(struct rb_iom_waiter *w)
{
list_del(&w->timer.tnode);
- if (w->th) {
- list_del_init(&w->node);
- }
- else {
- while (!list_empty((struct list_head *)&w->node)) {
- /* we got scheduled in auto_fiberq, wait for another to resume us */
- rb_fiber_yield(0, NULL);
- }
- }
+ list_del(&w->wnode);
}
static VALUE
@@ -69,15 +61,8 @@ fd_waiter_cleanup(VALUE ptr)
return Qfalse;
}
-static void
-waiter_init(struct rb_iom_waiter *w, rb_thread_t *th)
-{
- w->fibval = rb_fiber_current();
- w->th = th;
-}
-
static struct timeval *
-iom_timeout(rb_iom_t *iom, struct timeval *tv)
+next_timeout(rb_iom_t *iom, struct timeval *tv)
{
struct rb_iom_timer *t = list_top(&iom->timers, struct rb_iom_timer, tnode);
@@ -100,21 +85,17 @@ iom_select_begin(VALUE ptr)
{
rb_thread_t *th = (rb_thread_t *)ptr;
rb_iom_t *iom = th->vm->iom;
- int max, gen;
- struct timeval tv;
+ int max = -1;
+ struct timeval tv, *tvp;
struct rb_iom_fd_waiter *fdw = NULL;
rb_fdset_t *rfds, *wfds, *efds;
-again:
rfds = wfds = efds = NULL;
- gen = iom->gen;
- max = 0;
-
rb_fd_init(&iom->rfds);
rb_fd_init(&iom->wfds);
rb_fd_init(&iom->efds);
- list_for_each(&iom->fds, fdw, w.node) {
+ list_for_each(&iom->fds, fdw, w.wnode) {
int fd = *fdw->fdp;
if (fd < 0) {
continue; /* closed */
@@ -133,13 +114,11 @@ again:
}
}
- r = rb_thread_fd_select(max + 1, rfds, wfds, efds, iom_timeout(iom, &tv));
- if (r < 0 && errno == EINTR && iom->gen != gen) {
- rb_fd_term(&iom->rfds);
- rb_fd_term(&iom->wfds);
- rb_fd_term(&iom->efds);
- goto again;
- }
+ tvp = next_timeout(iom, &tv);
+
+ /* release GVL.. */
+ rb_thread_fd_select(max + 1, rfds, wfds, efds, tvp);
+ /* .. we have GVL again */
return Qfalse;
}
@@ -151,8 +130,8 @@ iom_select_end(VALUE ptr)
rb_iom_t *iom = th->vm->iom;
struct rb_iom_fd_waiter *fdw = NULL, *next = NULL;
- iom->scheduler = NULL;
- list_for_each_safe(&iom->fds, fdw, next, w.node) {
+ iom->selector = NULL;
+ list_for_each_safe(&iom->fds, fdw, next, w.wnode) {
int fd = *fdw->fdp;
if (fd < 0) {
continue; /* closed */
@@ -169,10 +148,12 @@ iom_select_end(VALUE ptr)
/* got revents? enqueue ourselves to be run! */
if (fdw->revents) {
+ VALUE fibval = fdw->w.fibval;
+
+ fdw->w.fibval = Qfalse;
list_del_init(&fdw->w.timer.tnode);
- list_del(&fdw->w.node);
- list_add_tail(&fdw->w.th->auto_fiberq, &fdw->w.node);
- fdw->w.th = NULL;
+ list_del_init(&fdw->w.wnode);
+ rb_fiber_auto_enqueue(fibval);
}
}
@@ -190,12 +171,14 @@ iom_schedule_fd(VALUE ptr)
rb_thread_t *th = fdw->w.th;
rb_iom_t *iom = th->vm->iom;
- rb_iom_schedule(th, &fdw->w); /* try to get some work done elsewhere */
+ write(2, "a\n", 2);
+ rb_fiber_auto_schedule(th);
+ write(2, "b\n", 2);
- while (fdw->w.th != NULL) {
- rb_thread_t *sched = iom->scheduler;
+ while (fdw->w.fibval != Qfalse) {
+ rb_thread_t *sched = iom->selector;
if (!sched) {
- iom->scheduler = th;
+ iom->selector = th;
/*
* nope, rb_iom_schedule did not do our work for us,
* maybe we can wait on select(), ourselves:
@@ -206,11 +189,8 @@ iom_schedule_fd(VALUE ptr)
/* or kick whoever is running select into doing work for us */
ubf_select(sched);
rb_thread_schedule_limits(0);
- rb_iom_schedule(th, &fdw->w);
- if (list_empty((struct list_head *)&fdw->w.node)) {
- return Qfalse;
- }
}
+ rb_fiber_auto_schedule(th);
}
return Qfalse;
}
@@ -225,8 +205,9 @@ rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
fdw.fdp = fdp;
fdw.events = (short)events;
fdw.revents = 0;
- waiter_init(&fdw.w, th);
- list_add(&iom->fds, &fdw.w.node);
+ fdw.w.th = th;
+ fdw.w.fibval = rb_fiber_current();
+ list_add(&iom->fds, &fdw.w.wnode);
rb_iom_timer_add(&iom->timers, &fdw.w.timer, timeout);
rb_ensure(iom_schedule_fd, (VALUE)&fdw, fd_waiter_cleanup, (VALUE)&fdw);
@@ -245,7 +226,8 @@ rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
VM_ASSERT((options & WNOHANG) == 0 &&
"WNOHANG should be handled in rb_waitpid");
- waiter_init(&pw.w, th);
+ pw.w.th = th;
+ pw.w.fibval = rb_fiber_current();
rb_iom_timer_add(&iom->timers, &pw.w.timer, timeout);
return -1;
diff --git a/thread.c b/thread.c
index 4ff8510100..0bffa9c022 100644
--- a/thread.c
+++ b/thread.c
@@ -3913,7 +3913,7 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
struct timespec *timeout = NULL;
rb_thread_t *th = GET_THREAD();
- if (rb_fiber_auto_sched_p(th)) {
+ if (0 && rb_fiber_auto_sched_p(th)) {
double *to = NULL;
double tout;
if (tv) {
diff --git a/vm.c b/vm.c
index 60b7bf070b..16d0c3d647 100644
--- a/vm.c
+++ b/vm.c
@@ -2513,7 +2513,7 @@ th_init(rb_thread_t *th, VALUE self)
th->ec.cfp = (void *)(th->ec.stack + th->ec.stack_size);
- list_head_init(&th->auto_fiberq);
+ list_head_init(&th->afhead);
vm_push_frame(th, 0 /* dummy iseq */, VM_FRAME_MAGIC_DUMMY | VM_ENV_FLAG_LOCAL | VM_FRAME_FLAG_FINISH | VM_FRAME_FLAG_CFRAME /* dummy frame */,
Qnil /* dummy self */, VM_BLOCK_HANDLER_NONE /* dummy block ptr */,
diff --git a/vm_core.h b/vm_core.h
index dade714b2a..05f72c3527 100644
--- a/vm_core.h
+++ b/vm_core.h
@@ -813,7 +813,7 @@ typedef struct rb_thread_struct {
rb_fiber_t *fiber;
rb_fiber_t *root_fiber;
rb_jmpbuf_t root_jmpbuf;
- struct list_head auto_fiberq;
+ struct list_head afhead; /* -rb_fiber_t.afnode */
/* ensure & callcc */
rb_ensure_list_t *ensure_list;
--
EW
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 5/8] ok
2017-05-18 22:30 [PATCH 1/8] wip-iom_select Eric Wong
` (2 preceding siblings ...)
2017-05-18 22:30 ` [PATCH 4/8] omg Eric Wong
@ 2017-05-18 22:30 ` Eric Wong
2017-05-18 22:30 ` [PATCH 6/8] wip Eric Wong
` (2 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2017-05-18 22:30 UTC (permalink / raw)
To: spew
---
cont.c | 5 -----
iom_select.h | 52 +++++++++++++++++++++++++++++-----------------------
thread.c | 2 +-
3 files changed, 30 insertions(+), 29 deletions(-)
diff --git a/cont.c b/cont.c
index 8282eabd81..7730ddd6e2 100644
--- a/cont.c
+++ b/cont.c
@@ -1524,7 +1524,6 @@ rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
rb_fiber_t *fib;
GetFiberPtr(fibval, fib);
- fprintf(stderr, "resume\n");
fiber_check_resume(fib);
return fiber_switch(fib, argc, argv, 1);
}
@@ -1676,7 +1675,6 @@ rb_fiber_auto_enqueue(VALUE fibval)
GetFiberPtr(fibval, fib);
GetThreadPtr(fib->cont.saved_thread.self, th);
- fprintf(stderr, "fibenq: %p\n", fib);
fiber_check_resume(fib);
list_add_tail(&th->afhead, &fib->afnode);
}
@@ -1690,10 +1688,8 @@ rb_fiber_auto_schedule(rb_thread_t *th)
list_for_each_safe(&th->afhead, fib, next, afnode) {
list_del_init(&fib->afnode);
if (fib == current) {
- write(2, "self\n", 5);
return;
}
- fprintf(stderr, "fibswitch: %p\n", fib);
fiber_check_resume(fib);
fiber_switch(fib, 0, NULL, 1);
}
@@ -1717,7 +1713,6 @@ rb_fiber_auto_set(VALUE self, VALUE val)
if (fib->afnode.next == NULL) {
rb_thread_t *th;
GetThreadPtr(fib->cont.saved_thread.self, th);
- fprintf(stderr, "fibset: %p\n", fib);
fiber_check_resume(fib);
list_add_tail(&th->afhead, &fib->afnode);
}
diff --git a/iom_select.h b/iom_select.h
index c178c8881e..cec1939c66 100644
--- a/iom_select.h
+++ b/iom_select.h
@@ -5,6 +5,7 @@
* epoll and kqueue versions
*/
#include "iom_common.h"
+#include "internal.h"
/* allocated on heap (rb_vm_t.iom) */
struct rb_iom_struct {
@@ -16,6 +17,7 @@ struct rb_iom_struct {
struct list_head fds; /* -rb_iom_fd_waiter.w.wnode, FIFO order */
struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
+ rb_serial_t gen;
rb_thread_t *selector; /* thread blocked on rb_thread_fd_select */
rb_fdset_t rfds;
rb_fdset_t wfds;
@@ -28,6 +30,7 @@ rb_iom_create(rb_thread_t *th)
{
rb_iom_t *iom = ALLOC(rb_iom_t);
+ iom->gen = 0;
iom->selector = NULL;
list_head_init(&iom->timers);
list_head_init(&iom->fds);
@@ -81,7 +84,7 @@ next_timeout(rb_iom_t *iom, struct timeval *tv)
}
static VALUE
-iom_select_begin(VALUE ptr)
+iom_select_do(VALUE ptr)
{
rb_thread_t *th = (rb_thread_t *)ptr;
rb_iom_t *iom = th->vm->iom;
@@ -89,8 +92,8 @@ iom_select_begin(VALUE ptr)
struct timeval tv, *tvp;
struct rb_iom_fd_waiter *fdw = NULL;
rb_fdset_t *rfds, *wfds, *efds;
-
rfds = wfds = efds = NULL;
+
rb_fd_init(&iom->rfds);
rb_fd_init(&iom->wfds);
rb_fd_init(&iom->efds);
@@ -124,7 +127,7 @@ iom_select_begin(VALUE ptr)
}
static VALUE
-iom_select_end(VALUE ptr)
+iom_select_done(VALUE ptr)
{
rb_thread_t *th = (rb_thread_t *)ptr;
rb_iom_t *iom = th->vm->iom;
@@ -170,29 +173,32 @@ iom_schedule_fd(VALUE ptr)
struct rb_iom_fd_waiter *fdw = (struct rb_iom_fd_waiter *)ptr;
rb_thread_t *th = fdw->w.th;
rb_iom_t *iom = th->vm->iom;
+ rb_thread_t *s;
- write(2, "a\n", 2);
+ /* kick off any other idle fibers, first */
rb_fiber_auto_schedule(th);
- write(2, "b\n", 2);
-
- while (fdw->w.fibval != Qfalse) {
- rb_thread_t *sched = iom->selector;
- if (!sched) {
- iom->selector = th;
- /*
- * nope, rb_iom_schedule did not do our work for us,
- * maybe we can wait on select(), ourselves:
- */
- rb_ensure(iom_select_begin, (VALUE)th, iom_select_end, (VALUE)th);
- }
- else {
- /* or kick whoever is running select into doing work for us */
- ubf_select(sched);
- rb_thread_schedule_limits(0);
- }
- rb_fiber_auto_schedule(th);
+ if (fdw->w.fibval == Qfalse) {
+ return rb_fiber_yield(0, NULL);
}
- return Qfalse;
+
+ /* rb_fiber_auto_schedule did not activate us... */
+ s = iom->selector;
+ if (s) {
+ iom->gen++;
+ /* get current selector to watch us */
+ ubf_select(s);
+ }
+ else {
+ /* go to sleep in select() ourselves */
+ rb_serial_t gen;
+ do {
+ gen = iom->gen;
+ iom->selector = th;
+ rb_ensure(iom_select_do, (VALUE)th, iom_select_done, (VALUE)th);
+ rb_fiber_auto_schedule(th);
+ } while (gen != iom->gen);
+ }
+ return rb_fiber_yield(0, NULL);
}
int
diff --git a/thread.c b/thread.c
index 0bffa9c022..4ff8510100 100644
--- a/thread.c
+++ b/thread.c
@@ -3913,7 +3913,7 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
struct timespec *timeout = NULL;
rb_thread_t *th = GET_THREAD();
- if (0 && rb_fiber_auto_sched_p(th)) {
+ if (rb_fiber_auto_sched_p(th)) {
double *to = NULL;
double tout;
if (tv) {
--
EW
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 6/8] wip
2017-05-18 22:30 [PATCH 1/8] wip-iom_select Eric Wong
` (3 preceding siblings ...)
2017-05-18 22:30 ` [PATCH 5/8] ok Eric Wong
@ 2017-05-18 22:30 ` Eric Wong
2017-05-18 22:30 ` [PATCH 7/8] wip-fib-join Eric Wong
2017-05-18 22:30 ` [PATCH 8/8] seems to work Eric Wong
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2017-05-18 22:30 UTC (permalink / raw)
To: spew
---
cont.c | 11 ++++++++++-
iom_select.h | 42 ++++++++++++++++++++++++++++++++----------
2 files changed, 42 insertions(+), 11 deletions(-)
diff --git a/cont.c b/cont.c
index 7730ddd6e2..52f34500c6 100644
--- a/cont.c
+++ b/cont.c
@@ -1684,10 +1684,19 @@ rb_fiber_auto_schedule(rb_thread_t *th)
{
rb_fiber_t *fib = NULL, *next = NULL;
rb_fiber_t *current = fiber_current();
+ struct list_head tmp;
- list_for_each_safe(&th->afhead, fib, next, afnode) {
+ /*
+ * do not infinite loop as new fibers get added to
+ * th->afhead, only work off a temporary list:
+ */
+ list_append_list(&tmp, &th->afhead);
+ list_for_each_safe(&tmp, fib, next, afnode) {
list_del_init(&fib->afnode);
+
+ /* resume current fiber naturally without switch */
if (fib == current) {
+ list_prepend_list(&th->afhead, &tmp);
return;
}
fiber_check_resume(fib);
diff --git a/iom_select.h b/iom_select.h
index cec1939c66..4da443de34 100644
--- a/iom_select.h
+++ b/iom_select.h
@@ -24,6 +24,15 @@ struct rb_iom_struct {
rb_fdset_t efds;
};
+
+struct wait_select {
+ struct list_node wsnode;
+ union {
+ rb_thread_t *th;
+ VALUE run_select; /* or yield */
+ } as;
+};
+
/* we lazily create this, small scripts may never need iom */
static rb_iom_t *
rb_iom_create(rb_thread_t *th)
@@ -177,28 +186,41 @@ iom_schedule_fd(VALUE ptr)
/* kick off any other idle fibers, first */
rb_fiber_auto_schedule(th);
+
if (fdw->w.fibval == Qfalse) {
+ /* inside th->afhead, wait for somebody to schedule us */
return rb_fiber_yield(0, NULL);
}
/* rb_fiber_auto_schedule did not activate us... */
+try_select:
s = iom->selector;
if (s) {
- iom->gen++;
- /* get current selector to watch us */
+ /* another thread is calling select(), kick them: */
+ struct wait_select ws;
+
+ ws.as.th = th;
+ list_add_tail(&iom->swaitq, &ws.wsnode);
ubf_select(s);
+ do {
+ rb_thread_sleep_deadly();
+ } while (ws.as.th == th);
+
+ if (ws.as.run_select == Qfalse) {
+ return rb_fiber_yield(0, NULL);
+ }
+ goto try_select;
}
else {
/* go to sleep in select() ourselves */
- rb_serial_t gen;
- do {
- gen = iom->gen;
- iom->selector = th;
- rb_ensure(iom_select_do, (VALUE)th, iom_select_done, (VALUE)th);
- rb_fiber_auto_schedule(th);
- } while (gen != iom->gen);
+ iom->selector = th;
+ rb_ensure(iom_select_do, (VALUE)th, iom_select_done, (VALUE)th);
+ list_for_each_safe(&iom->swaitq,
+
+ rb_fiber_auto_schedule(th);
+ if (fdw->w.fibval == Qfalse) {
+ return rb_fiber_yield(0, NULL);
}
- return rb_fiber_yield(0, NULL);
}
int
--
EW
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 7/8] wip-fib-join
2017-05-18 22:30 [PATCH 1/8] wip-iom_select Eric Wong
` (4 preceding siblings ...)
2017-05-18 22:30 ` [PATCH 6/8] wip Eric Wong
@ 2017-05-18 22:30 ` Eric Wong
2017-05-18 22:30 ` [PATCH 8/8] seems to work Eric Wong
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2017-05-18 22:30 UTC (permalink / raw)
To: spew
require 'net/http'
require 'uri'
require 'digest/sha1'
require 'fiber'
url = 'http://80x24.org/git-i-forgot-to-pack/objects/pack/pack-97b25a76c03b489d4cbbd85b12d0e1ad28717e55.pack'
url = 'http://80x24.org/git-i-forgot-to-pack/objects/pack/pack-97b25a76c03b489d4cbbd85b12d0e1ad28717e55.idx'
uri = URI(url)
use_ssl = "https" == uri.scheme
fibs = 10.times.map do
Fiber.new do
cur = Fiber.current.object_id
Net::HTTP.start(uri.host, uri.port, use_ssl: use_ssl) do |http|
req = Net::HTTP::Get.new(uri)
http.request(req) do |res|
dig = Digest::SHA1.new
res.read_body do |buf|
dig.update(buf)
#warn "#{cur} #{buf.bytesize}\n"
end
warn "#{cur} #{dig.hexdigest}\n"
end
end
:done
end
end
fibs.each { |f| f.auto = true }
while true
fibs.each { |f| f.join }
fibs.keep_if(&:alive?)
end until fibs.empty?
---
configure.in | 2 +-
cont.c | 71 +++++++++++++++++++++++++---
iom_common.h | 4 +-
iom_select.h | 149 ++++++++++++++++++++++++++++++++++++++++++++---------------
vm.c | 5 +-
vm_core.h | 2 -
6 files changed, 182 insertions(+), 51 deletions(-)
diff --git a/configure.in b/configure.in
index cc008dbebf..9c5def88ed 100644
--- a/configure.in
+++ b/configure.in
@@ -523,7 +523,7 @@ AS_CASE(["$target_os"],
AC_SUBST(LD)
if test "$GCC" = yes; then
linker_flag=-Wl,
- : ${optflags=-O3}
+ : ${optflags=-O0}
gcc_major=`echo =__GNUC__ | $CC -E -xc - | sed '/^=/!d;s///'`
gcc_minor=`echo =__GNUC_MINOR__ | $CC -E -xc - | sed '/^=/!d;s///'`
test -n "$gcc_major" || gcc_major=0
diff --git a/cont.c b/cont.c
index 52f34500c6..bf06eac355 100644
--- a/cont.c
+++ b/cont.c
@@ -1524,6 +1524,10 @@ rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
rb_fiber_t *fib;
GetFiberPtr(fibval, fib);
+ if (fib->afnode.next) {
+ list_del_init(&fib->afnode);
+ }
+ /* fprintf(stderr, "resume: %p\n", fib); */
fiber_check_resume(fib);
return fiber_switch(fib, argc, argv, 1);
}
@@ -1675,33 +1679,47 @@ rb_fiber_auto_enqueue(VALUE fibval)
GetFiberPtr(fibval, fib);
GetThreadPtr(fib->cont.saved_thread.self, th);
- fiber_check_resume(fib);
+ /* fprintf(stderr, "autoenq: %p\n", fib); */
list_add_tail(&th->afhead, &fib->afnode);
}
void
+rb_fiber_auto_schedule_mark(const rb_thread_t *th)
+{
+ rb_fiber_t *fib = NULL;
+ list_for_each(&th->afhead, fib, afnode) {
+ rb_gc_mark(fib->cont.self);
+ }
+}
+
+VALUE
rb_fiber_auto_schedule(rb_thread_t *th)
{
rb_fiber_t *fib = NULL, *next = NULL;
rb_fiber_t *current = fiber_current();
- struct list_head tmp;
+ LIST_HEAD(tmp);
/*
* do not infinite loop as new fibers get added to
* th->afhead, only work off a temporary list:
*/
list_append_list(&tmp, &th->afhead);
+ /* fprintf(stderr, "current: %p\n", current); */
+ /* list_for_each_safe(&tmp, fib, next, afnode) { */
+ /* fprintf(stderr, "fib: %p\n", fib); */
+ /* } */
list_for_each_safe(&tmp, fib, next, afnode) {
list_del_init(&fib->afnode);
- /* resume current fiber naturally without switch */
if (fib == current) {
list_prepend_list(&th->afhead, &tmp);
- return;
+ return Qtrue;
}
+ /* fprintf(stderr, "autosched: %p\n", fib); */
fiber_check_resume(fib);
fiber_switch(fib, 0, NULL, 1);
}
+ return Qfalse;
}
static VALUE
@@ -1720,10 +1738,9 @@ rb_fiber_auto_set(VALUE self, VALUE val)
GetFiberPtr(self, fib);
if (RTEST(val)) {
if (fib->afnode.next == NULL) {
- rb_thread_t *th;
- GetThreadPtr(fib->cont.saved_thread.self, th);
+ list_head_init(&fib->afnode);
fiber_check_resume(fib);
- list_add_tail(&th->afhead, &fib->afnode);
+ return fiber_switch(fib, 0, NULL, 1);
}
return Qtrue;
}
@@ -1736,6 +1753,45 @@ rb_fiber_auto_set(VALUE self, VALUE val)
}
}
+void rb_iom_schedule(rb_thread_t *);
+
+static VALUE
+fiber_join(rb_fiber_t *fib, double delay)
+{
+ rb_thread_t *th = GET_THREAD();
+ rb_fiber_t *cur = fiber_current();
+
+ if (cur == fib) {
+ rb_raise(rb_eFiberError, "Target fiber must not be current fiber");
+ }
+ if (th->root_fiber == fib) {
+ rb_raise(rb_eFiberError, "Target fiber must not be root fiber");
+ }
+ if (fib->cont.saved_thread.self != th->self) {
+ rb_raise(rb_eFiberError, "Target fiber not owned by current thread");
+ }
+ while (fib->status != TERMINATED) {
+ rb_iom_schedule(th);
+ }
+ return fib->cont.self;
+}
+
+static VALUE
+rb_fiber_join(int argc, VALUE *argv, VALUE self)
+{
+ rb_fiber_t *fib;
+ double delay = -1;
+ VALUE limit;
+
+ GetFiberPtr(self, fib);
+ rb_scan_args(argc, argv, "01", &limit);
+ if (!NIL_P(limit)) {
+ delay = rb_num2dbl(limit);
+ }
+
+ return fiber_join(fib, delay);
+}
+
int
rb_fiber_auto_sched_p(const rb_thread_t *th)
{
@@ -1781,6 +1837,7 @@ Init_Cont(void)
rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1);
rb_define_method(rb_cFiber, "auto", rb_fiber_auto_get, 0);
rb_define_method(rb_cFiber, "auto=", rb_fiber_auto_set, 1);
+ rb_define_method(rb_cFiber, "join", rb_fiber_join, -1);
}
RUBY_SYMBOL_EXPORT_BEGIN
diff --git a/iom_common.h b/iom_common.h
index c70b45499f..5a586100b3 100644
--- a/iom_common.h
+++ b/iom_common.h
@@ -5,7 +5,7 @@
/* cont.c */
void rb_fiber_auto_enqueue(VALUE fibval);
-void rb_fiber_auto_schedule(rb_thread_t *);
+VALUE rb_fiber_auto_schedule(rb_thread_t *);
/* allocated on stack */
struct rb_iom_timer {
@@ -17,7 +17,7 @@ struct rb_iom_waiter {
struct list_node wnode; /* <=> rb_iom_struct.(fds|pids) */
struct rb_iom_timer timer;
rb_thread_t *th;
- VALUE fibval;
+ VALUE fibval; /* Qfalse: enqueued in afhead */
};
struct rb_iom_fd_waiter {
diff --git a/iom_select.h b/iom_select.h
index 4da443de34..a42643d73d 100644
--- a/iom_select.h
+++ b/iom_select.h
@@ -16,19 +16,20 @@ struct rb_iom_struct {
struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
struct list_head fds; /* -rb_iom_fd_waiter.w.wnode, FIFO order */
struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
+ struct list_head swaitq; /* select_wait.swnode */
- rb_serial_t gen;
+ rb_serial_t select_start;
+ rb_serial_t select_gen;
rb_thread_t *selector; /* thread blocked on rb_thread_fd_select */
rb_fdset_t rfds;
rb_fdset_t wfds;
rb_fdset_t efds;
};
-
-struct wait_select {
- struct list_node wsnode;
+struct select_wait {
+ struct list_node swnode;
union {
- rb_thread_t *th;
+ VALUE thval;
VALUE run_select; /* or yield */
} as;
};
@@ -39,11 +40,13 @@ rb_iom_create(rb_thread_t *th)
{
rb_iom_t *iom = ALLOC(rb_iom_t);
- iom->gen = 0;
+ iom->select_start = 0;
+ iom->select_gen = 0;
iom->selector = NULL;
list_head_init(&iom->timers);
list_head_init(&iom->fds);
list_head_init(&iom->pids);
+ list_head_init(&iom->swaitq);
return iom;
}
@@ -76,7 +79,9 @@ fd_waiter_cleanup(VALUE ptr)
static struct timeval *
next_timeout(rb_iom_t *iom, struct timeval *tv)
{
- struct rb_iom_timer *t = list_top(&iom->timers, struct rb_iom_timer, tnode);
+ struct rb_iom_timer *t;
+
+ t = list_top(&iom->timers, struct rb_iom_timer, tnode);
if (t) {
double diff = t->expires_at - timeofday();
@@ -141,6 +146,9 @@ iom_select_done(VALUE ptr)
rb_thread_t *th = (rb_thread_t *)ptr;
rb_iom_t *iom = th->vm->iom;
struct rb_iom_fd_waiter *fdw = NULL, *next = NULL;
+ struct select_wait *sw;
+ VALUE thselect;
+ VALUE current = rb_fiber_current();
iom->selector = NULL;
list_for_each_safe(&iom->fds, fdw, next, w.wnode) {
@@ -165,7 +173,9 @@ iom_select_done(VALUE ptr)
fdw->w.fibval = Qfalse;
list_del_init(&fdw->w.timer.tnode);
list_del_init(&fdw->w.wnode);
- rb_fiber_auto_enqueue(fibval);
+ if (fibval != current) {
+ rb_fiber_auto_enqueue(fibval);
+ }
}
}
@@ -173,53 +183,99 @@ iom_select_done(VALUE ptr)
rb_fd_term(&iom->wfds);
rb_fd_term(&iom->efds);
+ /*
+ * did we have anybody write to `fds` while we were inside select?
+ * if so, designate the last `fds` writer to call select() again:
+ */
+ do {
+ sw = list_tail(&iom->swaitq, struct select_wait, swnode);
+
+ if (!sw) {
+ return Qfalse;
+ }
+ /* designate the next select() thread: */
+ thselect = sw->as.thval;
+ list_del_init(&sw->swnode);
+ sw->as.run_select = Qtrue;
+ /* it's gotta be alive to do run select(), otherwise find another */
+ } while (NIL_P(rb_thread_wakeup_alive(thselect)));
+
+ if (sw) {
+ struct select_wait *swnext = NULL;
+
+ /* everybody else will do rb_fiber_yield: */
+ list_for_each_safe(&iom->swaitq, sw, swnext, swnode) {
+ VALUE thyield = sw->as.thval;
+ list_del_init(&sw->swnode);
+ sw->as.run_select = Qfalse;
+ rb_thread_wakeup_alive(thyield);
+ }
+ }
+
+ return Qfalse;
+}
+
+static VALUE
+select_wait_sleep(VALUE ptr)
+{
+ struct select_wait *sw = (struct select_wait *)ptr;
+ VALUE th = sw->as.thval;
+
+ do {
+ rb_thread_sleep_deadly();
+ } while (sw->as.thval == th);
+ return Qfalse;
+}
+
+static VALUE
+select_wait_done(VALUE ptr)
+{
+ struct select_wait *sw = (struct select_wait *)ptr;
+ list_del(&sw->swnode);
return Qfalse;
}
static VALUE
iom_schedule_fd(VALUE ptr)
{
- struct rb_iom_fd_waiter *fdw = (struct rb_iom_fd_waiter *)ptr;
- rb_thread_t *th = fdw->w.th;
- rb_iom_t *iom = th->vm->iom;
- rb_thread_t *s;
+ return rb_fiber_yield(0, NULL);
+}
- /* kick off any other idle fibers, first */
+void
+rb_iom_schedule(rb_thread_t *th)
+{
+ rb_iom_t *iom = rb_iom_get(th);
+ rb_thread_t *s;
rb_fiber_auto_schedule(th);
+ /* fprintf(stderr, "empty? %d\n", !!list_empty(&th->afhead)); */
- if (fdw->w.fibval == Qfalse) {
- /* inside th->afhead, wait for somebody to schedule us */
- return rb_fiber_yield(0, NULL);
- }
-
- /* rb_fiber_auto_schedule did not activate us... */
try_select:
s = iom->selector;
- if (s) {
- /* another thread is calling select(), kick them: */
- struct wait_select ws;
-
- ws.as.th = th;
- list_add_tail(&iom->swaitq, &ws.wsnode);
- ubf_select(s);
- do {
- rb_thread_sleep_deadly();
- } while (ws.as.th == th);
-
- if (ws.as.run_select == Qfalse) {
- return rb_fiber_yield(0, NULL);
+ if (s) { /* somebody else is running select() */
+ /*
+ * if our watch set changed after select() started,
+ * we need to kick it and restart with the new set
+ */
+ int need_kick = iom->select_start != iom->select_gen;
+ struct select_wait sw;
+
+ sw.as.thval = th->self;
+ list_add_tail(&iom->swaitq, &sw.swnode);
+
+ if (need_kick) {
+ ubf_select(s);
+ }
+ rb_ensure(select_wait_sleep, (VALUE)&sw, select_wait_done, (VALUE)&sw);
+ if (need_kick) {
+ goto try_select;
}
- goto try_select;
}
else {
- /* go to sleep in select() ourselves */
iom->selector = th;
+ iom->select_start = iom->select_gen;
rb_ensure(iom_select_do, (VALUE)th, iom_select_done, (VALUE)th);
- list_for_each_safe(&iom->swaitq,
-
+ rb_iom_timer_check(&iom->timers);
rb_fiber_auto_schedule(th);
- if (fdw->w.fibval == Qfalse) {
- return rb_fiber_yield(0, NULL);
}
}
@@ -235,8 +291,16 @@ rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
fdw.revents = 0;
fdw.w.th = th;
fdw.w.fibval = rb_fiber_current();
+
+ /*
+ * order probably doesn't matter for FDs, but maybe LIFO is better
+ * since later Fibers tend to have higher FDs, and putting higher
+ * FDs first means rb_fd_set() needs to do fewer reallocs
+ */
list_add(&iom->fds, &fdw.w.wnode);
+
rb_iom_timer_add(&iom->timers, &fdw.w.timer, timeout);
+ iom->select_gen++;
rb_ensure(iom_schedule_fd, (VALUE)&fdw, fd_waiter_cleanup, (VALUE)&fdw);
if (*fdp < 0) return -1;
@@ -254,9 +318,18 @@ rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
VM_ASSERT((options & WNOHANG) == 0 &&
"WNOHANG should be handled in rb_waitpid");
+ pw.pid = pid;
+ pw.options = options;
pw.w.th = th;
pw.w.fibval = rb_fiber_current();
rb_iom_timer_add(&iom->timers, &pw.w.timer, timeout);
+ /* LIFO, matches Linux wait4() behavior */
+ list_add(&iom->pids, &pw.w.wnode);
+
+ /* do not touch select_gen, this has nothing to do with select() */
+
+ /* rb_ensure(iom_schedule_pid, (VALUE)&pw, pid_waiter_cleanup, (VALUE)&pw); */
+
return -1;
}
diff --git a/vm.c b/vm.c
index 16d0c3d647..90a24b4a9d 100644
--- a/vm.c
+++ b/vm.c
@@ -2345,6 +2345,7 @@ rb_thread_recycle_stack_release(VALUE *stack)
}
void rb_fiber_mark_self(rb_fiber_t *fib);
+void rb_fiber_auto_schedule_mark(const rb_thread_t *th);
void
rb_thread_mark(void *ptr)
@@ -2387,6 +2388,9 @@ rb_thread_mark(void *ptr)
RUBY_MARK_UNLESS_NULL(th->top_wrapper);
rb_fiber_mark_self(th->fiber);
rb_fiber_mark_self(th->root_fiber);
+ if (th->afhead.n.next && !list_empty(&th->afhead)) {
+ rb_fiber_auto_schedule_mark(th);
+ }
RUBY_MARK_UNLESS_NULL(th->stat_insn_usage);
RUBY_MARK_UNLESS_NULL(th->last_status);
@@ -3105,7 +3109,6 @@ Init_BareVM(void)
}
MEMZERO(th, rb_thread_t, 1);
rb_thread_set_current_raw(th);
-
vm_init2(vm);
vm->objspace = rb_objspace_alloc();
ruby_current_vm = vm;
diff --git a/vm_core.h b/vm_core.h
index 05f72c3527..c74314a743 100644
--- a/vm_core.h
+++ b/vm_core.h
@@ -491,9 +491,7 @@ typedef struct rb_vm_struct {
struct rb_thread_struct *main_thread;
struct rb_thread_struct *running_thread;
-
struct rb_iom_struct *iom;
-
struct list_head living_threads;
size_t living_thread_num;
VALUE thgroup_default;
--
EW
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 8/8] seems to work...
2017-05-18 22:30 [PATCH 1/8] wip-iom_select Eric Wong
` (5 preceding siblings ...)
2017-05-18 22:30 ` [PATCH 7/8] wip-fib-join Eric Wong
@ 2017-05-18 22:30 ` Eric Wong
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2017-05-18 22:30 UTC (permalink / raw)
To: spew
require 'net/http'
require 'uri'
require 'digest/sha1'
require 'fiber'
url = 'http://80x24.org/git-i-forgot-to-pack/objects/pack/pack-97b25a76c03b489d4cbbd85b12d0e1ad28717e55.idx'
uri = URI(url)
use_ssl = "https" == uri.scheme
fibs = 30.times.map do
Fiber.new do
cur = Fiber.current.object_id
Net::HTTP.start(uri.host, uri.port, use_ssl: use_ssl) do |http|
req = Net::HTTP::Get.new(uri)
http.request(req) do |res|
dig = Digest::SHA1.new
res.read_body do |buf|
dig.update(buf)
#warn "#{cur} #{buf.bytesize}\n"
end
warn "#{cur} #{dig.hexdigest}\n"
end
end
warn "done\n"
:done
end
end
fibs.each { |f| f.auto = true }
fibs[0].join
warn "the rest\n"
until fibs.empty?
fibs.each { |f| f.join }
fibs.keep_if(&:alive?)
warn fibs.inspect
end
---
cont.c | 63 ++++++++++++++++-----------------
iom_common.h | 8 ++---
iom_select.h | 112 ++++++++++++++++++++++++++++++++++++++++++-----------------
3 files changed, 116 insertions(+), 67 deletions(-)
diff --git a/cont.c b/cont.c
index bf06eac355..4d4927a0fa 100644
--- a/cont.c
+++ b/cont.c
@@ -128,7 +128,7 @@ struct rb_fiber_struct {
struct rb_fiber_struct *prev;
/*
- * afnode.next == NULL auto fiber disabled
+ * afnode.next == 0 auto fiber disabled
* afnode.next == &afnode auto fiber enabled, but not enqueued (running)
* afnode.next != &afnode auto fiber runnable (enqueued in th->afhead)
*/
@@ -1527,7 +1527,6 @@ rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
if (fib->afnode.next) {
list_del_init(&fib->afnode);
}
- /* fprintf(stderr, "resume: %p\n", fib); */
fiber_check_resume(fib);
return fiber_switch(fib, argc, argv, 1);
}
@@ -1679,24 +1678,36 @@ rb_fiber_auto_enqueue(VALUE fibval)
GetFiberPtr(fibval, fib);
GetThreadPtr(fib->cont.saved_thread.self, th);
- /* fprintf(stderr, "autoenq: %p\n", fib); */
list_add_tail(&th->afhead, &fib->afnode);
}
void
rb_fiber_auto_schedule_mark(const rb_thread_t *th)
{
- rb_fiber_t *fib = NULL;
+ rb_fiber_t *fib = 0;
list_for_each(&th->afhead, fib, afnode) {
rb_gc_mark(fib->cont.self);
}
}
-VALUE
-rb_fiber_auto_schedule(rb_thread_t *th)
+/* Returns true if auto-fiber is enabled for current fiber */
+int
+rb_fiber_auto_sched_p(const rb_thread_t *th)
+{
+ const rb_fiber_t *cur = th->fiber;
+
+ return (cur && cur->afnode.next && th->root_fiber != cur);
+}
+
+/*
+ * Resumes any ready fibers in the auto-Fiber run queue
+ * returns true if yielding current Fiber is needed, false if not
+ */
+int
+rb_fiber_auto_do_yield_p(rb_thread_t *th)
{
- rb_fiber_t *fib = NULL, *next = NULL;
- rb_fiber_t *current = fiber_current();
+ rb_fiber_t *current_auto = rb_fiber_auto_sched_p(th) ? th->fiber : 0;
+ rb_fiber_t *fib = 0, *next = 0;
LIST_HEAD(tmp);
/*
@@ -1704,22 +1715,17 @@ rb_fiber_auto_schedule(rb_thread_t *th)
* th->afhead, only work off a temporary list:
*/
list_append_list(&tmp, &th->afhead);
- /* fprintf(stderr, "current: %p\n", current); */
- /* list_for_each_safe(&tmp, fib, next, afnode) { */
- /* fprintf(stderr, "fib: %p\n", fib); */
- /* } */
list_for_each_safe(&tmp, fib, next, afnode) {
- list_del_init(&fib->afnode);
-
- if (fib == current) {
+ if (fib == current_auto || (fib->prev && fib != th->root_fiber)) {
+ /* tell the caller to yield */
list_prepend_list(&th->afhead, &tmp);
- return Qtrue;
+ return 1;
}
- /* fprintf(stderr, "autosched: %p\n", fib); */
+ list_del_init(&fib->afnode);
fiber_check_resume(fib);
- fiber_switch(fib, 0, NULL, 1);
+ fiber_switch(fib, 0, 0, 1);
}
- return Qfalse;
+ return 0;
}
static VALUE
@@ -1737,17 +1743,20 @@ rb_fiber_auto_set(VALUE self, VALUE val)
rb_fiber_t *fib;
GetFiberPtr(self, fib);
if (RTEST(val)) {
- if (fib->afnode.next == NULL) {
- list_head_init(&fib->afnode);
+ if (fib->afnode.next == 0) {
+ /* rb_thread_t *th = GET_THREAD(); */
+ /* fprintf(stderr, "auto=: %p\n", fib); */
+ /* list_add_tail(&th->afhead, &fib->afnode); */
+ list_node_init(&fib->afnode);
fiber_check_resume(fib);
- return fiber_switch(fib, 0, NULL, 1);
+ return fiber_switch(fib, 0, 0, 1);
}
return Qtrue;
}
else {
if (fib->afnode.next) {
list_del_init(&fib->afnode);
- fib->afnode.next = NULL;
+ fib->afnode.next = 0;
}
return Qfalse;
}
@@ -1792,14 +1801,6 @@ rb_fiber_join(int argc, VALUE *argv, VALUE self)
return fiber_join(fib, delay);
}
-int
-rb_fiber_auto_sched_p(const rb_thread_t *th)
-{
- const rb_fiber_t *cur = th->fiber;
-
- return (cur && cur->afnode.next && th->root_fiber != cur);
-}
-
/*
* Document-class: FiberError
*
diff --git a/iom_common.h b/iom_common.h
index 5a586100b3..e832944ff4 100644
--- a/iom_common.h
+++ b/iom_common.h
@@ -5,7 +5,7 @@
/* cont.c */
void rb_fiber_auto_enqueue(VALUE fibval);
-VALUE rb_fiber_auto_schedule(rb_thread_t *);
+int rb_fiber_auto_do_yield_p(rb_thread_t *);
/* allocated on stack */
struct rb_iom_timer {
@@ -14,10 +14,10 @@ struct rb_iom_timer {
};
struct rb_iom_waiter {
- struct list_node wnode; /* <=> rb_iom_struct.(fds|pids) */
struct rb_iom_timer timer;
rb_thread_t *th;
VALUE fibval; /* Qfalse: enqueued in afhead */
+ struct list_node wnode; /* <=> rb_iom_struct.(fds|pids) */
};
struct rb_iom_fd_waiter {
@@ -38,7 +38,7 @@ struct rb_iom_pid_waiter {
static void
rb_iom_timer_check(struct list_head *timers)
{
- struct rb_iom_timer *i = NULL, *next = NULL;
+ struct rb_iom_timer *i = 0, *next = 0;
double now = timeofday();
list_for_each_safe(timers, i, next, tnode) {
@@ -64,7 +64,7 @@ rb_iom_timer_add(struct list_head *timers, struct rb_iom_timer *add,
{
rb_iom_timer_check(timers);
if (timeout) {
- struct rb_iom_timer *i = NULL;
+ struct rb_iom_timer *i = 0;
add->expires_at = timeofday() + *timeout;
/*
diff --git a/iom_select.h b/iom_select.h
index a42643d73d..4913b4e378 100644
--- a/iom_select.h
+++ b/iom_select.h
@@ -16,16 +16,21 @@ struct rb_iom_struct {
struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
struct list_head fds; /* -rb_iom_fd_waiter.w.wnode, FIFO order */
struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
+
+ /* threads waiting for selector thread to finish */
struct list_head swaitq; /* select_wait.swnode */
- rb_serial_t select_start;
- rb_serial_t select_gen;
- rb_thread_t *selector; /* thread blocked on rb_thread_fd_select */
+ rb_serial_t select_start; /* WR protected by GVL (no need to protect RD) */
+ rb_serial_t select_gen; /* RDWR protected by GVL */
+ rb_thread_t *selector; /* rb_thread_fd_select owner (acts as lock w/ GVL */
+
+ /* these could be on stack, but they're huge on some platforms: */
rb_fdset_t rfds;
rb_fdset_t wfds;
rb_fdset_t efds;
};
+/* allocated on stack */
struct select_wait {
struct list_node swnode;
union {
@@ -42,7 +47,7 @@ rb_iom_create(rb_thread_t *th)
iom->select_start = 0;
iom->select_gen = 0;
- iom->selector = NULL;
+ iom->selector = 0;
list_head_init(&iom->timers);
list_head_init(&iom->fds);
list_head_init(&iom->pids);
@@ -94,7 +99,7 @@ next_timeout(rb_iom_t *iom, struct timeval *tv)
}
return tv;
}
- return NULL;
+ return 0;
}
static VALUE
@@ -104,10 +109,15 @@ iom_select_do(VALUE ptr)
rb_iom_t *iom = th->vm->iom;
int max = -1;
struct timeval tv, *tvp;
- struct rb_iom_fd_waiter *fdw = NULL;
+ struct rb_iom_fd_waiter *fdw = 0;
rb_fdset_t *rfds, *wfds, *efds;
- rfds = wfds = efds = NULL;
+ if (list_empty(&iom->fds) && list_empty(&iom->timers)) {
+ rb_thread_schedule();
+ return Qfalse;
+ }
+
+ rfds = wfds = efds = 0;
rb_fd_init(&iom->rfds);
rb_fd_init(&iom->wfds);
rb_fd_init(&iom->efds);
@@ -145,12 +155,11 @@ iom_select_done(VALUE ptr)
{
rb_thread_t *th = (rb_thread_t *)ptr;
rb_iom_t *iom = th->vm->iom;
- struct rb_iom_fd_waiter *fdw = NULL, *next = NULL;
+ struct rb_iom_fd_waiter *fdw = 0, *next = 0;
struct select_wait *sw;
VALUE thselect;
- VALUE current = rb_fiber_current();
- iom->selector = NULL;
+ iom->selector = 0;
list_for_each_safe(&iom->fds, fdw, next, w.wnode) {
int fd = *fdw->fdp;
if (fd < 0) {
@@ -173,9 +182,7 @@ iom_select_done(VALUE ptr)
fdw->w.fibval = Qfalse;
list_del_init(&fdw->w.timer.tnode);
list_del_init(&fdw->w.wnode);
- if (fibval != current) {
- rb_fiber_auto_enqueue(fibval);
- }
+ rb_fiber_auto_enqueue(fibval);
}
}
@@ -201,12 +208,11 @@ iom_select_done(VALUE ptr)
} while (NIL_P(rb_thread_wakeup_alive(thselect)));
if (sw) {
- struct select_wait *swnext = NULL;
+ struct select_wait *swnext = 0;
/* everybody else will do rb_fiber_yield: */
list_for_each_safe(&iom->swaitq, sw, swnext, swnode) {
VALUE thyield = sw->as.thval;
- list_del_init(&sw->swnode);
sw->as.run_select = Qfalse;
rb_thread_wakeup_alive(thyield);
}
@@ -235,19 +241,14 @@ select_wait_done(VALUE ptr)
return Qfalse;
}
-static VALUE
-iom_schedule_fd(VALUE ptr)
-{
- return rb_fiber_yield(0, NULL);
-}
-
void
rb_iom_schedule(rb_thread_t *th)
{
rb_iom_t *iom = rb_iom_get(th);
rb_thread_t *s;
- rb_fiber_auto_schedule(th);
- /* fprintf(stderr, "empty? %d\n", !!list_empty(&th->afhead)); */
+ if (rb_fiber_auto_do_yield_p(th)) {
+ rb_fiber_yield(0, 0);
+ }
try_select:
s = iom->selector;
@@ -275,8 +276,57 @@ try_select:
iom->select_start = iom->select_gen;
rb_ensure(iom_select_do, (VALUE)th, iom_select_done, (VALUE)th);
rb_iom_timer_check(&iom->timers);
- rb_fiber_auto_schedule(th);
+ if (rb_fiber_auto_do_yield_p(th)) {
+ rb_fiber_yield(0, 0);
+ }
+ }
+}
+
+static VALUE
+iom_schedule_fd(VALUE ptr)
+{
+ struct rb_iom_fd_waiter *fdw = (struct rb_iom_fd_waiter *)ptr;
+ rb_thread_t *th = fdw->w.th;
+ rb_iom_t *iom = rb_iom_get(th);
+ rb_thread_t *s;
+
+ if (rb_fiber_auto_do_yield_p(th)) {
+ return rb_fiber_yield(0, 0);
+ }
+
+try_select:
+ s = iom->selector;
+ if (s) { /* somebody else is running select() */
+ /*
+ * if our watch set changed after select() started,
+ * we need to kick it and restart with the new set
+ */
+ struct select_wait sw;
+
+ return rb_fiber_yield(0, 0);
+ sw.as.thval = th->self;
+ list_add_tail(&iom->swaitq, &sw.swnode);
+ ubf_select(s);
+ rb_ensure(select_wait_sleep, (VALUE)&sw, select_wait_done, (VALUE)&sw);
+ if (sw.as.run_select == Qfalse) {
+ return rb_fiber_yield(0, 0);
+ }
+ goto try_select;
+ }
+ else {
+ iom->selector = th;
+ iom->select_start = iom->select_gen;
+ rb_ensure(iom_select_do, (VALUE)th, iom_select_done, (VALUE)th);
+ rb_iom_timer_check(&iom->timers);
+ if (rb_fiber_auto_do_yield_p(th)) {
+ return rb_fiber_yield(0, 0);
+ }
+ }
+ return rb_fiber_yield(0, 0);
+ if (fdw->w.fibval != Qfalse && th->fiber != th->root_fiber) {
+ return rb_fiber_yield(0, 0);
}
+ return Qfalse;
}
int
@@ -292,12 +342,8 @@ rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
fdw.w.th = th;
fdw.w.fibval = rb_fiber_current();
- /*
- * order probably doesn't matter for FDs, but maybe LIFO is better
- * since later Fibers tend to have higher FDs, and putting higher
- * FDs first means rb_fd_set() needs to do fewer reallocs
- */
- list_add(&iom->fds, &fdw.w.wnode);
+ /* use FIFO order for fairness in iom_select_done */
+ list_add_tail(&iom->fds, &fdw.w.wnode);
rb_iom_timer_add(&iom->timers, &fdw.w.timer, timeout);
iom->select_gen++;
@@ -324,12 +370,14 @@ rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
pw.w.fibval = rb_fiber_current();
rb_iom_timer_add(&iom->timers, &pw.w.timer, timeout);
- /* LIFO, matches Linux wait4() behavior */
+ /* LIFO, to match Linux wait4() blocking behavior */
list_add(&iom->pids, &pw.w.wnode);
/* do not touch select_gen, this has nothing to do with select() */
- /* rb_ensure(iom_schedule_pid, (VALUE)&pw, pid_waiter_cleanup, (VALUE)&pw); */
+ /* TODO
+ * rb_ensure(iom_schedule_pid, (VALUE)&pw, pid_waiter_cleanup, (VALUE)&pw);
+ */
return -1;
}
--
EW
^ permalink raw reply related [flat|nested] 8+ messages in thread
end of thread, other threads:[~2017-05-18 22:31 UTC | newest]
Thread overview: 8+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2017-05-18 22:30 [PATCH 1/8] wip-iom_select Eric Wong
2017-05-18 22:30 ` [PATCH 2/8] compiles: Eric Wong
2017-05-18 22:30 ` [PATCH 3/8] wip Eric Wong
2017-05-18 22:30 ` [PATCH 4/8] omg Eric Wong
2017-05-18 22:30 ` [PATCH 5/8] ok Eric Wong
2017-05-18 22:30 ` [PATCH 6/8] wip Eric Wong
2017-05-18 22:30 ` [PATCH 7/8] wip-fib-join Eric Wong
2017-05-18 22:30 ` [PATCH 8/8] seems to work Eric Wong
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).