From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS6939 65.19.128.0/18 X-Spam-Status: No, score=-2.2 required=3.0 tests=AWL,BAYES_00, RCVD_IN_MSPIKE_BL,RCVD_IN_MSPIKE_ZBI,RCVD_IN_SORBS_WEB,RCVD_IN_XBL,RDNS_NONE, SPF_FAIL,SPF_HELO_FAIL,TO_EQ_FM_DOM_SPF_FAIL shortcircuit=no autolearn=no autolearn_force=no version=3.4.0 Received: from 80x24.org (unknown [65.19.167.132]) by dcvr.yhbt.net (Postfix) with ESMTP id C6D53202C1 for ; Thu, 16 Mar 2017 04:43:46 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [PATCH] WIP allow nestable TLS buffers within the same thread Date: Thu, 16 Mar 2017 04:43:41 +0000 Message-Id: <20170316044341.20877-1-e@80x24.org> List-Id: --- ext/sleepy_penguin/epoll.c | 7 ++-- ext/sleepy_penguin/init.c | 75 +++++++++++++++++++++++++++------ ext/sleepy_penguin/inotify.c | 82 ++++++++++++++++++++++--------------- ext/sleepy_penguin/kqueue.c | 24 +++++++---- ext/sleepy_penguin/sleepy_penguin.h | 1 + test/test_epoll.rb | 23 +++++++++++ 6 files changed, 155 insertions(+), 57 deletions(-) diff --git a/ext/sleepy_penguin/epoll.c b/ext/sleepy_penguin/epoll.c index e655bf9..f0619c9 100644 --- a/ext/sleepy_penguin/epoll.c +++ b/ext/sleepy_penguin/epoll.c @@ -200,14 +200,15 @@ static VALUE epwait(int argc, VALUE *argv, VALUE self) { VALUE timeout, maxevents; struct ep_per_thread *ept; + int t; rb_need_block(); rb_scan_args(argc, argv, "02", &maxevents, &timeout); - + t = NIL_P(timeout) ? -1 : NUM2INT(timeout); ept = ept_get(self, NIL_P(maxevents) ? 64 : NUM2INT(maxevents)); - ept->timeout = NIL_P(timeout) ? -1 : NUM2INT(timeout); + ept->timeout = t; - return real_epwait(ept); + return rb_ensure(real_epwait, (VALUE)ept, rb_sp_puttlsbuf, (VALUE)ept); } /* :nodoc: */ diff --git a/ext/sleepy_penguin/init.c b/ext/sleepy_penguin/init.c index 01bb52f..c514120 100644 --- a/ext/sleepy_penguin/init.c +++ b/ext/sleepy_penguin/init.c @@ -11,8 +11,15 @@ #define L1_CACHE_LINE_MAX 128 /* largest I've seen (Pentium 4) */ size_t rb_sp_l1_cache_line_size; static pthread_key_t rb_sp_key; +enum rb_sp_tls_buf_type { + RB_SP_TLS_INUSE = -1, + RB_SP_TLS_READY = 0, + RB_SP_TLS_MALLOCED = 1 +}; + struct rb_sp_tlsbuf { - size_t capa; + uint32_t capa; + enum rb_sp_tls_buf_type buf_type; unsigned char ptr[FLEX_ARRAY]; }; @@ -89,12 +96,36 @@ static void sp_once(void) } } +static struct rb_sp_tlsbuf *alloc_tlsbuf(size_t size) +{ + size_t bytes = size + sizeof(struct rb_sp_tlsbuf); + struct rb_sp_tlsbuf *buf; + void *ptr; + int err = posix_memalign(&ptr, rb_sp_l1_cache_line_size, bytes); + + if (err) { + errno = err; + rb_memerror(); /* fatal */ + } + + buf = ptr; + buf->capa = size; + + return buf; +} + void *rb_sp_gettlsbuf(size_t *size) { struct rb_sp_tlsbuf *buf = pthread_getspecific(rb_sp_key); - void *ptr; int err; - size_t bytes; + + assert(buf ? buf->buf_type != RB_SP_TLS_MALLOCED : 1); + + if (buf && buf->buf_type != RB_SP_TLS_READY) { + buf = alloc_tlsbuf(*size); + buf->buf_type = RB_SP_TLS_MALLOCED; + return buf->ptr; + } if (buf && buf->capa >= *size) { *size = buf->capa; @@ -102,24 +133,44 @@ void *rb_sp_gettlsbuf(size_t *size) } free(buf); - bytes = *size + sizeof(struct rb_sp_tlsbuf); - err = posix_memalign(&ptr, rb_sp_l1_cache_line_size, bytes); - if (err) { - errno = err; - rb_memerror(); /* fatal */ - } - - buf = ptr; - buf->capa = *size; + buf = alloc_tlsbuf(*size); err = pthread_setspecific(rb_sp_key, buf); if (err != 0) { errno = err; rb_sys_fail("BUG: pthread_setspecific"); } out: + buf->buf_type = RB_SP_TLS_INUSE; return buf->ptr; } +#define container_of(ptr, type, member) \ + (type *)((uintptr_t)(ptr) - offsetof(type, member)) + +VALUE rb_sp_puttlsbuf(VALUE p) +{ + struct rb_sp_tlsbuf *tls = pthread_getspecific(rb_sp_key); + void *ptr = (void *)p; + struct rb_sp_tlsbuf *buf; + + if (!ptr) + return Qfalse; + + buf = container_of(ptr, struct rb_sp_tlsbuf, ptr); + + switch (buf->buf_type) { + case RB_SP_TLS_INUSE: + assert(tls == buf && "rb_sp_puttlsbuf mismatch"); + buf->buf_type = RB_SP_TLS_READY; + break; + case RB_SP_TLS_READY: + assert(0 && "rb_sp_gettlsbuf not called"); + case RB_SP_TLS_MALLOCED: + free(buf); + } + return Qfalse; +} + void Init_sleepy_penguin_ext(void) { VALUE mSleepyPenguin; diff --git a/ext/sleepy_penguin/inotify.c b/ext/sleepy_penguin/inotify.c index b5cd67b..56fcff2 100644 --- a/ext/sleepy_penguin/inotify.c +++ b/ext/sleepy_penguin/inotify.c @@ -134,8 +134,11 @@ static VALUE event_new(struct inotify_event *e) } struct inread_args { + VALUE self; int fd; + int nonblock_p; size_t size; + VALUE tmp; void *buf; }; @@ -158,6 +161,7 @@ static void resize_internal_buffer(struct inread_args *args) if (newlen > 0) { args->size = (size_t)newlen; + rb_sp_puttlsbuf((VALUE)args->buf); args->buf = rb_sp_gettlsbuf(&args->size); } @@ -169,56 +173,35 @@ static void resize_internal_buffer(struct inread_args *args) newlen); } -/* - * call-seq: - * ino.take([nonblock]) -> Inotify::Event or nil - * - * Returns the next Inotify::Event processed. May return +nil+ if +nonblock+ - * is +true+. - */ -static VALUE take(int argc, VALUE *argv, VALUE self) +static VALUE do_take(VALUE p) { - struct inread_args args; - VALUE tmp = rb_ivar_get(self, id_inotify_tmp); - struct inotify_event *e, *end; - ssize_t r; + struct inread_args *args = (struct inread_args *)p; VALUE rv = Qnil; - VALUE nonblock; - - if (RARRAY_LEN(tmp) > 0) - return rb_ary_shift(tmp); - - rb_scan_args(argc, argv, "01", &nonblock); - - args.fd = rb_sp_fileno(self); - args.size = 128; - args.buf = rb_sp_gettlsbuf(&args.size); + struct inotify_event *e, *end; - if (RTEST(nonblock)) - rb_sp_set_nonblock(args.fd); - else - blocking_io_prepare(args.fd); + args->buf = rb_sp_gettlsbuf(&args->size); do { - r = (ssize_t)rb_sp_fd_region(inread, &args, args.fd); + ssize_t r = (ssize_t)rb_sp_fd_region(inread, args, args->fd); if (r == 0 /* Linux < 2.6.21 */ || (r < 0 && errno == EINVAL) /* Linux >= 2.6.21 */ ) { - resize_internal_buffer(&args); + resize_internal_buffer(args); } else if (r < 0) { - if (errno == EAGAIN && RTEST(nonblock)) + if (errno == EAGAIN && args->nonblock_p) return Qnil; - if (!rb_sp_wait(rb_io_wait_readable, self, &args.fd)) + if (!rb_sp_wait(rb_io_wait_readable, args->self, + &args->fd)) rb_sys_fail("read(inotify)"); } else { /* buffer in userspace to minimize read() calls */ - end = (struct inotify_event *)((char *)args.buf + r); - for (e = args.buf; e < end; ) { + end = (struct inotify_event *)((char *)args->buf + r); + for (e = args->buf; e < end; ) { VALUE event = event_new(e); if (NIL_P(rv)) rv = event; else - rb_ary_push(tmp, event); + rb_ary_push(args->tmp, event); e = (struct inotify_event *) ((char *)e + event_len(e)); } @@ -230,6 +213,39 @@ static VALUE take(int argc, VALUE *argv, VALUE self) /* * call-seq: + * ino.take([nonblock]) -> Inotify::Event or nil + * + * Returns the next Inotify::Event processed. May return +nil+ if +nonblock+ + * is +true+. + */ +static VALUE take(int argc, VALUE *argv, VALUE self) +{ + struct inread_args args; + VALUE nonblock; + + args.tmp = rb_ivar_get(self, id_inotify_tmp); + if (RARRAY_LEN(args.tmp) > 0) + return rb_ary_shift(args.tmp); + + rb_scan_args(argc, argv, "01", &nonblock); + + args.self = self; + args.fd = rb_sp_fileno(self); + args.size = 128; + args.nonblock_p = RTEST(nonblock); + + if (args.nonblock_p) + rb_sp_set_nonblock(args.fd); + else + blocking_io_prepare(args.fd); + + args.buf = 0; + return rb_ensure(do_take, (VALUE)&args, + rb_sp_puttlsbuf, (VALUE)args.buf); +} + +/* + * call-seq: * inotify_event.events => [ :MOVED_TO, ... ] * * Returns an array of symbolic event names based on the contents of diff --git a/ext/sleepy_penguin/kqueue.c b/ext/sleepy_penguin/kqueue.c index 22e20f1..22a2c5d 100644 --- a/ext/sleepy_penguin/kqueue.c +++ b/ext/sleepy_penguin/kqueue.c @@ -43,6 +43,7 @@ static VALUE mEv, mEvFilt, mNote, mVQ; struct kq_per_thread { VALUE io; + VALUE changelist; int fd; int nchanges; int nevents; @@ -72,7 +73,7 @@ static int kq_fd_check(struct kq_per_thread *kpt) return 1; } -static struct kq_per_thread *kpt_get(VALUE self, int nchanges, int nevents) +static struct kq_per_thread *kpt_get(int nchanges, int nevents) { struct kq_per_thread *kpt; size_t size; @@ -89,8 +90,6 @@ static struct kq_per_thread *kpt_get(VALUE self, int nchanges, int nevents) kpt->capa = max; kpt->nchanges = nchanges; kpt->nevents = nevents; - kpt->io = self; - kpt->fd = rb_sp_fileno(kpt->io); return kpt; } @@ -203,11 +202,16 @@ static VALUE nogvl_kevent(void *args) return (VALUE)nevents; } +static void changelist_prepare(struct kevent *, VALUE); + static VALUE do_kevent(struct kq_per_thread *kpt) { long nevents; struct timespec expire_at; + if (kpt->nchanges) + changelist_prepare(kpt->events, kpt->changelist); + if (kpt->ts) { clock_gettime(CLOCK_MONOTONIC, &expire_at); @@ -333,7 +337,7 @@ static void changelist_prepare(struct kevent *events, VALUE changelist) */ static VALUE sp_kevent(int argc, VALUE *argv, VALUE self) { - struct timespec ts; + struct timespec ts, *t; VALUE changelist, events, timeout; struct kq_per_thread *kpt; int nchanges, nevents; @@ -362,12 +366,14 @@ static VALUE sp_kevent(int argc, VALUE *argv, VALUE self) nevents = 0; } - kpt = kpt_get(self, nchanges, nevents); - kpt->ts = NIL_P(timeout) ? NULL : value2timespec(&ts, timeout); - if (nchanges) - changelist_prepare(kpt->events, changelist); + t = NIL_P(timeout) ? NULL : value2timespec(&ts, timeout); + kpt = kpt_get(nchanges, nevents); + kpt->ts = t; + kpt->changelist = changelist; + kpt->io = self; + kpt->fd = rb_sp_fileno(kpt->io); - return do_kevent(kpt); + return rb_ensure(do_kevent, (VALUE)kpt, rb_sp_puttlsbuf, (VALUE)kpt); } /* initialize constants in the SleepyPenguin::Ev namespace */ diff --git a/ext/sleepy_penguin/sleepy_penguin.h b/ext/sleepy_penguin/sleepy_penguin.h index 99ad0b7..bd44e18 100644 --- a/ext/sleepy_penguin/sleepy_penguin.h +++ b/ext/sleepy_penguin/sleepy_penguin.h @@ -78,6 +78,7 @@ static inline VALUE fake_blocking_region(VALUE (*fn)(void *), void *data) typedef int rb_sp_waitfn(int fd); int rb_sp_wait(rb_sp_waitfn waiter, VALUE obj, int *fd); void *rb_sp_gettlsbuf(size_t *size); +VALUE rb_sp_puttlsbuf(VALUE); /* Flexible array elements are standard in C99 */ #if defined(__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L) diff --git a/test/test_epoll.rb b/test/test_epoll.rb index d2b560c..2f803e3 100644 --- a/test/test_epoll.rb +++ b/test/test_epoll.rb @@ -534,4 +534,27 @@ class TestEpoll < Test::Unit::TestCase end @ep.wait(1) { |flags, io| assert_equal(first[0], io) } end + + def test_epoll_nest + ep2 = Epoll.new + r, w = IO.pipe + @ep.add @rd, :IN + @ep.add @wr, :OUT + ep2.add r, :IN + ep2.add w, :OUT + w.write '.' + @wr.write '.' + outer = [] + inner = [] + @ep.wait(2) do |_, io| + outer << io + ep2.wait(2) do |_, io2| + inner << io2 + end + end + assert_equal [ @rd, @wr ].sort_by(&:fileno), outer.sort_by(&:fileno) + assert_equal [ r, w ].sort_by(&:fileno), inner.sort_by(&:fileno) + ensure + [ r, w, ep2 ].compact.each(&:close) + end end if defined?(SleepyPenguin::Epoll) -- EW