* [PATCH 01/10] string.c (rb_str_crypt): fix excessive stack use with crypt_r
@ 2017-05-24 9:07 Eric Wong
2017-05-24 9:07 ` [PATCH 02/10] Add debug counters Eric Wong
` (8 more replies)
0 siblings, 9 replies; 10+ messages in thread
From: Eric Wong @ 2017-05-24 9:07 UTC (permalink / raw)
To: spew
From: normal <normal@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>
"struct crypt_data" is 131232 bytes on x86-64 GNU/Linux,
making it unsafe to use tiny Fiber stack sizes.
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58864 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
---
string.c | 14 +++++++++++---
1 file changed, 11 insertions(+), 3 deletions(-)
diff --git a/string.c b/string.c
index c0483c9471..4bd519787c 100644
--- a/string.c
+++ b/string.c
@@ -8713,7 +8713,7 @@ static VALUE
rb_str_crypt(VALUE str, VALUE salt)
{
#ifdef HAVE_CRYPT_R
- struct crypt_data data;
+ struct crypt_data *data = ALLOC(struct crypt_data);
#else
extern char *crypt(const char *, const char *);
#endif
@@ -8745,16 +8745,24 @@ rb_str_crypt(VALUE str, VALUE salt)
#endif
#ifdef HAVE_CRYPT_R
# ifdef HAVE_STRUCT_CRYPT_DATA_INITIALIZED
- data.initialized = 0;
+ data->initialized = 0;
# endif
- res = crypt_r(s, saltp, &data);
+ res = crypt_r(s, saltp, data);
#else
res = crypt(s, saltp);
#endif
if (!res) {
+#ifdef HAVE_CRYPT_R
+ int err = errno;
+ xfree(data);
+ errno = err;
+#endif
rb_sys_fail("crypt");
}
result = rb_str_new_cstr(res);
+#ifdef HAVE_CRYPT_R
+ xfree(data);
+#endif
FL_SET_RAW(result, OBJ_TAINTED_RAW(str) | OBJ_TAINTED_RAW(salt));
return result;
}
--
EW
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [PATCH 02/10] Add debug counters.
2017-05-24 9:07 [PATCH 01/10] string.c (rb_str_crypt): fix excessive stack use with crypt_r Eric Wong
@ 2017-05-24 9:07 ` Eric Wong
2017-05-24 9:07 ` [PATCH 03/10] string.c: for small crypt_data Eric Wong
` (7 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Eric Wong @ 2017-05-24 9:07 UTC (permalink / raw)
To: spew
From: ko1 <ko1@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>
* debug_counter.h: add the following counters to measure object types.
obj_free: freed count
obj_str_ptr: freed count of Strings they have extra buff.
obj_str_embed: freed count of Strings they don't have extra buff.
obj_str_shared: freed count of Strings they have shared extra buff.
obj_str_nofree: freed count of Strings they are marked as nofree.
obj_str_fstr: freed count of Strings they are marked as fstr.
obj_ary_ptr: freed count of Arrays they have extra buff.
obj_ary_embed: freed count of Arrays they don't have extra buff.
obj_obj_ptr: freed count of Objects (T_OBJECT) they have extra buff.
obj_obj_embed: freed count of Objects they don't have extra buff.
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58865 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
---
array.c | 5 +++++
common.mk | 3 +++
debug_counter.h | 16 ++++++++++++++++
gc.c | 7 +++++++
string.c | 12 +++++++++++-
5 files changed, 42 insertions(+), 1 deletion(-)
diff --git a/array.c b/array.c
index cff6d69ad3..0775e82999 100644
--- a/array.c
+++ b/array.c
@@ -16,6 +16,7 @@
#include "ruby/st.h"
#include "probes.h"
#include "id.h"
+#include "debug_counter.h"
#ifndef ARRAY_DEBUG
# define NDEBUG
@@ -553,8 +554,12 @@ void
rb_ary_free(VALUE ary)
{
if (ARY_OWNS_HEAP_P(ary)) {
+ RB_DEBUG_COUNTER_INC(obj_ary_ptr);
ruby_sized_xfree((void *)ARY_HEAP_PTR(ary), ARY_HEAP_SIZE(ary));
}
+ else {
+ RB_DEBUG_COUNTER_INC(obj_ary_embed);
+ }
}
RUBY_FUNC_EXPORTED size_t
diff --git a/common.mk b/common.mk
index bde5613564..654533ce8d 100644
--- a/common.mk
+++ b/common.mk
@@ -1303,6 +1303,7 @@ array.$(OBJEXT): $(hdrdir)/ruby/ruby.h
array.$(OBJEXT): $(top_srcdir)/include/ruby.h
array.$(OBJEXT): {$(VPATH)}array.c
array.$(OBJEXT): {$(VPATH)}config.h
+array.$(OBJEXT): {$(VPATH)}debug_counter.h
array.$(OBJEXT): {$(VPATH)}defines.h
array.$(OBJEXT): {$(VPATH)}encoding.h
array.$(OBJEXT): {$(VPATH)}id.h
@@ -1735,6 +1736,7 @@ gc.$(OBJEXT): $(top_srcdir)/include/ruby.h
gc.$(OBJEXT): {$(VPATH)}config.h
gc.$(OBJEXT): {$(VPATH)}constant.h
gc.$(OBJEXT): {$(VPATH)}debug.h
+gc.$(OBJEXT): {$(VPATH)}debug_counter.h
gc.$(OBJEXT): {$(VPATH)}defines.h
gc.$(OBJEXT): {$(VPATH)}encoding.h
gc.$(OBJEXT): {$(VPATH)}eval_intern.h
@@ -2505,6 +2507,7 @@ string.$(OBJEXT): $(top_srcdir)/include/ruby.h
string.$(OBJEXT): {$(VPATH)}config.h
string.$(OBJEXT): {$(VPATH)}crypt.h
string.$(OBJEXT): {$(VPATH)}defines.h
+string.$(OBJEXT): {$(VPATH)}debug_counter.h
string.$(OBJEXT): {$(VPATH)}encindex.h
string.$(OBJEXT): {$(VPATH)}encoding.h
string.$(OBJEXT): {$(VPATH)}gc.h
diff --git a/debug_counter.h b/debug_counter.h
index be2138d3bb..250d0a568d 100644
--- a/debug_counter.h
+++ b/debug_counter.h
@@ -22,6 +22,7 @@ RB_DEBUG_COUNTER(mc_class_serial_miss)
RB_DEBUG_COUNTER(mc_cme_complement)
RB_DEBUG_COUNTER(mc_cme_complement_hit)
RB_DEBUG_COUNTER(mc_search_super)
+
RB_DEBUG_COUNTER(ivar_get_ic_hit)
RB_DEBUG_COUNTER(ivar_get_ic_miss)
RB_DEBUG_COUNTER(ivar_get_ic_miss_serial)
@@ -35,6 +36,21 @@ RB_DEBUG_COUNTER(ivar_set_ic_miss_oorange)
RB_DEBUG_COUNTER(ivar_set_ic_miss_noobject)
RB_DEBUG_COUNTER(ivar_get_base)
RB_DEBUG_COUNTER(ivar_set_base)
+
+/* object counts */
+RB_DEBUG_COUNTER(obj_free)
+
+RB_DEBUG_COUNTER(obj_str_ptr)
+RB_DEBUG_COUNTER(obj_str_embed)
+RB_DEBUG_COUNTER(obj_str_shared)
+RB_DEBUG_COUNTER(obj_str_nofree)
+RB_DEBUG_COUNTER(obj_str_fstr)
+
+RB_DEBUG_COUNTER(obj_ary_ptr)
+RB_DEBUG_COUNTER(obj_ary_embed)
+
+RB_DEBUG_COUNTER(obj_obj_ptr)
+RB_DEBUG_COUNTER(obj_obj_embed)
#endif
#ifndef RUBY_DEBUG_COUNTER_H
diff --git a/gc.c b/gc.c
index f0fb88d320..77d84a7f91 100644
--- a/gc.c
+++ b/gc.c
@@ -33,6 +33,7 @@
#include <setjmp.h>
#include <sys/types.h>
#include "ruby_assert.h"
+#include "debug_counter.h"
#undef rb_data_object_wrap
@@ -2103,6 +2104,8 @@ make_io_zombie(rb_objspace_t *objspace, VALUE obj)
static int
obj_free(rb_objspace_t *objspace, VALUE obj)
{
+ RB_DEBUG_COUNTER_INC(obj_free);
+
gc_event_hook(objspace, RUBY_INTERNAL_EVENT_FREEOBJ, obj);
switch (BUILTIN_TYPE(obj)) {
@@ -2137,6 +2140,10 @@ obj_free(rb_objspace_t *objspace, VALUE obj)
if (!(RANY(obj)->as.basic.flags & ROBJECT_EMBED) &&
RANY(obj)->as.object.as.heap.ivptr) {
xfree(RANY(obj)->as.object.as.heap.ivptr);
+ RB_DEBUG_COUNTER_INC(obj_obj_ptr);
+ }
+ else {
+ RB_DEBUG_COUNTER_INC(obj_obj_embed);
}
break;
case T_MODULE:
diff --git a/string.c b/string.c
index 4bd519787c..bb86cb6b7f 100644
--- a/string.c
+++ b/string.c
@@ -18,6 +18,7 @@
#include "gc.h"
#include "ruby_assert.h"
#include "id.h"
+#include "debug_counter.h"
#define BEG(no) (regs->beg[(no)])
#define END(no) (regs->end[(no)])
@@ -1310,9 +1311,18 @@ rb_str_free(VALUE str)
if (FL_TEST(str, RSTRING_FSTR)) {
st_data_t fstr = (st_data_t)str;
st_delete(rb_vm_fstring_table(), &fstr, NULL);
+ RB_DEBUG_COUNTER_INC(obj_str_fstr);
}
- if (!STR_EMBED_P(str) && !FL_TEST(str, STR_SHARED|STR_NOFREE)) {
+ if (STR_EMBED_P(str)) {
+ RB_DEBUG_COUNTER_INC(obj_str_embed);
+ }
+ else if (FL_TEST(str, STR_SHARED | STR_NOFREE)) {
+ (void)RB_DEBUG_COUNTER_INC_IF(obj_str_shared, FL_TEST(str, STR_SHARED));
+ (void)RB_DEBUG_COUNTER_INC_IF(obj_str_shared, FL_TEST(str, STR_NOFREE));
+ }
+ else {
+ RB_DEBUG_COUNTER_INC(obj_str_ptr);
ruby_sized_xfree(STR_HEAP_PTR(str), STR_HEAP_SIZE(str));
}
}
--
EW
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [PATCH 03/10] string.c: for small crypt_data
2017-05-24 9:07 [PATCH 01/10] string.c (rb_str_crypt): fix excessive stack use with crypt_r Eric Wong
2017-05-24 9:07 ` [PATCH 02/10] Add debug counters Eric Wong
@ 2017-05-24 9:07 ` Eric Wong
2017-05-24 9:07 ` [PATCH 04/10] Improve performance in where push the element into non shared Array object Eric Wong
` (6 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Eric Wong @ 2017-05-24 9:07 UTC (permalink / raw)
To: spew
From: nobu <nobu@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>
* string.c (rb_str_crypt): struct crypt_data defined in
missing/crypt.h is small enough.
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58866 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
---
missing/crypt.h | 2 ++
string.c | 10 ++++++++--
2 files changed, 10 insertions(+), 2 deletions(-)
diff --git a/missing/crypt.h b/missing/crypt.h
index 7c2642f593..7a78767931 100644
--- a/missing/crypt.h
+++ b/missing/crypt.h
@@ -237,6 +237,8 @@ struct crypt_data {
char cryptresult[1+4+4+11+1]; /* encrypted result */
};
+#define SIZEOF_CRYPT_DATA (KS_SIZE*8+(1+4+4+11+1))
+
char *crypt(const char *key, const char *setting);
void setkey(const char *key);
void encrypt(char *block, int flag);
diff --git a/string.c b/string.c
index bb86cb6b7f..bd4a481203 100644
--- a/string.c
+++ b/string.c
@@ -8722,8 +8722,14 @@ rb_str_oct(VALUE str)
static VALUE
rb_str_crypt(VALUE str, VALUE salt)
{
+#undef LARGE_CRYPT_DATA
#ifdef HAVE_CRYPT_R
+# if defined SIZEOF_CRYPT_DATA && SIZEOF_CRYPT_DATA <= 256
+ struct crypt_data cdata, *const data = &cdata;
+# else
+# undef LARGE_CRYPT_DATA
struct crypt_data *data = ALLOC(struct crypt_data);
+# endif
#else
extern char *crypt(const char *, const char *);
#endif
@@ -8762,7 +8768,7 @@ rb_str_crypt(VALUE str, VALUE salt)
res = crypt(s, saltp);
#endif
if (!res) {
-#ifdef HAVE_CRYPT_R
+#ifdef LARGE_CRYPT_DATA
int err = errno;
xfree(data);
errno = err;
@@ -8770,7 +8776,7 @@ rb_str_crypt(VALUE str, VALUE salt)
rb_sys_fail("crypt");
}
result = rb_str_new_cstr(res);
-#ifdef HAVE_CRYPT_R
+#ifdef LARGE_CRYPT_DATA
xfree(data);
#endif
FL_SET_RAW(result, OBJ_TAINTED_RAW(str) | OBJ_TAINTED_RAW(salt));
--
EW
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [PATCH 04/10] Improve performance in where push the element into non shared Array object
2017-05-24 9:07 [PATCH 01/10] string.c (rb_str_crypt): fix excessive stack use with crypt_r Eric Wong
2017-05-24 9:07 ` [PATCH 02/10] Add debug counters Eric Wong
2017-05-24 9:07 ` [PATCH 03/10] string.c: for small crypt_data Eric Wong
@ 2017-05-24 9:07 ` Eric Wong
2017-05-24 9:07 ` [PATCH 05/10] auto fiber scheduling and friends (VERY LIGHTLY TESTED) Eric Wong
` (5 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Eric Wong @ 2017-05-24 9:07 UTC (permalink / raw)
To: spew
From: watson1978 <watson1978@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>
* array.c (ary_ensure_room_for_push): use rb_ary_modify_check() instead of
rb_ary_modify() to check whether the object can be modified for non shared
Array object. rb_ary_modify() has the codes for shared Array object too.
In here, it has condition branch for shared / non shared Array object and
it can use rb_ary_modify_check() which is smaller function than
rb_ary_modify() for non shared object.
rb_ary_modify_check() will be expand as inline function.
If it will compile with GCC, Array#<< will be faster around 8%.
[ruby-core:81082] [Bug #13553] [Fix GH-1609]
## Clang 802.0.42
### Before
Array#<< 9.353M (± 1.7%) i/s - 46.787M in 5.004123s
Array#push 7.702M (± 1.1%) i/s - 38.577M in 5.009338s
Array#values_at 6.133M (± 1.9%) i/s - 30.699M in 5.007772s
### After
Array#<< 9.458M (± 2.0%) i/s - 47.357M in 5.009069s
Array#push 7.921M (± 1.8%) i/s - 39.665M in 5.009151s
Array#values_at 6.377M (± 2.3%) i/s - 31.881M in 5.001888s
### Result
Array#<< -> 1.2% faster
Array#push -> 2.8% faster
Array#values_at -> 3.9% faster
## GCC 7.1.0
### Before
Array#<< 10.497M (± 1.1%) i/s - 52.665M in 5.017601s
Array#push 8.527M (± 1.6%) i/s - 42.777M in 5.018003s
Array#values_at 7.621M (± 1.7%) i/s - 38.152M in 5.007910s
### After
Array#<< 11.403M (± 1.3%) i/s - 57.028M in 5.001849s
Array#push 8.924M (± 1.3%) i/s - 44.609M in 4.999940s
Array#values_at 8.291M (± 1.4%) i/s - 41.487M in 5.004727s
### Result
Array#<< -> 8.3% faster
Array#push -> 4.3% faster
Array#values_at -> 8.7% faster
## Test code
require 'benchmark/ips'
Benchmark.ips do |x|
x.report "Array#<<" do |i|
i.times { [1,2] << 3 }
end
x.report "Array#push" do |i|
i.times { [1,2].push(3) }
end
x.report "Array#values_at" do |i|
ary = [1, 2, 3, 4, 5]
i.times { ary.values_at(0, 2, 4) }
end
end
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58867 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
---
array.c | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/array.c b/array.c
index 0775e82999..6d7838b9d3 100644
--- a/array.c
+++ b/array.c
@@ -380,8 +380,11 @@ ary_ensure_room_for_push(VALUE ary, long add_len)
}
}
}
+ rb_ary_modify(ary);
+ }
+ else {
+ rb_ary_modify_check(ary);
}
- rb_ary_modify(ary);
capa = ARY_CAPA(ary);
if (new_len > capa) {
ary_double_capa(ary, new_len);
--
EW
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [PATCH 05/10] auto fiber scheduling and friends (VERY LIGHTLY TESTED)
2017-05-24 9:07 [PATCH 01/10] string.c (rb_str_crypt): fix excessive stack use with crypt_r Eric Wong
` (2 preceding siblings ...)
2017-05-24 9:07 ` [PATCH 04/10] Improve performance in where push the element into non shared Array object Eric Wong
@ 2017-05-24 9:07 ` Eric Wong
2017-05-24 9:07 ` [PATCH 06/10] iom: implement waitpid Eric Wong
` (4 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Eric Wong @ 2017-05-24 9:07 UTC (permalink / raw)
To: spew
Currently, IO scheduling seems to, waitpid/sleep/other scheduling
is not done, yet; but we do not need to support everything at
once during dev.
main API:
Fiber#start -> enable auto-scheduling and run Fiber until it
automatically yields (due to EAGAIN/EWOULDBLOCK)
The following behave like their Thread counterparts:
Fiber#join - run internal scheduler until Fiber is terminated
Fiber#value - ditto
Fiber#run (in prelude.rb)
Fiber.start (ditto)
I think we can iron out the internal APIs and behavior, first,
and gradually add support for auto-Fiber.yield points.
Right now, it takes over rb_wait_for_single_fd() function
if the running Fiber is auto-enabled (cont.c::rb_fiber_auto_sched_p)
Changes to existing functions are minimal.
New files (important structs and relations should be documented):
iom.h - internal API for the rest of RubyVM (incomplete?)
iom_common.h - common stuff internal to iom_*.h
iom_select.h - select()-specific pieces
iom_epoll.h - select()-specific pieces
Changes to existing data structures:
rb_thread_t.afhead - list of fibers to auto-resume
rb_fiber_t.afnode - link to rb_thread_t->afhead
rb_vm_t.iom - Ruby I/O Manager (rb_iom_t) :)
Besides rb_iom_t, all the new structs are stack-only and relies
extensively on ccan/list for branch-less O(1) insert/delete.
Right now, I reuse some static functions in thread.c,
so thread.c includes iom_select.h / iom_epoll.h
TODO:
Hijack other blocking functions (waitpid, IO.select, ...)
iom_kqueue.h (easy once iom.h definitions are done)
I am using "double" for timeout since it is more convenient for
arithmetic like parts of thread.c. Most platforms have good FP,
I think. Also, all "blocking" functions (rb_iom_wait*) will
have timeout support
Test script I used to download a script from my server:
----8<---
require 'net/http'
require 'uri'
require 'digest/sha1'
require 'fiber'
url = 'http://80x24.org/git-i-forgot-to-pack/objects/pack/pack-97b25a76c03b489d4cbbd85b12d0e1ad28717e55.idx'
uri = URI(url)
use_ssl = "https" == uri.scheme
fibs = 10.times.map do
Fiber.start do
cur = Fiber.current.object_id
# XXX getaddrinfo() and connect() are blocking
# XXX resolv/replace + connect_nonblock
Net::HTTP.start(uri.host, uri.port, use_ssl: use_ssl) do |http|
req = Net::HTTP::Get.new(uri)
http.request(req) do |res|
dig = Digest::SHA1.new
res.read_body do |buf|
dig.update(buf)
#warn "#{cur} #{buf.bytesize}\n"
end
warn "#{cur} #{dig.hexdigest}\n"
end
end
warn "done\n"
:done
end
end
warn "joining #{Time.now}\n"
fibs[-1].join(4)
warn "joined #{Time.now}\n"
all = fibs.dup
warn "1 joined, wait for the rest\n"
until fibs.empty?
fibs.each(&:join)
fibs.keep_if(&:alive?)
warn fibs.inspect
end
p all.map(&:value)
Fiber.new do
puts 'HI'
end.run.join
END
zero timeout select for auto-fibers
cross thread test
simplify iom_select, slightly
iom_common: define FMODE_IOM_PRIVATE flags
epoll WIP
more wip epoll
O3
compact waiter structure, timer holds fibval
wip
epworks
ep cleanups - need testing
epoll pingers need to notify the waiter
In case we enqueue events for a waiter thread, they need to know.
less mem
epoll: do not sleep with no timers or waiters
epoll: track FD list anyways
We need idempotent delete, and this is the simplest way to
implement it.
fork + retry
iom_select: fix empty case
epoll: do not set false-positive on event timeout
iom_select: simplify nowait check for zero FDs
use native_fd_select so we can wait for single FDs
support auto-scheduled fibers on systems without usable poll
simplify
---
common.mk | 4 +
configure.in | 5 +
cont.c | 156 +++++++++++++++-
include/ruby/io.h | 2 +
iom.h | 88 +++++++++
iom_common.h | 118 ++++++++++++
iom_epoll.h | 427 +++++++++++++++++++++++++++++++++++++++++++
iom_select.h | 411 +++++++++++++++++++++++++++++++++++++++++
lib/net/http/response.rb | 13 +-
prelude.rb | 12 ++
test/lib/leakchecker.rb | 9 +
test/ruby/test_fiber_auto.rb | 126 +++++++++++++
thread.c | 40 ++++
thread_pthread.c | 4 +
vm.c | 9 +-
vm_core.h | 4 +
16 files changed, 1417 insertions(+), 11 deletions(-)
create mode 100644 iom.h
create mode 100644 iom_common.h
create mode 100644 iom_epoll.h
create mode 100644 iom_select.h
create mode 100644 test/ruby/test_fiber_auto.rb
diff --git a/common.mk b/common.mk
index 654533ce8d..78ed9d09f6 100644
--- a/common.mk
+++ b/common.mk
@@ -2598,6 +2598,10 @@ thread.$(OBJEXT): {$(VPATH)}id.h
thread.$(OBJEXT): {$(VPATH)}intern.h
thread.$(OBJEXT): {$(VPATH)}internal.h
thread.$(OBJEXT): {$(VPATH)}io.h
+thread.$(OBJEXT): {$(VPATH)}iom.h
+thread.$(OBJEXT): {$(VPATH)}iom_common.h
+thread.$(OBJEXT): {$(VPATH)}iom_epoll.h
+thread.$(OBJEXT): {$(VPATH)}iom_select.h
thread.$(OBJEXT): {$(VPATH)}method.h
thread.$(OBJEXT): {$(VPATH)}missing.h
thread.$(OBJEXT): {$(VPATH)}node.h
diff --git a/configure.in b/configure.in
index 24035c7c1a..48b4817f63 100644
--- a/configure.in
+++ b/configure.in
@@ -1389,6 +1389,7 @@ AC_CHECK_HEADERS(process.h)
AC_CHECK_HEADERS(pwd.h)
AC_CHECK_HEADERS(setjmpex.h)
AC_CHECK_HEADERS(sys/attr.h)
+AC_CHECK_HEADERS(sys/epoll.h)
AC_CHECK_HEADERS(sys/fcntl.h)
AC_CHECK_HEADERS(sys/file.h)
AC_CHECK_HEADERS(sys/id.h)
@@ -2405,6 +2406,10 @@ AC_CHECK_FUNCS(dladdr)
AC_CHECK_FUNCS(dup)
AC_CHECK_FUNCS(dup3)
AC_CHECK_FUNCS(eaccess)
+AC_CHECK_FUNCS(epoll_create)
+AC_CHECK_FUNCS(epoll_create1)
+AC_CHECK_FUNCS(epoll_ctl)
+AC_CHECK_FUNCS(epoll_wait)
AC_CHECK_FUNCS(endgrent)
AC_CHECK_FUNCS(fchmod)
AC_CHECK_FUNCS(fchown)
diff --git a/cont.c b/cont.c
index 4d6176f00c..ced496f61b 100644
--- a/cont.c
+++ b/cont.c
@@ -13,6 +13,7 @@
#include "vm_core.h"
#include "gc.h"
#include "eval_intern.h"
+#include "iom.h"
/* FIBER_USE_NATIVE enables Fiber performance improvement using system
* dependent method such as make/setcontext on POSIX system or
@@ -126,6 +127,14 @@ static machine_stack_cache_t terminated_machine_stack;
struct rb_fiber_struct {
rb_context_t cont;
struct rb_fiber_struct *prev;
+
+ /*
+ * afnode.next == 0 auto fiber disabled
+ * afnode.next == &afnode auto fiber enabled, but not enqueued (running)
+ * afnode.next != &afnode auto fiber runnable (enqueued in th->afhead)
+ */
+ struct list_node afnode;
+
enum fiber_status status;
/* If a fiber invokes "transfer",
* then this fiber can't "resume" any more after that.
@@ -1496,19 +1505,24 @@ rb_fiber_terminate(rb_fiber_t *fib)
fiber_switch(return_fiber(), 1, &value, 0);
}
-VALUE
-rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
+static void
+fiber_check_resume(const rb_fiber_t *fib)
{
- rb_fiber_t *fib;
- GetFiberPtr(fibval, fib);
-
if (fib->prev != 0 || fib->cont.type == ROOT_FIBER_CONTEXT) {
rb_raise(rb_eFiberError, "double resume");
}
if (fib->transferred != 0) {
rb_raise(rb_eFiberError, "cannot resume transferred Fiber");
}
+}
+VALUE
+rb_fiber_resume(VALUE fibval, int argc, const VALUE *argv)
+{
+ rb_fiber_t *fib;
+ GetFiberPtr(fibval, fib);
+
+ fiber_check_resume(fib);
return fiber_switch(fib, argc, argv, 1);
}
@@ -1651,7 +1665,136 @@ rb_fiber_s_current(VALUE klass)
return rb_fiber_current();
}
+/* enqueue given Fiber so it may be auto-resumed */
+void
+rb_fiber_auto_enqueue(VALUE fibval)
+{
+ rb_fiber_t *fib;
+ rb_thread_t *th;
+
+ GetFiberPtr(fibval, fib);
+ GetThreadPtr(fib->cont.saved_thread.self, th);
+ list_add_tail(&th->afhead, &fib->afnode);
+}
+
+/* for vm.c::rb_thread_mark */
+void
+rb_fiber_auto_schedule_mark(const rb_thread_t *th)
+{
+ rb_fiber_t *fib = 0;
+ list_for_each(&th->afhead, fib, afnode) {
+ rb_gc_mark(fib->cont.self);
+ }
+}
+
+/* Returns true if auto-fiber is enabled for current fiber */
+int
+rb_fiber_auto_sched_p(const rb_thread_t *th)
+{
+ const rb_fiber_t *cur = th->fiber;
+
+ return (cur && cur->afnode.next && th->root_fiber != cur);
+}
+
+/*
+ * Resumes any ready fibers in the auto-Fiber run queue
+ * returns true if yielding current Fiber is needed, false if not
+ */
+int
+rb_fiber_auto_do_yield_p(rb_thread_t *th)
+{
+ rb_fiber_t *current_auto = rb_fiber_auto_sched_p(th) ? th->fiber : 0;
+ rb_fiber_t *fib = 0, *next = 0;
+ LIST_HEAD(tmp);
+
+ /*
+ * do not infinite loop as new fibers get added to
+ * th->afhead, only work off a temporary list:
+ */
+ list_append_list(&tmp, &th->afhead);
+ list_for_each_safe(&tmp, fib, next, afnode) {
+ if (fib == current_auto || (fib->prev && fib != th->root_fiber)) {
+ /* tell the caller to yield */
+ list_prepend_list(&th->afhead, &tmp);
+ return 1;
+ }
+ list_del_init(&fib->afnode);
+ fiber_check_resume(fib);
+ fiber_switch(fib, 0, 0, 1);
+ }
+ return 0;
+}
+
+/*
+ * Enable auto-scheduling for the Fiber and resume it
+ */
+static VALUE
+rb_fiber_auto_start(int argc, VALUE *argv, VALUE self)
+{
+ rb_fiber_t *fib;
+ GetFiberPtr(self, fib);
+ if (fib->afnode.next) {
+ rb_raise(rb_eFiberError, "Fiber already started");
+ }
+ list_node_init(&fib->afnode);
+ fiber_check_resume(fib);
+ return fiber_switch(fib, argc, argv, 1);
+}
+
+static void
+fiber_auto_join(rb_fiber_t *fib, double *timeout)
+{
+ rb_thread_t *th = GET_THREAD();
+ rb_fiber_t *cur = fiber_current();
+ if (cur == fib) {
+ rb_raise(rb_eFiberError, "Target fiber must not be current fiber");
+ }
+ if (th->root_fiber == fib) {
+ rb_raise(rb_eFiberError, "Target fiber must not be root fiber");
+ }
+ if (fib->cont.saved_thread.self != th->self) {
+ rb_raise(rb_eFiberError, "Target fiber not owned by current thread");
+ }
+ if (!fib->afnode.next) {
+ rb_raise(rb_eFiberError, "Target fiber is not an auto-fiber");
+ }
+
+ while (fib->status != TERMINATED && (timeout == 0 || *timeout >= 0.0)) {
+ rb_iom_schedule(th, timeout);
+ }
+}
+
+static VALUE
+rb_fiber_auto_join(int argc, VALUE *argv, VALUE self)
+{
+ rb_fiber_t *fib;
+ double timeout, *t;
+ VALUE limit;
+
+ GetFiberPtr(self, fib);
+ rb_scan_args(argc, argv, "01", &limit);
+
+ if (NIL_P(limit)) {
+ t = 0;
+ } else {
+ timeout = rb_num2dbl(limit);
+ t = &timeout;
+ }
+
+ fiber_auto_join(fib, t);
+ return fib->status == TERMINATED ? fib->cont.self : Qnil;
+}
+
+static VALUE
+rb_fiber_auto_value(VALUE self)
+{
+ rb_fiber_t *fib;
+ GetFiberPtr(self, fib);
+
+ fiber_auto_join(fib, 0);
+ return fib->cont.value;
+}
/*
* Document-class: FiberError
@@ -1688,6 +1831,9 @@ Init_Cont(void)
rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1);
rb_define_method(rb_cFiber, "initialize", rb_fiber_init, 0);
rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1);
+ rb_define_method(rb_cFiber, "start", rb_fiber_auto_start, -1);
+ rb_define_method(rb_cFiber, "join", rb_fiber_auto_join, -1);
+ rb_define_method(rb_cFiber, "value", rb_fiber_auto_value, 0);
}
RUBY_SYMBOL_EXPORT_BEGIN
diff --git a/include/ruby/io.h b/include/ruby/io.h
index 60d6f6d32e..3bd6f06cf3 100644
--- a/include/ruby/io.h
+++ b/include/ruby/io.h
@@ -116,6 +116,8 @@ typedef struct rb_io_t {
/* #define FMODE_UNIX 0x00200000 */
/* #define FMODE_INET 0x00400000 */
/* #define FMODE_INET6 0x00800000 */
+/* #define FMODE_IOM_PRIVATE1 0x01000000 */ /* OS-dependent */
+/* #define FMODE_IOM_PRIVATE2 0x02000000 */ /* OS-dependent */
#define GetOpenFile(obj,fp) rb_io_check_closed((fp) = RFILE(rb_io_taint_check(obj))->fptr)
diff --git a/iom.h b/iom.h
new file mode 100644
index 0000000000..b14723895c
--- /dev/null
+++ b/iom.h
@@ -0,0 +1,88 @@
+/*
+ * iom -> I/O Manager for RubyVM (auto-Fiber-aware)
+ *
+ * On platforms with epoll or kqueue, this should be ready for multicore;
+ * even if the rest of the RubyVM is not.
+ *
+ * Some inspiration taken from Mio in GHC:
+ * http://haskell.cs.yale.edu/wp-content/uploads/2013/08/hask035-voellmy.pdf
+ */
+#ifndef RUBY_IOM_H
+#define RUBY_IOM_H
+#include "ruby.h"
+#include "ruby/intern.h"
+#include "vm_core.h"
+
+typedef struct rb_iom_struct rb_iom_t;
+typedef struct rb_io_t rb_io_t;
+
+/* WARNING: unstable API, only for Ruby internal use */
+
+/*
+ * Note: the first "rb_thread_t *" is a placeholder and may be replaced
+ * with "rb_execution_context_t *" in the future.
+ */
+
+/*
+ * All functions with "wait" in it take an optional double * +timeout+
+ * argument specifying the timeout in seconds. If NULL, it can wait
+ * forever until the event happens (or the fiber is explicitly resumed).
+ *
+ * (maybe) TODO: If non-NULL, the timeout will be updated to the
+ * remaining time upon returning. Not sure if useful, could just be
+ * a a waste of cycles; so not implemented, yet.
+ */
+
+/*
+ * Relinquish calling fiber while waiting for +events+ on the given
+ * +rb_io_t+
+ *
+ * Multiple native threads can enter this function at the same time.
+ *
+ * Events are RB_WAITFD_IN, RB_WAITFD_OUT, RB_WAITFD_PRI
+ *
+ * Returns a mask of events.
+ */
+
+int rb_iom_waitio(rb_thread_t *, rb_io_t *, int events, double *timeout);
+
+/*
+ * Identical to rb_iom_waitio, but takes a pointer to an integer file
+ * descriptor, instead of rb_io_t. Use rb_iom_waitio when possible,
+ * since it allows us to optimize epoll (and perhaps avoid kqueue
+ * portability bugs across different *BSDs).
+ */
+int rb_iom_waitfd(rb_thread_t *, int *fdp, int events, double *timeout);
+
+/*
+ * Relinquish calling fiber to wait for the given PID to change status.
+ * Multiple native threads can enter this function at the same time.
+ * If timeout is negative, wait forever.
+ */
+rb_pid_t rb_iom_waitpid(rb_thread_t *,
+ rb_pid_t, int *status, int options, double *timeout);
+
+/*
+ * Relinquish calling fiber for at least the duration of given timeout
+ * in seconds. If timeout is negative, wait forever (until explicitly
+ * resumed).
+ * Multiple native threads can enter this function at the same time.
+ */
+void rb_iom_sleep(rb_thread_t *, double *timeout);
+
+/* callback for SIGCHLD, needed to implemented rb_iom_waitpid */
+void rb_iom_sigchld(rb_iom_t *);
+
+/*
+ * there is no public create function, creation is lazy to avoid incurring
+ * overhead for small scripts which do not need fibers, we only need this
+ * at VM destruction
+ */
+void rb_iom_destroy(rb_vm_t *);
+
+/*
+ * schedule
+ */
+void rb_iom_schedule(rb_thread_t *th, double *timeout);
+
+#endif /* RUBY_IOM_H */
diff --git a/iom_common.h b/iom_common.h
new file mode 100644
index 0000000000..d4b99ba12a
--- /dev/null
+++ b/iom_common.h
@@ -0,0 +1,118 @@
+#ifndef RB_IOM_COMMON_H
+#define RB_IOM_COMMON_H
+
+#include "iom.h"
+
+/* cont.c */
+void rb_fiber_auto_enqueue(VALUE fibval);
+int rb_fiber_auto_do_yield_p(rb_thread_t *);
+
+#define FMODE_IOM_PRIVATE1 0x01000000
+#define FMODE_IOM_PRIVATE2 0x02000000
+
+#define IOM_FIBMASK ((VALUE)0x1)
+#define IOM_FIB (0x2)
+#define IOM_WAIT (0x1)
+
+/* allocated on stack */
+struct rb_iom_timer {
+ struct list_node tnode; /* <=> rb_iom_struct.timers */
+ double expires_at; /* absolute monotonic time */
+ VALUE _fibval;
+};
+
+struct rb_iom_waiter {
+ struct rb_iom_timer timer;
+ struct list_node wnode; /* <=> rb_iom_struct.(fds|pids) */
+};
+
+struct rb_iom_fd_waiter {
+ struct rb_iom_waiter w;
+ int *fdp;
+ short events;
+ short revents;
+};
+
+struct rb_iom_pid_waiter {
+ struct rb_iom_waiter w;
+ rb_pid_t pid;
+ int status;
+ int options;
+};
+
+static VALUE
+rb_iom_timer_fibval(const struct rb_iom_timer *t)
+{
+ return t->_fibval & ~IOM_FIBMASK;
+}
+
+static struct rb_iom_waiter *
+rb_iom_waiter_of(struct rb_iom_timer *t)
+{
+ if (t->_fibval & IOM_FIBMASK) {
+ return 0;
+ }
+ return container_of(t, struct rb_iom_waiter, timer);
+}
+
+/* check for expired timers */
+static void
+rb_iom_timer_check(struct list_head *timers)
+{
+ struct rb_iom_timer *t = 0, *next = 0;
+ double now = timeofday();
+
+ list_for_each_safe(timers, t, next, tnode) {
+ if (t->expires_at <= now) {
+ struct rb_iom_waiter *w = rb_iom_waiter_of(t);
+ VALUE fibval = rb_iom_timer_fibval(t);
+
+ list_del_init(&t->tnode);
+ if (w) {
+ list_del_init(&w->wnode);
+ }
+ t->_fibval = Qfalse;
+
+ /* non-auto-fibers may set timer in rb_iom_schedule */
+ if (fibval != Qfalse) {
+ rb_fiber_auto_enqueue(fibval);
+ }
+ }
+ return; /* done, timers is a sorted list */
+ }
+}
+
+/* insert a new +timer+ into +timers+, maintain sort order by expires_at */
+static void
+rb_iom_timer_add(struct list_head *timers, struct rb_iom_timer *add,
+ const double *timeout, int flags)
+{
+ rb_iom_timer_check(timers);
+
+ add->_fibval = flags & IOM_FIB ? rb_fiber_current() : Qfalse;
+ add->_fibval |= flags & IOM_WAIT ? 0 : IOM_FIBMASK;
+
+ if (timeout) {
+ struct rb_iom_timer *i = 0;
+ add->expires_at = timeofday() + *timeout;
+
+ /*
+ * search backwards: assume typical projects have multiple objects
+ * sharing the same timeout values, so new timers will expire later
+ * than existing timers
+ */
+ list_for_each_rev(timers, i, tnode) {
+ if (add->expires_at > i->expires_at) {
+ list_add_after(timers, &i->tnode, &add->tnode);
+ return;
+ }
+ }
+ list_add(timers, &add->tnode);
+ }
+ else {
+ /* not active, just allow list_del to function in cleanup */
+ list_node_init(&add->tnode);
+ }
+}
+
+#endif /* IOM_COMMON_H */
diff --git a/iom_epoll.h b/iom_epoll.h
new file mode 100644
index 0000000000..76347e3f77
--- /dev/null
+++ b/iom_epoll.h
@@ -0,0 +1,427 @@
+/*
+ * Linux-only epoll-based implementation of I/O Manager for RubyVM
+ */
+#include "iom_common.h"
+#include "internal.h"
+#include <sys/epoll.h>
+#include <math.h> /* round() */
+#define FMODE_IOM_ADDED FMODE_IOM_PRIVATE1
+
+/* allocated on heap (rb_vm_t.iom) */
+struct rb_iom_struct {
+ /*
+ * Everything here is protected by GVL at this time,
+ * URCU lists (LGPL-2.1+) may be used in the future
+ */
+ struct list_head fds; /* -epmod.w.wnode, order agnostic */
+ struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
+ struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
+
+ int epoll_fd;
+ int maxevents; /* auto-increases */
+
+ /*
+ * Any number of threads may use epoll_wait concurrently on
+ * the same epoll_fd, but we only allow one thread to use
+ * a non-zero timeout; since it is possible for a long sleeper
+ * to miss fibers queued by other threads for the sleeping thread.
+ */
+ rb_thread_t *waiter;
+};
+
+struct epmod {
+ struct rb_iom_waiter w;
+ rb_thread_t *th;
+ int *fdp;
+ int *flags; /* fptr->mode */
+ struct epoll_event ev;
+};
+
+/* 1024 is the common RUBY_ALLOCV_LIMIT: */
+
+static void
+increase_events(rb_iom_t *iom, int retries)
+{
+ /* 1024 is the RUBY_ALLOCV_LIMIT on such systems */
+ const int max_alloca = 1024 / sizeof(struct epoll_event);
+ const int max = max_alloca * 2;
+
+ if (retries) {
+ iom->maxevents *= retries;
+ if (iom->maxevents > max || iom->maxevents <= 0) {
+ iom->maxevents = max;
+ }
+ }
+}
+
+static int
+double2msec(double sec)
+{
+ /*
+ * clamp timeout to workaround a Linux <= 2.6.37 bug,
+ * see epoll_wait(2) manpage
+ */
+ const int max_msec = 35 * 60 * 1000; /* floor(35.79 minutes) */
+ double msec = round(sec * 1000);
+
+ if (msec < (double)max_msec) {
+ int ret = (int)msec;
+ return ret < 0 ? 0 : ret;
+ }
+ return max_msec;
+}
+
+STATIC_ASSERT(epbits_matches_waitfd_bits,
+ RB_WAITFD_IN == EPOLLIN && RB_WAITFD_OUT == EPOLLOUT &&
+ RB_WAITFD_PRI == EPOLLPRI);
+
+static int
+rb_events2ep(int events)
+{
+ return EPOLLONESHOT | events;
+}
+
+static int
+rb_ep2events(int revents)
+{
+ return revents & (EPOLLIN|EPOLLOUT|EPOLLPRI);
+}
+
+static void
+rb_iom_init(rb_iom_t *iom)
+{
+ int err;
+
+ list_head_init(&iom->timers);
+ list_head_init(&iom->fds);
+ list_head_init(&iom->pids);
+ iom->waiter = 0;
+ iom->maxevents = 8;
+#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
+ iom->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+ if (iom->epoll_fd < 0) {
+ err = errno;
+ if (rb_gc_for_fd(err)) {
+ iom->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+ if (iom->epoll_fd < 0) {
+ rb_sys_fail("epoll_create1");
+ }
+ }
+ else if (err != ENOSYS) {
+ rb_syserr_fail(err, "epoll_create1");
+ }
+ else { /* libc >= kernel || build-env > run-env */
+#endif /* HAVE_EPOLL_CREATE1 */
+ iom->epoll_fd = epoll_create(1);
+ if (iom->epoll_fd < 0) {
+ err = errno;
+ if (rb_gc_for_fd(err)) {
+ iom->epoll_fd = epoll_create(1);
+ }
+ }
+ if (iom->epoll_fd < 0) {
+ rb_sys_fail("epoll_create");
+ }
+ rb_maygvl_fd_fix_cloexec(iom->epoll_fd);
+#if defined(EPOLL_CLOEXEC) && defined(HAVE_EPOLL_CREATE1)
+ }
+ }
+#endif /* HAVE_EPOLL_CREATE1 */
+ rb_update_max_fd(iom->epoll_fd);
+}
+
+/* we lazily create this, small scripts may never need iom */
+static rb_iom_t *
+rb_iom_create(rb_thread_t *th)
+{
+ rb_iom_t *iom = ALLOC(rb_iom_t);
+
+ rb_iom_init(iom);
+ return iom;
+}
+
+static rb_iom_t *
+rb_iom_get(rb_thread_t *th)
+{
+ VM_ASSERT(th && th->vm);
+ if (!th->vm->iom) {
+ th->vm->iom = rb_iom_create(th);
+ }
+ return th->vm->iom;
+}
+
+static int
+next_timeout(rb_iom_t *iom)
+{
+ struct rb_iom_timer *t;
+ t = list_top(&iom->timers, struct rb_iom_timer, tnode);
+
+ if (t) {
+ double diff = t->expires_at - timeofday();
+ return double2msec(diff);
+ }
+ else {
+ return -1; /* forever */
+ }
+}
+
+static void
+check_epwait(rb_thread_t *th, int nr, struct epoll_event *ev, int msec)
+{
+ if (nr >= 0) {
+ int i;
+
+ for (i = 0; i < nr; i++) {
+ struct epmod *epm = ev[i].data.ptr;
+ VALUE fibval = rb_iom_timer_fibval(&epm->w.timer);
+
+ epm->w.timer._fibval = Qfalse;
+ /* set return value for rb_wait(io|fd): */
+ epm->ev.events = rb_ep2events(ev[i].events);
+ list_del_init(&epm->w.timer.tnode);
+ list_del_init(&epm->w.wnode);
+ if (fibval != Qfalse) {
+ rb_fiber_auto_enqueue(fibval);
+ }
+ }
+ /* epoll_wait will not hit EINTR if msec is 0 */
+ if (msec) {
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
+ }
+ /* notify the waiter thread in case we enqueued fibers for them */
+ else if (nr > 0 && th->vm->iom->waiter) {
+ ubf_select(th->vm->iom->waiter);
+ rb_thread_schedule();
+ }
+ }
+ else {
+ int err = errno;
+ if (err == EINTR) {
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
+ }
+ else {
+ rb_syserr_fail(err, "epoll_wait");
+ }
+ }
+ rb_iom_timer_check(&th->vm->iom->timers);
+}
+
+/* perform a non-blocking epoll_wait while holding GVL */
+static void
+ping_events(rb_thread_t *th)
+{
+ rb_iom_t *iom = th->vm->iom;
+ VALUE v;
+ int nr;
+ struct epoll_event *ev = ALLOCV_N(struct epoll_event, v, iom->maxevents);
+ int retries = 0;
+
+ do {
+ nr = epoll_wait(iom->epoll_fd, ev, iom->maxevents, 0);
+ check_epwait(th, nr, ev, 0);
+ } while (nr == iom->maxevents && ++retries);
+ if (v) {
+ ALLOCV_END(v);
+ }
+ increase_events(iom, retries);
+ (void)rb_fiber_auto_do_yield_p(th);
+}
+
+static VALUE
+iom_schedule_any(VALUE ptr)
+{
+ rb_thread_t *th = (rb_thread_t *)ptr;
+ rb_iom_t *iom = th->vm->iom;
+
+ if (rb_fiber_auto_do_yield_p(th)) {
+ rb_fiber_yield(0, 0);
+ }
+ if (iom) {
+ int maxevents = iom->maxevents;
+ int nr = maxevents;
+ int msec = iom->waiter ? 0 : next_timeout(iom);
+
+ /* only need to release GVL when we have a non-zero timeout */
+ if (msec) {
+ VALUE v;
+ struct epoll_event *ev = ALLOCV_N(struct epoll_event, v, maxevents);
+
+ /* release GVL anyways if no FDs and no timers */
+ if (msec < 0 && list_empty(&iom->fds)) {
+ msec = 0;
+ }
+ else {
+ iom->waiter = th;
+ }
+
+ BLOCKING_REGION({
+ nr = epoll_wait(iom->epoll_fd, ev, maxevents, msec);
+ }, ubf_select, th, FALSE);
+ iom->waiter = 0;
+ check_epwait(th, nr, ev, msec);
+ if (v) {
+ ALLOCV_END(v);
+ }
+ }
+ if (nr == maxevents) { /* msec == 0 */
+ ping_events(th);
+ }
+ }
+ if (rb_fiber_auto_do_yield_p(th)) {
+ rb_fiber_yield(0, 0);
+ }
+ return Qfalse;
+}
+
+static VALUE
+cancel_timer(VALUE ptr)
+{
+ struct rb_iom_timer *t = (struct rb_iom_timer *)ptr;
+ list_del(&t->tnode);
+ return Qfalse;
+}
+
+void
+rb_iom_schedule(rb_thread_t *th, double *timeout)
+{
+ if (rb_fiber_auto_sched_p(th)) {
+ rb_iom_t *iom = th->vm->iom;
+
+ if (iom) {
+ rb_iom_timer_check(&iom->timers);
+ ping_events(th);
+ }
+ else if (rb_fiber_auto_do_yield_p(th)) {
+ rb_fiber_yield(0, 0);
+ }
+ }
+ else if (timeout) {
+ double t0 = timeofday();
+ struct rb_iom_timer t;
+ rb_iom_t *iom = rb_iom_get(th);
+
+ rb_iom_timer_add(&iom->timers, &t, timeout, 0);
+ rb_ensure(iom_schedule_any, (VALUE)th, cancel_timer, (VALUE)&t);
+ *timeout -= timeofday() - t0;
+ }
+ else {
+ rb_iom_t *iom = th->vm->iom;
+
+ if (iom) {
+ rb_iom_timer_check(&iom->timers);
+ }
+ iom_schedule_any((VALUE)th);
+ }
+}
+
+static VALUE
+iom_epmod(VALUE ptr)
+{
+ struct epmod *epm = (struct epmod *)ptr;
+ rb_thread_t *th = epm->th;
+ rb_iom_t *iom = rb_iom_get(th);
+ int e;
+ int fd = *epm->fdp;
+
+ if (epm->flags) {
+ if (*epm->flags & FMODE_IOM_ADDED) {
+ e = epoll_ctl(iom->epoll_fd, EPOLL_CTL_MOD, fd, &epm->ev);
+ }
+ else {
+ e = epoll_ctl(iom->epoll_fd, EPOLL_CTL_ADD, fd, &epm->ev);
+ if (e == 0) {
+ *epm->flags |= FMODE_IOM_ADDED;
+ }
+ }
+ }
+ else { /* don't know if added or not: */
+ e = epoll_ctl(iom->epoll_fd, EPOLL_CTL_MOD, fd, &epm->ev);
+ if (e < 0 && errno == ENOENT) {
+ e = epoll_ctl(iom->epoll_fd, EPOLL_CTL_ADD, fd, &epm->ev);
+ }
+ }
+ if (e < 0) {
+ rb_sys_fail("epoll_ctl");
+ }
+ epm->ev.events = 0; /* to be set on return value */
+ if (rb_fiber_auto_do_yield_p(th)) {
+ return rb_fiber_yield(0, 0);
+ }
+ ping_events(th);
+ return rb_fiber_yield(0, 0);
+}
+
+static VALUE
+cancel_fdwait(VALUE ptr)
+{
+ struct epmod *epm = (struct epmod *)ptr;
+
+ list_del(&epm->w.wnode);
+ list_del(&epm->w.timer.tnode);
+
+ return Qfalse;
+}
+
+static int
+iom_waitfd(rb_thread_t *th, int *fdp, int *flags, int events, double *timeout)
+{
+ rb_iom_t *iom = rb_iom_get(th);
+ struct epmod epm;
+
+ epm.th = th;
+ epm.fdp = fdp;
+ epm.flags = flags;
+ epm.ev.events = rb_events2ep(events);
+ epm.ev.data.ptr = &epm;
+
+ list_add(&iom->fds, &epm.w.wnode);
+ rb_iom_timer_add(&iom->timers, &epm.w.timer, timeout, IOM_FIB|IOM_WAIT);
+ rb_ensure(iom_epmod, (VALUE)&epm, cancel_fdwait, (VALUE)&epm);
+
+ return epm.ev.events; /* may be zero if timed out */
+}
+
+int
+rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, double *timeout)
+{
+ return iom_waitfd(th, &fptr->fd, &fptr->mode, events, timeout);
+}
+
+int
+rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
+{
+ return iom_waitfd(th, fdp, 0, events, timeout);
+}
+
+void
+rb_iom_destroy(rb_vm_t *vm)
+{
+ rb_iom_t *iom = vm->iom;
+ vm->iom = 0;
+ if (iom) {
+ /*
+ * it's confusing to share epoll FDs across processes
+ * (kqueue has a rather unique close-on-fork behavior)
+ */
+ if (iom->epoll_fd >= 0) {
+ close(iom->epoll_fd);
+ }
+ xfree(iom);
+ }
+}
+
+/* used by thread.c::rb_thread_atfork */
+static void
+rb_iom_atfork_child(rb_thread_t *th)
+{
+ rb_iom_destroy(th->vm);
+}
+
+/* used by thread_pthread.c */
+static int
+rb_iom_reserved_fd(int fd)
+{
+ rb_iom_t *iom = GET_VM()->iom;
+
+ return iom && fd == iom->epoll_fd;
+}
diff --git a/iom_select.h b/iom_select.h
new file mode 100644
index 0000000000..06d3f8e13a
--- /dev/null
+++ b/iom_select.h
@@ -0,0 +1,411 @@
+/*
+ * select()-based implementation of I/O Manager for RubyVM
+ *
+ * This is crippled and relies heavily on GVL compared to the
+ * epoll and kqueue versions
+ */
+#include "iom_common.h"
+#include "internal.h"
+
+/* allocated on heap (rb_vm_t.iom) */
+struct rb_iom_struct {
+ /*
+ * Everything here is protected by GVL at this time,
+ * URCU lists (LGPL-2.1+) may be used in the future
+ */
+ struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
+ struct list_head fds; /* -rb_iom_fd_waiter.w.wnode, FIFO order */
+ struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
+
+ rb_serial_t select_start; /* WR protected by GVL (no need to protect RD) */
+ rb_serial_t select_gen; /* RDWR protected by GVL */
+ rb_thread_t *selector; /* rb_thread_fd_select owner (acts as lock w/ GVL */
+
+ /* these could be on stack, but they're huge on some platforms: */
+ rb_fdset_t rfds;
+ rb_fdset_t wfds;
+ rb_fdset_t efds;
+};
+
+/* allocated on stack */
+struct select_do {
+ rb_thread_t *th;
+ int do_wait;
+ int ret;
+};
+
+static void
+rb_iom_init(rb_iom_t *iom)
+{
+ iom->select_start = 0;
+ iom->select_gen = 0;
+ iom->selector = 0;
+ list_head_init(&iom->timers);
+ list_head_init(&iom->fds);
+ list_head_init(&iom->pids);
+}
+
+/* we lazily create this, small scripts may never need iom */
+static rb_iom_t *
+rb_iom_create(rb_thread_t *th)
+{
+ rb_iom_t *iom = ALLOC(rb_iom_t);
+ rb_iom_init(iom);
+ return iom;
+}
+
+static rb_iom_t *
+rb_iom_get(rb_thread_t *th)
+{
+ VM_ASSERT(th && th->vm);
+ if (!th->vm->iom) {
+ th->vm->iom = rb_iom_create(th);
+ }
+ return th->vm->iom;
+}
+
+static void
+waiter_cleanup(struct rb_iom_waiter *w)
+{
+ list_del(&w->timer.tnode);
+ list_del(&w->wnode);
+}
+
+static VALUE
+fd_waiter_cleanup(VALUE ptr)
+{
+ struct rb_iom_fd_waiter *fdw = (struct rb_iom_fd_waiter *)ptr;
+ waiter_cleanup(&fdw->w);
+ return Qfalse;
+}
+
+static struct timeval *
+next_timeout(rb_iom_t *iom, struct timeval *tv)
+{
+ struct rb_iom_timer *t;
+ t = list_top(&iom->timers, struct rb_iom_timer, tnode);
+
+ if (t) {
+ double diff = t->expires_at - timeofday();
+ if (diff > 0) {
+ *tv = double2timeval(diff);
+ }
+ else {
+ tv->tv_sec = 0;
+ tv->tv_usec = 0;
+ }
+
+ return tv;
+ }
+ else {
+ return 0;
+ }
+}
+
+static VALUE
+iom_select_do(VALUE ptr)
+{
+ struct select_do *sd = (struct select_do *)ptr;
+ rb_thread_t *th = sd->th;
+ rb_iom_t *iom = th->vm->iom;
+ int max = -1;
+ struct timeval tv, *tvp;
+ struct rb_iom_fd_waiter *fdw = 0;
+ rb_fdset_t *rfds, *wfds, *efds;
+
+ iom->select_start = iom->select_gen;
+ sd->ret = 0;
+ rfds = wfds = efds = 0;
+ rb_fd_init(&iom->rfds);
+ rb_fd_init(&iom->wfds);
+ rb_fd_init(&iom->efds);
+
+ list_for_each(&iom->fds, fdw, w.wnode) {
+ int fd = *fdw->fdp;
+ if (fd < 0) {
+ continue; /* closed */
+ }
+ if (fd > max) {
+ max = fd;
+ }
+ if (fdw->events & RB_WAITFD_IN) {
+ rb_fd_set(fd, rfds = &iom->rfds);
+ }
+ if (fdw->events & RB_WAITFD_OUT) {
+ rb_fd_set(fd, wfds = &iom->wfds);
+ }
+ if (fdw->events & RB_WAITFD_PRI) {
+ rb_fd_set(fd, efds = &iom->efds);
+ }
+ }
+
+ if (sd->do_wait) {
+ tvp = next_timeout(iom, &tv);
+ if (tvp == 0 && max < 0) {
+ goto nowait;
+ }
+ }
+ else {
+ nowait:
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ tvp = &tv;
+ }
+ BLOCKING_REGION({
+ sd->ret = native_fd_select(max + 1, rfds, wfds, efds, tvp, th);
+ }, ubf_select, th, FALSE);
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
+
+ return Qfalse;
+}
+
+static VALUE
+iom_select_done(VALUE ptr)
+{
+ struct select_do *sd = (struct select_do *)ptr;
+ rb_iom_t *iom = sd->th->vm->iom;
+ struct rb_iom_fd_waiter *fdw = 0, *next = 0;
+ int ret = sd->ret;
+
+ iom->selector = 0;
+ if (ret > 0) {
+ list_for_each_safe(&iom->fds, fdw, next, w.wnode) {
+ int fd = *fdw->fdp;
+ if (fd < 0) {
+ continue; /* closed */
+ }
+ if (fdw->events & RB_WAITFD_IN && rb_fd_isset(fd, &iom->rfds)) {
+ fdw->revents |= RB_WAITFD_IN;
+ }
+ if (fdw->events & RB_WAITFD_OUT && rb_fd_isset(fd, &iom->wfds)) {
+ fdw->revents |= RB_WAITFD_OUT;
+ }
+ if (fdw->events & RB_WAITFD_PRI && rb_fd_isset(fd, &iom->efds)) {
+ fdw->revents |= RB_WAITFD_PRI;
+ }
+
+ /* got revents? enqueue ourselves to be run! */
+ if (fdw->revents) {
+ VALUE fibval = rb_iom_timer_fibval(&fdw->w.timer);
+
+ /* drop IOM_FIBMASK bit, leaving timers list */
+ fdw->w.timer._fibval = Qfalse;
+ list_del_init(&fdw->w.timer.tnode);
+ list_del_init(&fdw->w.wnode);
+ VM_ASSERT(fibval != Qfalse && "fibval invalid in fds list");
+ rb_fiber_auto_enqueue(fibval);
+ if (--ret <= 0) {
+ break;
+ }
+ }
+ }
+ }
+
+ rb_fd_term(&iom->rfds);
+ rb_fd_term(&iom->wfds);
+ rb_fd_term(&iom->efds);
+
+ return Qfalse;
+}
+
+static int
+retry_select_p(struct select_do *sd, rb_iom_t *iom)
+{
+ /*
+ * if somebody changed iom->fds while we were inside select,
+ * rerun it with zero timeout to avoid a race condition.
+ * This is not necessary for epoll or kqueue because the kernel
+ * is constantly monitoring the watched set.
+ */
+ if (iom->select_start != iom->select_gen) {
+ sd->do_wait = 0;
+ return 1;
+ }
+ return 0;
+}
+
+static VALUE
+iom_schedule_any(VALUE ptr)
+{
+ struct select_do *sd = (struct select_do *)ptr;
+ rb_thread_t *th = sd->th;
+ rb_iom_t *iom = th->vm->iom;
+ rb_thread_t *s;
+
+ if (rb_fiber_auto_do_yield_p(th)) {
+ rb_fiber_yield(0, 0);
+ }
+ if (!iom) {
+ return Qfalse;
+ }
+ s = iom->selector;
+ if (s) { /* somebody else is running select() */
+ if (iom->select_start != iom->select_gen) {
+ ubf_select(s);
+ }
+ if (rb_fiber_auto_sched_p(th)) {
+ rb_fiber_yield(0, 0);
+ }
+ }
+ else {
+ iom->selector = th;
+
+ do {
+ rb_ensure(iom_select_do, (VALUE)sd, iom_select_done, (VALUE)sd);
+ rb_iom_timer_check(&iom->timers);
+ } while (retry_select_p(sd, iom));
+
+ if (rb_fiber_auto_do_yield_p(th)) {
+ rb_fiber_yield(0, 0);
+ }
+ }
+ return Qfalse;
+}
+
+static VALUE
+schedule_done(VALUE ptr)
+{
+ struct rb_iom_timer *t = (struct rb_iom_timer *)ptr;
+ list_del(&t->tnode);
+ return Qfalse;
+}
+
+void
+rb_iom_schedule(rb_thread_t *th, double *timeout)
+{
+ struct select_do sd;
+
+ sd.th = th;
+ sd.do_wait = !rb_fiber_auto_sched_p(th);
+
+ if (timeout && sd.do_wait) {
+ double t0 = timeofday();
+ rb_iom_t *iom = rb_iom_get(th);
+ struct rb_iom_timer t;
+
+ rb_iom_timer_add(&iom->timers, &t, timeout, 0);
+ rb_ensure(iom_schedule_any, (VALUE)&sd, schedule_done, (VALUE)&t);
+ *timeout -= timeofday() - t0;
+ }
+ else {
+ rb_iom_t *iom = th->vm->iom;
+ if (iom) {
+ rb_iom_timer_check(&iom->timers);
+ }
+ iom_schedule_any((VALUE)&sd);
+ }
+}
+
+static VALUE
+iom_schedule_fd(VALUE ptr)
+{
+ rb_thread_t *th = (rb_thread_t *)ptr;
+ rb_iom_t *iom = rb_iom_get(th);
+ rb_thread_t *s;
+
+ if (rb_fiber_auto_do_yield_p(th)) {
+ return rb_fiber_yield(0, 0);
+ }
+
+ s = iom->selector;
+ if (s) {
+ /*
+ * kick the select thread, retry_select_p will return true since
+ * caller bumped select_gen
+ */
+ ubf_select(s);
+ return rb_fiber_yield(0, 0);
+ }
+ else {
+ struct select_do sd;
+
+ iom->selector = sd.th = th;
+ sd.do_wait = 0;
+ do {
+ rb_ensure(iom_select_do, (VALUE)&sd, iom_select_done, (VALUE)&sd);
+ rb_iom_timer_check(&iom->timers);
+ } while (retry_select_p(&sd, iom));
+ (void)rb_fiber_auto_do_yield_p(th);
+ }
+ return rb_fiber_yield(0, 0);
+}
+
+
+/* only epoll takes advantage of this */
+int
+rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, double *timeout)
+{
+ return rb_iom_waitfd(th, &fptr->fd, events, timeout);
+}
+
+int
+rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
+{
+ rb_iom_t *iom = rb_iom_get(th);
+ struct rb_iom_fd_waiter fdw;
+
+ if (*fdp < 0) return -1;
+ fdw.fdp = fdp;
+ fdw.events = (short)events;
+ fdw.revents = 0;
+
+ /* use FIFO order for fairness in iom_select_done */
+ list_add_tail(&iom->fds, &fdw.w.wnode);
+
+ rb_iom_timer_add(&iom->timers, &fdw.w.timer, timeout, IOM_FIB|IOM_WAIT);
+ iom->select_gen++;
+ rb_ensure(iom_schedule_fd, (VALUE)th, fd_waiter_cleanup, (VALUE)&fdw);
+
+ if (*fdp < 0) return -1;
+
+ return (int)fdw.revents; /* may be zero if timed out */
+}
+
+rb_pid_t
+rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
+ double *timeout)
+{
+ rb_iom_t *iom = rb_iom_get(th);
+ struct rb_iom_pid_waiter pw;
+
+ VM_ASSERT((options & WNOHANG) == 0 &&
+ "WNOHANG should be handled in rb_waitpid");
+
+ pw.pid = pid;
+ pw.options = options;
+ rb_iom_timer_add(&iom->timers, &pw.w.timer, timeout, IOM_FIB|IOM_WAIT);
+
+ /* LIFO, to match Linux wait4() blocking behavior */
+ list_add(&iom->pids, &pw.w.wnode);
+
+ /* do not touch select_gen, this has nothing to do with select() */
+
+ /* TODO
+ * rb_ensure(iom_schedule_pid, (VALUE)&pw, pid_waiter_cleanup, (VALUE)&pw);
+ */
+
+ return -1;
+}
+
+void
+rb_iom_destroy(rb_vm_t *vm)
+{
+ if (vm->iom) {
+ xfree(vm->iom);
+ vm->iom = 0;
+ }
+}
+
+/* used by thread.c::rb_thread_atfork */
+static void
+rb_iom_atfork_child(rb_thread_t *th)
+{
+ rb_iom_destroy(th->vm);
+}
+
+/* used by thread_pthread.c */
+static int
+rb_iom_reserved_fd(int fd)
+{
+ return 0;
+}
diff --git a/lib/net/http/response.rb b/lib/net/http/response.rb
index 1351d7b2d2..92c5c6d88a 100644
--- a/lib/net/http/response.rb
+++ b/lib/net/http/response.rb
@@ -52,16 +52,19 @@ def response_class(code)
def each_response_header(sock)
key = value = nil
while true
- line = sock.readuntil("\n", true).sub(/\s+\z/, '')
+ line = sock.readuntil(-"\n", true).sub(/\s+\z/, -'')
break if line.empty?
- if line[0] == ?\s or line[0] == ?\t and value
- value << ' ' unless value.empty?
- value << line.strip
+ if line.start_with?(-' ', -"\t") and value
+ value << -' ' unless value.empty?
+ line.strip!
+ value << line
else
yield key, value if key
- key, value = line.strip.split(/\s*:\s*/, 2)
+ line.strip!
+ key, value = line.split(/\s*:\s*/, 2)
raise Net::HTTPBadResponse, 'wrong header line format' if value.nil?
end
+ line.clear
end
yield key, value if key
end
diff --git a/prelude.rb b/prelude.rb
index 7b98e28285..1e71a3f168 100644
--- a/prelude.rb
+++ b/prelude.rb
@@ -16,6 +16,18 @@ def self.exclusive
end
end
+class Fiber
+ def self.start(*args, &block)
+ # loses Fiber#resume return value, but maybe people do not care
+ new(&block).run
+ end
+
+ def run
+ start
+ self
+ end
+end
+
class IO
# call-seq:
diff --git a/test/lib/leakchecker.rb b/test/lib/leakchecker.rb
index c0cf718635..562f32efca 100644
--- a/test/lib/leakchecker.rb
+++ b/test/lib/leakchecker.rb
@@ -28,6 +28,15 @@ def find_fds
if d.respond_to? :fileno
a -= [d.fileno]
end
+
+ # only place we use epoll is internally, do not count it against us
+ a.delete_if do |fd|
+ begin
+ File.readlink("#{fd_dir}/#{fd}").match?(/\beventpoll\b/)
+ rescue
+ false
+ end
+ end
a
}
fds.sort
diff --git a/test/ruby/test_fiber_auto.rb b/test/ruby/test_fiber_auto.rb
new file mode 100644
index 0000000000..0dd798644c
--- /dev/null
+++ b/test/ruby/test_fiber_auto.rb
@@ -0,0 +1,126 @@
+# frozen_string_literal: true
+require 'test/unit'
+require 'fiber'
+require 'io/nonblock'
+require 'io/wait'
+
+class TestFiberAuto < Test::Unit::TestCase
+ def test_value
+ nr = 0
+ f = Fiber.start { nr += 1 }
+ assert_equal 1, f.value, 'value returns as expected'
+ assert_equal 1, f.value, 'value is idempotent'
+ end
+
+ def test_join
+ nr = 0
+ f = Fiber.start { nr += 1 }
+ assert_equal f, f.join
+ assert_equal 1, f.value
+ assert_equal f, f.join, 'join is idempotent'
+ assert_equal f, f.join(10), 'join is idempotent w/ timeout'
+ end
+
+ def test_io_wait_read
+ tick = 0.1 # 100ms, TIME_QUANTUM_USEC in thread_pthread.c
+ IO.pipe do |r, w|
+ r.nonblock = w.nonblock = true
+ rdr = Fiber.start { r.read(1) }
+ t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ assert_nil rdr.join(tick), 'join returns nil on timeout'
+ diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0
+ assert_operator diff, :>=, tick, 'Fiber still running'
+ w.write('a')
+ assert_equal 'a', rdr.value, 'finished'
+
+ t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ rwait = Fiber.start { r.wait_readable(tick) }
+ assert_nil rwait.value, 'IO returned no events after timeout'
+ diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0
+ assert_operator diff, :>=, tick, 'Fiber waited for timeout'
+ end
+ end
+
+ # make sure we do not create extra FDs (from epoll/kqueue)
+ # for operations which do not need it
+ def test_fd_creation_and_fork
+ assert_separately(%w(-r io/nonblock), <<~'end;') # do
+ fdrange = (3..64)
+ reserved = -'The given fd is not accessible because RubyVM reserves it'
+ fdstats = lambda do
+ stats = Hash.new(0)
+ fdrange.map do |fd|
+ begin
+ IO.for_fd(fd, autoclose: false)
+ stats[:good] += 1
+ rescue Errno::EBADF # ignore
+ rescue ArgumentError => e
+ raise if e.message != reserved
+ stats[:reserved] += 1
+ end
+ end
+ stats.freeze
+ end
+
+ before = fdstats.call
+ Fiber.start { :fib }.join
+ assert_equal before, fdstats.call,
+ 'Fiber.start + Fiber#join does not create new FD'
+
+ # now, epoll/kqueue implementations create FDs, ensure they're reserved
+ IO.pipe do |r, w|
+ r.nonblock = w.nonblock = true
+
+ before_io = fdstats.call
+ assert_equal before[:reserved], before_io[:reserved],
+ 'no reserved FDs created before I/O attempted'
+
+ f1 = Fiber.start { r.read(1) }
+ after_io = fdstats.call
+ assert_equal before_io[:good], after_io[:good],
+ 'no change in unreserved FDs during I/O wait'
+
+ w.write('!')
+ assert_equal '!', f1.value, 'auto-Fiber IO#read works'
+ assert_operator before_io[:reserved], :<=, after_io[:reserved],
+ 'a reserved FD may be created on some implementations'
+
+ # ensure we do not share epoll/kqueue FDs across fork
+ if Process.respond_to?(:fork) && Process.respond_to?(:waitpid2)
+ pid = fork do
+ after_fork = fdstats.call
+ w.write(after_fork.inspect == before_io.inspect ? '1' : '0')
+
+ # ensure auto-Fiber works properly after forking
+ IO.pipe do |a, b|
+ a.nonblock = b.nonblock = true
+ f1 = Fiber.start { a.read(1) }
+ f2 = Fiber.start { b.write('!') }
+ assert_equal 1, f2.value
+ w.write(f1.value == '!' ? '!' : '?')
+ end
+
+ exit!(0)
+ end
+ assert_equal '1', r.read(1), 'reserved FD cleared after fork'
+ assert_equal '!', r.read(1), 'auto-fiber works after forking'
+ _, status = Process.waitpid2(pid)
+ assert_predicate status, :success?, 'forked child exited properly'
+ end
+ end
+ end;
+ end
+
+ # As usual, cannot resume/run fibers across threads, but the
+ # scheduler works across threads
+ def test_cross_thread_schedule
+ IO.pipe do |r, w|
+ r.nonblock = w.nonblock = true
+ t = Thread.new { Fiber.start { r.read(1) }.value }
+ assert_nil t.join(0.1)
+ f = Fiber.start { w.write('a') }
+ assert_equal 'a', t.value
+ assert_equal 1, f.value
+ end
+ end
+end
diff --git a/thread.c b/thread.c
index 76fb1360c9..ca02bd0504 100644
--- a/thread.c
+++ b/thread.c
@@ -70,6 +70,10 @@
#include "ruby/thread.h"
#include "ruby/thread_native.h"
#include "internal.h"
+#include "iom.h"
+
+/* cont.c */
+int rb_fiber_auto_sched_p(const rb_thread_t *th);
#ifndef USE_NATIVE_THREAD_PRIORITY
#define USE_NATIVE_THREAD_PRIORITY 0
@@ -3484,6 +3488,18 @@ rb_thread_priority_set(VALUE thread, VALUE prio)
/* for IO */
+static int
+rb_iom_waitfd_tv(rb_thread_t *th, int *fdp, int events, struct timeval *tv)
+{
+ double *to = NULL;
+ double tout;
+ if (tv) {
+ tout = (double)tv->tv_sec + (double)tv->tv_usec * 1e-6;
+ to = &tout;
+ }
+ return rb_iom_waitfd(th, fdp, events, to);
+}
+
#if defined(NFDBITS) && defined(HAVE_RB_FD_INIT)
/*
@@ -3919,6 +3935,10 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
struct timespec *timeout = NULL;
rb_thread_t *th = GET_THREAD();
+ if (rb_fiber_auto_sched_p(th)) {
+ return rb_iom_waitfd_tv(th, &fd, events, tv);
+ }
+
#define poll_update() \
(update_timespec(timeout, limit), \
TRUE)
@@ -4027,7 +4047,11 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
struct select_args args;
int r;
VALUE ptr = (VALUE)&args;
+ rb_thread_t *th = GET_THREAD();
+ if (rb_fiber_auto_sched_p(th)) {
+ return rb_iom_waitfd_tv(th, &fd, events, tv);
+ }
args.as.fd = fd;
args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
@@ -4174,10 +4198,13 @@ terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th)
}
}
+static void rb_iom_atfork_child(rb_thread_t *);
+
void
rb_thread_atfork(void)
{
rb_thread_t *th = GET_THREAD();
+ rb_iom_atfork_child(th);
rb_thread_atfork_internal(th, terminate_atfork_i);
th->join_list = NULL;
@@ -5070,6 +5097,12 @@ rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data)
}
void
+rb_thread_ubf_select(rb_thread_t *th)
+{
+ ubf_select(th);
+}
+
+void
ruby_kill(rb_pid_t pid, int sig)
{
int err;
@@ -5091,3 +5124,10 @@ ruby_kill(rb_pid_t pid, int sig)
rb_sys_fail(0);
}
}
+
+#if defined(HAVE_SYS_EPOLL_H) && defined(HAVE_EPOLL_CREATE) && \
+ defined(HAVE_EPOLL_CTL) && defined(HAVE_EPOLL_WAIT)
+# include "iom_epoll.h"
+#else
+# include "iom_select.h"
+#endif
diff --git a/thread_pthread.c b/thread_pthread.c
index 43644e7c06..a544994c49 100644
--- a/thread_pthread.c
+++ b/thread_pthread.c
@@ -1735,9 +1735,13 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr)
}
#endif
+static int rb_iom_reserved_fd(int);
int
rb_reserved_fd_p(int fd)
{
+ if (rb_iom_reserved_fd(fd)) {
+ return 1;
+ }
#if USE_SLEEPY_TIMER_THREAD
if ((fd == timer_thread_pipe.normal[0] ||
fd == timer_thread_pipe.normal[1] ||
diff --git a/vm.c b/vm.c
index 4810321d6f..656b50ca19 100644
--- a/vm.c
+++ b/vm.c
@@ -11,6 +11,7 @@
#include "internal.h"
#include "ruby/vm.h"
#include "ruby/st.h"
+#include "iom.h"
#include "gc.h"
#include "vm_core.h"
@@ -2179,6 +2180,7 @@ ruby_vm_destruct(rb_vm_t *vm)
rb_fiber_reset_root_local_storage(th->self);
thread_free(th);
}
+ rb_iom_destroy(vm);
rb_vm_living_threads_init(vm);
ruby_vm_run_at_exit_hooks(vm);
if (vm->loading_table) {
@@ -2345,6 +2347,7 @@ rb_thread_recycle_stack_release(VALUE *stack)
}
void rb_fiber_mark_self(rb_fiber_t *fib);
+void rb_fiber_auto_schedule_mark(const rb_thread_t *th);
void
rb_thread_mark(void *ptr)
@@ -2387,6 +2390,9 @@ rb_thread_mark(void *ptr)
RUBY_MARK_UNLESS_NULL(th->top_wrapper);
rb_fiber_mark_self(th->fiber);
rb_fiber_mark_self(th->root_fiber);
+ if (th->afhead.n.next && !list_empty(&th->afhead)) {
+ rb_fiber_auto_schedule_mark(th);
+ }
RUBY_MARK_UNLESS_NULL(th->stat_insn_usage);
RUBY_MARK_UNLESS_NULL(th->last_status);
@@ -2513,6 +2519,8 @@ th_init(rb_thread_t *th, VALUE self)
th->ec.cfp = (void *)(th->ec.stack + th->ec.stack_size);
+ list_head_init(&th->afhead);
+
vm_push_frame(th, 0 /* dummy iseq */, VM_FRAME_MAGIC_DUMMY | VM_ENV_FLAG_LOCAL | VM_FRAME_FLAG_FINISH | VM_FRAME_FLAG_CFRAME /* dummy frame */,
Qnil /* dummy self */, VM_BLOCK_HANDLER_NONE /* dummy block ptr */,
0 /* dummy cref/me */,
@@ -3102,7 +3110,6 @@ Init_BareVM(void)
}
MEMZERO(th, rb_thread_t, 1);
rb_thread_set_current_raw(th);
-
vm_init2(vm);
vm->objspace = rb_objspace_alloc();
ruby_current_vm = vm;
diff --git a/vm_core.h b/vm_core.h
index 022243fb1d..3c92306815 100644
--- a/vm_core.h
+++ b/vm_core.h
@@ -481,6 +481,8 @@ typedef struct rb_hook_list_struct {
int need_clean;
} rb_hook_list_t;
+struct rb_iom_struct;
+
typedef struct rb_vm_struct {
VALUE self;
@@ -489,6 +491,7 @@ typedef struct rb_vm_struct {
struct rb_thread_struct *main_thread;
struct rb_thread_struct *running_thread;
+ struct rb_iom_struct *iom;
struct list_head waiting_fds; /* <=> struct waiting_fd */
struct list_head living_threads;
@@ -808,6 +811,7 @@ typedef struct rb_thread_struct {
rb_fiber_t *fiber;
rb_fiber_t *root_fiber;
rb_jmpbuf_t root_jmpbuf;
+ struct list_head afhead; /* -rb_fiber_t.afnode */
/* ensure & callcc */
rb_ensure_list_t *ensure_list;
--
EW
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [PATCH 06/10] iom: implement waitpid
2017-05-24 9:07 [PATCH 01/10] string.c (rb_str_crypt): fix excessive stack use with crypt_r Eric Wong
` (3 preceding siblings ...)
2017-05-24 9:07 ` [PATCH 05/10] auto fiber scheduling and friends (VERY LIGHTLY TESTED) Eric Wong
@ 2017-05-24 9:07 ` Eric Wong
2017-05-24 9:07 ` [PATCH 07/10] avoid redefinition Eric Wong
` (3 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Eric Wong @ 2017-05-24 9:07 UTC (permalink / raw)
To: spew
This requires us to constantly hijack the SIGCHLD handler,
so special values like nil, 'SYSTEM_DEFAULT', 'SIG_IGN',
'IGNORE', are always ignored in favor Ruby's internal
signal handling
---
common.mk | 1 +
iom.h | 7 ++--
iom_common.h | 2 ++
iom_epoll.h | 2 ++
iom_select.h | 28 ++--------------
iom_waitpid.h | 79 ++++++++++++++++++++++++++++++++++++++++++++
process.c | 14 +++++---
signal.c | 40 ++++++++++++++++++----
test/ruby/test_fiber_auto.rb | 60 +++++++++++++++++++++++++++++++++
thread.c | 3 --
10 files changed, 195 insertions(+), 41 deletions(-)
create mode 100644 iom_waitpid.h
diff --git a/common.mk b/common.mk
index 78ed9d09f6..e79aae966c 100644
--- a/common.mk
+++ b/common.mk
@@ -2602,6 +2602,7 @@ thread.$(OBJEXT): {$(VPATH)}iom.h
thread.$(OBJEXT): {$(VPATH)}iom_common.h
thread.$(OBJEXT): {$(VPATH)}iom_epoll.h
thread.$(OBJEXT): {$(VPATH)}iom_select.h
+thread.$(OBJEXT): {$(VPATH)}iom_waitpid.h
thread.$(OBJEXT): {$(VPATH)}method.h
thread.$(OBJEXT): {$(VPATH)}missing.h
thread.$(OBJEXT): {$(VPATH)}node.h
diff --git a/iom.h b/iom.h
index b14723895c..106f766c59 100644
--- a/iom.h
+++ b/iom.h
@@ -70,8 +70,8 @@ rb_pid_t rb_iom_waitpid(rb_thread_t *,
*/
void rb_iom_sleep(rb_thread_t *, double *timeout);
-/* callback for SIGCHLD, needed to implemented rb_iom_waitpid */
-void rb_iom_sigchld(rb_iom_t *);
+/* callback for SIGCHLD, needed to implemented for rb_iom_waitpid */
+void rb_iom_sigchld(rb_vm_t *);
/*
* there is no public create function, creation is lazy to avoid incurring
@@ -85,4 +85,7 @@ void rb_iom_destroy(rb_vm_t *);
*/
void rb_iom_schedule(rb_thread_t *th, double *timeout);
+/* cont.c */
+int rb_fiber_auto_sched_p(const rb_thread_t *);
+
#endif /* RUBY_IOM_H */
diff --git a/iom_common.h b/iom_common.h
index d4b99ba12a..9a48a985b5 100644
--- a/iom_common.h
+++ b/iom_common.h
@@ -35,9 +35,11 @@ struct rb_iom_fd_waiter {
struct rb_iom_pid_waiter {
struct rb_iom_waiter w;
+ rb_thread_t *th;
rb_pid_t pid;
int status;
int options;
+ int errnum;
};
static VALUE
diff --git a/iom_epoll.h b/iom_epoll.h
index 76347e3f77..6a0886ca7e 100644
--- a/iom_epoll.h
+++ b/iom_epoll.h
@@ -425,3 +425,5 @@ rb_iom_reserved_fd(int fd)
return iom && fd == iom->epoll_fd;
}
+
+#include "iom_waitpid.h"
diff --git a/iom_select.h b/iom_select.h
index 06d3f8e13a..b3f3be2439 100644
--- a/iom_select.h
+++ b/iom_select.h
@@ -361,32 +361,6 @@ rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
return (int)fdw.revents; /* may be zero if timed out */
}
-rb_pid_t
-rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
- double *timeout)
-{
- rb_iom_t *iom = rb_iom_get(th);
- struct rb_iom_pid_waiter pw;
-
- VM_ASSERT((options & WNOHANG) == 0 &&
- "WNOHANG should be handled in rb_waitpid");
-
- pw.pid = pid;
- pw.options = options;
- rb_iom_timer_add(&iom->timers, &pw.w.timer, timeout, IOM_FIB|IOM_WAIT);
-
- /* LIFO, to match Linux wait4() blocking behavior */
- list_add(&iom->pids, &pw.w.wnode);
-
- /* do not touch select_gen, this has nothing to do with select() */
-
- /* TODO
- * rb_ensure(iom_schedule_pid, (VALUE)&pw, pid_waiter_cleanup, (VALUE)&pw);
- */
-
- return -1;
-}
-
void
rb_iom_destroy(rb_vm_t *vm)
{
@@ -409,3 +383,5 @@ rb_iom_reserved_fd(int fd)
{
return 0;
}
+
+#include "iom_waitpid.h"
diff --git a/iom_waitpid.h b/iom_waitpid.h
new file mode 100644
index 0000000000..ef2df86f49
--- /dev/null
+++ b/iom_waitpid.h
@@ -0,0 +1,79 @@
+/* included by iom_(epoll|select|kqueue).h */
+#if defined(WNOHANG) && (defined(HAVE_WAITPID) || defined(HAVE_WAIT4))
+
+static VALUE
+pid_waiter_cleanup(VALUE ptr)
+{
+ struct rb_iom_pid_waiter *pw = (struct rb_iom_pid_waiter *)ptr;
+
+ list_del(&pw->w.wnode);
+ list_del(&pw->w.timer.tnode);
+ return Qfalse;
+}
+
+static VALUE
+iom_schedule_pid(VALUE ptr)
+{
+ struct rb_iom_pid_waiter *pw = (struct rb_iom_pid_waiter *)ptr;
+ rb_thread_t *th = pw->th;
+
+ rb_fiber_auto_do_yield_p(th);
+ return rb_fiber_yield(0, 0);
+}
+
+static rb_iom_t *rb_iom_get(rb_thread_t *);
+
+rb_pid_t
+rb_iom_waitpid(rb_thread_t *th, rb_pid_t pid, int *status, int options,
+ double *timeout)
+{
+ rb_iom_t *iom = rb_iom_get(th);
+ struct rb_iom_pid_waiter pw;
+
+ VM_ASSERT((options & WNOHANG) == 0 &&
+ "WNOHANG should be handled in rb_waitpid");
+ pw.th = th;
+ pw.pid = pid;
+ pw.options = options;
+ pw.errnum = 0;
+ rb_iom_timer_add(&iom->timers, &pw.w.timer, timeout, IOM_FIB|IOM_WAIT);
+
+ /* LIFO, to match Linux wait4() blocking behavior */
+ list_add(&iom->pids, &pw.w.wnode);
+ rb_ensure(iom_schedule_pid, (VALUE)&pw, pid_waiter_cleanup, (VALUE)&pw);
+
+ *status = pw.status;
+ rb_last_status_set(pw.status, pw.pid);
+ return pw.pid;
+}
+
+void
+rb_iom_sigchld(rb_vm_t *vm)
+{
+ rb_iom_t *iom = vm->iom;
+ if (iom) {
+ struct rb_iom_pid_waiter *pw = 0, *next = 0;
+
+ list_for_each_safe(&iom->pids, pw, next, w.wnode) {
+ pid_t r = rb_waitpid(pw->pid, &pw->status, pw->options | WNOHANG);
+ VALUE fibval;
+
+ if (r < 0) {
+ pw->errnum = errno;
+ if (pw->errnum == ECHILD) {
+ continue;
+ }
+ }
+ list_del_init(&pw->w.wnode);
+ list_del_init(&pw->w.timer.tnode);
+ fibval = rb_iom_timer_fibval(&pw->w.timer);
+ pw->w.timer._fibval = Qfalse;
+ if (fibval != Qfalse) {
+ pw->pid = r;
+ rb_fiber_auto_enqueue(fibval);
+ }
+ }
+ }
+}
+
+#endif /* defined(WNOHANG) && (defined(HAVE_WAITPID) || defined(HAVE_WAIT4)) */
diff --git a/process.c b/process.c
index c8964dff69..d8b156c625 100644
--- a/process.c
+++ b/process.c
@@ -16,6 +16,7 @@
#include "ruby/thread.h"
#include "ruby/util.h"
#include "vm_core.h"
+#include "iom.h"
#include <stdio.h>
#include <errno.h>
@@ -907,10 +908,15 @@ rb_waitpid(rb_pid_t pid, int *st, int flags)
result = do_waitpid(pid, st, flags);
}
else {
- while ((result = do_waitpid_nonblocking(pid, st, flags)) < 0 &&
- (errno == EINTR)) {
- rb_thread_t *th = GET_THREAD();
- RUBY_VM_CHECK_INTS(th);
+ rb_thread_t *th = GET_THREAD();
+ if (rb_fiber_auto_sched_p(th)) {
+ return rb_iom_waitpid(th, pid, st, flags, 0);
+ }
+ else {
+ while ((result = do_waitpid_nonblocking(pid, st, flags)) < 0 &&
+ (errno == EINTR)) {
+ RUBY_VM_CHECK_INTS(th);
+ }
}
}
if (result > 0) {
diff --git a/signal.c b/signal.c
index 8ee0963b8a..8f5dee375f 100644
--- a/signal.c
+++ b/signal.c
@@ -13,6 +13,7 @@
#include "internal.h"
#include "vm_core.h"
+#include "iom.h"
#include <signal.h>
#include <stdio.h>
#include <errno.h>
@@ -1028,6 +1029,17 @@ rb_trap_exit(void)
}
}
+static int
+sig_is_chld(int sig)
+{
+#if defined(SIGCLD)
+ return (sig == SIGCLD);
+#elif defined(SIGCHLD)
+ return (sig == SIGCHLD);
+#endif
+ return 0;
+}
+
void
rb_signal_exec(rb_thread_t *th, int sig)
{
@@ -1035,6 +1047,9 @@ rb_signal_exec(rb_thread_t *th, int sig)
VALUE cmd = vm->trap_list[sig].cmd;
int safe = vm->trap_list[sig].safe;
+ if (sig_is_chld(sig)) {
+ rb_iom_sigchld(vm);
+ }
if (cmd == 0) {
switch (sig) {
case SIGINT:
@@ -1094,6 +1109,11 @@ default_handler(int sig)
#ifdef SIGUSR2
case SIGUSR2:
#endif
+#ifdef SIGCLD
+ case SIGCLD:
+#elif defined(SIGCHLD)
+ case SIGCHLD:
+#endif
func = sighandler;
break;
#ifdef SIGBUS
@@ -1131,6 +1151,9 @@ trap_handler(VALUE *cmd, int sig)
VALUE command;
if (NIL_P(*cmd)) {
+ if (sig_is_chld(sig)) {
+ goto sig_dfl;
+ }
func = SIG_IGN;
}
else {
@@ -1151,6 +1174,9 @@ trap_handler(VALUE *cmd, int sig)
break;
case 14:
if (memcmp(cptr, "SYSTEM_DEFAULT", 14) == 0) {
+ if (sig_is_chld(sig)) {
+ goto sig_dfl;
+ }
func = SIG_DFL;
*cmd = 0;
}
@@ -1158,6 +1184,9 @@ trap_handler(VALUE *cmd, int sig)
case 7:
if (memcmp(cptr, "SIG_IGN", 7) == 0) {
sig_ign:
+ if (sig_is_chld(sig)) {
+ goto sig_dfl;
+ }
func = SIG_IGN;
*cmd = Qtrue;
}
@@ -1406,15 +1435,14 @@ static int
init_sigchld(int sig)
{
sighandler_t oldfunc;
+ sighandler_t func = sighandler;
oldfunc = ruby_signal(sig, SIG_DFL);
if (oldfunc == SIG_ERR) return -1;
- if (oldfunc != SIG_DFL && oldfunc != SIG_IGN) {
- ruby_signal(sig, oldfunc);
- }
- else {
- GET_VM()->trap_list[sig].cmd = 0;
- }
+
+ ruby_signal(sig, func);
+ GET_VM()->trap_list[sig].cmd = 0;
+
return 0;
}
# ifndef __native_client__
diff --git a/test/ruby/test_fiber_auto.rb b/test/ruby/test_fiber_auto.rb
index 0dd798644c..67a9fe87f3 100644
--- a/test/ruby/test_fiber_auto.rb
+++ b/test/ruby/test_fiber_auto.rb
@@ -111,6 +111,66 @@ def test_fd_creation_and_fork
end;
end
+ def test_waitpid
+ assert_separately(%w(-r io/nonblock), <<~'end;') # do
+ t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ pid = fork do
+ sleep 0.1
+ exit!(0)
+ end
+ f = Fiber.start { Process.waitpid2(pid) }
+ wpid, st = f.value
+ diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0
+ assert_operator diff, :>=, 0.1, 'child did not return immediately'
+ assert_equal pid, wpid, 'waited pid matches'
+ assert_predicate st, :success?, 'process exited successfully'
+
+ IO.pipe do |r, w|
+ r.nonblock = w.nonblock = true
+
+ # "blocking" Process.waitpid2 takes precedence over trap(:CHLD)
+ waited = []
+ chld_called = false
+ trap(:CHLD) { chld_called = true; waited.concat(Process.waitall) }
+ pid = fork do
+ r.read(1)
+ exit!(0)
+ end
+ assert_nil Process.waitpid2(pid, Process::WNOHANG), 'child is waiting'
+ assert_nil(Fiber.start do
+ Process.waitpid2(pid, Process::WNOHANG)
+ end.value, 'WNOHANG works normally in fiber')
+ f = Fiber.start { Process.waitpid2(pid) }
+ assert_equal 1, w.write('.'), 'woke child'
+ wpid, st = f.value
+ assert_equal pid, wpid, 'waited pid matches'
+ assert_predicate st, :success?, 'child exited successfully'
+ assert_empty waited, 'waitpid had predence'
+ assert chld_called, 'CHLD handler got called anyways'
+
+ chld_called = false
+ [ 'DEFAULT', 'SIG_DFL', 'IGNORE', 'SYSTEM_DEFAULT', nil
+ ].each do |handler|
+ trap(:CHLD, handler)
+ pid = fork do
+ r.read(1)
+ exit!(0)
+ end
+ t = "trap(:CHLD, #{handler.inspect})"
+ f = Fiber.start { Process.waitpid2(pid) }
+ assert_equal 1, w.write('.'), "woke child for #{t}"
+ wpid, st = f.value
+ assert_predicate st, :success?, "child exited successfully for #{t}"
+ assert_equal wpid, pid, "return value correct for #{t}"
+ assert_equal false, chld_called,
+ "old CHLD handler did not fire for #{t}"
+ end
+ end
+ end;
+ end if Process.respond_to?(:fork) && Process.respond_to?(:waitpid2) &&
+ Process.const_defined?(:WNOHANG)
+
+
# As usual, cannot resume/run fibers across threads, but the
# scheduler works across threads
def test_cross_thread_schedule
diff --git a/thread.c b/thread.c
index ca02bd0504..cd33b377f4 100644
--- a/thread.c
+++ b/thread.c
@@ -72,9 +72,6 @@
#include "internal.h"
#include "iom.h"
-/* cont.c */
-int rb_fiber_auto_sched_p(const rb_thread_t *th);
-
#ifndef USE_NATIVE_THREAD_PRIORITY
#define USE_NATIVE_THREAD_PRIORITY 0
#define RUBY_THREAD_PRIORITY_MAX 3
--
EW
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [PATCH 07/10] avoid redefinition
2017-05-24 9:07 [PATCH 01/10] string.c (rb_str_crypt): fix excessive stack use with crypt_r Eric Wong
` (4 preceding siblings ...)
2017-05-24 9:07 ` [PATCH 06/10] iom: implement waitpid Eric Wong
@ 2017-05-24 9:07 ` Eric Wong
2017-05-24 9:07 ` [PATCH 08/10] fix waitpid on freebsd Eric Wong
` (2 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Eric Wong @ 2017-05-24 9:07 UTC (permalink / raw)
To: spew
---
iom.h | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/iom.h b/iom.h
index 106f766c59..d412885baa 100644
--- a/iom.h
+++ b/iom.h
@@ -10,11 +10,11 @@
#ifndef RUBY_IOM_H
#define RUBY_IOM_H
#include "ruby.h"
+#include "ruby/io.h"
#include "ruby/intern.h"
#include "vm_core.h"
typedef struct rb_iom_struct rb_iom_t;
-typedef struct rb_io_t rb_io_t;
/* WARNING: unstable API, only for Ruby internal use */
--
EW
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [PATCH 08/10] fix waitpid on freebsd
2017-05-24 9:07 [PATCH 01/10] string.c (rb_str_crypt): fix excessive stack use with crypt_r Eric Wong
` (5 preceding siblings ...)
2017-05-24 9:07 ` [PATCH 07/10] avoid redefinition Eric Wong
@ 2017-05-24 9:07 ` Eric Wong
2017-05-24 9:07 ` [PATCH 09/10] wip Eric Wong
2017-05-24 9:07 ` [PATCH 10/10] kq works Eric Wong
8 siblings, 0 replies; 10+ messages in thread
From: Eric Wong @ 2017-05-24 9:07 UTC (permalink / raw)
To: spew
---
iom_waitpid.h | 2 ++
1 file changed, 2 insertions(+)
diff --git a/iom_waitpid.h b/iom_waitpid.h
index ef2df86f49..e086d0403f 100644
--- a/iom_waitpid.h
+++ b/iom_waitpid.h
@@ -1,4 +1,6 @@
/* included by iom_(epoll|select|kqueue).h */
+#include <sys/types.h>
+#include <sys/wait.h>
#if defined(WNOHANG) && (defined(HAVE_WAITPID) || defined(HAVE_WAIT4))
static VALUE
--
EW
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [PATCH 09/10] wip
2017-05-24 9:07 [PATCH 01/10] string.c (rb_str_crypt): fix excessive stack use with crypt_r Eric Wong
` (6 preceding siblings ...)
2017-05-24 9:07 ` [PATCH 08/10] fix waitpid on freebsd Eric Wong
@ 2017-05-24 9:07 ` Eric Wong
2017-05-24 9:07 ` [PATCH 10/10] kq works Eric Wong
8 siblings, 0 replies; 10+ messages in thread
From: Eric Wong @ 2017-05-24 9:07 UTC (permalink / raw)
To: spew
---
common.mk | 1 +
configure.in | 3 +
iom_kqueue.h | 447 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
thread.c | 5 +-
4 files changed, 455 insertions(+), 1 deletion(-)
create mode 100644 iom_kqueue.h
diff --git a/common.mk b/common.mk
index e79aae966c..ff1a1ec625 100644
--- a/common.mk
+++ b/common.mk
@@ -2601,6 +2601,7 @@ thread.$(OBJEXT): {$(VPATH)}io.h
thread.$(OBJEXT): {$(VPATH)}iom.h
thread.$(OBJEXT): {$(VPATH)}iom_common.h
thread.$(OBJEXT): {$(VPATH)}iom_epoll.h
+thread.$(OBJEXT): {$(VPATH)}iom_kqueue.h
thread.$(OBJEXT): {$(VPATH)}iom_select.h
thread.$(OBJEXT): {$(VPATH)}iom_waitpid.h
thread.$(OBJEXT): {$(VPATH)}method.h
diff --git a/configure.in b/configure.in
index 48b4817f63..b2272b2173 100644
--- a/configure.in
+++ b/configure.in
@@ -1390,6 +1390,7 @@ AC_CHECK_HEADERS(pwd.h)
AC_CHECK_HEADERS(setjmpex.h)
AC_CHECK_HEADERS(sys/attr.h)
AC_CHECK_HEADERS(sys/epoll.h)
+AC_CHECK_HEADERS(sys/event.h)
AC_CHECK_HEADERS(sys/fcntl.h)
AC_CHECK_HEADERS(sys/file.h)
AC_CHECK_HEADERS(sys/id.h)
@@ -2443,7 +2444,9 @@ AC_CHECK_FUNCS(initgroups)
AC_CHECK_FUNCS(ioctl)
AC_CHECK_FUNCS(isfinite)
AC_CHECK_FUNCS(issetugid)
+AC_CHECK_FUNCS(kevent)
AC_CHECK_FUNCS(killpg)
+AC_CHECK_FUNCS(kqueue)
AC_CHECK_FUNCS(lchmod)
AC_CHECK_FUNCS(lchown)
AC_CHECK_FUNCS(link)
diff --git a/iom_kqueue.h b/iom_kqueue.h
new file mode 100644
index 0000000000..f4be8dd872
--- /dev/null
+++ b/iom_kqueue.h
@@ -0,0 +1,447 @@
+/*
+ * kqueue-based implementation of I/O Manager for RubyVM on *BSD
+ */
+#include "iom_common.h"
+#include "internal.h"
+#include <sys/types.h>
+#include <sys/event.h>
+#include <sys/time.h>
+
+/* allocated on heap (rb_vm_t.iom) */
+struct rb_iom_struct {
+ /*
+ * Everything here is protected by GVL at this time,
+ * URCU lists (LGPL-2.1+) may be used in the future
+ */
+ struct list_head fds; /* -kev.fdw.w.wnode, order agnostic */
+ struct list_head timers; /* -rb_iom_timer.tnode, sort by expire_at */
+ struct list_head pids; /* -rb_iom_pid_waiter.w.wnode, LIFO order */
+
+ int kqueue_fd;
+ int nevents; /* auto-increases */
+
+ /*
+ * Any number of threads may use kevent concurrently on
+ * the same kqueue_fd, but we only allow one thread to use
+ * a non-zero timeout; since it is possible for a long sleeper
+ * to miss fibers queued by other threads for the sleeping thread.
+ */
+ rb_thread_t *waiter;
+};
+
+struct kev {
+ struct rb_iom_fd_waiter fdw;
+ rb_thread_t *th;
+};
+
+static const struct timespec zero;
+
+static void
+increase_nevents(rb_iom_t *iom, int retries)
+{
+ /* 1024 is the RUBY_ALLOCV_LIMIT on such systems */
+ const int max_alloca = 1024 / sizeof(struct kevent);
+ const int max = max_alloca * 2;
+
+ if (retries) {
+ iom->nevents *= retries;
+ if (iom->nevents > max || iom->nevents <= 0) {
+ iom->nevents = max;
+ }
+ }
+}
+
+static void
+rb_iom_init(rb_iom_t *iom)
+{
+ int err;
+
+ list_head_init(&iom->timers);
+ list_head_init(&iom->fds);
+ list_head_init(&iom->pids);
+ iom->waiter = 0;
+ iom->nevents = 8;
+ iom->kqueue_fd = kqueue();
+ if (iom->kqueue_fd < 0) {
+ if (rb_gc_for_fd(errno)) {
+ iom->kqueue_fd = kqueue();
+ }
+ if (iom->kqueue_fd < 0) {
+ rb_sys_fail("kqueue");
+ }
+ }
+ rb_fd_fix_cloexec(iom->kqueue_fd);
+}
+
+/* we lazily create this, small scripts may never need iom */
+static rb_iom_t *
+rb_iom_create(rb_thread_t *th)
+{
+ rb_iom_t *iom = ALLOC(rb_iom_t);
+
+ rb_iom_init(iom);
+ return iom;
+}
+
+static rb_iom_t *
+rb_iom_get(rb_thread_t *th)
+{
+ VM_ASSERT(th && th->vm);
+ if (!th->vm->iom) {
+ th->vm->iom = rb_iom_create(th);
+ }
+ return th->vm->iom;
+}
+
+static struct timespec *
+next_timeout(rb_iom_t *iom, struct timespec *ts)
+{
+ struct rb_iom_timer *t;
+ t = list_top(&iom->timers, struct rb_iom_timer, tnode);
+
+ if (t) {
+ double diff = t->expires_at - timeofday();
+
+ ts->tv_sec = (long)diff;
+ ts->tv_nsec = (long)((diff - (double)ts->tv_sec) * 1e9);
+ if (ts->tv_sec < 0) {
+ ts->tv_sec = 0;
+ }
+ if (ts->tv_nsec < 0) {
+ ts->tv_nsec = 0;
+ }
+ return ts;
+ }
+ return NULL; /* forever */
+}
+
+/*
+ * kevent does not have an EVFILT_* flag for (rarely-used) urgent data,
+ * at least not on FreeBSD 11.0, so we call select() separately after
+ * EVFILT_READ returns to set the RB_WAITFD_PRI bit:
+ */
+static void
+check_pri(rb_thread_t *th, int fd, struct kev *kev)
+{
+ rb_fdset_t efds;
+ int r;
+ struct timeval tv = { 0 };
+
+ rb_fd_init(&efds);
+ rb_fd_set(fd, &efds);
+ r = native_fd_select(fd + 1, 0, 0, &efds, &tv, th);
+ rb_fd_term(&efds);
+
+ if (r > 0) {
+ kev->fdw.revents |= RB_WAITFD_PRI;
+ }
+}
+
+static int
+check_kevent(rb_thread_t *th, int nr, struct kevent *eventlist)
+{
+ int err = 0;
+ if (nr >= 0) {
+ int i;
+ struct kevent *changelist = eventlist;
+ int nchanges = 0;
+
+ for (i = 0; i < nr; i++) {
+ struct kevent *ev = &eventlist[i];
+ struct kev *kev = ev->udata;
+ int fd = *kev->fdw.fdp;
+ VALUE fibval = rb_iom_timer_fibval(&kev->fdw.w.timer);
+
+ kev->fdw.w.timer._fibval = Qfalse;
+ if (ev->filter == EVFILT_READ) {
+ int rd = kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI);
+ if (rd == RB_WAITFD_IN) { /* common */
+ kev->fdw.revents |= rd;
+ }
+ else if (rd & RB_WAITFD_PRI) { /* ugh... */
+ if (fd >= 0) {
+ check_pri(th, fd, kev);
+ }
+ }
+ assert(rd && "got EVFILT_READ unexpectedly");
+
+ /* disable the other filter if we waited on both read+write */
+ if (fd >= 0 && (kev->fdw.events & RB_WAITFD_OUT) && kev->th) {
+ struct kevent *chg = &changelist[nchanges++];
+
+ kev->th = 0;
+ EV_SET(chg, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
+ }
+ }
+ else if (ev->filter == EVFILT_WRITE) {
+ assert(kev->fdw.events & RB_WAITFD_OUT &&
+ "unexpected EVFILT_WRITE");
+ kev->fdw.revents |= RB_WAITFD_OUT;
+
+ /* disable the other filter if we waited on both read+write */
+ if (fd >= 0 &&
+ kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI) &&
+ kev->th) {
+ struct kevent *chg = &changelist[nchanges++];
+
+ kev->th = 0;
+ EV_SET(chg, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
+ }
+ }
+ else {
+ rb_bug("unexpected filter: %d", ev->filter);
+ }
+
+ list_del_init(&kev->fdw.w.timer.tnode);
+ list_del_init(&kev->fdw.w.wnode);
+ if (fibval != Qfalse) {
+ rb_fiber_auto_enqueue(fibval);
+ }
+ }
+
+ if (nchanges && kevent(th->vm->iom->kqueue_fd, changelist, nchanges,
+ 0, 0, &zero) < 0) {
+ err = errno;
+ }
+
+ /* epoll_wait will not hit EINTR if msec is 0 */
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
+
+ /* notify the waiter thread in case we enqueued fibers for them */
+ if (nr > 0 && th->vm->iom->waiter) {
+ ubf_select(th->vm->iom->waiter);
+ rb_thread_schedule();
+ }
+ }
+ else {
+ err = errno;
+ }
+ if (err) {
+ if (err == EINTR) {
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
+ }
+ else {
+ rb_syserr_fail(err, "kevent");
+ }
+ }
+ rb_iom_timer_check(&th->vm->iom->timers);
+}
+
+/* perform a non-blocking epoll_wait while holding GVL */
+static void
+ping_events(rb_thread_t *th)
+{
+ rb_iom_t *iom = th->vm->iom;
+ VALUE v;
+ int nr;
+ struct kevent *eventlist = ALLOCV_N(struct kevent, v, iom->nevents);
+ int retries = 0;
+
+ do {
+ nr = kevent(iom->kqueue_fd, 0, 0, eventlist, iom->nevents, &zero);
+ check_kevent(th, nr, eventlist);
+ } while (nr == iom->nevents && ++retries);
+ if (v) {
+ ALLOCV_END(v);
+ }
+ increase_nevents(iom, retries);
+}
+
+static VALUE
+iom_schedule_any(VALUE ptr)
+{
+ rb_thread_t *th = (rb_thread_t *)ptr;
+ rb_iom_t *iom = th->vm->iom;
+
+ if (rb_fiber_auto_do_yield_p(th)) {
+ rb_fiber_yield(0, 0);
+ }
+ if (iom) {
+ int nevents = iom->nevents;
+ int nr = nevents;
+ struct timespec ts;
+ VALUE v;
+ const struct timespec *tsp;
+ struct kevent *eventlist = ALLOCV_N(struct kevent, v, nevents);
+
+ if (!iom->waiter) {
+ tsp = &zero;
+ }
+ else {
+ tsp = next_timeout(iom, &ts);
+ if (!tsp && list_empty(&iom->fds)) {
+ tsp = &zero;
+ }
+ iom->waiter = th;
+ BLOCKING_REGION({
+ nr = kevent(iom->kqueue_fd, 0, 0, eventlist, nevents, tsp);
+ }, ubf_select, th, FALSE);
+ iom->waiter = 0;
+ check_kevent(th, nr, eventlist);
+ }
+ if (v) {
+ ALLOCV_END(v);
+ }
+ if (nr == nevents) { /* msec == 0 */
+ ping_events(th);
+ }
+ }
+ if (rb_fiber_auto_do_yield_p(th)) {
+ rb_fiber_yield(0, 0);
+ }
+ return Qfalse;
+}
+
+static VALUE
+cancel_timer(VALUE ptr)
+{
+ struct rb_iom_timer *t = (struct rb_iom_timer *)ptr;
+ list_del(&t->tnode);
+ return Qfalse;
+}
+
+void
+rb_iom_schedule(rb_thread_t *th, double *timeout)
+{
+ if (rb_fiber_auto_sched_p(th)) {
+ rb_iom_t *iom = th->vm->iom;
+
+ if (iom) {
+ rb_iom_timer_check(&iom->timers);
+ ping_events(th);
+ }
+ if (rb_fiber_auto_do_yield_p(th)) {
+ rb_fiber_yield(0, 0);
+ }
+ }
+ else if (timeout) {
+ double t0 = timeofday();
+ struct rb_iom_timer t;
+ rb_iom_t *iom = rb_iom_get(th);
+
+ rb_iom_timer_add(&iom->timers, &t, timeout, 0);
+ rb_ensure(iom_schedule_any, (VALUE)th, cancel_timer, (VALUE)&t);
+ *timeout -= timeofday() - t0;
+ }
+ else {
+ rb_iom_t *iom = th->vm->iom;
+
+ if (iom) {
+ rb_iom_timer_check(&iom->timers);
+ }
+ iom_schedule_any((VALUE)th);
+ }
+}
+
+static VALUE
+iom_kevxchg(VALUE ptr)
+{
+ struct kev *kev = (struct kev *)ptr;
+ rb_thread_t *th = kev->th;
+ rb_iom_t *iom = rb_iom_get(th);
+ int nchanges = 0;
+ int nr;
+ VALUE v;
+ struct kevent *eventlist = ALLOCV_N(struct kevent, v, iom->nevents);
+ struct kevent *changelist = eventlist; /* kevent manpage allows it */
+
+ VM_ASSERT(iom->nevents > 2);
+
+ /* EVFILT_READ handles urgent data (POLLPRI) */
+ if (kev->fdw.events & (RB_WAITFD_IN|RB_WAITFD_PRI)) {
+ struct kevent *chg = &changelist[nchanges++];
+ EV_SET(chg, *kev->fdw.fdp, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, kev);
+ }
+
+ if (kev->fdw.events & RB_WAITFD_OUT) {
+ struct kevent *chg = &changelist[nchanges++];
+ EV_SET(chg, *kev->fdw.fdp, EVFILT_WRITE, EV_ADD|EV_ONESHOT, 0, 0, kev);
+ }
+
+ nr = kevent(iom->kqueue_fd, changelist, nchanges,
+ eventlist, iom->nevents, &zero);
+ check_kevent(th, nr, eventlist);
+ if (v) {
+ ALLOCV_END(v);
+ }
+ (void)rb_fiber_auto_do_yield_p(th);
+ return rb_fiber_yield(0, 0);
+}
+
+static VALUE
+cancel_waitfd(VALUE ptr)
+{
+ struct kev *kev = (struct kev*)ptr;
+
+ list_del(&kev->fdw.w.wnode);
+ list_del(&kev->fdw.w.timer.tnode);
+
+ return Qfalse;
+}
+
+static int
+iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
+{
+ rb_iom_t *iom = rb_iom_get(th);
+ struct kev kev;
+
+ kev.th = th;
+ kev.fdw.fdp = fdp;
+ kev.fdw.events = (short)events;
+ kev.fdw.revents = 0;
+ list_add(&iom->fds, &kev.fdw.w.wnode);
+ rb_iom_timer_add(&iom->timers, &kev.fdw.w.timer, timeout, IOM_FIB|IOM_WAIT);
+ rb_ensure(iom_kevxchg, (VALUE)&kev, cancel_waitfd, (VALUE)&kev);
+
+ return kev.fdw.revents; /* may be zero if timed out */
+}
+
+/*
+ * XXX we may use fptr->mode to mark FDs which kqueue cannot handle,
+ * different BSDs may have different bugs which prevent certain
+ * FD types from being handled by kqueue...
+ */
+int
+rb_iom_waitio(rb_thread_t *th, rb_io_t *fptr, int events, double *timeout)
+{
+ return iom_waitfd(th, &fptr->fd, events, timeout);
+}
+
+int
+rb_iom_waitfd(rb_thread_t *th, int *fdp, int events, double *timeout)
+{
+ return iom_waitfd(th, fdp, events, timeout);
+}
+
+void
+rb_iom_destroy(rb_vm_t *vm)
+{
+ rb_iom_t *iom = vm->iom;
+ vm->iom = 0;
+ if (iom) {
+ if (iom->kqueue_fd >= 0) {
+ close(iom->kqueue_fd);
+ }
+ xfree(iom);
+ }
+}
+
+/* used by thread.c::rb_thread_atfork */
+static void
+rb_iom_atfork_child(rb_thread_t *th)
+{
+ /* kqueue is close-on-fork, so we do not close ourselves */
+ xfree(th->vm->iom);
+ th->vm->iom = 0;
+}
+
+/* used by thread_pthread.c */
+static int
+rb_iom_reserved_fd(int fd)
+{
+ rb_iom_t *iom = GET_VM()->iom;
+
+ return iom && fd == iom->kqueue_fd;
+}
+
+#include "iom_waitpid.h"
diff --git a/thread.c b/thread.c
index cd33b377f4..77c6994db8 100644
--- a/thread.c
+++ b/thread.c
@@ -5122,7 +5122,10 @@ ruby_kill(rb_pid_t pid, int sig)
}
}
-#if defined(HAVE_SYS_EPOLL_H) && defined(HAVE_EPOLL_CREATE) && \
+#if defined(HAVE_SYS_EVENT_H) && defined(HAVE_KQUEUE) && \
+ defined(HAVE_KEVENT)
+# include "iom_kqueue.h"
+#elif defined(HAVE_SYS_EPOLL_H) && defined(HAVE_EPOLL_CREATE) && \
defined(HAVE_EPOLL_CTL) && defined(HAVE_EPOLL_WAIT)
# include "iom_epoll.h"
#else
--
EW
^ permalink raw reply related [flat|nested] 10+ messages in thread
* [PATCH 10/10] kq works
2017-05-24 9:07 [PATCH 01/10] string.c (rb_str_crypt): fix excessive stack use with crypt_r Eric Wong
` (7 preceding siblings ...)
2017-05-24 9:07 ` [PATCH 09/10] wip Eric Wong
@ 2017-05-24 9:07 ` Eric Wong
8 siblings, 0 replies; 10+ messages in thread
From: Eric Wong @ 2017-05-24 9:07 UTC (permalink / raw)
To: spew
---
iom_kqueue.h | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
diff --git a/iom_kqueue.h b/iom_kqueue.h
index f4be8dd872..df277f9d23 100644
--- a/iom_kqueue.h
+++ b/iom_kqueue.h
@@ -54,8 +54,6 @@ increase_nevents(rb_iom_t *iom, int retries)
static void
rb_iom_init(rb_iom_t *iom)
{
- int err;
-
list_head_init(&iom->timers);
list_head_init(&iom->fds);
list_head_init(&iom->pids);
@@ -137,7 +135,7 @@ check_pri(rb_thread_t *th, int fd, struct kev *kev)
}
}
-static int
+static void
check_kevent(rb_thread_t *th, int nr, struct kevent *eventlist)
{
int err = 0;
--
EW
^ permalink raw reply related [flat|nested] 10+ messages in thread
end of thread, other threads:[~2017-05-24 9:07 UTC | newest]
Thread overview: 10+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2017-05-24 9:07 [PATCH 01/10] string.c (rb_str_crypt): fix excessive stack use with crypt_r Eric Wong
2017-05-24 9:07 ` [PATCH 02/10] Add debug counters Eric Wong
2017-05-24 9:07 ` [PATCH 03/10] string.c: for small crypt_data Eric Wong
2017-05-24 9:07 ` [PATCH 04/10] Improve performance in where push the element into non shared Array object Eric Wong
2017-05-24 9:07 ` [PATCH 05/10] auto fiber scheduling and friends (VERY LIGHTLY TESTED) Eric Wong
2017-05-24 9:07 ` [PATCH 06/10] iom: implement waitpid Eric Wong
2017-05-24 9:07 ` [PATCH 07/10] avoid redefinition Eric Wong
2017-05-24 9:07 ` [PATCH 08/10] fix waitpid on freebsd Eric Wong
2017-05-24 9:07 ` [PATCH 09/10] wip Eric Wong
2017-05-24 9:07 ` [PATCH 10/10] kq works Eric Wong
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).