From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.1 (2015-04-28) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS198093 171.25.193.0/24 X-Spam-Status: No, score=-1.4 required=3.0 tests=AWL,BAYES_00, RCVD_IN_MSPIKE_BL,RCVD_IN_MSPIKE_ZBI,RCVD_IN_XBL,SPF_FAIL,SPF_HELO_FAIL, TO_EQ_FM_DOM_SPF_FAIL shortcircuit=no autolearn=no autolearn_force=no version=3.4.1 Received: from 80x24.org (tor-exit0-readme.dfri.se [171.25.193.20]) by dcvr.yhbt.net (Postfix) with ESMTP id 9B4651F403 for ; Sat, 16 Jun 2018 12:05:47 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [PATCH] implement Timeout.timeout using ccan/timer via bundled gem Date: Sat, 16 Jun 2018 12:05:44 +0000 Message-Id: <20180616120544.28171-1-e@80x24.org> List-Id: gem install timeout_ext git clone https://80x24.org/timeout_ext.git --- common.mk | 10 +++ include/ruby/intern.h | 11 +++ internal.h | 6 ++ test/test_timeout.rb | 29 +++++++ thread.c | 165 ++++++++++++++++++++++++++++--------- thread_pthread.c | 30 ++++--- timeout.c | 187 ++++++++++++++++++++++++++++++++++++++++++ vm.c | 4 + vm_core.h | 3 + 9 files changed, 393 insertions(+), 52 deletions(-) create mode 100644 timeout.c diff --git a/common.mk b/common.mk index da9cab3c11..3c7c1bfaa7 100644 --- a/common.mk +++ b/common.mk @@ -130,6 +130,7 @@ COMMONOBJS = array.$(OBJEXT) \ symbol.$(OBJEXT) \ thread.$(OBJEXT) \ time.$(OBJEXT) \ + timeout.$(OBJEXT) \ transcode.$(OBJEXT) \ util.$(OBJEXT) \ variable.$(OBJEXT) \ @@ -2812,6 +2813,15 @@ time.$(OBJEXT): {$(VPATH)}st.h time.$(OBJEXT): {$(VPATH)}subst.h time.$(OBJEXT): {$(VPATH)}time.c time.$(OBJEXT): {$(VPATH)}timev.h +timeout.$(OBJEXT): $(hdrdir)/ruby/ruby.h +timeout.$(OBJEXT): $(top_srcdir)/include/ruby.h +timeout.$(OBJEXT): {$(VPATH)}config.h +timeout.$(OBJEXT): {$(VPATH)}id.h +timeout.$(OBJEXT): {$(VPATH)}intern.h +timeout.$(OBJEXT): {$(VPATH)}internal.h +timeout.$(OBJEXT): {$(VPATH)}missing.h +timeout.$(OBJEXT): {$(VPATH)}subst.h +timeout.$(OBJEXT): {$(VPATH)}vm_core.h transcode.$(OBJEXT): $(hdrdir)/ruby/ruby.h transcode.$(OBJEXT): $(top_srcdir)/include/ruby.h transcode.$(OBJEXT): {$(VPATH)}config.h diff --git a/include/ruby/intern.h b/include/ruby/intern.h index f071486458..19cf4cd352 100644 --- a/include/ruby/intern.h +++ b/include/ruby/intern.h @@ -958,6 +958,17 @@ VALUE rb_time_succ(VALUE); VALUE rb_make_backtrace(void); VALUE rb_make_exception(int, const VALUE*); +struct rb_timers_type0 { + void *(*create)(void); + void (*add)(ANYARGS/* struct timers *, struct timer *, struct timespec * */); + void *(*expire)(ANYARGS/* struct timers * */); + void *(*earliest)(ANYARGS /* struct timers * */); + void (*destroy)(ANYARGS/* struct timers * */); +}; + +extern struct rb_timers_type0 *rb_timers; +VALUE rb_s_timeout(int, VALUE *, VALUE); + RUBY_SYMBOL_EXPORT_END #if defined(__cplusplus) diff --git a/internal.h b/internal.h index 7944f87043..ca9925ee08 100644 --- a/internal.h +++ b/internal.h @@ -1837,6 +1837,12 @@ VALUE rb_struct_s_keyword_init(VALUE klass); /* time.c */ struct timeval rb_time_timeval(VALUE); +/* timeout.c */ +typedef struct rb_vm_struct rb_vm_t; +typedef struct rb_execution_context_struct rb_execution_context_t; +struct timespec *rb_timeout_sleep_interval(rb_vm_t *, struct timespec *); +int rb_timeout_expire(rb_execution_context_t *); + /* thread.c */ #define COVERAGE_INDEX_LINES 0 #define COVERAGE_INDEX_BRANCHES 1 diff --git a/test/test_timeout.rb b/test/test_timeout.rb index c57d90c063..bcb1c20add 100644 --- a/test/test_timeout.rb +++ b/test/test_timeout.rb @@ -1,6 +1,10 @@ # frozen_string_literal: false require 'test/unit' require 'timeout' +begin + require 'io/wait' +rescue LoadError +end class TestTimeout < Test::Unit::TestCase def test_queue @@ -107,4 +111,29 @@ def test_handle_interrupt } assert(ok, bug11344) end + + def test_io + t = 0.001 + IO.pipe do |r, w| + assert_raise(Timeout::Error) { Timeout.timeout(t) { r.read } } + if r.respond_to?(:wait) + assert_raise(Timeout::Error) { Timeout.timeout(t) { r.wait } } + assert_raise(Timeout::Error) { Timeout.timeout(t) { r.wait(9) } } + end + rset = [r, r.dup] + assert_raise(Timeout::Error) do + Timeout.timeout(t) { IO.select(rset, nil, nil, 9) } + end + assert_raise(Timeout::Error) { Timeout.timeout(t) { IO.select(rset) } } + rset.each(&:close) + end + end + + def test_thread_join + th = Thread.new { sleep } + assert_raise(Timeout::Error) { Timeout.timeout(0.001) { th.join } } + ensure + th.kill + th.join + end end diff --git a/thread.c b/thread.c index db7f28c8fb..0703078024 100644 --- a/thread.c +++ b/thread.c @@ -242,19 +242,39 @@ timeval_for(struct timeval *tv, const struct timespec *ts) return 0; } +static int timespec_cmp(const struct timespec *a, const struct timespec *b); + static void -timeout_prepare(struct timespec **tsp, - struct timespec *ts, struct timespec *end, - const struct timeval *timeout) +vm_timeout_prepare(rb_thread_t *th, struct timespec **tsp, struct timespec *ts) +{ + struct timespec vm_tout; + + if (rb_timeout_sleep_interval(th->vm, &vm_tout)) { + if (!*tsp || timespec_cmp(&vm_tout, *tsp) < 0) { + *ts = vm_tout; + *tsp = ts; + } + } +} + +static struct timespec * +timeout_prepare(rb_thread_t *th, struct timespec **tsp, + struct timespec *ts, struct timespec *end, + const struct timeval *caller_tout) { - if (timeout) { + struct timespec *ret = 0; + + if (caller_tout) { getclockofday(end); - timespec_add(end, timespec_for(ts, timeout)); - *tsp = ts; + *tsp = timespec_for(ts, caller_tout); + timespec_add(end, *tsp); + ret = end; } else { - *tsp = 0; + *tsp = 0; } + vm_timeout_prepare(th, tsp, ts); + return ret; } #if THREAD_DEBUG @@ -1159,22 +1179,46 @@ sleep_forever(rb_thread_t *th, unsigned int fl) { enum rb_thread_status prev_status = th->status; enum rb_thread_status status; + struct timespec to; + int spurious_check = fl & SLEEP_SPURIOUS_CHECK; - status = fl & SLEEP_DEADLOCKABLE ? THREAD_STOPPED_FOREVER : THREAD_STOPPED; - th->status = status; - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - while (th->status == status) { - if (fl & SLEEP_DEADLOCKABLE) { - th->vm->sleeper++; - rb_check_deadlock(th->vm); - } - native_sleep(th, 0); - if (fl & SLEEP_DEADLOCKABLE) { - th->vm->sleeper--; - } - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - if (!(fl & SLEEP_SPURIOUS_CHECK)) - break; + check_timeout: + if (rb_timeout_sleep_interval(th->vm, &to)) { + th->status = THREAD_STOPPED; + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + while (th->status == THREAD_STOPPED) { + struct timespec to_end; + getclockofday(&to_end); + timespec_add(&to_end, &to); + native_sleep(th, &to); + rb_timeout_expire(th->ec); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + if (!spurious_check && !timespec_update_expire(&to, &to_end)) + return; + if (!rb_timeout_sleep_interval(th->vm, &to)) + goto no_timeout; + } + } + else { + no_timeout: + status = fl & SLEEP_DEADLOCKABLE ? THREAD_STOPPED_FOREVER : THREAD_STOPPED; + th->status = status; + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + while (th->status == status) { + if (fl & SLEEP_DEADLOCKABLE) { + th->vm->sleeper++; + rb_check_deadlock(th->vm); + } + native_sleep(th, 0); + if (fl & SLEEP_DEADLOCKABLE) { + th->vm->sleeper--; + } + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + if (!spurious_check) + break; + if (th->status == status) + goto check_timeout; + } } th->status = prev_status; } @@ -1261,20 +1305,46 @@ timespec_update_expire(struct timespec *ts, const struct timespec *end) static void sleep_timespec(rb_thread_t *th, struct timespec ts, unsigned int fl) { - struct timespec end; + struct timespec end, to; enum rb_thread_status prev_status = th->status; + int spurious_check = fl & SLEEP_SPURIOUS_CHECK; getclockofday(&end); timespec_add(&end, &ts); th->status = THREAD_STOPPED; RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - while (th->status == THREAD_STOPPED) { - native_sleep(th, &ts); - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - if (timespec_update_expire(&ts, &end)) - break; - if (!(fl & SLEEP_SPURIOUS_CHECK)) - break; + if (rb_timeout_sleep_interval(th->vm, &to) && timespec_cmp(&to, &ts) < 0) { + check_timeout: + while (th->status == THREAD_STOPPED) { + struct timespec to_end; + getclockofday(&to_end); + native_sleep(th, &to); + rb_timeout_expire(th->ec); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + if (!spurious_check && !timespec_update_expire(&to, &to_end)) + return; + if (timespec_update_expire(&ts, &end)) + return; + if (!rb_timeout_sleep_interval(th->vm, &to) || + timespec_cmp(&to, &ts) > 0) { + goto no_timeout; + } + } + } + else { + no_timeout: + while (th->status == THREAD_STOPPED) { + native_sleep(th, &ts); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + if (timespec_update_expire(&ts, &end)) + break; + if (!spurious_check) + break; + if (rb_timeout_sleep_interval(th->vm, &to) && + timespec_cmp(&to, &ts) < 0) { + goto check_timeout; + } + } } th->status = prev_status; } @@ -2192,6 +2262,8 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) if (timer_interrupt) { uint32_t limits_us = TIME_QUANTUM_USEC; + rb_timeout_expire(th->ec); + if (th->priority > 0) limits_us <<= th->priority; else @@ -3805,8 +3877,9 @@ rb_fd_set(int fd, rb_fdset_t *set) #endif static int -wait_retryable(int *result, int errnum, struct timespec *timeout, - const struct timespec *end) +wait_retryable(rb_thread_t *th, int *result, int errnum, + struct timespec **tsp, struct timespec *ts, + const struct timespec *end) { if (*result < 0) { switch (errnum) { @@ -3815,19 +3888,21 @@ wait_retryable(int *result, int errnum, struct timespec *timeout, case ERESTART: #endif *result = 0; - if (timeout && timespec_update_expire(timeout, end)) { - timeout->tv_sec = 0; - timeout->tv_nsec = 0; + if (end && timespec_update_expire(ts, end)) { + ts->tv_sec = 0; + ts->tv_nsec = 0; } + vm_timeout_prepare(th, tsp, ts); return TRUE; } return FALSE; } else if (*result == 0) { /* check for spurious wakeup */ - if (timeout) { - return !timespec_update_expire(timeout, end); + if (end && timespec_update_expire(ts, end)) { + return FALSE; } + vm_timeout_prepare(th, tsp, ts); return TRUE; } return FALSE; @@ -3847,8 +3922,13 @@ do_select(int n, rb_fdset_t *const readfds, rb_fdset_t *const writefds, rb_fdset_t MAYBE_UNUSED(orig_except); struct timespec ts, end, *tsp; rb_thread_t *th = GET_THREAD(); + struct timespec *endp = timeout_prepare(th, &tsp, &ts, &end, timeout); + + /* VM timeout exists, but user never passed us a timeval */ + if (!timeout && tsp) { + timeout = alloca(sizeof(struct timeval)); + } - timeout_prepare(&tsp, &ts, &end, timeout); #define do_select_update() \ (restore_fdset(readfds, &orig_read), \ restore_fdset(writefds, &orig_write), \ @@ -3872,7 +3952,8 @@ do_select(int n, rb_fdset_t *const readfds, rb_fdset_t *const writefds, }, ubf_select, th, FALSE); RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - } while (wait_retryable(&result, lerrno, tsp, &end) && do_select_update()); + } while (wait_retryable(th, &result, lerrno, &tsp, &ts, endp) && + do_select_update()); #define fd_term(f) if (f##fds) rb_fd_term(&orig_##f) fd_term(read); @@ -3994,8 +4075,8 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) int result = 0, lerrno; struct timespec ts, end, *tsp; rb_thread_t *th = GET_THREAD(); + struct timespec *endp = timeout_prepare(th, &tsp, &ts, &end, timeout); - timeout_prepare(&tsp, &ts, &end, timeout); fds.fd = fd; fds.events = (short)events; @@ -4008,7 +4089,7 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) }, ubf_select, th, FALSE); RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - } while (wait_retryable(&result, lerrno, tsp, &end)); + } while (wait_retryable(th, &result, lerrno, &tsp, &ts, endp)); if (result < 0) { errno = lerrno; return -1; @@ -5059,10 +5140,12 @@ rb_check_deadlock(rb_vm_t *vm) { int found = 0; rb_thread_t *th = 0; + struct timespec to; if (vm_living_thread_num(vm) > vm->sleeper) return; if (vm_living_thread_num(vm) < vm->sleeper) rb_bug("sleeper must not be more than vm_living_thread_num(vm)"); if (patrol_thread && patrol_thread != GET_THREAD()) return; + if (rb_timeout_sleep_interval(vm, &to)) return; list_for_each(&vm->living_threads, th, vmlt_node) { if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) { diff --git a/thread_pthread.c b/thread_pthread.c index 069c50ed7a..7a6fe729ea 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -1366,12 +1366,13 @@ setup_communication_pipe(void) * The timer thread sleeps until woken up by rb_thread_wakeup_timer_thread() if only one Ruby thread is running. * @pre the calling context is in the timer thread. */ -static inline void -timer_thread_sleep(rb_global_vm_lock_t* gvl) +static inline int +timer_thread_sleep(rb_vm_t *vm) { int result; int need_polling; struct pollfd pollfds[2]; + int timeout = -1; pollfds[0].fd = timer_thread_pipe.normal[0]; pollfds[0].events = POLLIN; @@ -1380,13 +1381,14 @@ timer_thread_sleep(rb_global_vm_lock_t* gvl) need_polling = !ubf_threads_empty(); - if (gvl->waiting > 0 || need_polling) { + if (vm->gvl.waiting > 0 || need_polling) { /* polling (TIME_QUANTUM_USEC usec) */ result = poll(pollfds, 1, TIME_QUANTUM_USEC/1000); } else { - /* wait (infinite) */ - result = poll(pollfds, numberof(pollfds), -1); + /* wait (infinite, or whatever timeout.c sets) */ + timeout = (int)ATOMIC_EXCHANGE(vm->timer_thread_timeout, -1); + result = poll(pollfds, numberof(pollfds), timeout); } if (result == 0) { @@ -1408,6 +1410,7 @@ timer_thread_sleep(rb_global_vm_lock_t* gvl) /* ignore */; } } + return timeout; } #else /* USE_SLEEPY_TIMER_THREAD */ @@ -1418,8 +1421,8 @@ static void rb_thread_wakeup_timer_thread_low(void) {} static rb_nativethread_lock_t timer_thread_lock; static rb_nativethread_cond_t timer_thread_cond; -static inline void -timer_thread_sleep(rb_global_vm_lock_t* unused) +static inline int +timer_thread_sleep(rb_vm_t *unused) { struct timespec ts; ts.tv_sec = 0; @@ -1427,6 +1430,7 @@ timer_thread_sleep(rb_global_vm_lock_t* unused) ts = native_cond_timeout(&timer_thread_cond, ts); native_cond_timedwait(&timer_thread_cond, &timer_thread_lock, &ts); + return (int)ATOMIC_EXCHANGE(vm->timer_thread_timeout, -1); } #endif /* USE_SLEEPY_TIMER_THREAD */ @@ -1483,7 +1487,8 @@ native_set_another_thread_name(rb_nativethread_id_t thread_id, VALUE name) static void * thread_timer(void *p) { - rb_global_vm_lock_t *gvl = (rb_global_vm_lock_t *)p; + rb_vm_t *vm = p; + int timeout = -1; if (TT_DEBUG) WRITE_CONST(2, "start timer thread\n"); @@ -1500,12 +1505,15 @@ thread_timer(void *p) /* timer function */ ubf_wakeup_all_threads(); + if (timeout >= 0 && vm->main_thread) { + pthread_kill(vm->main_thread->thread_id, SIGVTALRM); + } timer_thread_function(0); if (TT_DEBUG) WRITE_CONST(2, "tick\n"); /* wait */ - timer_thread_sleep(gvl); + timeout = timer_thread_sleep(vm); } #if USE_SLEEPY_TIMER_THREAD CLOSE_INVALIDATE(normal[0]); @@ -1577,7 +1585,7 @@ rb_thread_create_timer_thread(void) if (timer_thread.created) { rb_bug("rb_thread_create_timer_thread: Timer thread was already created\n"); } - err = pthread_create(&timer_thread.id, &attr, thread_timer, &vm->gvl); + err = pthread_create(&timer_thread.id, &attr, thread_timer, vm); pthread_attr_destroy(&attr); if (err == EINVAL) { @@ -1588,7 +1596,7 @@ rb_thread_create_timer_thread(void) * default stack size is enough for them: */ stack_size = 0; - err = pthread_create(&timer_thread.id, NULL, thread_timer, &vm->gvl); + err = pthread_create(&timer_thread.id, NULL, thread_timer, vm); } if (err != 0) { rb_warn("pthread_create failed for timer: %s, scheduling broken", diff --git a/timeout.c b/timeout.c new file mode 100644 index 0000000000..49f51ca517 --- /dev/null +++ b/timeout.c @@ -0,0 +1,187 @@ +#include "internal.h" +#include "vm_core.h" +#include "ccan/container_of/container_of.h" + +/* from ccan/timer/timer.h */ +struct timer { + struct list_node list; + uint64_t time; +}; + +void rb_native_mutex_lock(rb_nativethread_lock_t *); +void rb_native_mutex_unlock(rb_nativethread_lock_t *); +struct rb_timers_type0 *rb_timers; + +struct timeout_args { + rb_execution_context_t *ec; + VALUE sec; + VALUE klass; + VALUE message; + struct timer t; + struct timespec ts; +}; +static VALUE cTimeoutError; + +static int +timespec2msec(const struct timespec *ts) +{ + const long max_msec = INT_MAX; + long msec; + + if (!ts) return -1; /* infinite */ + if (ts->tv_sec < 0) return 0; /* bug of caller */ + + msec = ts->tv_sec * 1000; + + if (msec < ts->tv_sec || msec > max_msec) return max_msec; + + /* + * round up to avoid busy waiting, the rest of the code uses + * nanosecond resolution and we risk wasting cycles looping + * on poll(..., timeout=0) calls w/o rounding up, here: + */ + msec += (ts->tv_nsec + 500000) / 1000000L; + + return msec > max_msec ? max_msec : (int)msec; +} + +static void +timer_thread_timeout_update(rb_vm_t *vm, struct timespec *ts) +{ + if (rb_timers->earliest(vm->timers, ts)) { + int timeout = timespec2msec(ts); + ATOMIC_SET(vm->timer_thread_timeout, (rb_atomic_t)timeout); + rb_thread_wakeup_timer_thread(); + } +} + +static VALUE +timeout_run(VALUE x) +{ + struct timeout_args *a = (struct timeout_args *)x; + + if (RTEST(a->klass)) return rb_yield(a->sec); + + /* for Timeout::Error#exception to throw */ + a->message = rb_exc_new_str(cTimeoutError, a->message); + RBASIC_CLEAR_CLASS(a->message); + a->sec = rb_catch_obj(a->message, rb_yield, a->message); + if (a->sec == a->message) { + rb_exc_raise(a->message); + } + assert(RBASIC_CLASS(a->message) == 0 && RB_TYPE_P(a->message, T_OBJECT)); + if (FL_TEST(a->message, FL_EXIVAR)) { + rb_free_generic_ivar(a->message); + FL_UNSET(a->message, FL_EXIVAR); + } + rb_gc_force_recycle(a->message); + return a->sec; +} + +static VALUE +timeout_ensure(VALUE x) +{ + struct timeout_args *a = (struct timeout_args *)x; + rb_vm_t *vm = rb_ec_vm_ptr(a->ec); + if (!rb_timers) { + rb_fatal("rb_timers not defined by timeout_ext"); + } + list_del_init(&a->t.list); /* inlined timer_del */ + timer_thread_timeout_update(vm, &a->ts); + + return Qfalse; +} + +static struct timeout_args * +rb_timers_expire_one(rb_vm_t *vm) +{ + struct timer *t = rb_timers->expire(vm->timers); + + return t ? container_of(t, struct timeout_args, t) : 0; +} + +int +rb_timeout_expire(rb_execution_context_t *ec) +{ + rb_vm_t *vm = rb_ec_vm_ptr(ec); + int n = 0; + struct timeout_args *a; + rb_thread_t *current_th; + + if (!vm->timers || !rb_timers) return n; + + current_th = rb_ec_thread_ptr(ec); + while ((a = rb_timers_expire_one(vm))) { + rb_thread_t *th = rb_ec_thread_ptr(a->ec); + VALUE ex; + + if (RTEST(a->klass)) { + ex = rb_exc_new_str(a->klass, a->message); + } + else { /* default, pre-made Timeout::Error */ + ex = a->message; + RBASIC_SET_CLASS_RAW(ex, cTimeoutError); /* reveal */ + /* for Timeout::Error#exception to call `throw' */ + rb_ivar_set(ex, rb_intern("@thread"), th->self); + } + if (current_th == th) { + rb_threadptr_pending_interrupt_enque(th, ex); + rb_threadptr_interrupt(th); + } + else { + rb_funcall(th->self, rb_intern("raise"), 1, ex); + } + n++; + } + if (!n) { + struct timespec ts; + timer_thread_timeout_update(vm, &ts); + } + + return n; +} + +struct timespec * +rb_timeout_sleep_interval(rb_vm_t *vm, struct timespec *ts) +{ + return (vm->timers && rb_timers) ? rb_timers->earliest(vm->timers, ts) : 0; +} + +/* timeout_ext defines this as Timeout.timeout */ +VALUE +rb_s_timeout(int argc, VALUE *argv, VALUE mTimeout) +{ + rb_vm_t *vm; + struct timeout_args a; + + if (!rb_timers) { + rb_fatal("rb_timers not defined by timeout_ext"); + } + if (!cTimeoutError) { +#undef rb_intern + cTimeoutError = rb_const_get(mTimeout, rb_intern("Error")); + } + + rb_scan_args(argc, argv, "12", &a.sec, &a.klass, &a.message); + if (NIL_P(a.sec) || rb_equal(a.sec, INT2FIX(0))) { + return rb_yield(a.sec); + } + if (!RTEST(a.message)) { + a.message = rb_fstring_cstr("execution expired"); + } + a.ec = GET_EC(); + vm = rb_ec_vm_ptr(a.ec); + { + struct timeval tv = rb_time_interval(a.sec); + a.ts.tv_sec = tv.tv_sec; + a.ts.tv_nsec = tv.tv_usec * 1000; + } + + if (!vm->timers) { + vm->timers = rb_timers->create(); + } + rb_timers->add(vm->timers, &a.t, &a.ts); + timer_thread_timeout_update(vm, &a.ts); + + return rb_ensure(timeout_run, (VALUE)&a, timeout_ensure, (VALUE)&a); +} diff --git a/vm.c b/vm.c index 7a40e79d6c..1d48b3c842 100644 --- a/vm.c +++ b/vm.c @@ -2211,6 +2211,9 @@ ruby_vm_destruct(rb_vm_t *vm) vm->frozen_strings = 0; } rb_vm_gvl_destroy(vm); + if (rb_timers && vm->timers) { + rb_timers->destroy(vm->timers); + } RB_ALTSTACK_FREE(vm->main_altstack); if (objspace) { rb_objspace_free(objspace); @@ -2325,6 +2328,7 @@ vm_init2(rb_vm_t *vm) rb_vm_living_threads_init(vm); vm->thread_report_on_exception = 1; vm->src_encoding_index = -1; + vm->timer_thread_timeout = (rb_atomic_t)-1; vm_default_params_setup(vm); } diff --git a/vm_core.h b/vm_core.h index 0a185a6ceb..444baa36f4 100644 --- a/vm_core.h +++ b/vm_core.h @@ -540,11 +540,14 @@ typedef struct rb_hook_list_struct { int need_clean; } rb_hook_list_t; +struct timers; typedef struct rb_vm_struct { VALUE self; rb_global_vm_lock_t gvl; rb_nativethread_lock_t thread_destruct_lock; + struct timers *timers; + rb_atomic_t timer_thread_timeout; struct rb_thread_struct *main_thread; struct rb_thread_struct *running_thread; -- EW