* [PATCH 1/7] auto fiber scheduling for I/O
@ 2017-05-30 19:03 Eric Wong
2017-05-30 19:03 ` [PATCH 2/7] iom: implement waitpid generically Eric Wong
` (5 more replies)
0 siblings, 6 replies; 7+ messages in thread
From: Eric Wong @ 2017-05-30 19:03 UTC (permalink / raw)
To: spew
Currently; basic IO scheduling works with a single FD per Fiber
works. The Ruby API changes for Fiber are named after existing
Thread methods.
main Ruby API:
Fiber#start -> enable auto-scheduling and run Fiber until it
automatically yields (due to EAGAIN/EWOULDBLOCK)
The following behave like their Thread counterparts:
Fiber.start - Fiber.new + Fiber#start (prelude.rb)
Fiber#join - run internal scheduler until Fiber is terminated
Fiber#value - ditto
Fiber#run - like Fiber#start (prelude.rb)
Right now, it takes over rb_wait_for_single_fd() function
if the running Fiber is auto-enabled (cont.c::rb_fiber_auto_sched_p)
Changes to existing functions are minimal.
New files (all new structs and relations should be documented):
iom.h - internal API for the rest of RubyVM (incomplete?)
iom_internal.h - internal header for iom_(select|epoll|kqueue).h
iom_epoll.h - epoll-specific pieces
iom_kqueue.h - kqueue-specific pieces
iom_select.h - select()-specific pieces
iom_pingable_common.h - common code for iom_(epoll|kqueue).h
iom_common.h - common footer for iom_(select|epoll|kqueue).h
Changes to existing data structures:
rb_thread_t.afhead - list of fibers to auto-resume
rb_fiber_t.afnode - link to rb_thread_t->afhead
rb_vm_t.iom - Ruby I/O Manager (rb_iom_t) :)
Besides rb_iom_t, all the new structs are stack-only and relies
extensively on ccan/list for branch-less O(1) insert/delete.
As usual, understanding the data structures first should help
you understand the code.
Right now, I reuse some static functions in thread.c,
so thread.c includes iom_(select|epoll|kqueue).h
TODO:
Hijack other blocking functions (waitpid, IO.select, ...)
I am using "double" for timeout since it is more convenient for
arithmetic like parts of thread.c. Most platforms have good FP,
I think. Also, all "blocking" functions (rb_iom_wait*) will
have timeout support.
kqueue support can be tested portably with libkqueue:
./configure cflags="$(pkg-config --cflags libkqueue)" \
LIBS="$(pkg-config --libs libkqueue)"
./configure gains a new --with-iom=(select|epoll|kqueue) switch
With libkqueue installed, I can test all 3 iom implementations on
Debian 8.x
Test script I used to download a file from my server:
----8<---
require 'net/http'
require 'uri'
require 'digest/sha1'
require 'fiber'
url = 'http://80x24.org/git-i-forgot-to-pack/objects/pack/pack-97b25a76c03b489d4cbbd85b12d0e1ad28717e55.idx'
uri = URI(url)
use_ssl = "https" == uri.scheme
fibs = 10.times.map do
Fiber.start do
cur = Fiber.current.object_id
# XXX getaddrinfo() and connect() are blocking
# XXX resolv/replace + connect_nonblock
Net::HTTP.start(uri.host, uri.port, use_ssl: use_ssl) do |http|
req = Net::HTTP::Get.new(uri)
http.request(req) do |res|
dig = Digest::SHA1.new
res.read_body do |buf|
dig.update(buf)
#warn "#{cur} #{buf.bytesize}\n"
end
warn "#{cur} #{dig.hexdigest}\n"
end
end
warn "done\n"
:done
end
end
warn "joining #{Time.now}\n"
fibs[-1].join(4)
warn "joined #{Time.now}\n"
all = fibs.dup
warn "1 joined, wait for the rest\n"
until fibs.empty?
fibs.each(&:join)
fibs.keep_if(&:alive?)
warn fibs.inspect
end
p all.map(&:value)
Fiber.new do
puts 'HI'
end.run.join
__END__
---
common.mk | 7 +
configure.in | 32 ++++
cont.c | 156 ++++++++++++++-
include/ruby/io.h | 2 +
iom.h | 91 +++++++++
iom_common.h | 90 +++++++++
iom_epoll.h | 392 ++++++++++++++++++++++++++++++++++++++
iom_internal.h | 153 +++++++++++++++
iom_kqueue.h | 440 +++++++++++++++++++++++++++++++++++++++++++
iom_pingable_common.h | 55 ++++++
iom_select.h | 341 +++++++++++++++++++++++++++++++++
prelude.rb | 12 ++
test/lib/leakchecker.rb | 9 +
test/ruby/test_fiber_auto.rb | 126 +++++++++++++
thread.c | 42 +++++
thread_pthread.c | 5 +
vm.c | 9 +
vm_core.h | 4 +
18 files changed, 1961 insertions(+), 5 deletions(-)
create mode 100644 iom.h
create mode 100644 iom_common.h
create mode 100644 iom_epoll.h
create mode 100644 iom_internal.h
create mode 100644 iom_kqueue.h
create mode 100644 iom_pingable_common.h
create mode 100644 iom_select.h
create mode 100644 test/ruby/test_fiber_auto.rb
diff --git a/common.mk b/common.mk
index 654533ce8d..2a7c3866cf 100644
--- a/common.mk
+++ b/common.mk
@@ -2598,6 +2598,13 @@ thread.$(OBJEXT): {$(VPATH)}id.h
thread.$(OBJEXT): {$(VPATH)}intern.h
thread.$(OBJEXT): {$(VPATH)}internal.h
thread.$(OBJEXT): {$(VPATH)}io.h
+thread.$(OBJEXT): {$(VPATH)}iom.h
+thread.$(OBJEXT): {$(VPATH)}iom_internal.h
+thread.$(OBJEXT): {$(VPATH)}iom_common.h
+thread.$(OBJEXT): {$(VPATH)}iom_epoll.h
+thread.$(OBJEXT): {$(VPATH)}iom_kqueue.h
+thread.$(OBJEXT): {$(VPATH)}iom_pingable_common.h
+thread.$(OBJEXT): {$(VPATH)}iom_select.h
thread.$(OBJEXT): {$(VPATH)}method.h
thread.$(OBJEXT): {$(VPATH)}missing.h
thread.$(OBJEXT): {$(VPATH)}node.h
diff --git a/configure.in b/configure.in
index 24035c7c1a..f6577a89dc 100644
--- a/configure.in
+++ b/configure.in
@@ -1389,6 +1389,8 @@ AC_CHECK_HEADERS(process.h)
AC_CHECK_HEADERS(pwd.h)
AC_CHECK_HEADERS(setjmpex.h)
AC_CHECK_HEADERS(sys/attr.h)
+AC_CHECK_HEADERS(sys/epoll.h)
+AC_CHECK_HEADERS(sys/event.h)
AC_CHECK_HEADERS(sys/fcntl.h)
AC_CHECK_HEADERS(sys/file.h)
AC_CHECK_HEADERS(sys/id.h)
@@ -2405,6 +2407,10 @@ AC_CHECK_FUNCS(dladdr)
AC_CHECK_FUNCS(dup)
AC_CHECK_FUNCS(dup3)
AC_CHECK_FUNCS(eaccess)
+AC_CHECK_FUNCS(epoll_create)
+AC_CHECK_FUNCS(epoll_create1)
+AC_CHECK_FUNCS(epoll_ctl)
+AC_CHECK_FUNCS(epoll_wait)
AC_CHECK_FUNCS(endgrent)
AC_CHECK_FUNCS(fchmod)
AC_CHECK_FUNCS(fchown)
@@ -2438,7 +2444,9 @@ AC_CHECK_FUNCS(initgroups)
AC_CHECK_FUNCS(ioctl)
AC_CHECK_FUNCS(isfinite)
AC_CHECK_FUNCS(issetugid)
+AC_CHECK_FUNCS(kevent)
AC_CHECK_FUNCS(killpg)
+AC_CHECK_FUNCS(kqueue)
AC_CHECK_FUNCS(lchmod)
AC_CHECK_FUNCS(lchown)
AC_CHECK_FUNCS(link)
@@ -3590,6 +3598,29 @@ AC_ARG_WITH(valgrind,
AS_IF([test x$with_valgrind != xno],
[AC_CHECK_HEADERS(valgrind/memcheck.h)])
+AC_DEFINE_UNQUOTED(IOM_SELECT, 0)
+AC_DEFINE_UNQUOTED(IOM_KQUEUE, 1)
+AC_DEFINE_UNQUOTED(IOM_EPOLL, 2)
+
+iom_default=select
+AS_CASE([$ac_cv_func_kqueue:$ac_cv_func_kevent:$ac_cv_header_sys_event_h],
+[yes:yes:yes], [iom_default=kqueue],
+[*],
+ [AS_CASE(
+ [$ac_cv_func_epoll_wait:$ac_cv_func_epoll_create:$ac_cv_header_sys_epoll_h],
+ [yes:yes:yes], [iom_default=epoll])]
+)
+
+AC_ARG_WITH(iom,
+ AS_HELP_STRING([--with-iom=XXXXX],
+ [I/O manager (select|kqueue|epoll)]),
+ [with_iom="$withval"], [with_iom="$iom_default"])
+AS_CASE(["$with_iom"],
+ [select], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_SELECT)],
+ [kqueue], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_KQUEUE)],
+ [epoll], [AC_DEFINE_UNQUOTED(RUBYVM_IOM, IOM_EPOLL)],
+ [AC_MSG_ERROR(unknown I/O manager: $with_iom)])
+
dln_a_out_works=no
if test "$ac_cv_header_a_out_h" = yes; then
if test "$with_dln_a_out" = yes || test "$rb_cv_dlopen" = unknown; then
@@ -4745,5 +4776,6 @@ config_summary "install doc" "$install_doc"
config_summary "man page type" "$MANTYPE"
config_summary "search path" "$search_path"
config_summary "static-linked-ext" ${EXTSTATIC:+"yes"}
+config_summary "I/O manager" ${with_iom}
echo ""
echo "---"
diff --git a/cont.c b/cont.c
index 4d6176f00c..ced496f61b 100644
--- a/cont.c
+++ b/cont.c
@@ -13,6 +13,7 @@
#include "vm_core.h"
#include "gc.h"
#include "eval_intern.h"
+#include "iom.h"
/* FIBER_USE_NATIVE enables Fiber performance improvement using system
* dependent method such as make/setcontext on POSIX system or
@@ -126,6 +127,14 @@ static machine_stack_cache_t terminated_machine_stack;
struct rb_fiber_struct {
rb_context_t cont;
struct rb_fiber_struct *prev;
+
+ /*
+ * afnode.next == 0 auto fiber disabled
+ * afnode.next == &afnode auto fiber enabled, but not enqueued (running)
+ * afnode.next != &afnode auto fiber runnable (enqueued in th->afhead)
+ */
+ struct list_node afnode;
+
enum fiber_status status;
/* If a fiber invokes "transfer",
* then this fiber can't "resume" any more after that.
@@ -1496,19 +1505,24 @@ rb_fiber_terminate(rb_fiber_t *fib)
fiber_switch(return_fiber(), 1, &value, 0);
}
-VALUE
-rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
+static void
+fiber_check_resume(const rb_fiber_t *fib)
{
- rb_fiber_t *fib;
- GetFiberPtr(fibval, fib);
-
if (fib->prev != 0 || fib->cont.type == ROOT_FIBER_CONTEXT) {
rb_raise(rb_eFiberError, "double resume");
}
if (fib->transferred != 0) {
rb_raise(rb_eFiberError, "cannot resume transferred Fiber");
}
+}
+VALUE
+rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
+{
+ rb_fiber_t *fib;
+ GetFiberPtr(fibval, fib);
+
+ fiber_check_resume(fib);
return fiber_switch(fib, argc, argv, 1);
}
@@ -1651,7 +1665,136 @@ rb_fiber_s_current(VALUE klass)
return rb_fiber_current();
}
+/* enqueue given Fiber so it may be auto-resumed */
+void
+rb_fiber_auto_enqueue(VALUE fibval)
+{
+ rb_fiber_t *fib;
+ rb_thread_t *th;
+
+ GetFiberPtr(fibval, fib);
+ GetThreadPtr(fib->cont.saved_thread.self, th);
+ list_add_tail(&th->afhead, &fib->afnode);
+}
+
+/* for vm.c::rb_thread_mark */
+void
+rb_fiber_auto_schedule_mark(const rb_thread_t *th)
+{
+ rb_fiber_t *fib = 0;
+ list_for_each(&th->afhead, fib, afnode) {
+ rb_gc_mark(fib->cont.self);
+ }
+}
+
+/* Returns true if auto-fiber is enabled for current fiber */
+int
+rb_fiber_auto_sched_p(const rb_thread_t *th)
+{
+ const rb_fiber_t *cur = th->fiber;
+
+ return (cur && cur->afnode.next && th->root_fiber != cur);
+}
+
+/*
+ * Resumes any ready fibers in the auto-Fiber run queue
+ * returns true if yielding current Fiber is needed, false if not
+ */
+int
+rb_fiber_auto_do_yield_p(rb_thread_t *th)
+{
+ rb_fiber_t *current_auto = rb_fiber_auto_sched_p(th) ? th->fiber : 0;
+ rb_fiber_t *fib = 0, *next = 0;
+ LIST_HEAD(tmp);
+
+ /*
+ * do not infinite loop as new fibers get added to
+ * th->afhead, only work off a temporary list:
+ */
+ list_append_list(&tmp, &th->afhead);
+ list_for_each_safe(&tmp, fib, next, afnode) {
+ if (fib == current_auto || (fib->prev && fib != th->root_fiber)) {
+ /* tell the caller to yield */
+ list_prepend_list(&th->afhead, &tmp);
+ return 1;
+ }
+ list_del_init(&fib->afnode);
+ fiber_check_resume(fib);
+ fiber_switch(fib, 0, 0, 1);
+ }
+ return 0;
+}
+
+/*
+ * Enable auto-scheduling for the Fiber and resume it
+ */
+static VALUE
+rb_fiber_auto_start(int argc, VALUE *argv, VALUE self)
+{
+ rb_fiber_t *fib;
+ GetFiberPtr(self, fib);
+ if (fib->afnode.next) {
+ rb_raise(rb_eFiberError, "Fiber already started");
+ }
+ list_node_init(&fib->afnode);
+ fiber_check_resume(fib);
+ return fiber_switch(fib, argc, argv, 1);
+}
+
+static void
+fiber_auto_join(rb_fiber_t *fib, double *timeout)
+{
+ rb_thread_t *th = GET_THREAD();
+ rb_fiber_t *cur = fiber_current();
+ if (cur == fib) {
+ rb_raise(rb_eFiberError, "Target fiber must not be current fiber");
+ }
+ if (th->root_fiber == fib) {
+ rb_raise(rb_eFiberError, "Target fiber must not be root fiber");
+ }
+ if (fib->cont.saved_thread.self != th->self) {
+ rb_raise(rb_eFiberError, "Target fiber not owned by current thread");
+ }
+ if (!fib->afnode.next) {
+ rb_raise(rb_eFiberError, "Target fiber is not an auto-fiber");
+ }
+
+ while (fib->status != TERMINATED && (timeout == 0 || *timeout >= 0.0)) {
+ rb_iom_schedule(th, timeout);
+ }
+}
+
+static VALUE
+rb_fiber_auto_join(int argc, VALUE *argv, VALUE self)
+{
+ rb_fiber_t *fib;
+ double timeout, *t;
+ VALUE limit;
+
+ GetFiberPtr(self, fib);
+ rb_scan_args(argc, argv, "01", &limit);
+
+ if (NIL_P(limit)) {
+ t = 0;
+ } else {
+ timeout = rb_num2dbl(limit);
+ t = &timeout;
+ }
+
+ fiber_auto_join(fib, t);
+ return fib->status == TERMINATED ? fib->cont.self : Qnil;
+}
+
+static VALUE
+rb_fiber_auto_value(VALUE self)
+{
+ rb_fiber_t *fib;
+ GetFiberPtr(self, fib);
+
+ fiber_auto_join(fib, 0);
+ return fib->cont.value;
+}
/*
* Document-class: FiberError
@@ -1688,6 +1831,9 @@ Init_Cont(void)
rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1);
rb_define_method(rb_cFiber, "initialize", rb_fiber_init, 0);
rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1);
+ rb_define_method(rb_cFiber, "start", rb_fiber_auto_start, -1);
+ rb_define_method(rb_cFiber, "join", rb_fiber_auto_join, -1);
+ rb_define_method(rb_cFiber, "value", rb_fiber_auto_value, 0);
}
RUBY_SYMBOL_EXPORT_BEGIN
diff --git a/include/ruby/io.h b/include/ruby/io.h
index 60d6f6d32e..3bd6f06cf3 100644
--- a/include/ruby/io.h
+++ b/include/ruby/io.h
@@ -116,6 +116,8 @@ typedef struct rb_io_t {
/* #define FMODE_UNIX 0x00200000 */
/* #define FMODE_INET 0x00400000 */
/* #define FMODE_INET6 0x00800000 */
+/* #define FMODE_IOM_PRIVATE1 0x01000000 */ /* OS-dependent */
+/* #define FMODE_IOM_PRIVATE2 0x02000000 */ /* OS-dependent */
#define GetOpenFile(obj,fp) rb_io_check_closed((fp) = RFILE(rb_io_taint_check(obj))->fptr)
diff --git a/iom.h b/iom.h
new file mode 100644
index 0000000000..d412885baa
--- /dev/null
+++ b/iom.h
@@ -0,0 +1,91 @@
+/*
+ * iom -> I/O Manager for RubyVM (auto-Fiber-aware)
+ *
+ * On platforms with epoll or kqueue, this should be ready for multicore;
+ * even if the rest of the RubyVM is not.
+ *
+ * Some inspiration taken from Mio in GHC:
+ * http://haskell.cs.yale.edu/wp-content/uploads/2013/08/hask035-voellmy.pdf
+ */
+#ifndef RUBY_IOM_H
+#define RUBY_IOM_H
+#include "ruby.h"
+#include "ruby/io.h"
+#include "ruby/intern.h"
+#include "vm_core.h"
+
+typedef struct rb_iom_struct rb_iom_t;
+
+/* WARNING: unstable API, only for Ruby internal use */
+
+/*
+ * Note: the first "rb_thread_t *" is a placeholder and may be replaced
+ * with "rb_execution_context_t *" in the future.
+ */
+
+/*
+ * All functions with "wait" in it take an optional double * +timeout+
+ * argument specifying the timeout in seconds. If NULL, it can wait
+ * forever until the event happens (or the fiber is explicitly resumed).
+ *
+ * (maybe) TODO: If non-NULL, the timeout will be updated to the
+ * remaining time upon returning. Not sure if useful, could just be
+ * a a waste of cycles; so not implemented, yet.
+ */
+
+/*
+ * Relinquish calling fiber while waiting for +events+ on the given
+ * +rb_io_t+
+ *
+ * Multiple native threads can enter this function at the same time.
+ *
+ * Events are RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI
+ *
+ * Returns a mask of events.
+ */
+
+int rb_iom_waitio(rb_thread_t *, rb_io_t *, int events, double *timeout);
+
+/*
+ * Identical to rb_iom_waitio, but takes a pointer to an integer file
+ * descriptor, instead of rb_io_t. Use rb_iom_waitio when possible,
+ * since it allows us to optimize epoll (and perhaps avoid kqueue
+ * portability bugs across different *BSDs).
+ */
+int rb_iom_waitfd(rb_thread_t *, int *fdp, int events, double *timeout);
+
+/*
+ * Relinquish calling fiber to wait for the given PID to change status.
+ * Multiple native threads can enter this function at the same time.
+ * If timeout is negative, wait forever.
+ */
+rb_pid_t rb_iom_waitpid(rb_thread_t *,
+ rb_pid_t, int *status, int options, double *timeout);
+
+/*
+ * Relinquish calling fiber for at least the duration of given timeout
+ * in seconds. If timeout is negative, wait forever (until explicitly
+ * resumed).
+ * Multiple native threads can enter this function at the same time.
+ */
+void rb_iom_sleep(rb_thread_t *, double *timeout);
+
+/* callback for SIGCHLD, needed to implemented for rb_iom_waitpid */
+void rb_iom_sigchld(rb_vm_t *);
+
+/*
+ * there is no public create function, creation is lazy to avoid incurring
+ * overhead for small scripts which do not need fibers, we only need this
+ * at VM destruction
+ */
+void rb_iom_destroy(rb_vm_t *);
+
+/*
+ * schedule
+ */
+void rb_iom_schedule(rb_thread_t *th, double *timeout);
+
+/* cont.c */
+int rb_fiber_auto_sched_p(const rb_thread_t *);
+
+#endif /* RUBY_IOM_H */
diff --git a/iom_common.h b/iom_common.h
new file mode 100644
index 0000000000..c9e7f716d8
--- /dev/null
+++ b/iom_common.h
@@ -0,0 +1,90 @@
+
+/* we lazily create this, small scripts may never need iom */
+static rb_iom_t *
+iom_new(rb_thread_t *th)
+{
+ rb_iom_t *iom = ALLOC(rb_iom_t);
+ rb_iom_init(iom);
+ return iom;
+}
+
+static rb_iom_t *
+rb_iom_get(rb_thread_t *th)
+{
+ VM_ASSERT(th && th->vm);
+ if (!th->vm->iom) {
+ th->vm->iom = iom_new(th);
+ }
+ return th->vm->iom;
+}
+
+
+/* included by iom_(epoll|select|kqueue).h */
+#ifdef HAVE_SYS_TYPES_H
+# include <sys/types.h>
+#endif
+#ifdef HAVE_SYS_WAIT_H
+# include <sys/wait.h>
+#endif
+#if defined(WNOHANG) && WNOHANG != 0 && \
+ (defined(HAVE_WAITPID) || defined(HAVE_WAIT4))
+
+static VALUE
+iom_schedule_pid(VALUE ptr)
+{
+ struct rb_iom_pid_waiter *pw = (struct rb_iom_pid_waiter *)ptr;
+ rb_thread_t *th = pw->th;
+
+ rb_fiber_auto_do_yield_p(th);
+ return rb_fiber_yield(0, 0);
+}
+
+static rb_iom_t *rb_iom_get(rb_thread_t *);
+
+rb_pid_t
+rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
+ double *timeout)
+{
+ rb_iom_t *iom = rb_iom_get(th);
+ struct rb_iom_pid_waiter pw;
+
+ VM_ASSERT((options & WNOHANG) == 0 &&
+ "WNOHANG should be handled in rb_waitpid");
+ pw.th = th;
+ pw.pid = pid;
+ pw.options = options;
+ pw.errnum = 0;
+ rb_iom_timer_add(&iom->timers, &pw.w.timer, timeout, IOM_FIB|IOM_WAIT);
+
+ /* LIFO, to match Linux wait4() blocking behavior */
+ list_add(&iom->pids, &pw.w.wnode);
+ rb_ensure(iom_schedule_pid, (VALUE)&pw, rb_iom_waiter_done, (VALUE)&pw.w);
+
+ *status = pw.status;
+ rb_last_status_set(pw.status, pw.pid);
+ return pw.pid;
+}
+
+void
+rb_iom_sigchld(rb_vm_t *vm)
+{
+ rb_iom_t *iom = vm->iom;
+ if (iom) {
+ struct rb_iom_pid_waiter *pw = 0, *next = 0;
+
+ list_for_each_safe(&iom->pids, pw, next, w.wnode) {
+ pid_t r = rb_waitpid(pw->pid, &pw->status, pw->options | WNOHANG);
+
+ if (r < 0) {
+ pw->errnum = errno;
+ if (pw->errnum == ECHILD) {
+ continue;
+ }
+ }
+ pw->pid = r;
+ rb_iom_waiter_ready(&pw->w);
+ }
+ }
+}
+
+#endif /* defined(WNOHANG) && (defined(HAVE_WAITPID) || defined(HAVE_WAIT4)) */
diff --git a/iom_epoll.h b/iom_epoll.h
new file mode 100644
index 0000000000..aa1ec087fa
--- /dev/null
+++ b/iom_epoll.h
@@ -0,0 +1,392 @@
+/*
+ * Linux-only epoll-based implementation of I/O Manager for RubyVM
+ *
+ * Notes:
+ *
+ * TODO: epoll_wait only has millisecond resolution; if we need higher
+ * resolution we can use timerfd or ppoll on the epoll_fd itself.
+ *
+ * Inside the Linux kernel, select/poll/ppoll/epoll_wait all use the
+ * same notification callback (struct file_operations)->poll.
+ * Unlike with kqueue across different *BSDs; we do not need to worry
+ * about inconsistencies between these syscalls.
+ *
+ * See also notes in iom_kqueue.h
+ */
+#include "iom_internal.h"
+#include <sys/epoll.h>
+#include <math.h> /* round() */
+#define FMODE_IOM_ADDED FMODE_IOM_PRIVATE1
+
+/* allocated on heap (rb_vm_t.iom) */
+struct rb_iom_struct {
+ /*
+ * Everything here is protected by GVL at this time,
+ * URCU lists (LGPL-2.1+) may be used in the future
+ */
+ struct list_head epws; /* -epw.w.wnode, order agnostic */
+ struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
+ struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
+
+ int epoll_fd;
+ int maxevents; /* auto-increases */
+
+ /*
+ * Any number of threads may use epoll_wait concurrently on
+ * the same epoll_fd, but we only allow one thread to use
+ * a non-zero timeout; since it is possible for a long sleeper
+ * to miss fibers queued by other threads for the sleeping thread.
+ */
+ rb_thread_t *waiter;
+};
+
+/* allocated on stack */
+struct epw {
+ struct rb_iom_waiter w;
+ union {
+ rb_thread_t *th;
+ short revents;
+ } as;
+ int *fdp;
+ int *flags; /* &fptr->mode */
+ struct epoll_event ev;
+};
+
+static void
+increase_maxevents(rb_iom_t *iom, int retries)
+{
+ /* 1024 is the RUBY_ALLOCV_LIMIT on such systems */
+ const int max_alloca = 1024 / sizeof(struct epoll_event);
+ const int max = max_alloca * 2;
+
+ if (retries) {
+ iom->maxevents *= retries;
+ if (iom->maxevents > max || iom->maxevents <= 0) {
+ iom->maxevents = max;
+ }
+ }
+}
+
+static int
+double2msec(double sec)
+{
+ /*
+ * clamp timeout to workaround a Linux <= 2.6.37 bug,
+ * see epoll_wait(2) manpage
+ */
+ const int max_msec = 35 * 60 * 1000; /* floor(35.79 minutes) */
+ double msec = round(sec * 1000);
+
+ if (msec < (double)max_msec) {
+ int ret = (int)msec;
+ return ret < 0 ? 0 : ret;
+ }
+ return max_msec;
+}
+
+/* we can avoid branches when mapping RB_WAIT_* bits to EPOLL* bits */
+STATIC_ASSERT(epbits_matches_waitfd_bits,
+ RB_WAITFD_IN == EPOLLIN && RB_WAITFD_OUT == EPOLLOUT &&
+ RB_WAITFD_PRI == EPOLLPRI);
+
+/* what goes into epoll_ctl */
+static int
+rb_events2ep(int events)
+{
+ return EPOLLONESHOT | events;
+}
+
+/* what comes out of epoll_wait */
+static short
+rb_ep2revents(int revents)
+{
+ return (short)(revents & (EPOLLIN|EPOLLOUT|EPOLLPRI));
+}
+
+/* lazilly create epoll FD, since not everybody waits on I/O */
+static int
+iom_epfd(rb_iom_t *iom)
+{
+ if (iom->epoll_fd < 0) {
+#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
+ iom->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+ if (iom->epoll_fd < 0) {
+ int err = errno;
+ if (rb_gc_for_fd(err)) {
+ iom->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+ if (iom->epoll_fd < 0) {
+ rb_sys_fail("epoll_create1");
+ }
+ }
+ else if (err != ENOSYS) {
+ rb_syserr_fail(err, "epoll_create1");
+ }
+ else { /* libc >= kernel || build-env > run-env */
+#endif /* HAVE_EPOLL_CREATE1 */
+ iom->epoll_fd = epoll_create(1);
+ if (iom->epoll_fd < 0) {
+ if (rb_gc_for_fd(errno)) {
+ iom->epoll_fd = epoll_create(1);
+ }
+ }
+ if (iom->epoll_fd < 0) {
+ rb_sys_fail("epoll_create");
+ }
+ rb_maygvl_fd_fix_cloexec(iom->epoll_fd);
+#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
+ }
+ }
+#endif /* HAVE_EPOLL_CREATE1 */
+ rb_update_max_fd(iom->epoll_fd);
+ }
+ return iom->epoll_fd;
+}
+
+static void
+rb_iom_init(rb_iom_t *iom)
+{
+ list_head_init(&iom->timers);
+ list_head_init(&iom->epws);
+ list_head_init(&iom->pids);
+ iom->waiter = 0;
+ iom->maxevents = 8;
+ iom->epoll_fd = -1;
+}
+
+static int
+next_timeout(rb_iom_t *iom)
+{
+ struct rb_iom_timer *t;
+ t = list_top(&iom->timers, struct rb_iom_timer, tnode);
+
+ if (t) {
+ double diff = t->expires_at - timeofday();
+ return double2msec(diff);
+ }
+ else {
+ return -1; /* forever */
+ }
+}
+
+static void
+check_epoll_wait(rb_thread_t *th, int nr, struct epoll_event *ev)
+{
+ if (nr >= 0) {
+ int i;
+
+ for (i = 0; i < nr; i++) {
+ struct epw *epw = ev[i].data.ptr;
+
+ epw->as.revents = rb_ep2revents(ev[i].events);
+ rb_iom_waiter_ready(&epw->w);
+ }
+
+ /* notify the waiter thread in case we enqueued fibers for them */
+ if (nr > 0 && th->vm->iom->waiter) {
+ ubf_select(th->vm->iom->waiter);
+ rb_thread_schedule();
+ }
+ }
+ else {
+ int err = errno;
+ if (err != EINTR) {
+ rb_syserr_fail(err, "epoll_wait");
+ }
+ }
+ rb_iom_timer_check(&th->vm->iom->timers);
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
+}
+
+/* perform a non-blocking epoll_wait while holding GVL */
+static void
+ping_events(rb_thread_t *th)
+{
+ rb_iom_t *iom = th->vm->iom;
+ int epfd = iom->epoll_fd;
+
+ if (epfd >= 0) {
+ VALUE v;
+ int nr;
+ int maxevents = iom->maxevents;
+ struct epoll_event *ev = ALLOCV_N(struct epoll_event, v, maxevents);
+ int retries = 0;
+
+ do {
+ nr = epoll_wait(epfd, ev, maxevents, 0);
+ check_epoll_wait(th, nr, ev);
+ } while (nr == maxevents && ++retries);
+ if (v) {
+ ALLOCV_END(v);
+ }
+ increase_maxevents(iom, retries);
+ }
+}
+
+/* for iom_pingable_common.h */
+static void
+rb_iom_do_wait(rb_thread_t *th, rb_iom_t *iom)
+{
+ int maxevents = iom->maxevents;
+ int nr = maxevents;
+
+ if (!iom->waiter) {
+ int msec = next_timeout(iom);
+
+ if (list_empty(&iom->epws)) {
+ if (msec == 0) {
+ rb_thread_schedule();
+ }
+ else {
+ struct timeval *tvp = 0;
+ struct timeval tv;
+ if (msec > 0) {
+ tv.tv_sec = msec / 1000;
+ tv.tv_usec = (msec % 1000) * 1000;
+ tvp = &tv;
+ }
+ native_sleep(th, tvp);
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
+ }
+ }
+ else {
+ VALUE v;
+ struct epoll_event *ev = ALLOCV_N(struct epoll_event, v, maxevents);
+ int epfd = iom->epoll_fd;
+
+ VM_ASSERT(epfd >= 0);
+ iom->waiter = th;
+ BLOCKING_REGION({
+ nr = epoll_wait(epfd, ev, maxevents, msec);
+ }, ubf_select, th, FALSE);
+ iom->waiter = 0;
+ check_epoll_wait(th, nr, ev);
+ if (v) {
+ ALLOCV_END(v);
+ }
+ }
+ }
+ if (nr == maxevents) { /* || msec == 0 */
+ ping_events(th);
+ }
+}
+
+static VALUE
+epmod_yield(VALUE ptr)
+{
+ struct epw *epw = (struct epw *)ptr;
+ rb_thread_t *th = epw->as.th;
+ rb_iom_t *iom = rb_iom_get(th);
+ int e;
+ int fd = *epw->fdp;
+ int epfd = iom_epfd(iom);
+
+ /*
+ * if we did not have GVL, revents may be set immediately
+ * upon epoll_ctl by another thread running epoll_wait,
+ * so we must initialize it before epoll_ctl:
+ */
+ epw->as.revents = 0;
+
+ /* we want to track if an FD is already being watched ourselves */
+ if (epw->flags) {
+ if (*epw->flags & FMODE_IOM_ADDED) {
+ e = epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epw->ev);
+ }
+ else {
+ e = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epw->ev);
+ if (e == 0) {
+ *epw->flags |= FMODE_IOM_ADDED;
+ }
+ else if (e < 0 && errno == EEXIST) {
+ /*
+ * It's possible to get EEXIST if fptrs point to the same FD:
+ * f1 = Fiber.start { io1.read(1) }
+ * io2 = IO.for_fd(io1.fileno)
+ * f2 = Fiber.start { io2.read(1) }
+ */
+ *epw->flags |= FMODE_IOM_ADDED;
+ e = epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epw->ev);
+ }
+ }
+ }
+ else { /* don't know if added or not, fall back to addiing on ENOENT */
+ e = epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epw->ev);
+ if (e < 0 && errno == ENOENT) {
+ e = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epw->ev);
+ }
+ }
+ if (e < 0) {
+ rb_sys_fail("epoll_ctl");
+ }
+
+ ping_events(th);
+ (void)rb_fiber_auto_do_yield_p(th);
+ return rb_fiber_yield(0, 0);
+}
+
+static int
+iom_waitfd(rb_thread_t *th, int *fdp, int *flags, int events, double *timeout)
+{
+ rb_iom_t *iom = rb_iom_get(th);
+ struct epw epw;
+
+ epw.as.th = th;
+ epw.fdp = fdp;
+ epw.flags = flags;
+ epw.ev.events = rb_events2ep(events);
+ epw.ev.data.ptr = &epw;
+
+ list_add(&iom->epws, &epw.w.wnode);
+ rb_iom_timer_add(&iom->timers, &epw.w.timer, timeout, IOM_FIB|IOM_WAIT);
+ rb_ensure(epmod_yield, (VALUE)&epw, rb_iom_waiter_done, (VALUE)&epw.w);
+
+ return (int)epw.as.revents; /* may be zero if timed out */
+}
+
+int
+rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, double *timeout)
+{
+ return iom_waitfd(th, &fptr->fd, &fptr->mode, events, timeout);
+}
+
+int
+rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
+{
+ return iom_waitfd(th, fdp, 0, events, timeout);
+}
+
+void
+rb_iom_destroy(rb_vm_t *vm)
+{
+ rb_iom_t *iom = vm->iom;
+ vm->iom = 0;
+ if (iom) {
+ /*
+ * it's confusing to share epoll FDs across processes
+ * (kqueue has a rather unique close-on-fork behavior)
+ */
+ if (iom->epoll_fd >= 0) {
+ close(iom->epoll_fd);
+ }
+ xfree(iom);
+ }
+}
+
+/* used by thread.c::rb_thread_atfork */
+static void
+rb_iom_atfork_child(rb_thread_t *th)
+{
+ rb_iom_destroy(th->vm);
+}
+
+/* used by thread_pthread.c */
+static int
+rb_iom_reserved_fd(int fd)
+{
+ rb_iom_t *iom = GET_VM()->iom;
+
+ return iom && fd == iom->epoll_fd;
+}
+
+#include "iom_pingable_common.h"
+#include "iom_common.h"
diff --git a/iom_internal.h b/iom_internal.h
new file mode 100644
index 0000000000..b5aa321274
--- /dev/null
+++ b/iom_internal.h
@@ -0,0 +1,153 @@
+#ifndef RB_IOM_COMMON_H
+#define RB_IOM_COMMON_H
+
+#include "internal.h"
+#include "iom.h"
+
+/* cont.c */
+void rb_fiber_auto_enqueue(VALUE fibval);
+int rb_fiber_auto_do_yield_p(rb_thread_t *);
+
+#define FMODE_IOM_PRIVATE1 0x01000000
+#define FMODE_IOM_PRIVATE2 0x02000000
+
+#define IOM_FIBMASK ((VALUE)0x1)
+#define IOM_FIB (0x2)
+#define IOM_WAIT (0x1) /* container_of(..., struct rb_iom_waiter, timer) */
+
+/* allocated on stack */
+struct rb_iom_timer {
+ struct list_node tnode; /* <=> rb_iom_struct.timers */
+ double expires_at; /* absolute monotonic time */
+ VALUE _fibval;
+};
+
+struct rb_iom_waiter {
+ struct rb_iom_timer timer;
+ struct list_node wnode; /* <=> rb_iom_struct.(fds|pids) */
+};
+
+struct rb_iom_fd_waiter {
+ struct rb_iom_waiter w;
+ int *fdp;
+ short events;
+ short revents;
+};
+
+struct rb_iom_pid_waiter {
+ struct rb_iom_waiter w;
+ rb_thread_t *th;
+ rb_pid_t pid;
+ int status;
+ int options;
+ int errnum;
+};
+
+static VALUE
+rb_iom_timer_fibval(const struct rb_iom_timer *t)
+{
+ return t->_fibval & ~IOM_FIBMASK;
+}
+
+static struct rb_iom_waiter *
+rb_iom_waiter_of(struct rb_iom_timer *t)
+{
+ if (t->_fibval & IOM_FIBMASK) {
+ return 0;
+ }
+ return container_of(t, struct rb_iom_waiter, timer);
+}
+
+/* check for expired timers */
+static void
+rb_iom_timer_check(struct list_head *timers)
+{
+ struct rb_iom_timer *t = 0, *next = 0;
+ double now = timeofday();
+
+ list_for_each_safe(timers, t, next, tnode) {
+ if (t->expires_at <= now) {
+ struct rb_iom_waiter *w = rb_iom_waiter_of(t);
+ VALUE fibval = rb_iom_timer_fibval(t);
+
+ list_del_init(&t->tnode);
+ if (w) {
+ list_del_init(&w->wnode);
+ }
+ t->_fibval = Qfalse;
+
+ /* non-auto-fibers may set timer in rb_iom_schedule */
+ if (fibval != Qfalse) {
+ rb_fiber_auto_enqueue(fibval);
+ }
+ }
+ return; /* done, timers is a sorted list */
+ }
+}
+
+/* insert a new +timer+ into +timers+, maintain sort order by expires_at */
+static void
+rb_iom_timer_add(struct list_head *timers, struct rb_iom_timer *add,
+ const double *timeout, int flags)
+{
+ rb_iom_timer_check(timers);
+
+ add->_fibval = flags & IOM_FIB ? rb_fiber_current() : Qfalse;
+ add->_fibval |= flags & IOM_WAIT ? 0 : IOM_FIBMASK;
+
+ if (timeout) {
+ struct rb_iom_timer *i = 0;
+ add->expires_at = timeofday() + *timeout;
+
+ /*
+ * search backwards: assume typical projects have multiple objects
+ * sharing the same timeout values, so new timers will expire later
+ * than existing timers
+ */
+ list_for_each_rev(timers, i, tnode) {
+ if (add->expires_at > i->expires_at) {
+ list_add_after(timers, &i->tnode, &add->tnode);
+ return;
+ }
+ }
+ list_add(timers, &add->tnode);
+ }
+ else {
+ /* not active, just allow list_del to function in cleanup */
+ list_node_init(&add->tnode);
+ }
+}
+
+static VALUE
+rb_iom_timer_done(VALUE ptr)
+{
+ struct rb_iom_timer *t = (struct rb_iom_timer *)ptr;
+ list_del(&t->tnode);
+ return Qfalse;
+}
+
+static void
+rb_iom_waiter_ready(struct rb_iom_waiter *w)
+{
+ VALUE fibval = rb_iom_timer_fibval(&w->timer);
+
+ w->timer._fibval = Qfalse;
+ list_del_init(&w->timer.tnode);
+ list_del_init(&w->wnode);
+ if (fibval != Qfalse) {
+ rb_fiber_auto_enqueue(fibval);
+ }
+}
+
+static VALUE
+rb_iom_waiter_done(VALUE ptr)
+{
+ struct rb_iom_waiter *w = (struct rb_iom_waiter *)ptr;
+ list_del(&w->timer.tnode);
+ list_del(&w->wnode);
+ return Qfalse;
+}
+
+static rb_iom_t *rb_iom_get(rb_thread_t *);
+
+#endif /* IOM_COMMON_H */
diff --git a/iom_kqueue.h b/iom_kqueue.h
new file mode 100644
index 0000000000..f0776b2f58
--- /dev/null
+++ b/iom_kqueue.h
@@ -0,0 +1,440 @@
+/*
+ * kqueue-based implementation of I/O Manager for RubyVM on *BSD
+ *
+ * The kevent API has an advantage over epoll_ctl+epoll_wait since
+ * it can simultaneously add filters and check for events with one
+ * syscall. It also allows EV_ADD to be used idempotently for
+ * enabling filters, where as epoll_ctl requires separate ADD and
+ * MOD operations.
+ *
+ * These are advantages in the common case.
+ *
+ * The epoll API has advantages in more esoteric cases:
+ *
+ * epoll has the advantage over kqueue when watching for multiple
+ * events (POLLIN|POLLOUT|POLLPRI) (which is rare). We also have
+ * to install two kevent filters to watch POLLIN|POLLOUT simutaneously.
+ *
+ * Finally, kevent does not support POLLPRI directly, we need to use
+ * select() (or perhaps poll() on some platforms) with a zero
+ * timeout to check for POLLPRI after EVFILT_READ returns.
+ *
+ * Finally, several *BSDs implement kqueue; and the quality of each
+ * implementation may vary. Anecdotally, *BSDs are not known to even
+ * support poll() consistently across different types of files.
+ * We will need to selective and careful about injecting them into
+ * kevent().
+ */
+#include "iom_internal.h"
+#include <sys/types.h>
+#include <sys/event.h>
+#include <sys/time.h>
+
+/* allocated on heap (rb_vm_t.iom) */
+struct rb_iom_struct {
+ /*
+ * Everything here is protected by GVL at this time,
+ * URCU lists (LGPL-2.1+) may be used in the future
+ */
+ struct list_head kevs; /* -kev.fdw.w.wnode, order agnostic */
+ struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
+ struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
+
+ int kqueue_fd;
+ int nevents; /* auto-increases */
+
+ /*
+ * Any number of threads may use kevent concurrently on
+ * the same kqueue_fd, but we only allow one thread to use
+ * a non-zero timeout. It is possible for a long sleeper
+ * to miss fibers queued by other threads for the sleeping thread.
+ */
+ rb_thread_t *waiter;
+};
+
+/* allocated on stack */
+struct kev {
+ struct rb_iom_fd_waiter fdw;
+ rb_thread_t *th; /* 0 - kev is disabled */
+};
+
+/*
+ * like our epoll implementation, we "ping" using kevent with zero-timeout
+ * and can do so on any thread.
+ */
+static const struct timespec zero;
+
+static void
+increase_nevents(rb_iom_t *iom, int retries)
+{
+ /* 1024 is the RUBY_ALLOCV_LIMIT on such systems */
+ const int max_alloca = 1024 / sizeof(struct kevent);
+ const int max = max_alloca * 2;
+
+ if (retries) {
+ iom->nevents *= retries;
+ if (iom->nevents > max || iom->nevents <= 0) {
+ iom->nevents = max;
+ }
+ }
+}
+
+static int
+iom_kqfd(rb_iom_t *iom)
+{
+ if (iom->kqueue_fd >= 0) {
+ return iom->kqueue_fd;
+ }
+ iom->kqueue_fd = kqueue();
+ if (iom->kqueue_fd < 0) {
+ if (rb_gc_for_fd(errno)) {
+ iom->kqueue_fd = kqueue();
+ }
+ if (iom->kqueue_fd < 0) {
+ rb_sys_fail("kqueue");
+ }
+ }
+ rb_fd_fix_cloexec(iom->kqueue_fd);
+ return iom->kqueue_fd;
+}
+
+static void
+rb_iom_init(rb_iom_t *iom)
+{
+ list_head_init(&iom->timers);
+ list_head_init(&iom->kevs);
+ list_head_init(&iom->pids);
+ iom->waiter = 0;
+ iom->nevents = 8;
+ iom->kqueue_fd = -1;
+}
+
+static struct timespec *
+next_timeout(rb_iom_t *iom, struct timespec *ts)
+{
+ struct rb_iom_timer *t;
+ t = list_top(&iom->timers, struct rb_iom_timer, tnode);
+
+ if (t) {
+ double diff = t->expires_at - timeofday();
+
+ ts->tv_sec = (long)diff;
+ ts->tv_nsec = (long)((diff - (double)ts->tv_sec) * 1e9);
+ if (ts->tv_sec < 0) {
+ ts->tv_sec = 0;
+ }
+ if (ts->tv_nsec < 0) {
+ ts->tv_nsec = 0;
+ }
+ return ts;
+ }
+ return NULL; /* forever */
+}
+
+/*
+ * kevent does not have an EVFILT_* flag for (rarely-used) urgent data,
+ * at least not on FreeBSD 11.0, so we call select() separately after
+ * EVFILT_READ returns to set the RB_WAITFD_PRI bit:
+ */
+static void
+check_pri(rb_thread_t *th, int fd, struct kev *kev)
+{
+ rb_fdset_t efds;
+ int r;
+ struct timeval tv = { 0 };
+
+ rb_fd_init(&efds);
+ rb_fd_set(fd, &efds);
+ r = native_fd_select(fd + 1, 0, 0, &efds, &tv, th);
+ rb_fd_term(&efds);
+ /* TODO: use poll() if possible */
+
+ if (r > 0) {
+ kev->fdw.revents |= RB_WAITFD_PRI;
+ }
+}
+
+static void
+check_kevent(rb_thread_t *th, int nr, struct kevent *eventlist)
+{
+ int err = 0;
+ if (nr >= 0) {
+ int i;
+ struct kevent *changelist = eventlist;
+ int nchanges = 0;
+
+ for (i = 0; i < nr; i++) {
+ struct kevent *ev = &eventlist[i];
+ struct kev *kev = ev->udata;
+ int fd = *kev->fdw.fdp;
+
+ if (ev->filter == EVFILT_READ) {
+ int rd = kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI);
+ if (rd == RB_WAITFD_IN) { /* common */
+ kev->fdw.revents |= rd;
+ }
+ else if (rd & RB_WAITFD_PRI) { /* ugh... */
+ if (fd >= 0) {
+ check_pri(th, fd, kev);
+ }
+ }
+ assert(rd && "got EVFILT_READ unexpectedly");
+
+ /* disable the other filter if we waited on both read+write */
+ if (fd >= 0 && (kev->fdw.events & RB_WAITFD_OUT) && kev->th) {
+ struct kevent *chg = &changelist[nchanges++];
+
+ kev->th = 0;
+ EV_SET(chg, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
+ }
+ }
+ else if (ev->filter == EVFILT_WRITE) {
+ kev->fdw.revents |= RB_WAITFD_OUT;
+
+ assert(kev->fdw.events & RB_WAITFD_OUT &&
+ "unexpected EVFILT_WRITE");
+
+ /* disable the other filter if we waited on both read+write */
+ if (fd >= 0 &&
+ kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI) &&
+ kev->th) {
+ struct kevent *chg = &changelist[nchanges++];
+
+ kev->th = 0;
+ EV_SET(chg, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
+ }
+ }
+ else {
+ rb_bug("unexpected filter: %d", ev->filter);
+ }
+ rb_iom_waiter_ready(&kev->fdw.w);
+ }
+
+ /*
+ * kqueue is a little more complicated because we install
+ * separate filters for simultaneously watching read and write
+ * on the same FD, and now we must clear shared filters
+ */
+ if (nchanges && kevent(th->vm->iom->kqueue_fd, changelist, nchanges,
+ 0, 0, &zero) < 0) {
+ err = errno;
+ }
+
+ /* notify the waiter thread in case we enqueued fibers for them */
+ if (nr > 0 && th->vm->iom->waiter) {
+ ubf_select(th->vm->iom->waiter);
+ rb_thread_schedule();
+ }
+ }
+ else {
+ err = errno;
+ }
+ if (err && err != EINTR) {
+ rb_syserr_fail(err, "kevent");
+ }
+ rb_iom_timer_check(&th->vm->iom->timers);
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
+}
+
+/* perform a non-blocking kevent check while holding GVL */
+static void
+ping_events(rb_thread_t *th)
+{
+ rb_iom_t *iom = th->vm->iom;
+ int kqfd = iom->kqueue_fd;
+
+ if (kqfd >= 0) {
+ VALUE v;
+ int nr;
+ int nevents = iom->nevents;
+ struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents);
+ int retries = 0;
+
+ do {
+ nr = kevent(kqfd, 0, 0, eventlist, nevents, &zero);
+ check_kevent(th, nr, eventlist);
+ } while (nr == nevents && ++retries);
+ if (v) {
+ ALLOCV_END(v);
+ }
+ increase_nevents(iom, retries);
+ }
+}
+
+/* for iom_pingable_common.h */
+static void
+rb_iom_do_wait(rb_thread_t *th, rb_iom_t *iom)
+{
+ int nevents = iom->nevents;
+ int nr = nevents;
+
+ if (!iom->waiter) {
+ struct timespec ts;
+ const struct timespec *tsp = next_timeout(iom, &ts);
+
+ if (list_empty(&iom->kevs)) {
+ if (tsp) {
+ struct timeval tv;
+ tv.tv_sec = tsp->tv_sec;
+ tv.tv_usec = (int)(tsp->tv_nsec / 1000);
+ native_sleep(th, &tv);
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
+ }
+ else {
+ rb_thread_schedule();
+ }
+ }
+ else {
+ VALUE v;
+ struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents);
+ int kqfd = iom->kqueue_fd;
+
+ VM_ASSERT(kqfd >= 0);
+ iom->waiter = th;
+ BLOCKING_REGION({
+ nr = kevent(kqfd, 0, 0, eventlist, nevents, tsp);
+ }, ubf_select, th, FALSE);
+ iom->waiter = 0;
+ check_kevent(th, nr, eventlist);
+ if (v) {
+ ALLOCV_END(v);
+ }
+ }
+ }
+ if (nr == nevents) { /* || tsp == &zero */
+ ping_events(th);
+ }
+}
+
+static void
+kevxchg_ping(struct kev *kev)
+{
+ rb_thread_t *th = kev->th;
+ rb_iom_t *iom = rb_iom_get(th);
+ int retries = 0;
+ int nchanges = 0;
+ int nr;
+ VALUE v;
+ int nevents = iom->nevents;
+ struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents);
+ struct kevent *changelist = eventlist; /* allowed, see kevent manpage */
+ int fd = *kev->fdw.fdp;
+ int kqfd = iom_kqfd(iom);
+
+ VM_ASSERT(nevents > 2);
+
+ /* EVFILT_READ handles urgent data (POLLPRI) */
+ if (kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI)) {
+ struct kevent *chg = &changelist[nchanges++];
+ EV_SET(chg, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, kev);
+ }
+
+ if (kev->fdw.events & RB_WAITFD_OUT) {
+ struct kevent *chg = &changelist[nchanges++];
+ EV_SET(chg, fd, EVFILT_WRITE, EV_ADD|EV_ONESHOT, 0, 0, kev);
+ }
+
+ do {
+ int err;
+
+ nr = kevent(kqfd, changelist, nchanges, eventlist, nevents, &zero);
+
+ /* kevent may fail with ENOMEM when processing changelist */
+ if (nr < 0 && nchanges && rb_gc_for_fd(errno)) {
+ continue;
+ }
+ check_kevent(th, nr, eventlist);
+ } while (nr == nevents && ++retries && (nchanges = 0) == 0);
+ if (v) {
+ ALLOCV_END(v);
+ }
+ increase_nevents(iom, retries);
+}
+
+static VALUE
+kevxchg_yield(VALUE ptr)
+{
+ struct kev *kev = (struct kev *)ptr;
+ kevxchg_ping(kev);
+ (void)rb_fiber_auto_do_yield_p(kev->th);
+ return rb_fiber_yield(0, 0);
+}
+
+static int
+iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
+{
+ rb_iom_t *iom = rb_iom_get(th);
+ struct kev kev;
+
+ kev.th = th;
+ kev.fdw.fdp = fdp;
+ kev.fdw.events = (short)events;
+ kev.fdw.revents = 0;
+ list_add(&iom->kevs, &kev.fdw.w.wnode);
+ rb_iom_timer_add(&iom->timers, &kev.fdw.w.timer, timeout, IOM_FIB|IOM_WAIT);
+ rb_ensure(kevxchg_yield, (VALUE)&kev,
+ rb_iom_waiter_done, (VALUE)&kev.fdw.w);
+
+ return kev.fdw.revents; /* may be zero if timed out */
+}
+
+/*
+ * XXX we may use fptr->mode to mark FDs which kqueue cannot handle,
+ * different BSDs may have different bugs which prevent certain
+ * FD types from being handled by kqueue...
+ */
+int
+rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, double *timeout)
+{
+ return iom_waitfd(th, &fptr->fd, events, timeout);
+}
+
+int
+rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
+{
+ return iom_waitfd(th, fdp, events, timeout);
+}
+
+void
+rb_iom_destroy(rb_vm_t *vm)
+{
+ rb_iom_t *iom = vm->iom;
+ vm->iom = 0;
+ if (iom) {
+ if (iom->kqueue_fd >= 0) {
+ close(iom->kqueue_fd);
+ }
+ xfree(iom);
+ }
+}
+
+/* used by thread.c::rb_thread_atfork */
+static void
+rb_iom_atfork_child(rb_thread_t *th)
+{
+#ifdef LIBKQUEUE
+ /*
+ * libkqueue uses epoll, /dev/poll FD, or poll/select on a pipe,
+ * so there is an FD.
+ * I guess we're going to hit an error in case somebody uses
+ * libkqueue with a real native kqueue backend, but nobody does
+ * that in production, hopefully.
+ */
+ rb_iom_destroy(th->vm);
+#else /* native kqueue is close-on-fork, so we do not close ourselves */
+ xfree(th->vm->iom);
+ th->vm->iom = 0;
+#endif
+}
+
+/* used by thread_pthread.c */
+static int
+rb_iom_reserved_fd(int fd)
+{
+ rb_iom_t *iom = GET_VM()->iom;
+
+ return iom && fd == iom->kqueue_fd;
+}
+
+#include "iom_pingable_common.h"
+#include "iom_common.h"
diff --git a/iom_pingable_common.h b/iom_pingable_common.h
new file mode 100644
index 0000000000..b3caf2f1df
--- /dev/null
+++ b/iom_pingable_common.h
@@ -0,0 +1,55 @@
+/* shared between "pingable" implementations (iom_kqueue.h and iom_epoll.h) */
+
+/* only for iom_kqueue.h and iom_epoll.h */
+static void rb_iom_do_wait(rb_thread_t *, rb_iom_t *);
+
+static VALUE
+rb_iom_do_schedule(VALUE ptr)
+{
+ rb_thread_t *th = (rb_thread_t *)ptr;
+ rb_iom_t *iom = th->vm->iom;
+
+ if (rb_fiber_auto_do_yield_p(th)) {
+ rb_fiber_yield(0, 0);
+ }
+ if (iom) {
+ rb_iom_do_wait(th, iom);
+ }
+ if (rb_fiber_auto_do_yield_p(th)) {
+ rb_fiber_yield(0, 0);
+ }
+ return Qfalse;
+}
+
+void
+rb_iom_schedule(rb_thread_t *th, double *timeout)
+{
+ if (rb_fiber_auto_sched_p(th)) {
+ rb_iom_t *iom = th->vm->iom;
+
+ if (iom) {
+ rb_iom_timer_check(&iom->timers);
+ ping_events(th);
+ }
+ if (rb_fiber_auto_do_yield_p(th)) {
+ rb_fiber_yield(0, 0);
+ }
+ }
+ else if (timeout) {
+ double t0 = timeofday();
+ struct rb_iom_timer t;
+ rb_iom_t *iom = rb_iom_get(th);
+
+ rb_iom_timer_add(&iom->timers, &t, timeout, 0);
+ rb_ensure(rb_iom_do_schedule, (VALUE)th, rb_iom_timer_done, (VALUE)&t);
+ *timeout -= timeofday() - t0;
+ }
+ else {
+ rb_iom_t *iom = th->vm->iom;
+
+ if (iom) {
+ rb_iom_timer_check(&iom->timers);
+ }
+ rb_iom_do_schedule((VALUE)th);
+ }
+}
diff --git a/iom_select.h b/iom_select.h
new file mode 100644
index 0000000000..d6a93e7555
--- /dev/null
+++ b/iom_select.h
@@ -0,0 +1,341 @@
+/*
+ * select()-based implementation of I/O Manager for RubyVM
+ *
+ * This is crippled and relies heavily on GVL compared to the
+ * epoll and kqueue versions
+ */
+#include "iom_internal.h"
+
+/* allocated on heap (rb_vm_t.iom) */
+struct rb_iom_struct {
+ /*
+ * Everything here is protected by GVL at this time,
+ * URCU lists (LGPL-2.1+) may be used in the future
+ */
+ struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
+ struct list_head fds; /* -rb_iom_fd_waiter.w.wnode, FIFO order */
+ struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
+
+ rb_serial_t select_start; /* WR protected by GVL (no need to protect RD) */
+ rb_serial_t select_gen; /* RDWR protected by GVL */
+ rb_thread_t *selector; /* rb_thread_fd_select owner (acts as lock w/ GVL */
+
+ /* these could be on stack, but they're huge on some platforms: */
+ rb_fdset_t rfds;
+ rb_fdset_t wfds;
+ rb_fdset_t efds;
+};
+
+/* allocated on stack */
+struct select_do {
+ rb_thread_t *th;
+ int do_wait;
+ int ret;
+};
+
+static void
+rb_iom_init(rb_iom_t *iom)
+{
+ iom->select_start = 0;
+ iom->select_gen = 0;
+ iom->selector = 0;
+ list_head_init(&iom->timers);
+ list_head_init(&iom->fds);
+ list_head_init(&iom->pids);
+}
+
+static struct timeval *
+next_timeout(rb_iom_t *iom, struct timeval *tv)
+{
+ struct rb_iom_timer *t;
+ t = list_top(&iom->timers, struct rb_iom_timer, tnode);
+
+ if (t) {
+ double diff = t->expires_at - timeofday();
+ if (diff > 0) {
+ *tv = double2timeval(diff);
+ }
+ else {
+ tv->tv_sec = 0;
+ tv->tv_usec = 0;
+ }
+
+ return tv;
+ }
+ else {
+ return 0;
+ }
+}
+
+static VALUE
+iom_select_wait(VALUE ptr)
+{
+ struct select_do *sd = (struct select_do *)ptr;
+ rb_thread_t *th = sd->th;
+ rb_iom_t *iom = th->vm->iom;
+ int max = -1;
+ struct timeval tv, *tvp;
+ struct rb_iom_fd_waiter *fdw = 0, *next = 0;
+ rb_fdset_t *rfds, *wfds, *efds;
+
+ iom->select_start = iom->select_gen;
+ sd->ret = 0;
+ rfds = wfds = efds = 0;
+ rb_fd_init(&iom->rfds);
+ rb_fd_init(&iom->wfds);
+ rb_fd_init(&iom->efds);
+
+ list_for_each_safe(&iom->fds, fdw, next, w.wnode) {
+ int fd = *fdw->fdp;
+ if (fd < 0) { /* closed */
+ fdw->revents = fdw->events;
+ rb_iom_waiter_ready(&fdw->w);
+ sd->do_wait = 0;
+ continue;
+ }
+ if (fd > max) {
+ max = fd;
+ }
+ if (fdw->events & RB_WAITFD_IN) {
+ rb_fd_set(fd, rfds = &iom->rfds);
+ }
+ if (fdw->events & RB_WAITFD_OUT) {
+ rb_fd_set(fd, wfds = &iom->wfds);
+ }
+ if (fdw->events & RB_WAITFD_PRI) {
+ rb_fd_set(fd, efds = &iom->efds);
+ }
+ }
+
+ if (sd->do_wait) {
+ tvp = next_timeout(iom, &tv);
+ if (tvp == 0 && max < 0) {
+ goto nowait;
+ }
+ }
+ else {
+ nowait:
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ tvp = &tv;
+ }
+ BLOCKING_REGION({
+ sd->ret = native_fd_select(max + 1, rfds, wfds, efds, tvp, th);
+ }, ubf_select, th, FALSE);
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
+
+ return Qfalse;
+}
+
+static VALUE
+iom_select_done(VALUE ptr)
+{
+ struct select_do *sd = (struct select_do *)ptr;
+ rb_iom_t *iom = sd->th->vm->iom;
+ struct rb_iom_fd_waiter *fdw = 0, *next = 0;
+ int ret = sd->ret;
+
+ iom->selector = 0;
+ if (ret > 0) {
+ list_for_each_safe(&iom->fds, fdw, next, w.wnode) {
+ int fd = *fdw->fdp;
+
+ if (fd < 0) { /* closed */
+ fdw->revents = fdw->events;
+ rb_iom_waiter_ready(&fdw->w);
+ continue;
+ }
+ if (fdw->events & RB_WAITFD_IN && rb_fd_isset(fd, &iom->rfds)) {
+ fdw->revents |= RB_WAITFD_IN;
+ }
+ if (fdw->events & RB_WAITFD_OUT && rb_fd_isset(fd, &iom->wfds)) {
+ fdw->revents |= RB_WAITFD_OUT;
+ }
+ if (fdw->events & RB_WAITFD_PRI && rb_fd_isset(fd, &iom->efds)) {
+ fdw->revents |= RB_WAITFD_PRI;
+ }
+
+ /* got revents? enqueue ourselves to be run! */
+ if (fdw->revents) {
+ rb_iom_waiter_ready(&fdw->w);
+ if (--ret <= 0) {
+ break;
+ }
+ }
+ }
+ }
+
+ rb_fd_term(&iom->rfds);
+ rb_fd_term(&iom->wfds);
+ rb_fd_term(&iom->efds);
+
+ return Qfalse;
+}
+
+static int
+retry_select_p(struct select_do *sd, rb_iom_t *iom)
+{
+ /*
+ * if somebody changed iom->fds while we were inside select,
+ * rerun it with zero timeout to avoid a race condition.
+ * This is not necessary for epoll or kqueue because the kernel
+ * is constantly monitoring the watched set.
+ */
+ if (iom->select_start != iom->select_gen) {
+ sd->do_wait = 0;
+ return 1;
+ }
+ return 0;
+}
+
+static VALUE
+iom_do_select(VALUE ptr)
+{
+ struct select_do *sd = (struct select_do *)ptr;
+ rb_thread_t *th = sd->th;
+ rb_iom_t *iom = th->vm->iom;
+ rb_thread_t *s;
+
+ if (rb_fiber_auto_do_yield_p(th)) {
+ rb_fiber_yield(0, 0);
+ }
+ if (!iom) {
+ return Qfalse;
+ }
+ s = iom->selector;
+ if (s) { /* somebody else is running select() */
+ if (iom->select_start != iom->select_gen) {
+ ubf_select(s);
+ }
+ if (rb_fiber_auto_sched_p(th)) {
+ rb_fiber_yield(0, 0);
+ }
+ }
+ else {
+ iom->selector = th;
+
+ do {
+ rb_ensure(iom_select_wait, (VALUE)sd, iom_select_done, (VALUE)sd);
+ rb_iom_timer_check(&iom->timers);
+ } while (retry_select_p(sd, iom));
+
+ if (rb_fiber_auto_do_yield_p(th)) {
+ rb_fiber_yield(0, 0);
+ }
+ }
+ return Qfalse;
+}
+
+void
+rb_iom_schedule(rb_thread_t *th, double *timeout)
+{
+ struct select_do sd;
+
+ sd.th = th;
+ sd.do_wait = !rb_fiber_auto_sched_p(th);
+
+ if (timeout && sd.do_wait) {
+ double t0 = timeofday();
+ rb_iom_t *iom = rb_iom_get(th);
+ struct rb_iom_timer t;
+
+ rb_iom_timer_add(&iom->timers, &t, timeout, 0);
+ rb_ensure(iom_do_select, (VALUE)&sd, rb_iom_timer_done, (VALUE)&t);
+ *timeout -= timeofday() - t0;
+ }
+ else {
+ rb_iom_t *iom = th->vm->iom;
+ if (iom) {
+ rb_iom_timer_check(&iom->timers);
+ }
+ iom_do_select((VALUE)&sd);
+ }
+}
+
+static VALUE
+iom_schedule_fd(VALUE ptr)
+{
+ rb_thread_t *th = (rb_thread_t *)ptr;
+ rb_iom_t *iom = rb_iom_get(th);
+ rb_thread_t *s;
+
+ if (rb_fiber_auto_do_yield_p(th)) {
+ return rb_fiber_yield(0, 0);
+ }
+
+ s = iom->selector;
+ if (s) {
+ /*
+ * kick the select thread, retry_select_p will return true since
+ * caller bumped iom->select_gen
+ */
+ ubf_select(s);
+ }
+ else {
+ struct select_do sd;
+
+ iom->selector = sd.th = th;
+ sd.do_wait = 0;
+ do {
+ rb_ensure(iom_select_wait, (VALUE)&sd, iom_select_done, (VALUE)&sd);
+ rb_iom_timer_check(&iom->timers);
+ } while (retry_select_p(&sd, iom));
+ (void)rb_fiber_auto_do_yield_p(th);
+ }
+ return rb_fiber_yield(0, 0);
+}
+
+
+/* only epoll takes advantage of this (kqueue may for portability bugs) */
+int
+rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, double *timeout)
+{
+ return rb_iom_waitfd(th, &fptr->fd, events, timeout);
+}
+
+int
+rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
+{
+ rb_iom_t *iom = rb_iom_get(th);
+ struct rb_iom_fd_waiter fdw;
+
+ if (*fdp < 0) return -1;
+ fdw.fdp = fdp;
+ fdw.events = (short)events;
+ fdw.revents = 0;
+
+ /* use FIFO order for fairness in iom_select_done */
+ list_add_tail(&iom->fds, &fdw.w.wnode);
+ rb_iom_timer_add(&iom->timers, &fdw.w.timer, timeout, IOM_FIB|IOM_WAIT);
+ iom->select_gen++;
+ rb_ensure(iom_schedule_fd, (VALUE)th, rb_iom_waiter_done, (VALUE)&fdw.w);
+
+ if (*fdp < 0) return -1;
+
+ return (int)fdw.revents; /* may be zero if timed out */
+}
+
+void
+rb_iom_destroy(rb_vm_t *vm)
+{
+ if (vm->iom) {
+ xfree(vm->iom);
+ vm->iom = 0;
+ }
+}
+
+/* used by thread.c::rb_thread_atfork */
+static void
+rb_iom_atfork_child(rb_thread_t *th)
+{
+ rb_iom_destroy(th->vm);
+}
+
+/* used by thread_pthread.c */
+static int
+rb_iom_reserved_fd(int fd)
+{
+ return 0;
+}
+
+#include "iom_common.h"
diff --git a/prelude.rb b/prelude.rb
index 7b98e28285..1e71a3f168 100644
--- a/prelude.rb
+++ b/prelude.rb
@@ -16,6 +16,18 @@ def self.exclusive
end
end
+class Fiber
+ def self.start(*args, &block)
+ # loses Fiber#resume return value, but maybe people do not care
+ new(&block).run
+ end
+
+ def run
+ start
+ self
+ end
+end
+
class IO
# call-seq:
diff --git a/test/lib/leakchecker.rb b/test/lib/leakchecker.rb
index c0cf718635..562f32efca 100644
--- a/test/lib/leakchecker.rb
+++ b/test/lib/leakchecker.rb
@@ -28,6 +28,15 @@ def find_fds
if d.respond_to? :fileno
a -= [d.fileno]
end
+
+ # only place we use epoll is internally, do not count it against us
+ a.delete_if do |fd|
+ begin
+ File.readlink("#{fd_dir}/#{fd}").match?(/\beventpoll\b/)
+ rescue
+ false
+ end
+ end
a
}
fds.sort
diff --git a/test/ruby/test_fiber_auto.rb b/test/ruby/test_fiber_auto.rb
new file mode 100644
index 0000000000..0dd798644c
--- /dev/null
+++ b/test/ruby/test_fiber_auto.rb
@@ -0,0 +1,126 @@
+# frozen_string_literal: true
+require 'test/unit'
+require 'fiber'
+require 'io/nonblock'
+require 'io/wait'
+
+class TestFiberAuto < Test::Unit::TestCase
+ def test_value
+ nr = 0
+ f = Fiber.start { nr += 1 }
+ assert_equal 1, f.value, 'value returns as expected'
+ assert_equal 1, f.value, 'value is idempotent'
+ end
+
+ def test_join
+ nr = 0
+ f = Fiber.start { nr += 1 }
+ assert_equal f, f.join
+ assert_equal 1, f.value
+ assert_equal f, f.join, 'join is idempotent'
+ assert_equal f, f.join(10), 'join is idempotent w/ timeout'
+ end
+
+ def test_io_wait_read
+ tick = 0.1 # 100ms, TIME_QUANTUM_USEC in thread_pthread.c
+ IO.pipe do |r, w|
+ r.nonblock = w.nonblock = true
+ rdr = Fiber.start { r.read(1) }
+ t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ assert_nil rdr.join(tick), 'join returns nil on timeout'
+ diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0
+ assert_operator diff, :>=, tick, 'Fiber still running'
+ w.write('a')
+ assert_equal 'a', rdr.value, 'finished'
+
+ t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ rwait = Fiber.start { r.wait_readable(tick) }
+ assert_nil rwait.value, 'IO returned no events after timeout'
+ diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0
+ assert_operator diff, :>=, tick, 'Fiber waited for timeout'
+ end
+ end
+
+ # make sure we do not create extra FDs (from epoll/kqueue)
+ # for operations which do not need it
+ def test_fd_creation_and_fork
+ assert_separately(%w(-r io/nonblock), <<~'end;') # do
+ fdrange = (3..64)
+ reserved = -'The given fd is not accessible because RubyVM reserves it'
+ fdstats = lambda do
+ stats = Hash.new(0)
+ fdrange.map do |fd|
+ begin
+ IO.for_fd(fd, autoclose: false)
+ stats[:good] += 1
+ rescue Errno::EBADF # ignore
+ rescue ArgumentError => e
+ raise if e.message != reserved
+ stats[:reserved] += 1
+ end
+ end
+ stats.freeze
+ end
+
+ before = fdstats.call
+ Fiber.start { :fib }.join
+ assert_equal before, fdstats.call,
+ 'Fiber.start + Fiber#join does not create new FD'
+
+ # now, epoll/kqueue implementations create FDs, ensure they're reserved
+ IO.pipe do |r, w|
+ r.nonblock = w.nonblock = true
+
+ before_io = fdstats.call
+ assert_equal before[:reserved], before_io[:reserved],
+ 'no reserved FDs created before I/O attempted'
+
+ f1 = Fiber.start { r.read(1) }
+ after_io = fdstats.call
+ assert_equal before_io[:good], after_io[:good],
+ 'no change in unreserved FDs during I/O wait'
+
+ w.write('!')
+ assert_equal '!', f1.value, 'auto-Fiber IO#read works'
+ assert_operator before_io[:reserved], :<=, after_io[:reserved],
+ 'a reserved FD may be created on some implementations'
+
+ # ensure we do not share epoll/kqueue FDs across fork
+ if Process.respond_to?(:fork) && Process.respond_to?(:waitpid2)
+ pid = fork do
+ after_fork = fdstats.call
+ w.write(after_fork.inspect == before_io.inspect ? '1' : '0')
+
+ # ensure auto-Fiber works properly after forking
+ IO.pipe do |a, b|
+ a.nonblock = b.nonblock = true
+ f1 = Fiber.start { a.read(1) }
+ f2 = Fiber.start { b.write('!') }
+ assert_equal 1, f2.value
+ w.write(f1.value == '!' ? '!' : '?')
+ end
+
+ exit!(0)
+ end
+ assert_equal '1', r.read(1), 'reserved FD cleared after fork'
+ assert_equal '!', r.read(1), 'auto-fiber works after forking'
+ _, status = Process.waitpid2(pid)
+ assert_predicate status, :success?, 'forked child exited properly'
+ end
+ end
+ end;
+ end
+
+ # As usual, cannot resume/run fibers across threads, but the
+ # scheduler works across threads
+ def test_cross_thread_schedule
+ IO.pipe do |r, w|
+ r.nonblock = w.nonblock = true
+ t = Thread.new { Fiber.start { r.read(1) }.value }
+ assert_nil t.join(0.1)
+ f = Fiber.start { w.write('a') }
+ assert_equal 'a', t.value
+ assert_equal 1, f.value
+ end
+ end
+end
diff --git a/thread.c b/thread.c
index 4d954c76de..7899aeae0b 100644
--- a/thread.c
+++ b/thread.c
@@ -70,6 +70,7 @@
#include "ruby/thread.h"
#include "ruby/thread_native.h"
#include "internal.h"
+#include "iom.h"
#ifndef USE_NATIVE_THREAD_PRIORITY
#define USE_NATIVE_THREAD_PRIORITY 0
@@ -3484,6 +3485,18 @@ rb_thread_priority_set(VALUE thread, VALUE prio)
/* for IO */
+static int
+rb_iom_waitfd_tv(rb_thread_t *th, int *fdp, int events, struct timeval *tv)
+{
+ double *to = NULL;
+ double tout;
+ if (tv) {
+ tout = (double)tv->tv_sec + (double)tv->tv_usec * 1e-6;
+ to = &tout;
+ }
+ return rb_iom_waitfd(th, fdp, events, to);
+}
+
#if defined(NFDBITS) && defined(HAVE_RB_FD_INIT)
/*
@@ -3919,6 +3932,10 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
struct timespec *timeout = NULL;
rb_thread_t *th = GET_THREAD();
+ if (rb_fiber_auto_sched_p(th)) {
+ return rb_iom_waitfd_tv(th, &fd, events, tv);
+ }
+
#define poll_update() \
(update_timespec(timeout, limit), \
TRUE)
@@ -4030,7 +4047,11 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
struct select_args args;
int r;
VALUE ptr = (VALUE)&args;
+ rb_thread_t *th = GET_THREAD();
+ if (rb_fiber_auto_sched_p(th)) {
+ return rb_iom_waitfd_tv(th, &fd, events, tv);
+ }
args.as.fd = fd;
args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
@@ -4177,10 +4198,13 @@ terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th)
}
}
+static void rb_iom_atfork_child(rb_thread_t *);
+
void
rb_thread_atfork(void)
{
rb_thread_t *th = GET_THREAD();
+ rb_iom_atfork_child(th);
rb_thread_atfork_internal(th, terminate_atfork_i);
th->join_list = NULL;
@@ -5094,3 +5118,21 @@ ruby_kill(rb_pid_t pid, int sig)
rb_sys_fail(0);
}
}
+
+#ifndef RUBYVM_IOM
+# if defined(HAVE_SYS_EVENT_H) && defined(HAVE_KQUEUE) && defined(HAVE_KEVENT)
+# define RUBYVM_IOM IOM_KQUEUE
+# elif defined(HAVE_SYS_EPOLL_H) && defined(HAVE_EPOLL_CREATE) && \
+ defined(HAVE_EPOLL_CTL) && defined(HAVE_EPOLL_WAIT)
+# define RUBYVM_IOM IOM_EPOLL
+# else
+# define RUBYVM_IOM IOM_SELECT
+# endif
+#endif
+#if RUBYVM_IOM == IOM_KQUEUE
+# include "iom_kqueue.h"
+#elif RUBYVM_IOM == IOM_EPOLL
+# include "iom_epoll.h"
+#else
+# include "iom_select.h"
+#endif
diff --git a/thread_pthread.c b/thread_pthread.c
index 437ff370d5..14945ff40c 100644
--- a/thread_pthread.c
+++ b/thread_pthread.c
@@ -1746,9 +1746,14 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr)
}
#endif
+static int rb_iom_reserved_fd(int); /* check kqueue or epoll FD */
+
int
rb_reserved_fd_p(int fd)
{
+ if (rb_iom_reserved_fd(fd)) {
+ return 1;
+ }
#if USE_SLEEPY_TIMER_THREAD
if ((fd == timer_thread_pipe.normal[0] ||
fd == timer_thread_pipe.normal[1] ||
diff --git a/vm.c b/vm.c
index 4810321d6f..44bc4e73f9 100644
--- a/vm.c
+++ b/vm.c
@@ -11,6 +11,7 @@
#include "internal.h"
#include "ruby/vm.h"
#include "ruby/st.h"
+#include "iom.h"
#include "gc.h"
#include "vm_core.h"
@@ -2179,6 +2180,7 @@ ruby_vm_destruct(rb_vm_t *vm)
rb_fiber_reset_root_local_storage(th->self);
thread_free(th);
}
+ rb_iom_destroy(vm);
rb_vm_living_threads_init(vm);
ruby_vm_run_at_exit_hooks(vm);
if (vm->loading_table) {
@@ -2345,6 +2347,7 @@ rb_thread_recycle_stack_release(VALUE *stack)
}
void rb_fiber_mark_self(rb_fiber_t *fib);
+void rb_fiber_auto_schedule_mark(const rb_thread_t *th);
void
rb_thread_mark(void *ptr)
@@ -2387,6 +2390,11 @@ rb_thread_mark(void *ptr)
RUBY_MARK_UNLESS_NULL(th->top_wrapper);
rb_fiber_mark_self(th->fiber);
rb_fiber_mark_self(th->root_fiber);
+
+ /* th->afhead may be 0 early on */
+ if (th->afhead.n.next && !list_empty(&th->afhead)) {
+ rb_fiber_auto_schedule_mark(th);
+ }
RUBY_MARK_UNLESS_NULL(th->stat_insn_usage);
RUBY_MARK_UNLESS_NULL(th->last_status);
@@ -2499,6 +2507,7 @@ static void
th_init(rb_thread_t *th, VALUE self)
{
th->self = self;
+ list_head_init(&th->afhead);
/* allocate thread stack */
#ifdef USE_SIGALTSTACK
diff --git a/vm_core.h b/vm_core.h
index 21136a8874..50a3f7d2fe 100644
--- a/vm_core.h
+++ b/vm_core.h
@@ -481,6 +481,8 @@ typedef struct rb_hook_list_struct {
int need_clean;
} rb_hook_list_t;
+struct rb_iom_struct;
+
typedef struct rb_vm_struct {
VALUE self;
@@ -489,6 +491,7 @@ typedef struct rb_vm_struct {
struct rb_thread_struct *main_thread;
struct rb_thread_struct *running_thread;
+ struct rb_iom_struct *iom;
struct list_head waiting_fds; /* <=> struct waiting_fd */
struct list_head living_threads;
@@ -808,6 +811,7 @@ typedef struct rb_thread_struct {
rb_fiber_t *fiber;
rb_fiber_t *root_fiber;
rb_jmpbuf_t root_jmpbuf;
+ struct list_head afhead; /* -rb_fiber_t.afnode */
/* ensure & callcc */
rb_ensure_list_t *ensure_list;
--
EW
^ permalink raw reply related [flat|nested] 7+ messages in thread
* [PATCH 2/7] iom: implement waitpid generically
2017-05-30 19:03 [PATCH 1/7] auto fiber scheduling for I/O Eric Wong
@ 2017-05-30 19:03 ` Eric Wong
2017-05-30 19:03 ` [PATCH 3/7] iom: move to fdmap Eric Wong
` (4 subsequent siblings)
5 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2017-05-30 19:03 UTC (permalink / raw)
To: spew
This requires us to constantly hijack the SIGCHLD handler,
so special values like nil, 'SYSTEM_DEFAULT', 'SIG_IGN',
'IGNORE', are always ignored in favor Ruby's internal
signal handling
---
iom_common.h | 19 +++++++++-----
process.c | 14 ++++++++---
signal.c | 40 ++++++++++++++++++++++++-----
test/ruby/test_fiber_auto.rb | 60 ++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 117 insertions(+), 16 deletions(-)
diff --git a/iom_common.h b/iom_common.h
index c9e7f716d8..af0646b6cb 100644
--- a/iom_common.h
+++ b/iom_common.h
@@ -1,4 +1,3 @@
-
/* we lazily create this, small scripts may never need iom */
static rb_iom_t *
iom_new(rb_thread_t *th)
@@ -18,8 +17,12 @@ rb_iom_get(rb_thread_t *th)
return th->vm->iom;
}
-
-/* included by iom_(epoll|select|kqueue).h */
+/*
+ * TODO: consider EVFILT_PROC for kqueue and netlink+epoll on Linux;
+ * see the "god" RubyGem for usage examples.
+ * However, I doubt rb_waitpid scalability will be a problem and
+ * the simplicity of a single implementation for all is appealing.
+ */
#ifdef HAVE_SYS_TYPES_H
# include <sys/types.h>
#endif
@@ -39,8 +42,6 @@ iom_schedule_pid(VALUE ptr)
return rb_fiber_yield(0, 0);
}
-static rb_iom_t *rb_iom_get(rb_thread_t *);
-
rb_pid_t
rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
double *timeout)
@@ -86,5 +87,11 @@ rb_iom_sigchld(rb_vm_t *vm)
}
}
}
-
+#else
+rb_pid_t
+rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
+ double *timeout)
+{
+ rb_bug("Should not get here, WNOHANG not implemented");
+}
#endif /* defined(WNOHANG) && (defined(HAVE_WAITPID) || defined(HAVE_WAIT4)) */
diff --git a/process.c b/process.c
index 67f51db847..bd384e86e1 100644
--- a/process.c
+++ b/process.c
@@ -16,6 +16,7 @@
#include "ruby/thread.h"
#include "ruby/util.h"
#include "vm_core.h"
+#include "iom.h"
#include <stdio.h>
#include <errno.h>
@@ -907,10 +908,15 @@ rb_waitpid(rb_pid_t pid, int *st, int flags)
result = do_waitpid(pid, st, flags);
}
else {
- while ((result = do_waitpid_nonblocking(pid, st, flags)) < 0 &&
- (errno == EINTR)) {
- rb_thread_t *th = GET_THREAD();
- RUBY_VM_CHECK_INTS(th);
+ rb_thread_t *th = GET_THREAD();
+ if (rb_fiber_auto_sched_p(th) && WNOHANG) {
+ return rb_iom_waitpid(th, pid, st, flags, 0);
+ }
+ else {
+ while ((result = do_waitpid_nonblocking(pid, st, flags)) < 0 &&
+ (errno == EINTR)) {
+ RUBY_VM_CHECK_INTS(th);
+ }
}
}
if (result > 0) {
diff --git a/signal.c b/signal.c
index 8ee0963b8a..8f5dee375f 100644
--- a/signal.c
+++ b/signal.c
@@ -13,6 +13,7 @@
#include "internal.h"
#include "vm_core.h"
+#include "iom.h"
#include <signal.h>
#include <stdio.h>
#include <errno.h>
@@ -1028,6 +1029,17 @@ rb_trap_exit(void)
}
}
+static int
+sig_is_chld(int sig)
+{
+#if defined(SIGCLD)
+ return (sig == SIGCLD);
+#elif defined(SIGCHLD)
+ return (sig == SIGCHLD);
+#endif
+ return 0;
+}
+
void
rb_signal_exec(rb_thread_t *th, int sig)
{
@@ -1035,6 +1047,9 @@ rb_signal_exec(rb_thread_t *th, int sig)
VALUE cmd = vm->trap_list[sig].cmd;
int safe = vm->trap_list[sig].safe;
+ if (sig_is_chld(sig)) {
+ rb_iom_sigchld(vm);
+ }
if (cmd == 0) {
switch (sig) {
case SIGINT:
@@ -1094,6 +1109,11 @@ default_handler(int sig)
#ifdef SIGUSR2
case SIGUSR2:
#endif
+#ifdef SIGCLD
+ case SIGCLD:
+#elif defined(SIGCHLD)
+ case SIGCHLD:
+#endif
func = sighandler;
break;
#ifdef SIGBUS
@@ -1131,6 +1151,9 @@ trap_handler(VALUE *cmd, int sig)
VALUE command;
if (NIL_P(*cmd)) {
+ if (sig_is_chld(sig)) {
+ goto sig_dfl;
+ }
func = SIG_IGN;
}
else {
@@ -1151,6 +1174,9 @@ trap_handler(VALUE *cmd, int sig)
break;
case 14:
if (memcmp(cptr, "SYSTEM_DEFAULT", 14) == 0) {
+ if (sig_is_chld(sig)) {
+ goto sig_dfl;
+ }
func = SIG_DFL;
*cmd = 0;
}
@@ -1158,6 +1184,9 @@ trap_handler(VALUE *cmd, int sig)
case 7:
if (memcmp(cptr, "SIG_IGN", 7) == 0) {
sig_ign:
+ if (sig_is_chld(sig)) {
+ goto sig_dfl;
+ }
func = SIG_IGN;
*cmd = Qtrue;
}
@@ -1406,15 +1435,14 @@ static int
init_sigchld(int sig)
{
sighandler_t oldfunc;
+ sighandler_t func = sighandler;
oldfunc = ruby_signal(sig, SIG_DFL);
if (oldfunc == SIG_ERR) return -1;
- if (oldfunc != SIG_DFL && oldfunc != SIG_IGN) {
- ruby_signal(sig, oldfunc);
- }
- else {
- GET_VM()->trap_list[sig].cmd = 0;
- }
+
+ ruby_signal(sig, func);
+ GET_VM()->trap_list[sig].cmd = 0;
+
return 0;
}
# ifndef __native_client__
diff --git a/test/ruby/test_fiber_auto.rb b/test/ruby/test_fiber_auto.rb
index 0dd798644c..67a9fe87f3 100644
--- a/test/ruby/test_fiber_auto.rb
+++ b/test/ruby/test_fiber_auto.rb
@@ -111,6 +111,66 @@ def test_fd_creation_and_fork
end;
end
+ def test_waitpid
+ assert_separately(%w(-r io/nonblock), <<~'end;') # do
+ t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ pid = fork do
+ sleep 0.1
+ exit!(0)
+ end
+ f = Fiber.start { Process.waitpid2(pid) }
+ wpid, st = f.value
+ diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0
+ assert_operator diff, :>=, 0.1, 'child did not return immediately'
+ assert_equal pid, wpid, 'waited pid matches'
+ assert_predicate st, :success?, 'process exited successfully'
+
+ IO.pipe do |r, w|
+ r.nonblock = w.nonblock = true
+
+ # "blocking" Process.waitpid2 takes precedence over trap(:CHLD)
+ waited = []
+ chld_called = false
+ trap(:CHLD) { chld_called = true; waited.concat(Process.waitall) }
+ pid = fork do
+ r.read(1)
+ exit!(0)
+ end
+ assert_nil Process.waitpid2(pid, Process::WNOHANG), 'child is waiting'
+ assert_nil(Fiber.start do
+ Process.waitpid2(pid, Process::WNOHANG)
+ end.value, 'WNOHANG works normally in fiber')
+ f = Fiber.start { Process.waitpid2(pid) }
+ assert_equal 1, w.write('.'), 'woke child'
+ wpid, st = f.value
+ assert_equal pid, wpid, 'waited pid matches'
+ assert_predicate st, :success?, 'child exited successfully'
+ assert_empty waited, 'waitpid had predence'
+ assert chld_called, 'CHLD handler got called anyways'
+
+ chld_called = false
+ [ 'DEFAULT', 'SIG_DFL', 'IGNORE', 'SYSTEM_DEFAULT', nil
+ ].each do |handler|
+ trap(:CHLD, handler)
+ pid = fork do
+ r.read(1)
+ exit!(0)
+ end
+ t = "trap(:CHLD, #{handler.inspect})"
+ f = Fiber.start { Process.waitpid2(pid) }
+ assert_equal 1, w.write('.'), "woke child for #{t}"
+ wpid, st = f.value
+ assert_predicate st, :success?, "child exited successfully for #{t}"
+ assert_equal wpid, pid, "return value correct for #{t}"
+ assert_equal false, chld_called,
+ "old CHLD handler did not fire for #{t}"
+ end
+ end
+ end;
+ end if Process.respond_to?(:fork) && Process.respond_to?(:waitpid2) &&
+ Process.const_defined?(:WNOHANG)
+
+
# As usual, cannot resume/run fibers across threads, but the
# scheduler works across threads
def test_cross_thread_schedule
--
EW
^ permalink raw reply related [flat|nested] 7+ messages in thread
* [PATCH 3/7] iom: move to fdmap
2017-05-30 19:03 [PATCH 1/7] auto fiber scheduling for I/O Eric Wong
2017-05-30 19:03 ` [PATCH 2/7] iom: implement waitpid generically Eric Wong
@ 2017-05-30 19:03 ` Eric Wong
2017-05-30 19:03 ` [PATCH 4/7] epoll fdmap inversion Eric Wong
` (3 subsequent siblings)
5 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2017-05-30 19:03 UTC (permalink / raw)
To: spew
---
iom_epoll.h | 41 +++++++++-
iom_internal.h | 63 +++++++++++++++
iom_kqueue.h | 178 ++++++++++++++++++++++++++++++-------------
test/ruby/test_fiber_auto.rb | 14 ++++
4 files changed, 240 insertions(+), 56 deletions(-)
diff --git a/iom_epoll.h b/iom_epoll.h
index aa1ec087fa..888f8a696c 100644
--- a/iom_epoll.h
+++ b/iom_epoll.h
@@ -27,6 +27,7 @@ struct rb_iom_struct {
struct list_head epws; /* -epw.w.wnode, order agnostic */
struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
+ struct rb_iom_fdmap fdmap; /* maps each FD to multiple epws */
int epoll_fd;
int maxevents; /* auto-increases */
@@ -47,7 +48,11 @@ struct epw {
rb_thread_t *th;
short revents;
} as;
- int *fdp;
+ struct list_node fdnode; /* self->fd.fdm->fdhead */
+ union {
+ int *fdp;
+ struct rb_iom_fdm *fdm;
+ } fd;
int *flags; /* &fptr->mode */
struct epoll_event ev;
};
@@ -151,6 +156,7 @@ rb_iom_init(rb_iom_t *iom)
iom->waiter = 0;
iom->maxevents = 8;
iom->epoll_fd = -1;
+ rb_iom_fdmap_init(&iom->fdmap);
}
static int
@@ -178,7 +184,18 @@ check_epoll_wait(rb_thread_t *th, int nr, struct epoll_event *ev)
struct epw *epw = ev[i].data.ptr;
epw->as.revents = rb_ep2revents(ev[i].events);
+
+ list_del_init(&epw->fdnode);
rb_iom_waiter_ready(&epw->w);
+
+ /* are there other epw using the same FD? */
+ if (!list_empty(&epw->fd.fdm->fdhead)) {
+ struct epw *cur = 0, *nxt = 0;
+ list_for_each_safe(&epw->fd.fdm->fdhead, cur, nxt, fdnode) {
+ list_del_init(&cur->fdnode);
+ rb_iom_waiter_ready(&cur->w);
+ }
+ }
}
/* notify the waiter thread in case we enqueued fibers for them */
@@ -277,7 +294,7 @@ epmod_yield(VALUE ptr)
rb_thread_t *th = epw->as.th;
rb_iom_t *iom = rb_iom_get(th);
int e;
- int fd = *epw->fdp;
+ int fd = *epw->fd.fdp;
int epfd = iom_epfd(iom);
/*
@@ -286,6 +303,12 @@ epmod_yield(VALUE ptr)
* so we must initialize it before epoll_ctl:
*/
epw->as.revents = 0;
+ if (fd < 0) {
+ list_node_init(&epw->fdnode); /* for list_del in ensure */
+ return Qfalse;
+ }
+ epw->fd.fdm = rb_iom_fdm_get(&iom->fdmap, fd);
+ list_add(&epw->fd.fdm->fdhead, &epw->fdnode);
/* we want to track if an FD is already being watched ourselves */
if (epw->flags) {
@@ -324,6 +347,15 @@ epmod_yield(VALUE ptr)
return rb_fiber_yield(0, 0);
}
+static VALUE
+epw_done(VALUE ptr)
+{
+ struct epw *epw = (struct epw *)ptr;
+
+ list_del(&epw->fdnode);
+ return rb_iom_waiter_done((VALUE)&epw->w);
+}
+
static int
iom_waitfd(rb_thread_t *th, int *fdp, int *flags, int events, double *timeout)
{
@@ -331,14 +363,14 @@ iom_waitfd(rb_thread_t *th, int *fdp, int *flags, int events, double *timeout)
struct epw epw;
epw.as.th = th;
- epw.fdp = fdp;
+ epw.fd.fdp = fdp;
epw.flags = flags;
epw.ev.events = rb_events2ep(events);
epw.ev.data.ptr = &epw;
list_add(&iom->epws, &epw.w.wnode);
rb_iom_timer_add(&iom->timers, &epw.w.timer, timeout, IOM_FIB|IOM_WAIT);
- rb_ensure(epmod_yield, (VALUE)&epw, rb_iom_waiter_done, (VALUE)&epw.w);
+ rb_ensure(epmod_yield, (VALUE)&epw, epw_done, (VALUE)&epw);
return (int)epw.as.revents; /* may be zero if timed out */
}
@@ -368,6 +400,7 @@ rb_iom_destroy(rb_vm_t *vm)
if (iom->epoll_fd >= 0) {
close(iom->epoll_fd);
}
+ rb_iom_fdmap_destroy(&iom->fdmap);
xfree(iom);
}
}
diff --git a/iom_internal.h b/iom_internal.h
index b5aa321274..45fda397db 100644
--- a/iom_internal.h
+++ b/iom_internal.h
@@ -15,6 +15,19 @@ int rb_fiber_auto_do_yield_p(rb_thread_t *);
#define IOM_FIB (0x2)
#define IOM_WAIT (0x1) /* container_of(..., struct rb_iom_waiter, timer) */
+#define RB_IOM_FD_PER_HEAP 256
+/* on-heap and persistent, keep as small as possible */
+struct rb_iom_fdm {
+ struct list_head fdhead; /* -rb_iom_fd_waiter.fdnode */
+};
+
+/* singleton (per-rb_iom_t) */
+struct rb_iom_fdmap {
+ struct rb_iom_fdm **map;
+ unsigned int heaps;
+ int max_fd;
+};
+
/* allocated on stack */
struct rb_iom_timer {
struct list_node tnode; /* <=> rb_iom_struct.timers */
@@ -29,6 +42,7 @@ struct rb_iom_waiter {
struct rb_iom_fd_waiter {
struct rb_iom_waiter w;
+ struct list_node fdnode; /* <=> &self->fdm->fdhead */
int *fdp;
short events;
short revents;
@@ -43,6 +57,55 @@ struct rb_iom_pid_waiter {
int errnum;
};
+#if (RUBYVM_IOM == IOM_KQUEUE || RUBYVM_IOM == IOM_EPOLL)
+static struct rb_iom_fdm *
+iom_fdhead_aref(struct rb_iom_fdmap *fdmap, int fd)
+{
+ VM_ASSERT(fd >= 0);
+ return &fdmap->map[fd / RB_IOM_FD_PER_HEAP][fd % RB_IOM_FD_PER_HEAP];
+}
+
+static struct rb_iom_fdm *
+rb_iom_fdm_get(struct rb_iom_fdmap *fdmap, int fd)
+{
+ if (fd >= fdmap->max_fd) {
+ struct rb_iom_fdm *base, *h;
+ unsigned n = fdmap->heaps + 1;
+ unsigned i;
+
+ fdmap->map = xrealloc2(fdmap->map, n, sizeof(struct rb_iom_fdm *));
+ base = h = ALLOC_N(struct rb_iom_fdm, RB_IOM_FD_PER_HEAP);
+ for (i = 0; i < RB_IOM_FD_PER_HEAP; i++) {
+ list_head_init(&h->fdhead);
+ h++;
+ }
+ fdmap->map[fdmap->heaps] = base;
+ fdmap->max_fd += RB_IOM_FD_PER_HEAP;
+ }
+ return iom_fdhead_aref(fdmap, fd);
+}
+
+static void
+rb_iom_fdmap_init(struct rb_iom_fdmap *fdmap)
+{
+ fdmap->max_fd = 0;
+ fdmap->heaps = 0;
+ fdmap->map = 0;
+}
+
+static void
+rb_iom_fdmap_destroy(struct rb_iom_fdmap *fdmap)
+{
+ unsigned n;
+
+ for (n = 0; n < fdmap->heaps; n++) {
+ xfree(fdmap->map[n]);
+ }
+ xfree(fdmap->map);
+ rb_iom_fdmap_init(fdmap);
+}
+#endif /* (RUBYVM_IOM == IOM_KQUEUE || RUBYVM_IOM == IOM_EPOLL) */
+
static VALUE
rb_iom_timer_fibval(const struct rb_iom_timer *t)
{
diff --git a/iom_kqueue.h b/iom_kqueue.h
index f0776b2f58..52298b9055 100644
--- a/iom_kqueue.h
+++ b/iom_kqueue.h
@@ -26,6 +26,9 @@
* kevent().
*/
#include "iom_internal.h"
+
+/* LIST_HEAD (via ccan/list) conflicts with sys/queue.h (via sys/event.h) */
+#undef LIST_HEAD
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
@@ -39,6 +42,8 @@ struct rb_iom_struct {
struct list_head kevs; /* -kev.fdw.w.wnode, order agnostic */
struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
+ struct rb_iom_fdmap rfdmap; /* holds fdm for EVFILT_READ */
+ struct rb_iom_fdmap wfdmap; /* holds fdm for EVFILT_WRITE */
int kqueue_fd;
int nevents; /* auto-increases */
@@ -56,6 +61,8 @@ struct rb_iom_struct {
struct kev {
struct rb_iom_fd_waiter fdw;
rb_thread_t *th; /* 0 - kev is disabled */
+ struct list_node rfdnode; /* -(ev.udata==fdm)->fdhead (EVFILT_READ) */
+ struct list_node wfdnode; /* -(ev.udata==fdm)->fdhead (EVFILT_WRITE) */
};
/*
@@ -107,6 +114,8 @@ rb_iom_init(rb_iom_t *iom)
iom->waiter = 0;
iom->nevents = 8;
iom->kqueue_fd = -1;
+ rb_iom_fdmap_init(&iom->rfdmap);
+ rb_iom_fdmap_init(&iom->wfdmap);
}
static struct timespec *
@@ -137,21 +146,40 @@ next_timeout(rb_iom_t *iom, struct timespec *ts)
* EVFILT_READ returns to set the RB_WAITFD_PRI bit:
*/
static void
-check_pri(rb_thread_t *th, int fd, struct kev *kev)
+check_pri(rb_thread_t *th, struct list_head *pri)
{
rb_fdset_t efds;
int r;
struct timeval tv = { 0 };
+ struct kev *kev = 0, *next = 0;
+ int max = -1;
rb_fd_init(&efds);
- rb_fd_set(fd, &efds);
- r = native_fd_select(fd + 1, 0, 0, &efds, &tv, th);
- rb_fd_term(&efds);
- /* TODO: use poll() if possible */
+ list_for_each(pri, kev, fdw.w.wnode) {
+ int fd = *kev->fdw.fdp;
+ if (fd >= 0) {
+ rb_fd_set(fd, &efds);
+ if (fd > max) {
+ max = fd;
+ }
+ }
+ }
+ r = native_fd_select(max + 1, 0, 0, &efds, &tv, th);
if (r > 0) {
- kev->fdw.revents |= RB_WAITFD_PRI;
+ list_for_each(pri, kev, fdw.w.wnode) {
+ int fd = *kev->fdw.fdp;
+ if (fd >= 0 && rb_fd_isset(fd, &efds)) {
+ kev->fdw.revents |= RB_WAITFD_PRI;
+ }
+ }
}
+
+ /* needed for rb_ensure */
+ list_for_each_safe(pri, kev, next, fdw.w.wnode) {
+ list_del_init(&kev->fdw.w.wnode);
+ }
+ rb_fd_term(&efds);
}
static void
@@ -162,53 +190,68 @@ check_kevent(rb_thread_t *th, int nr, struct kevent *eventlist)
int i;
struct kevent *changelist = eventlist;
int nchanges = 0;
+ struct list_head pri = LIST_HEAD_INIT(pri);
for (i = 0; i < nr; i++) {
struct kevent *ev = &eventlist[i];
- struct kev *kev = ev->udata;
- int fd = *kev->fdw.fdp;
+ struct rb_iom_fdm *fdm = ev->udata;
+ struct kev *kev = 0, *next = 0;
+ int fd = (int)ev->ident;
+ int ok_fd = -1;
+ int paired = 0;
if (ev->filter == EVFILT_READ) {
- int rd = kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI);
- if (rd == RB_WAITFD_IN) { /* common */
- kev->fdw.revents |= rd;
- }
- else if (rd & RB_WAITFD_PRI) { /* ugh... */
- if (fd >= 0) {
- check_pri(th, fd, kev);
+ list_for_each_safe(&fdm->fdhead, kev, next, rfdnode) {
+ int rbits = kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI);
+
+ list_del_init(&kev->rfdnode);
+ assert(rbits && "unexpected EVFILT_READ");
+ rb_iom_waiter_ready(&kev->fdw.w);
+ paired |= (kev->fdw.events & RB_WAITFD_OUT);
+ if (rbits == RB_WAITFD_IN) { /* common */
+ kev->fdw.revents |= RB_WAITFD_IN;
+ }
+ else if (rbits & RB_WAITFD_PRI) { /* ugh... */
+ kev->fdw.revents |= rbits & RB_WAITFD_IN;
+ list_add_tail(&pri, &kev->fdw.w.wnode);
+ }
+ if (ok_fd != fd) {
+ ok_fd = *kev->fdw.fdp;
}
- }
- assert(rd && "got EVFILT_READ unexpectedly");
-
- /* disable the other filter if we waited on both read+write */
- if (fd >= 0 && (kev->fdw.events & RB_WAITFD_OUT) && kev->th) {
- struct kevent *chg = &changelist[nchanges++];
-
- kev->th = 0;
- EV_SET(chg, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
}
}
else if (ev->filter == EVFILT_WRITE) {
- kev->fdw.revents |= RB_WAITFD_OUT;
-
- assert(kev->fdw.events & RB_WAITFD_OUT &&
- "unexpected EVFILT_WRITE");
-
- /* disable the other filter if we waited on both read+write */
- if (fd >= 0 &&
- kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI) &&
- kev->th) {
- struct kevent *chg = &changelist[nchanges++];
-
- kev->th = 0;
- EV_SET(chg, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
+ list_for_each_safe(&fdm->fdhead, kev, next, wfdnode) {
+ list_del_init(&kev->wfdnode);
+ kev->fdw.revents |= RB_WAITFD_OUT;
+ assert(kev->fdw.events & RB_WAITFD_OUT &&
+ "unexpected EVFILT_WRITE");
+ rb_iom_waiter_ready(&kev->fdw.w);
+ paired |= (kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI));
+ if (ok_fd != fd) {
+ ok_fd = *kev->fdw.fdp;
+ }
}
}
else {
rb_bug("unexpected filter: %d", ev->filter);
}
- rb_iom_waiter_ready(&kev->fdw.w);
+ /* delete the corresponding event when one FD has 2 filters */
+ if (ok_fd == fd) {
+ if (paired & (RB_WAITFD_IN|RB_WAITFD_PRI)) {
+ struct kevent *chg = &changelist[nchanges++];
+ EV_SET(chg, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
+ }
+ if (paired & RB_WAITFD_OUT) {
+ struct kevent *chg = &changelist[nchanges++];
+ EV_SET(chg, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
+ }
+ }
}
+ if (!list_empty(&pri)) {
+ check_pri(th, &pri);
+ }
+
/*
* kqueue is a little more complicated because we install
@@ -319,24 +362,32 @@ kevxchg_ping(struct kev *kev)
struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents);
struct kevent *changelist = eventlist; /* allowed, see kevent manpage */
int fd = *kev->fdw.fdp;
- int kqfd = iom_kqfd(iom);
+ int kqfd;
VM_ASSERT(nevents > 2);
+ if (fd < 0) {
+ return;
+ }
+ kqfd = iom_kqfd(iom);
/* EVFILT_READ handles urgent data (POLLPRI) */
if (kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI)) {
- struct kevent *chg = &changelist[nchanges++];
- EV_SET(chg, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, kev);
+ struct rb_iom_fdm *fdm = rb_iom_fdm_get(&iom->rfdmap, fd);
+ if (list_empty(&fdm->fdhead)) {
+ struct kevent *chg = &changelist[nchanges++];
+ EV_SET(chg, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, fdm);
+ }
+ list_add(&fdm->fdhead, &kev->rfdnode);
}
-
if (kev->fdw.events & RB_WAITFD_OUT) {
- struct kevent *chg = &changelist[nchanges++];
- EV_SET(chg, fd, EVFILT_WRITE, EV_ADD|EV_ONESHOT, 0, 0, kev);
+ struct rb_iom_fdm *fdm = rb_iom_fdm_get(&iom->wfdmap, fd);
+ if (list_empty(&fdm->fdhead)) {
+ struct kevent *chg = &changelist[nchanges++];
+ EV_SET(chg, fd, EVFILT_WRITE, EV_ADD|EV_ONESHOT, 0, 0, fdm);
+ }
+ list_add(&fdm->fdhead, &kev->wfdnode);
}
-
do {
- int err;
-
nr = kevent(kqfd, changelist, nchanges, eventlist, nevents, &zero);
/* kevent may fail with ENOMEM when processing changelist */
@@ -360,6 +411,19 @@ kevxchg_yield(VALUE ptr)
return rb_fiber_yield(0, 0);
}
+static VALUE
+kev_done(VALUE ptr)
+{
+ struct kev *kev = (struct kev *)ptr;
+ if (kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI)) {
+ list_del(&kev->rfdnode);
+ }
+ if (kev->fdw.events & RB_WAITFD_OUT) {
+ list_del(&kev->wfdnode);
+ }
+ return rb_iom_waiter_done((VALUE)&kev->fdw.w);
+}
+
static int
iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
{
@@ -372,8 +436,7 @@ iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
kev.fdw.revents = 0;
list_add(&iom->kevs, &kev.fdw.w.wnode);
rb_iom_timer_add(&iom->timers, &kev.fdw.w.timer, timeout, IOM_FIB|IOM_WAIT);
- rb_ensure(kevxchg_yield, (VALUE)&kev,
- rb_iom_waiter_done, (VALUE)&kev.fdw.w);
+ rb_ensure(kevxchg_yield, (VALUE)&kev, kev_done, (VALUE)&kev);
return kev.fdw.revents; /* may be zero if timed out */
}
@@ -395,6 +458,14 @@ rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
return iom_waitfd(th, fdp, events, timeout);
}
+static void
+iom_free(rb_iom_t *iom)
+{
+ rb_iom_fdmap_destroy(&iom->rfdmap);
+ rb_iom_fdmap_destroy(&iom->wfdmap);
+ xfree(iom);
+}
+
void
rb_iom_destroy(rb_vm_t *vm)
{
@@ -404,7 +475,7 @@ rb_iom_destroy(rb_vm_t *vm)
if (iom->kqueue_fd >= 0) {
close(iom->kqueue_fd);
}
- xfree(iom);
+ iom_free(iom);
}
}
@@ -422,8 +493,11 @@ rb_iom_atfork_child(rb_thread_t *th)
*/
rb_iom_destroy(th->vm);
#else /* native kqueue is close-on-fork, so we do not close ourselves */
- xfree(th->vm->iom);
- th->vm->iom = 0;
+ rb_iom_t *iom = th->vm->iom;
+ if (iom) {
+ iom_free(iom);
+ th->vm->iom = 0;
+ }
#endif
}
diff --git a/test/ruby/test_fiber_auto.rb b/test/ruby/test_fiber_auto.rb
index 67a9fe87f3..196361c674 100644
--- a/test/ruby/test_fiber_auto.rb
+++ b/test/ruby/test_fiber_auto.rb
@@ -183,4 +183,18 @@ def test_cross_thread_schedule
assert_equal 1, f.value
end
end
+
+ # tricky for kqueue and epoll implementations
+ def test_multi_readers
+ IO.pipe do |r, w|
+ r.nonblock = w.nonblock = true
+ nr = 30
+ fibs = nr.times.map { Fiber.start { r.read(1) } }
+ fibs.each { |f| assert_nil f.join(0.001) }
+
+ exp = nr.times.map { -'a' }.freeze
+ w.write(exp.join)
+ assert_equal exp, fibs.map(&:value);
+ end
+ end
end
--
EW
^ permalink raw reply related [flat|nested] 7+ messages in thread
* [PATCH 4/7] epoll fdmap inversion
2017-05-30 19:03 [PATCH 1/7] auto fiber scheduling for I/O Eric Wong
2017-05-30 19:03 ` [PATCH 2/7] iom: implement waitpid generically Eric Wong
2017-05-30 19:03 ` [PATCH 3/7] iom: move to fdmap Eric Wong
@ 2017-05-30 19:03 ` Eric Wong
2017-05-30 19:03 ` [PATCH 5/7] kq-share Eric Wong
` (2 subsequent siblings)
5 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2017-05-30 19:03 UTC (permalink / raw)
To: spew
---
iom_epoll.h | 130 ++++++++++++++++++++++++++++++++-------------------------
iom_internal.h | 3 +-
2 files changed, 75 insertions(+), 58 deletions(-)
diff --git a/iom_epoll.h b/iom_epoll.h
index 888f8a696c..3afb73325c 100644
--- a/iom_epoll.h
+++ b/iom_epoll.h
@@ -27,7 +27,7 @@ struct rb_iom_struct {
struct list_head epws; /* -epw.w.wnode, order agnostic */
struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
- struct rb_iom_fdmap fdmap; /* maps each FD to multiple epws */
+ struct rb_iom_fdmap fdmap; /* maps each FD to multiple epw */
int epoll_fd;
int maxevents; /* auto-increases */
@@ -45,16 +45,16 @@ struct rb_iom_struct {
struct epw {
struct rb_iom_waiter w;
union {
- rb_thread_t *th;
- short revents;
+ struct list_node fdnode;
+ struct {
+ rb_thread_t *th;
+ struct rb_iom_fdm *fdm;
+ } pre_ctl;
} as;
- struct list_node fdnode; /* self->fd.fdm->fdhead */
- union {
- int *fdp;
- struct rb_iom_fdm *fdm;
- } fd;
+ int fd; /* no need for "int *", here */
+ short events;
+ short revents;
int *flags; /* &fptr->mode */
- struct epoll_event ev;
};
static void
@@ -181,20 +181,14 @@ check_epoll_wait(rb_thread_t *th, int nr, struct epoll_event *ev)
int i;
for (i = 0; i < nr; i++) {
- struct epw *epw = ev[i].data.ptr;
-
- epw->as.revents = rb_ep2revents(ev[i].events);
-
- list_del_init(&epw->fdnode);
- rb_iom_waiter_ready(&epw->w);
-
- /* are there other epw using the same FD? */
- if (!list_empty(&epw->fd.fdm->fdhead)) {
- struct epw *cur = 0, *nxt = 0;
- list_for_each_safe(&epw->fd.fdm->fdhead, cur, nxt, fdnode) {
- list_del_init(&cur->fdnode);
- rb_iom_waiter_ready(&cur->w);
- }
+ struct rb_iom_fdm *fdm = ev[i].data.ptr;
+ struct epw *epw = 0, *next = 0;
+ short revents = rb_ep2revents(ev[i].events);
+
+ list_for_each_safe(&fdm->fdhead, epw, next, as.fdnode) {
+ epw->revents = epw->events & revents;
+ list_del_init(&epw->as.fdnode);
+ rb_iom_waiter_ready(&epw->w);
}
}
@@ -287,61 +281,74 @@ rb_iom_do_wait(rb_thread_t *th, rb_iom_t *iom)
}
}
-static VALUE
-epmod_yield(VALUE ptr)
+static void
+epoll_ctl_or_raise(rb_thread_t *th, struct epw *epw)
{
- struct epw *epw = (struct epw *)ptr;
- rb_thread_t *th = epw->as.th;
- rb_iom_t *iom = rb_iom_get(th);
int e;
- int fd = *epw->fd.fdp;
- int epfd = iom_epfd(iom);
+ int epfd;
+ struct epoll_event ev;
- /*
- * if we did not have GVL, revents may be set immediately
- * upon epoll_ctl by another thread running epoll_wait,
- * so we must initialize it before epoll_ctl:
- */
- epw->as.revents = 0;
- if (fd < 0) {
- list_node_init(&epw->fdnode); /* for list_del in ensure */
- return Qfalse;
+ /* we cannot raise until list_add: */
+ {
+ struct rb_iom_fdm *fdm = epw->as.pre_ctl.fdm;
+
+ ev.data.ptr = fdm;
+ ev.events = rb_events2ep(epw->events);
+ /*
+ * merge events from other threads/fibers waiting on the same
+ * [ descriptor (int fd), description (struct file *) ] tuplet
+ */
+ if (!list_empty(&fdm->fdhead)) { /* uncommon, I hope... */
+ struct epw *cur;
+ list_for_each(&fdm->fdhead, cur, as.fdnode) {
+ ev.events |= rb_events2ep(cur->events);
+ }
+ }
+ list_add(&fdm->fdhead, &epw->as.fdnode);
}
- epw->fd.fdm = rb_iom_fdm_get(&iom->fdmap, fd);
- list_add(&epw->fd.fdm->fdhead, &epw->fdnode);
+
+ epfd = iom_epfd(th->vm->iom); /* may raise */
/* we want to track if an FD is already being watched ourselves */
if (epw->flags) {
- if (*epw->flags & FMODE_IOM_ADDED) {
- e = epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epw->ev);
+ if (*epw->flags & FMODE_IOM_ADDED) { /* ideal situation */
+ e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev);
}
else {
- e = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epw->ev);
+ e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev);
if (e == 0) {
*epw->flags |= FMODE_IOM_ADDED;
}
else if (e < 0 && errno == EEXIST) {
/*
- * It's possible to get EEXIST if fptrs point to the same FD:
+ * possible EEXIST if several fptrs point to the same FD:
* f1 = Fiber.start { io1.read(1) }
* io2 = IO.for_fd(io1.fileno)
* f2 = Fiber.start { io2.read(1) }
*/
*epw->flags |= FMODE_IOM_ADDED;
- e = epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epw->ev);
+ e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev);
}
}
}
- else { /* don't know if added or not, fall back to addiing on ENOENT */
- e = epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epw->ev);
+ else { /* don't know if added or not, fall back to add on ENOENT */
+ e = epoll_ctl(epfd, EPOLL_CTL_MOD, epw->fd, &ev);
if (e < 0 && errno == ENOENT) {
- e = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epw->ev);
+ e = epoll_ctl(epfd, EPOLL_CTL_ADD, epw->fd, &ev);
}
}
if (e < 0) {
rb_sys_fail("epoll_ctl");
}
+}
+static VALUE
+epmod_yield(VALUE ptr)
+{
+ /* we must have no posibility of raising until list_add: */
+ struct epw *epw = (struct epw *)ptr;
+ rb_thread_t *th = epw->as.pre_ctl.th;
+ epoll_ctl_or_raise(th, epw);
ping_events(th);
(void)rb_fiber_auto_do_yield_p(th);
return rb_fiber_yield(0, 0);
@@ -351,8 +358,7 @@ static VALUE
epw_done(VALUE ptr)
{
struct epw *epw = (struct epw *)ptr;
-
- list_del(&epw->fdnode);
+ list_del(&epw->as.fdnode);
return rb_iom_waiter_done((VALUE)&epw->w);
}
@@ -362,17 +368,29 @@ iom_waitfd(rb_thread_t *th, int *fdp, int *flags, int events, double *timeout)
rb_iom_t *iom = rb_iom_get(th);
struct epw epw;
- epw.as.th = th;
- epw.fd.fdp = fdp;
+ /* unlike kqueue or select, we never need to reread fdp after yielding */
+ epw.fd = *fdp;
+ if (epw.fd < 0) {
+ return -1;
+ }
+
+ /* may raise on OOM: */
+ epw.as.pre_ctl.fdm = rb_iom_fdm_get(&iom->fdmap, epw.fd);
+ epw.as.pre_ctl.th = th;
epw.flags = flags;
- epw.ev.events = rb_events2ep(events);
- epw.ev.data.ptr = &epw;
+ /*
+ * if we did not have GVL, revents may be set immediately
+ * upon epoll_ctl by another thread running epoll_wait,
+ * so we must initialize it before epoll_ctl:
+ */
+ epw.revents = 0;
+ epw.events = (short)events;
list_add(&iom->epws, &epw.w.wnode);
rb_iom_timer_add(&iom->timers, &epw.w.timer, timeout, IOM_FIB|IOM_WAIT);
rb_ensure(epmod_yield, (VALUE)&epw, epw_done, (VALUE)&epw);
- return (int)epw.as.revents; /* may be zero if timed out */
+ return (int)epw.revents; /* may be zero if timed out */
}
int
diff --git a/iom_internal.h b/iom_internal.h
index 45fda397db..1a4050d62d 100644
--- a/iom_internal.h
+++ b/iom_internal.h
@@ -18,7 +18,7 @@ int rb_fiber_auto_do_yield_p(rb_thread_t *);
#define RB_IOM_FD_PER_HEAP 256
/* on-heap and persistent, keep as small as possible */
struct rb_iom_fdm {
- struct list_head fdhead; /* -rb_iom_fd_waiter.fdnode */
+ struct list_head fdhead; /* -kev.(rfdnode|wfdnode), epw.fdnode */
};
/* singleton (per-rb_iom_t) */
@@ -42,7 +42,6 @@ struct rb_iom_waiter {
struct rb_iom_fd_waiter {
struct rb_iom_waiter w;
- struct list_node fdnode; /* <=> &self->fdm->fdhead */
int *fdp;
short events;
short revents;
--
EW
^ permalink raw reply related [flat|nested] 7+ messages in thread
* [PATCH 5/7] kq-share
2017-05-30 19:03 [PATCH 1/7] auto fiber scheduling for I/O Eric Wong
` (2 preceding siblings ...)
2017-05-30 19:03 ` [PATCH 4/7] epoll fdmap inversion Eric Wong
@ 2017-05-30 19:03 ` Eric Wong
2017-05-30 19:03 ` [PATCH 6/7] wip Eric Wong
2017-05-30 19:03 ` [PATCH 7/7] wip Eric Wong
5 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2017-05-30 19:03 UTC (permalink / raw)
To: spew
---
iom_kqueue.h | 229 +++++++++++++++++++++++++++++++++++++++--------------------
1 file changed, 153 insertions(+), 76 deletions(-)
diff --git a/iom_kqueue.h b/iom_kqueue.h
index 52298b9055..31cddd54be 100644
--- a/iom_kqueue.h
+++ b/iom_kqueue.h
@@ -7,13 +7,14 @@
* enabling filters, where as epoll_ctl requires separate ADD and
* MOD operations.
*
- * These are advantages in the common case.
+ * These are advantages in the common case...
*
* The epoll API has advantages in more esoteric cases:
*
* epoll has the advantage over kqueue when watching for multiple
* events (POLLIN|POLLOUT|POLLPRI) (which is rare). We also have
* to install two kevent filters to watch POLLIN|POLLOUT simutaneously.
+ * See udata_set/udata_get functions below for more on this.
*
* Finally, kevent does not support POLLPRI directly, we need to use
* select() (or perhaps poll() on some platforms) with a zero
@@ -33,6 +34,9 @@
#include <sys/event.h>
#include <sys/time.h>
+/* We need to use EVFILT_READ to watch RB_WAITFD_PRI */
+#define WAITFD_READ (RB_WAITFD_IN|RB_WAITFD_PRI)
+
/* allocated on heap (rb_vm_t.iom) */
struct rb_iom_struct {
/*
@@ -154,6 +158,9 @@ check_pri(rb_thread_t *th, struct list_head *pri)
struct kev *kev = 0, *next = 0;
int max = -1;
+ if (list_empty(pri)) {
+ return;
+ }
rb_fd_init(&efds);
list_for_each(pri, kev, fdw.w.wnode) {
int fd = *kev->fdw.fdp;
@@ -166,20 +173,108 @@ check_pri(rb_thread_t *th, struct list_head *pri)
}
r = native_fd_select(max + 1, 0, 0, &efds, &tv, th);
- if (r > 0) {
- list_for_each(pri, kev, fdw.w.wnode) {
+
+ list_for_each_safe(pri, kev, next, fdw.w.wnode) {
+ list_del_init(&kev->fdw.w.wnode);
+ if (r > 0) {
int fd = *kev->fdw.fdp;
if (fd >= 0 && rb_fd_isset(fd, &efds)) {
kev->fdw.revents |= RB_WAITFD_PRI;
}
}
}
+ rb_fd_term(&efds);
+}
- /* needed for rb_ensure */
- list_for_each_safe(pri, kev, next, fdw.w.wnode) {
- list_del_init(&kev->fdw.w.wnode);
+/*
+ * kqueue is a more complicated because we install separate filters for
+ * simultaneously watching read and write on the same FD, and now we
+ * must clear shared filters if the event from the other side came in.
+ */
+static int
+drop_pairs(rb_thread_t *th, int max, struct kevent *changelist,
+ struct list_head *rdel, struct list_head *wdel)
+{
+ int nchanges = 0;
+ struct kev *kev = 0, *next = 0;
+
+ list_for_each_safe(rdel, kev, next, wfdnode) {
+ int fd = *kev->fdw.fdp;
+ list_del_init(&kev->wfdnode);
+ assert(kev->fdw.revents & RB_WAITFD_OUT);
+ if (fd >= 0) {
+ struct kevent *chg = &changelist[nchanges++];
+ assert(nchanges <= max);
+ EV_SET(chg, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
+ }
}
- rb_fd_term(&efds);
+ list_for_each_safe(wdel, kev, next, rfdnode) {
+ int fd = *kev->fdw.fdp;
+ list_del_init(&kev->rfdnode);
+ assert(kev->fdw.revents & WAITFD_READ);
+ if (fd >= 0) {
+ struct kevent *chg = &changelist[nchanges++];
+ assert(nchanges <= max);
+ EV_SET(chg, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
+ }
+ }
+ if (nchanges) {
+ int kqfd = th->vm->iom->kqueue_fd;
+ if (kevent(kqfd, changelist, nchanges, 0, 0, &zero) < 0) {
+ return errno;
+ }
+ }
+ return 0;
+}
+
+/*
+ * kqueue requires separate filters when watching for simultaneously
+ * watching read and write events on the same FD. Different filters
+ * means they can have different udata pointers; at least with
+ * native kqueue.
+ *
+ * When emulating native kqueue behavior using epoll as libkqueue does,
+ * there is only space for one udata pointer. This is because epoll
+ * allows each "filter" (epitem in the kernel) to watch read and write
+ * simultaneously.
+ * epoll keys epitems using: [ fd, file description (struct file *) ].
+ * In contrast, kevent keys its filters using: [ fd (ident), filter ].
+ *
+ * So, with native kqueue, we can at least optimize away rb_iom_fdm_get
+ * function calls; but we cannot do that when libkqueue emulates
+ * kqueue with epoll and need to retrieve the fdm from the correct
+ * fdmap.
+ *
+ * Finally, the epoll implementation only needs one fdmap.
+ */
+
+static void *
+udata_set(rb_iom_t *iom, struct rb_iom_fdm *fdm)
+{
+#ifdef LIBKQUEUE
+ return iom;
+#else /* native kqueue */
+ return fdm;
+#endif
+}
+
+static struct rb_iom_fdm *
+udata_get(struct kevent *ev)
+{
+#ifdef LIBKQUEUE
+ rb_iom_t *iom = ev->udata;
+ int fd = ev->ident;
+
+ switch (ev->filter) {
+ case EVFILT_READ:
+ return rb_iom_fdm_get(&iom->rfdmap, fd);
+ case EVFILT_WRITE:
+ return rb_iom_fdm_get(&iom->wfdmap, fd);
+ default:
+ rb_bug("bad filter in libkqueue compatibility mode: %d", ev->filter);
+ }
+#endif
+ return ev->udata;
}
static void
@@ -187,81 +282,64 @@ check_kevent(rb_thread_t *th, int nr, struct kevent *eventlist)
{
int err = 0;
if (nr >= 0) {
- int i;
- struct kevent *changelist = eventlist;
- int nchanges = 0;
struct list_head pri = LIST_HEAD_INIT(pri);
+ struct list_head wdel = LIST_HEAD_INIT(wdel);
+ struct list_head rdel = LIST_HEAD_INIT(rdel);
+ rb_iom_t *iom = th->vm->iom;
+ struct kev *kev = 0, *next = 0;
+ int i;
for (i = 0; i < nr; i++) {
struct kevent *ev = &eventlist[i];
- struct rb_iom_fdm *fdm = ev->udata;
- struct kev *kev = 0, *next = 0;
- int fd = (int)ev->ident;
- int ok_fd = -1;
- int paired = 0;
+ struct rb_iom_fdm *fdm = udata_get(ev);
if (ev->filter == EVFILT_READ) {
+ int pair_queued = 0;
+
list_for_each_safe(&fdm->fdhead, kev, next, rfdnode) {
- int rbits = kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI);
+ int rbits = kev->fdw.events & WAITFD_READ;
- list_del_init(&kev->rfdnode);
assert(rbits && "unexpected EVFILT_READ");
+ kev->fdw.revents |= (rbits & RB_WAITFD_IN);
rb_iom_waiter_ready(&kev->fdw.w);
- paired |= (kev->fdw.events & RB_WAITFD_OUT);
- if (rbits == RB_WAITFD_IN) { /* common */
- kev->fdw.revents |= RB_WAITFD_IN;
- }
- else if (rbits & RB_WAITFD_PRI) { /* ugh... */
- kev->fdw.revents |= rbits & RB_WAITFD_IN;
- list_add_tail(&pri, &kev->fdw.w.wnode);
+ list_del_init(&kev->rfdnode);
+
+ if (rbits & RB_WAITFD_PRI) {
+ list_add(&pri, &kev->fdw.w.wnode);
}
- if (ok_fd != fd) {
- ok_fd = *kev->fdw.fdp;
+
+ if ((kev->fdw.events & RB_WAITFD_OUT) &&
+ *kev->fdw.fdp >= 0 &&
+ !pair_queued) {
+ pair_queued = 1;
+ list_add(&wdel, &kev->rfdnode);
}
}
}
else if (ev->filter == EVFILT_WRITE) {
+ int pair_queued = 0;
+
list_for_each_safe(&fdm->fdhead, kev, next, wfdnode) {
- list_del_init(&kev->wfdnode);
- kev->fdw.revents |= RB_WAITFD_OUT;
assert(kev->fdw.events & RB_WAITFD_OUT &&
"unexpected EVFILT_WRITE");
+ kev->fdw.revents |= RB_WAITFD_OUT;
rb_iom_waiter_ready(&kev->fdw.w);
- paired |= (kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI));
- if (ok_fd != fd) {
- ok_fd = *kev->fdw.fdp;
+ list_del_init(&kev->wfdnode);
+
+ if ((kev->fdw.events & WAITFD_READ) &&
+ *kev->fdw.fdp >= 0 &&
+ !pair_queued) {
+ pair_queued = 1;
+ list_add(&rdel, &kev->wfdnode);
}
}
}
else {
rb_bug("unexpected filter: %d", ev->filter);
}
- /* delete the corresponding event when one FD has 2 filters */
- if (ok_fd == fd) {
- if (paired & (RB_WAITFD_IN|RB_WAITFD_PRI)) {
- struct kevent *chg = &changelist[nchanges++];
- EV_SET(chg, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
- }
- if (paired & RB_WAITFD_OUT) {
- struct kevent *chg = &changelist[nchanges++];
- EV_SET(chg, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
- }
- }
- }
- if (!list_empty(&pri)) {
- check_pri(th, &pri);
- }
-
-
- /*
- * kqueue is a little more complicated because we install
- * separate filters for simultaneously watching read and write
- * on the same FD, and now we must clear shared filters
- */
- if (nchanges && kevent(th->vm->iom->kqueue_fd, changelist, nchanges,
- 0, 0, &zero) < 0) {
- err = errno;
}
+ check_pri(th, &pri);
+ err = drop_pairs(th, nr, eventlist, &rdel, &wdel);
/* notify the waiter thread in case we enqueued fibers for them */
if (nr > 0 && th->vm->iom->waiter) {
@@ -350,7 +428,7 @@ rb_iom_do_wait(rb_thread_t *th, rb_iom_t *iom)
}
static void
-kevxchg_ping(struct kev *kev)
+kevxchg_ping(int fd, struct kev *kev)
{
rb_thread_t *th = kev->th;
rb_iom_t *iom = rb_iom_get(th);
@@ -360,21 +438,17 @@ kevxchg_ping(struct kev *kev)
VALUE v;
int nevents = iom->nevents;
struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents);
- struct kevent *changelist = eventlist; /* allowed, see kevent manpage */
- int fd = *kev->fdw.fdp;
- int kqfd;
+ struct kevent *changelist = eventlist;
+ int kqfd = iom_kqfd(iom);
- VM_ASSERT(nevents > 2);
- if (fd < 0) {
- return;
- }
+ VM_ASSERT(nevents >= 2);
- kqfd = iom_kqfd(iom);
- /* EVFILT_READ handles urgent data (POLLPRI) */
- if (kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI)) {
+ /* EVFILT_READ handles urgent data (POLLPRI)... hopefully */
+ if (kev->fdw.events & WAITFD_READ) {
struct rb_iom_fdm *fdm = rb_iom_fdm_get(&iom->rfdmap, fd);
if (list_empty(&fdm->fdhead)) {
struct kevent *chg = &changelist[nchanges++];
+ fdm = udata_set(iom, fdm);
EV_SET(chg, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, fdm);
}
list_add(&fdm->fdhead, &kev->rfdnode);
@@ -383,6 +457,7 @@ kevxchg_ping(struct kev *kev)
struct rb_iom_fdm *fdm = rb_iom_fdm_get(&iom->wfdmap, fd);
if (list_empty(&fdm->fdhead)) {
struct kevent *chg = &changelist[nchanges++];
+ fdm = udata_set(iom, fdm);
EV_SET(chg, fd, EVFILT_WRITE, EV_ADD|EV_ONESHOT, 0, 0, fdm);
}
list_add(&fdm->fdhead, &kev->wfdnode);
@@ -406,21 +481,21 @@ static VALUE
kevxchg_yield(VALUE ptr)
{
struct kev *kev = (struct kev *)ptr;
- kevxchg_ping(kev);
- (void)rb_fiber_auto_do_yield_p(kev->th);
- return rb_fiber_yield(0, 0);
+ int fd = *kev->fdw.fdp;
+ if (fd >= 0) {
+ kevxchg_ping(fd, kev);
+ (void)rb_fiber_auto_do_yield_p(kev->th);
+ return rb_fiber_yield(0, 0);
+ }
+ return Qfalse;
}
static VALUE
kev_done(VALUE ptr)
{
struct kev *kev = (struct kev *)ptr;
- if (kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI)) {
- list_del(&kev->rfdnode);
- }
- if (kev->fdw.events & RB_WAITFD_OUT) {
- list_del(&kev->wfdnode);
- }
+ list_del(&kev->rfdnode);
+ list_del(&kev->wfdnode);
return rb_iom_waiter_done((VALUE)&kev->fdw.w);
}
@@ -435,6 +510,8 @@ iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
kev.fdw.events = (short)events;
kev.fdw.revents = 0;
list_add(&iom->kevs, &kev.fdw.w.wnode);
+ list_node_init(&kev.rfdnode);
+ list_node_init(&kev.wfdnode);
rb_iom_timer_add(&iom->timers, &kev.fdw.w.timer, timeout, IOM_FIB|IOM_WAIT);
rb_ensure(kevxchg_yield, (VALUE)&kev, kev_done, (VALUE)&kev);
--
EW
^ permalink raw reply related [flat|nested] 7+ messages in thread
* [PATCH 6/7] wip
2017-05-30 19:03 [PATCH 1/7] auto fiber scheduling for I/O Eric Wong
` (3 preceding siblings ...)
2017-05-30 19:03 ` [PATCH 5/7] kq-share Eric Wong
@ 2017-05-30 19:03 ` Eric Wong
2017-05-30 19:03 ` [PATCH 7/7] wip Eric Wong
5 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2017-05-30 19:03 UTC (permalink / raw)
To: spew
---
iom_kqueue.h | 14 +++++++-------
test/ruby/test_fiber_auto.rb | 9 ++++++++-
2 files changed, 15 insertions(+), 8 deletions(-)
diff --git a/iom_kqueue.h b/iom_kqueue.h
index 31cddd54be..cb008db3ff 100644
--- a/iom_kqueue.h
+++ b/iom_kqueue.h
@@ -259,7 +259,7 @@ udata_set(rb_iom_t *iom, struct rb_iom_fdm *fdm)
}
static struct rb_iom_fdm *
-udata_get(struct kevent *ev)
+udata_get(const struct kevent *ev)
{
#ifdef LIBKQUEUE
rb_iom_t *iom = ev->udata;
@@ -446,19 +446,19 @@ kevxchg_ping(int fd, struct kev *kev)
/* EVFILT_READ handles urgent data (POLLPRI)... hopefully */
if (kev->fdw.events & WAITFD_READ) {
struct rb_iom_fdm *fdm = rb_iom_fdm_get(&iom->rfdmap, fd);
- if (list_empty(&fdm->fdhead)) {
+ if (1 || list_empty(&fdm->fdhead)) {
struct kevent *chg = &changelist[nchanges++];
- fdm = udata_set(iom, fdm);
- EV_SET(chg, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, fdm);
+ void *udata = udata_set(iom, fdm);
+ EV_SET(chg, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, udata);
}
list_add(&fdm->fdhead, &kev->rfdnode);
}
if (kev->fdw.events & RB_WAITFD_OUT) {
struct rb_iom_fdm *fdm = rb_iom_fdm_get(&iom->wfdmap, fd);
- if (list_empty(&fdm->fdhead)) {
+ if (1 || list_empty(&fdm->fdhead)) {
struct kevent *chg = &changelist[nchanges++];
- fdm = udata_set(iom, fdm);
- EV_SET(chg, fd, EVFILT_WRITE, EV_ADD|EV_ONESHOT, 0, 0, fdm);
+ void *udata = udata_set(iom, fdm);
+ EV_SET(chg, fd, EVFILT_WRITE, EV_ADD|EV_ONESHOT, 0, 0, udata);
}
list_add(&fdm->fdhead, &kev->wfdnode);
}
diff --git a/test/ruby/test_fiber_auto.rb b/test/ruby/test_fiber_auto.rb
index 196361c674..da758e8518 100644
--- a/test/ruby/test_fiber_auto.rb
+++ b/test/ruby/test_fiber_auto.rb
@@ -188,12 +188,19 @@ def test_cross_thread_schedule
def test_multi_readers
IO.pipe do |r, w|
r.nonblock = w.nonblock = true
- nr = 30
+ nr = 10
+ warn "AAAAAAAAAAAAAAAAAA\n"
+ warn "AAAAAAAAAAAAAAAAAA\n"
fibs = nr.times.map { Fiber.start { r.read(1) } }
fibs.each { |f| assert_nil f.join(0.001) }
exp = nr.times.map { -'a' }.freeze
w.write(exp.join)
+ fibs.each_with_index do |f, i|
+ warn i
+ warn f.value
+ end
+ warn "OK\n";
assert_equal exp, fibs.map(&:value);
end
end
--
EW
^ permalink raw reply related [flat|nested] 7+ messages in thread
* [PATCH 7/7] wip
2017-05-30 19:03 [PATCH 1/7] auto fiber scheduling for I/O Eric Wong
` (4 preceding siblings ...)
2017-05-30 19:03 ` [PATCH 6/7] wip Eric Wong
@ 2017-05-30 19:03 ` Eric Wong
5 siblings, 0 replies; 7+ messages in thread
From: Eric Wong @ 2017-05-30 19:03 UTC (permalink / raw)
To: spew
---
iom.h | 2 ++
iom_internal.h | 9 +++++++
iom_kqueue.h | 57 +++++++++++++++++++++++++++++++++++++++++++-
test/ruby/test_fiber_auto.rb | 14 +++++------
thread.c | 1 +
5 files changed, 74 insertions(+), 9 deletions(-)
diff --git a/iom.h b/iom.h
index d412885baa..82827868c5 100644
--- a/iom.h
+++ b/iom.h
@@ -85,6 +85,8 @@ void rb_iom_destroy(rb_vm_t *);
*/
void rb_iom_schedule(rb_thread_t *th, double *timeout);
+int rb_iom_notify_fd_close(rb_vm_t *, int fd);
+
/* cont.c */
int rb_fiber_auto_sched_p(const rb_thread_t *);
diff --git a/iom_internal.h b/iom_internal.h
index 1a4050d62d..d2e51ba6e3 100644
--- a/iom_internal.h
+++ b/iom_internal.h
@@ -65,6 +65,15 @@ iom_fdhead_aref(struct rb_iom_fdmap *fdmap, int fd)
}
static struct rb_iom_fdm *
+rb_iom_fdm_check(struct rb_iom_fdmap *fdmap, int fd)
+{
+ if (fd < fdmap->max_fd) {
+ return iom_fdhead_aref(fdmap, fd);
+ }
+ return 0;
+}
+
+static struct rb_iom_fdm *
rb_iom_fdm_get(struct rb_iom_fdmap *fdmap, int fd)
{
if (fd >= fdmap->max_fd) {
diff --git a/iom_kqueue.h b/iom_kqueue.h
index cb008db3ff..96998a0329 100644
--- a/iom_kqueue.h
+++ b/iom_kqueue.h
@@ -285,7 +285,6 @@ check_kevent(rb_thread_t *th, int nr, struct kevent *eventlist)
struct list_head pri = LIST_HEAD_INIT(pri);
struct list_head wdel = LIST_HEAD_INIT(wdel);
struct list_head rdel = LIST_HEAD_INIT(rdel);
- rb_iom_t *iom = th->vm->iom;
struct kev *kev = 0, *next = 0;
int i;
@@ -587,5 +586,61 @@ rb_iom_reserved_fd(int fd)
return iom && fd == iom->kqueue_fd;
}
+int rb_iom_notify_fd_close(rb_vm_t *vm, int fd)
+{
+ rb_iom_t *iom = vm->iom;
+ struct rb_iom_fdm *fdm;
+ struct kev *kev = 0, *next = 0;
+ int n = 0;
+ int wr = 0;
+ int rd = 0;
+ struct kevent changes[2];
+
+ if (!iom) {
+ return 0;
+ }
+ fdm = rb_iom_fdm_check(&iom->rfdmap, fd);
+ if (fdm) {
+ list_for_each_safe(&fdm->fdhead, kev, next, rfdnode) {
+ rb_thread_t *th = kev->th;
+ VALUE err = th->vm->special_exceptions[ruby_error_stream_closed];
+ rb_threadptr_pending_interrupt_enque(th, err);
+ rb_threadptr_interrupt(th);
+
+ rb_iom_waiter_ready(&kev->fdw.w);
+ list_del_init(&kev->rfdnode);
+
+ rd = 1;
+ }
+ }
+ fdm = rb_iom_fdm_check(&iom->wfdmap, fd);
+ if (fdm) {
+ list_for_each_safe(&fdm->fdhead, kev, next, wfdnode) {
+ rb_thread_t *th = kev->th;
+ VALUE err = th->vm->special_exceptions[ruby_error_stream_closed];
+ rb_threadptr_pending_interrupt_enque(th, err);
+ rb_threadptr_interrupt(th);
+ rb_iom_waiter_ready(&kev->fdw.w);
+ list_del_init(&kev->wfdnode);
+ wr = 1;
+ }
+ }
+ if (wr) {
+ struct kevent *chg = &changes[n++];
+ EV_SET(chg, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
+ }
+ if (rd) {
+ struct kevent *chg = &changes[n++];
+ EV_SET(chg, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
+ }
+ if (n) {
+ if (kevent(iom->kqueue_fd, changes, n, 0, 0, &zero) < 0) {
+ rb_sys_fail("kevent");
+ }
+ rb_fiber_auto_do_yield_p(GET_THREAD());
+ }
+ return n;
+}
+
#include "iom_pingable_common.h"
#include "iom_common.h"
diff --git a/test/ruby/test_fiber_auto.rb b/test/ruby/test_fiber_auto.rb
index da758e8518..527472d28f 100644
--- a/test/ruby/test_fiber_auto.rb
+++ b/test/ruby/test_fiber_auto.rb
@@ -186,21 +186,19 @@ def test_cross_thread_schedule
# tricky for kqueue and epoll implementations
def test_multi_readers
+ if RUBY_PLATFORM =~ /linux/ &&
+ (RbConfig::CONFIG['LDFLAGS'] =~ /-lkqueue\b/ ||
+ RbConfig::CONFIG['DLDFLAGS'] =~ /-lkqueue\b/)
+ skip "FIXME libkqueue is buggy on this test"
+ end
IO.pipe do |r, w|
r.nonblock = w.nonblock = true
- nr = 10
- warn "AAAAAAAAAAAAAAAAAA\n"
- warn "AAAAAAAAAAAAAAAAAA\n"
+ nr = 30
fibs = nr.times.map { Fiber.start { r.read(1) } }
fibs.each { |f| assert_nil f.join(0.001) }
exp = nr.times.map { -'a' }.freeze
w.write(exp.join)
- fibs.each_with_index do |f, i|
- warn i
- warn f.value
- end
- warn "OK\n";
assert_equal exp, fibs.map(&:value);
end
end
diff --git a/thread.c b/thread.c
index 7899aeae0b..9de07f663e 100644
--- a/thread.c
+++ b/thread.c
@@ -2219,6 +2219,7 @@ rb_notify_fd_close(int fd)
busy = 1;
}
}
+ /* busy |= rb_iom_notify_fd_close(vm, fd); */
return busy;
}
--
EW
^ permalink raw reply related [flat|nested] 7+ messages in thread
end of thread, other threads:[~2017-05-30 19:04 UTC | newest]
Thread overview: 7+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2017-05-30 19:03 [PATCH 1/7] auto fiber scheduling for I/O Eric Wong
2017-05-30 19:03 ` [PATCH 2/7] iom: implement waitpid generically Eric Wong
2017-05-30 19:03 ` [PATCH 3/7] iom: move to fdmap Eric Wong
2017-05-30 19:03 ` [PATCH 4/7] epoll fdmap inversion Eric Wong
2017-05-30 19:03 ` [PATCH 5/7] kq-share Eric Wong
2017-05-30 19:03 ` [PATCH 6/7] wip Eric Wong
2017-05-30 19:03 ` [PATCH 7/7] wip Eric Wong
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).