From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.1 (2015-04-28) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.1 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 1A6341F516 for ; Fri, 22 Jun 2018 21:57:46 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [WIP] timeout-intrusive Date: Fri, 22 Jun 2018 21:57:45 +0000 Message-Id: <20180622215745.20698-1-e@80x24.org> List-Id: --- internal.h | 6 +- test/test_timeout.rb | 1 + thread.c | 187 ++++++++++++++++++++++++++++++------------- thread_sync.c | 14 +++- timeout.c | 129 ++++++++++++++++++++++------- vm.c | 2 +- vm_core.h | 7 +- 7 files changed, 251 insertions(+), 95 deletions(-) diff --git a/internal.h b/internal.h index 3c46bcf3fe7..f1de0a8c3d0 100644 --- a/internal.h +++ b/internal.h @@ -1839,10 +1839,10 @@ VALUE rb_struct_s_keyword_init(VALUE klass); struct timeval rb_time_timeval(VALUE); /* timeout.c */ -typedef struct rb_vm_struct rb_vm_t; typedef struct rb_execution_context_struct rb_execution_context_t; -struct timespec *rb_timeout_sleep_interval(rb_vm_t *, struct timespec *); -void rb_timeout_expire(const rb_execution_context_t *); +struct timespec *rb_timeout_sleep_interval(const rb_execution_context_t *, struct timespec *); +void rb_timeout_expire(rb_execution_context_t *); +void rb_timeout_arm_timer(const rb_execution_context_t *); /* thread.c */ #define COVERAGE_INDEX_LINES 0 diff --git a/test/test_timeout.rb b/test/test_timeout.rb index e7378d98dac..7cbc8fd26e5 100644 --- a/test/test_timeout.rb +++ b/test/test_timeout.rb @@ -132,6 +132,7 @@ def test_io def test_thread_join th = Thread.new { sleep } assert_raise(Timeout::Error) { Timeout.timeout(0.001) { th.join } } + assert_raise(Timeout::Error) { Timeout.timeout(0.001) { th.value } } ensure th.kill th.join diff --git a/thread.c b/thread.c index 0067c44bf61..ec95c9d390e 100644 --- a/thread.c +++ b/thread.c @@ -242,19 +242,44 @@ timeval_for(struct timeval *tv, const struct timespec *ts) return 0; } -static void -timeout_prepare(struct timespec **tsp, - struct timespec *ts, struct timespec *end, - const struct timeval *timeout) +static int timespec_cmp(const struct timespec *a, const struct timespec *b); + +/* + * Adjust for timeouts set by Timeout.timeout. Returns TRUE if adjusted. + */ +static int +vm_timeout_prepare(rb_thread_t *th, struct timespec **tsp, struct timespec *ts) { - if (timeout) { + struct timespec vm_tout; + + if (rb_timeout_sleep_interval(th->ec, &vm_tout)) { + if (!*tsp || timespec_cmp(&vm_tout, *tsp) < 0) { + *ts = vm_tout; + *tsp = ts; + return TRUE; + } + } + return FALSE; +} + +static struct timespec * +timeout_prepare(rb_thread_t *th, struct timespec **tsp, + struct timespec *ts, struct timespec *end, + const struct timeval *caller_tout) +{ + struct timespec *ret = 0; + + if (caller_tout) { getclockofday(end); - timespec_add(end, timespec_for(ts, timeout)); - *tsp = ts; + *tsp = timespec_for(ts, caller_tout); + timespec_add(end, *tsp); + ret = end; } else { - *tsp = 0; + *tsp = 0; } + vm_timeout_prepare(th, tsp, ts); + return ret; } #if THREAD_DEBUG @@ -473,7 +498,7 @@ unblock_function_clear(rb_thread_t *th) rb_native_mutex_unlock(&th->interrupt_lock); } -static void +void rb_threadptr_interrupt_set(rb_thread_t *th, rb_atomic_t flag) { rb_native_mutex_lock(&th->interrupt_lock); @@ -563,6 +588,7 @@ rb_thread_terminate_all(void) */ sleeping = 1; native_sleep(th, &ts); + rb_timeout_expire(th->ec); RUBY_VM_CHECK_INTS_BLOCKING(ec); sleeping = 0; } @@ -940,15 +966,17 @@ thread_join_sleep(VALUE arg) th->vm->sleeper--; } else { - if (timespec_update_expire(p->limit, &end)) { - thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n", - thread_id_str(target_th)); - return Qfalse; - } th->status = THREAD_STOPPED; native_sleep(th, p->limit); } + rb_timeout_expire(th->ec); RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + if (p->limit && timespec_update_expire(p->limit, &end)) { + thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n", + thread_id_str(target_th)); + return Qfalse; + } + th->status = THREAD_RUNNABLE; thread_debug("thread_join: interrupted (thid: %"PRI_THREAD_ID", status: %s)\n", thread_id_str(target_th), thread_status_name(target_th, TRUE)); @@ -961,6 +989,7 @@ thread_join(rb_thread_t *target_th, struct timespec *ts) { rb_thread_t *th = GET_THREAD(); struct join_arg arg; + struct timespec to; if (th == target_th) { rb_raise(rb_eThreadError, "Target thread must not be current thread"); @@ -969,6 +998,7 @@ thread_join(rb_thread_t *target_th, struct timespec *ts) rb_raise(rb_eThreadError, "Target thread must not be main thread"); } + vm_timeout_prepare(th, &ts, &to); arg.target = target_th; arg.waiting = th; arg.limit = ts; @@ -1063,6 +1093,7 @@ thread_join_m(int argc, VALUE *argv, VALUE self) VALUE limit; struct timespec timespec; struct timespec *ts = 0; + rb_thread_t *th = rb_thread_ptr(self); rb_scan_args(argc, argv, "01", &limit); @@ -1083,7 +1114,7 @@ thread_join_m(int argc, VALUE *argv, VALUE self) ts = double2timespec(×pec, rb_num2dbl(limit)); } - return thread_join(rb_thread_ptr(self), ts); + return thread_join(th, ts); } /* @@ -1150,23 +1181,48 @@ static void sleep_forever(rb_thread_t *th, unsigned int fl) { enum rb_thread_status prev_status = th->status; - enum rb_thread_status status; + struct timespec ts; + struct timespec *tsp = 0; + int spurious_check = fl & SLEEP_SPURIOUS_CHECK; - status = fl & SLEEP_DEADLOCKABLE ? THREAD_STOPPED_FOREVER : THREAD_STOPPED; - th->status = status; - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - while (th->status == status) { - if (fl & SLEEP_DEADLOCKABLE) { - th->vm->sleeper++; - rb_check_deadlock(th->vm); - } - native_sleep(th, 0); - if (fl & SLEEP_DEADLOCKABLE) { - th->vm->sleeper--; - } - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - if (!(fl & SLEEP_SPURIOUS_CHECK)) - break; + if (vm_timeout_prepare(th, &tsp, &ts)) { + struct timespec to_end; + + getclockofday(&to_end); + timespec_add(&to_end, &ts); + + th->status = THREAD_STOPPED; + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + while (th->status == THREAD_STOPPED) { + native_sleep(th, tsp); + rb_timeout_expire(th->ec); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + if (!spurious_check) + break; + if (timespec_update_expire(&ts, &to_end)) + break; + } + } + else { + int deadlockable = fl & SLEEP_DEADLOCKABLE; + enum rb_thread_status status; + + status = deadlockable ? THREAD_STOPPED_FOREVER : THREAD_STOPPED; + th->status = status; + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + while (th->status == status) { + if (deadlockable) { + th->vm->sleeper++; + rb_check_deadlock(th->vm); + } + native_sleep(th, 0); + if (deadlockable) { + th->vm->sleeper--; + } + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + if (!spurious_check) + break; + } } th->status = prev_status; } @@ -1261,18 +1317,21 @@ sleep_timespec(rb_thread_t *th, struct timespec ts, unsigned int fl) { struct timespec end; enum rb_thread_status prev_status = th->status; + struct timespec *tsp = &ts; + vm_timeout_prepare(th, &tsp, &ts); getclockofday(&end); timespec_add(&end, &ts); th->status = THREAD_STOPPED; RUBY_VM_CHECK_INTS_BLOCKING(th->ec); while (th->status == THREAD_STOPPED) { - native_sleep(th, &ts); - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - if (!(fl & SLEEP_SPURIOUS_CHECK)) - break; - if (timespec_update_expire(&ts, &end)) - break; + native_sleep(th, tsp); + rb_timeout_expire(th->ec); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + if (!(fl & SLEEP_SPURIOUS_CHECK)) + break; + if (timespec_update_expire(&ts, &end)) + break; } th->status = prev_status; } @@ -1415,7 +1474,7 @@ call_without_gvl(void *(*func)(void *), void *data1, ubf = ubf_select; data2 = th; } - + rb_timeout_expire(ec); BLOCKING_REGION(th, { val = func(data1); saved_errno = errno; @@ -1540,6 +1599,7 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) wfd.fd = fd; wfd.th = rb_ec_thread_ptr(ec); + rb_timeout_expire(ec); list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &wfd.wfd_node); EC_PUSH_TAG(ec); @@ -1561,6 +1621,7 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) EC_JUMP_TAG(ec, state); } /* TODO: check func() */ + rb_timeout_expire(ec); RUBY_VM_CHECK_INTS_BLOCKING(ec); errno = saved_errno; @@ -2190,12 +2251,14 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) } if (timeout_interrupt) { - rb_timeout_expire(th->ec); - } + rb_timeout_expire(th->ec); + } if (timer_interrupt) { uint32_t limits_us = TIME_QUANTUM_USEC; + rb_timeout_expire(th->ec); + if (th->priority > 0) limits_us <<= th->priority; else @@ -3809,8 +3872,9 @@ rb_fd_set(int fd, rb_fdset_t *set) #endif static int -wait_retryable(int *result, int errnum, struct timespec *timeout, - const struct timespec *end) +wait_retryable(rb_thread_t *th, int *result, int errnum, + struct timespec **tsp, struct timespec *ts, + const struct timespec *end) { if (*result < 0) { switch (errnum) { @@ -3819,19 +3883,21 @@ wait_retryable(int *result, int errnum, struct timespec *timeout, case ERESTART: #endif *result = 0; - if (timeout && timespec_update_expire(timeout, end)) { - timeout->tv_sec = 0; - timeout->tv_nsec = 0; + if (end && timespec_update_expire(ts, end)) { + ts->tv_sec = 0; + ts->tv_nsec = 0; } + vm_timeout_prepare(th, tsp, ts); return TRUE; } return FALSE; } else if (*result == 0) { /* check for spurious wakeup */ - if (timeout) { - return !timespec_update_expire(timeout, end); + if (end && timespec_update_expire(ts, end)) { + return FALSE; } + vm_timeout_prepare(th, tsp, ts); return TRUE; } return FALSE; @@ -3851,8 +3917,13 @@ do_select(int n, rb_fdset_t *const readfds, rb_fdset_t *const writefds, rb_fdset_t MAYBE_UNUSED(orig_except); struct timespec ts, end, *tsp; rb_thread_t *th = GET_THREAD(); + struct timespec *endp = timeout_prepare(th, &tsp, &ts, &end, timeout); + + /* VM timeout exists, but user never passed us a timeval */ + if (!timeout && tsp) { + timeout = alloca(sizeof(struct timeval)); + } - timeout_prepare(&tsp, &ts, &end, timeout); #define do_select_update() \ (restore_fdset(readfds, &orig_read), \ restore_fdset(writefds, &orig_write), \ @@ -3875,8 +3946,10 @@ do_select(int n, rb_fdset_t *const readfds, rb_fdset_t *const writefds, if (result < 0) lerrno = errno; }, ubf_select, th, FALSE); + rb_timeout_expire(th->ec); RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - } while (wait_retryable(&result, lerrno, tsp, &end) && do_select_update()); + } while (wait_retryable(th, &result, lerrno, &tsp, &ts, endp) && + do_select_update()); #define fd_term(f) if (f##fds) rb_fd_term(&orig_##f) fd_term(read); @@ -3998,8 +4071,8 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) int result = 0, lerrno; struct timespec ts, end, *tsp; rb_thread_t *th = GET_THREAD(); + struct timespec *endp = timeout_prepare(th, &tsp, &ts, &end, timeout); - timeout_prepare(&tsp, &ts, &end, timeout); fds.fd = fd; fds.events = (short)events; @@ -4011,8 +4084,9 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) if (result < 0) lerrno = errno; }, ubf_select, th, FALSE); + rb_timeout_expire(th->ec); RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - } while (wait_retryable(&result, lerrno, tsp, &end)); + } while (wait_retryable(th, &result, lerrno, &tsp, &ts, endp)); if (result < 0) { errno = lerrno; return -1; @@ -4149,6 +4223,8 @@ rb_threadptr_check_signal(rb_thread_t *mth) } } +void rb_timeout_kick_blockers(rb_vm_t *vm); + static void timer_thread_function(void *arg) { @@ -4164,11 +4240,11 @@ timer_thread_function(void *arg) if (vm->running_thread) { RUBY_VM_SET_TIMER_INTERRUPT(vm->running_thread->ec); } + if (!list_empty(&vm->timeout_blockers)) { + rb_timeout_kick_blockers(vm); + } rb_native_mutex_unlock(&vm->thread_destruct_lock); - if (vm->timer_thread_timeout >= 0) { - rb_threadptr_interrupt_set(vm->main_thread, TIMEOUT_INTERRUPT_MASK); - } /* check signal */ rb_threadptr_check_signal(vm->main_thread); @@ -5071,7 +5147,6 @@ rb_check_deadlock(rb_vm_t *vm) if (vm_living_thread_num(vm) > vm->sleeper) return; if (vm_living_thread_num(vm) < vm->sleeper) rb_bug("sleeper must not be more than vm_living_thread_num(vm)"); if (patrol_thread && patrol_thread != GET_THREAD()) return; - if (rb_timeout_sleep_interval(vm, 0)) return; list_for_each(&vm->living_threads, th, vmlt_node) { if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) { @@ -5084,6 +5159,8 @@ rb_check_deadlock(rb_vm_t *vm) if (mutex->th == th || (!mutex->th && !list_empty(&mutex->waitq))) { found = 1; } + } else if (!list_empty(&th->timers)) { + found = 1; } if (found) break; diff --git a/thread_sync.c b/thread_sync.c index b2b7d050883..67782486d90 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -248,6 +248,7 @@ rb_mutex_lock(VALUE self) rb_raise(rb_eThreadError, "deadlock; recursive locking"); } + rb_timeout_expire(th->ec); w.th = th; while (mutex->th != th) { @@ -268,22 +269,27 @@ rb_mutex_lock(VALUE self) timeout = &ts; patrol_thread = th; } + else { + timeout = rb_timeout_sleep_interval(th->ec, &ts); + } list_add_tail(&mutex->waitq, &w.node); native_sleep(th, timeout); /* release GVL */ list_del(&w.node); - if (patrol_thread == th) - patrol_thread = NULL; th->locking_mutex = Qfalse; - if (timeout && !RUBY_VM_INTERRUPTED(th->ec)) { - rb_check_deadlock(th->vm); + if (patrol_thread) { + patrol_thread = NULL; + if (!RUBY_VM_INTERRUPTED(th->ec)) { + rb_check_deadlock(th->vm); + } } if (th->status == THREAD_STOPPED_FOREVER) { th->status = prev_status; } th->vm->sleeper--; + rb_timeout_expire(th->ec); RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may release mutex */ if (!mutex->th) { mutex->th = th; diff --git a/timeout.c b/timeout.c index 954353179a0..c4bd76716ff 100644 --- a/timeout.c +++ b/timeout.c @@ -17,10 +17,16 @@ struct timeout { static VALUE eTimeoutError, mTimeout, eUncaughtThrow; static ID id_thread; +void rb_native_mutex_lock(rb_nativethread_lock_t *); +void rb_native_mutex_unlock(rb_nativethread_lock_t *); + static uint64_t timespec2usec(const struct timespec *ts) { - return (uint64_t)ts->tv_sec * 1000000 + (uint64_t)ts->tv_nsec / 1000; + uint64_t usec = (uint64_t)ts->tv_sec * 1000000; + + /* round up to avoid busy waiting: */ + return usec + ((uint64_t)ts->tv_nsec + 999) / 1000; } static void @@ -98,29 +104,52 @@ timeout_run(VALUE x) return x; } +static void +timeout_blockers_cleanup(rb_thread_t *th) +{ + if (list_empty(&th->timers)) { + rb_native_mutex_lock(&th->vm->thread_destruct_lock); + list_del_init(&th->blocker); + rb_native_mutex_unlock(&th->vm->thread_destruct_lock); + } +} + static VALUE timeout_ensure(VALUE x) { struct timeout *a = (struct timeout *)x; list_del_init(&a->t.list); /* inlined timer_del */ + timeout_blockers_cleanup(rb_ec_thread_ptr(a->ec)); return Qfalse; } static struct timeout * -rb_timers_expire_one(rb_vm_t *vm, uint64_t now_usec) +rb_timers_expire_one(rb_thread_t *th, uint64_t now_usec) { - struct timer *t = timers_ll_expire(&vm->timers, now_usec); + struct timer *t = timers_ll_expire(&th->timers, now_usec); - return t ? container_of(t, struct timeout, t) : 0; + if (t) { + return container_of(t, struct timeout, t); + } + return 0; +} + +static int +usec2msec(uint64_t usec) +{ + uint64_t msec = (usec + 999) / 1000; /* round up to avoid busy waiting */ + + return msec > INT_MAX ? INT_MAX : (int)msec; } static void -arm_timer(rb_vm_t *vm, uint64_t rel_usec) +arm_timer(rb_thread_t *th, struct timer *t, uint64_t now_usec) { - int msec = rel_usec / 1000; + uint64_t rel_usec = t->time > now_usec ? t->time - now_usec : 0; + int msec = usec2msec(rel_usec); - ATOMIC_EXCHANGE(vm->next_timeout, (rb_atomic_t)msec); + ATOMIC_EXCHANGE(th->vm->next_timeout, (rb_atomic_t)msec); /* _low makes a difference in benchmark/bm_timeout_mt_nested.rb */ rb_thread_wakeup_timer_thread_low(); @@ -136,10 +165,9 @@ static VALUE do_expire(VALUE x) { struct expire_args *ea = (struct expire_args *)x; - rb_vm_t *vm = ea->current_th->vm; struct timeout *a; - while ((a = rb_timers_expire_one(vm, ea->now_usec))) { + while ((a = rb_timers_expire_one(ea->current_th, ea->now_usec))) { rb_thread_t *target_th = rb_ec_thread_ptr(a->ec); VALUE exc; @@ -151,14 +179,10 @@ do_expire(VALUE x) RBASIC_SET_CLASS_RAW(exc, eTimeoutError); /* reveal */ /* for Timeout::Error#exception to call `throw' */ rb_ivar_set(exc, id_thread, target_th->self); + } - if (ea->current_th == target_th) { - rb_threadptr_pending_interrupt_enque(target_th, exc); - rb_threadptr_interrupt(target_th); - } - else { - rb_funcall(target_th->self, rb_intern("raise"), 1, exc); - } + rb_threadptr_pending_interrupt_enque(target_th, exc); + rb_threadptr_interrupt(target_th); } return Qfalse; } @@ -167,17 +191,17 @@ static VALUE expire_ensure(VALUE p) { struct expire_args *ea = (struct expire_args *)p; - rb_vm_t *vm = ea->current_th->vm; - struct timer *t = timers_ll_earliest(&vm->timers); + rb_thread_t *th = ea->current_th; + struct timer *t = timers_ll_earliest(&th->timers); if (t) { - arm_timer(vm, t->time > ea->now_usec ? t->time - ea->now_usec : 0); + arm_timer(th, t, ea->now_usec); } - ea->current_th->status = ea->prev_status; + th->status = ea->prev_status; return Qfalse; } void -rb_timeout_expire(const rb_execution_context_t *ec) +rb_timeout_expire(rb_execution_context_t *ec) { struct expire_args ea; struct timespec ts; @@ -191,15 +215,14 @@ rb_timeout_expire(const rb_execution_context_t *ec) } struct timespec * -rb_timeout_sleep_interval(rb_vm_t *vm, struct timespec *ts) +rb_timeout_sleep_interval(const rb_execution_context_t *ec, struct timespec *ts) { - struct timer *t = timers_ll_earliest(&vm->timers); + const rb_thread_t *th = rb_ec_thread_ptr(ec); + struct timer *t = timers_ll_earliest(&th->timers); - if (t && !ts) { - return (struct timespec *)-1; - } if (t) { uint64_t now_usec; + rb_getclockofday(ts); now_usec = timespec2usec(ts); if (t->time >= now_usec) { @@ -220,8 +243,8 @@ rb_timeout_sleep_interval(rb_vm_t *vm, struct timespec *ts) static void timeout_add(struct timeout *a) { - rb_vm_t *vm = rb_ec_vm_ptr(a->ec); - struct timer *cur = timers_ll_earliest(&vm->timers); + rb_thread_t *th = rb_ec_thread_ptr(a->ec); + struct timer *cur = timers_ll_earliest(&th->timers); uint64_t now_usec, rel_usec; struct timeval tv = rb_time_interval(a->sec); struct timespec ts; @@ -231,9 +254,14 @@ timeout_add(struct timeout *a) rel_usec = timespec2usec(&ts); rb_getclockofday(&ts); now_usec = timespec2usec(&ts); - timers_ll_add(&vm->timers, &a->t, rel_usec, now_usec); - if (!cur || timers_ll_earliest(&vm->timers) == &a->t) { - arm_timer(vm, rel_usec); + timers_ll_add(&th->timers, &a->t, rel_usec, now_usec); + if (!cur) { + rb_native_mutex_lock(&th->vm->thread_destruct_lock); + list_add(&th->vm->timeout_blockers, &th->blocker); + rb_native_mutex_unlock(&th->vm->thread_destruct_lock); + } + if (!cur || timers_ll_earliest(&th->timers) == &a->t) { + arm_timer(th, &a->t, now_usec); } } @@ -316,3 +344,42 @@ Init_timeout(void) rb_provide("timeout.rb"); } + +/* called by timer-thread under thread_destruct_lock w/o GVL */ +void +rb_timeout_kick_blockers(rb_vm_t *vm) +{ + rb_thread_t *th; + uint64_t now_usec; + uint64_t min_usec = UINT64_MAX; + struct timespec ts; + + rb_getclockofday(&ts); + now_usec = timespec2usec(&ts); + list_for_each(&vm->timeout_blockers, th, blocker) { + + /* + * accessing th->timers is ONLY safe w/o GVL from timer-thread + * since we know th is in a BLOCKING_REGION with th->unblock.func + */ + rb_native_mutex_lock(&th->interrupt_lock); + if (th->unblock.func) { + struct timer *t = timers_ll_earliest(&th->timers); + + if (t) { + if (now_usec >= t->time) { + ATOMIC_OR(th->ec->interrupt_flag, TIMEOUT_INTERRUPT_MASK); + (th->unblock.func)(th->unblock.arg); + } + else if (min_usec > t->time) { + min_usec = t->time; + } + } + } + rb_native_mutex_unlock(&th->interrupt_lock); + } + if (min_usec != UINT64_MAX) { + int msec = usec2msec(min_usec - now_usec); + ATOMIC_CAS(vm->next_timeout, (rb_atomic_t)-1, (rb_atomic_t)msec); + } +} diff --git a/vm.c b/vm.c index a07ee14b1fe..919c15c9dc6 100644 --- a/vm.c +++ b/vm.c @@ -2323,7 +2323,6 @@ vm_init2(rb_vm_t *vm) { MEMZERO(vm, rb_vm_t, 1); rb_vm_living_threads_init(vm); - list_head_init(&vm->timers); vm->thread_report_on_exception = 1; vm->src_encoding_index = -1; vm->next_timeout = (rb_atomic_t)-1; @@ -2529,6 +2528,7 @@ static void th_init(rb_thread_t *th, VALUE self) { th->self = self; + list_head_init(&th->timers); rb_threadptr_root_fiber_setup(th); { diff --git a/vm_core.h b/vm_core.h index 829ac9c1572..eccd488cd0e 100644 --- a/vm_core.h +++ b/vm_core.h @@ -545,10 +545,12 @@ typedef struct rb_vm_struct { rb_global_vm_lock_t gvl; rb_nativethread_lock_t thread_destruct_lock; - struct list_head timers; /* TODO: consider moving to rb_thread_t */ rb_atomic_t next_timeout; int timer_thread_timeout; + /* protected by thread_destruct_lock */ + struct list_head timeout_blockers; + struct rb_thread_struct *main_thread; struct rb_thread_struct *running_thread; #ifdef USE_SIGALTSTACK @@ -837,6 +839,8 @@ void ec_set_vm_stack(rb_execution_context_t *ec, VALUE *stack, size_t size); typedef struct rb_thread_struct { struct list_node vmlt_node; + struct list_head timers; + struct list_node blocker; VALUE self; rb_vm_t *vm; @@ -1565,6 +1569,7 @@ static inline void rb_vm_living_threads_init(rb_vm_t *vm) { list_head_init(&vm->waiting_fds); + list_head_init(&vm->timeout_blockers); list_head_init(&vm->living_threads); vm->living_thread_num = 0; } -- EW