From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.1 (2015-04-28) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-3.5 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.1 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id CD2161F405 for ; Thu, 9 Aug 2018 03:46:18 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [PATCH 1/2] thread_pthread.c: eliminate timer thread by restructuring GVL Date: Thu, 9 Aug 2018 03:46:17 +0000 Message-Id: <20180809034618.20082-1-e@80x24.org> List-Id: This reverts commit 194a6a2c68e9c8a3536b24db18ceac87535a6051 (r64203). Race conditions which caused the original reversion will be fixed in the subsequent commit. --- internal.h | 3 + process.c | 140 +++++++- signal.c | 7 +- test/ruby/test_io.rb | 9 +- test/ruby/test_process.rb | 2 +- test/ruby/test_thread.rb | 5 +- thread.c | 397 +++++++++++++-------- thread_pthread.c | 710 +++++++++++++++++--------------------- thread_pthread.h | 20 +- thread_win32.c | 29 +- vm_core.h | 8 +- 11 files changed, 746 insertions(+), 584 deletions(-) diff --git a/internal.h b/internal.h index 81c556c0be..3b3575b42d 100644 --- a/internal.h +++ b/internal.h @@ -77,6 +77,9 @@ extern "C" { # define __has_extension __has_feature #endif +/* Prevent compiler from reordering access */ +#define ACCESS_ONCE(type,x) (*((volatile type *)&(x))) + #if defined(__STDC_VERSION__) && (__STDC_VERSION__ >= 201112L) # define STATIC_ASSERT(name, expr) _Static_assert(expr, #name ": " #expr) #elif GCC_VERSION_SINCE(4, 6, 0) || __has_extension(c_static_assert) diff --git a/process.c b/process.c index 0c534a8a36..43db055cbf 100644 --- a/process.c +++ b/process.c @@ -928,6 +928,7 @@ struct waitpid_state { int status; int options; int errnum; + int sigwait_fd; }; void rb_native_mutex_lock(rb_nativethread_lock_t *); @@ -936,13 +937,65 @@ void rb_native_cond_signal(rb_nativethread_cond_t *); void rb_native_cond_wait(rb_nativethread_cond_t *, rb_nativethread_lock_t *); rb_nativethread_cond_t *rb_sleep_cond_get(const rb_execution_context_t *); void rb_sleep_cond_put(rb_nativethread_cond_t *); +int rb_sigwait_fd_get(const rb_thread_t *); +void rb_sigwait_sleep(const rb_thread_t *, int fd, const struct timespec *); +void rb_sigwait_fd_put(const rb_thread_t *, int fd); + +static int +sigwait_fd_migrate_signaled_p(struct waitpid_state *w) +{ + int signaled = FALSE; + rb_thread_t *th = w->ec ? rb_ec_thread_ptr(w->ec) : 0; + + if (th) rb_native_mutex_lock(&th->interrupt_lock); + + if (w->cond) { + rb_native_cond_signal(w->cond); + signaled = TRUE; + } + + if (th) rb_native_mutex_unlock(&th->interrupt_lock); + + return signaled; +} + +/* + * When a thread is done using sigwait_fd and there are other threads + * sleeping on waitpid, we must kick one of the threads out of + * rb_native_cond_wait so it can switch to rb_sigwait_sleep + */ +static void +sigwait_fd_migrate_sleeper(rb_vm_t *vm) +{ + struct waitpid_state *w = 0; + + list_for_each(&vm->waiting_pids, w, wnode) { + if (sigwait_fd_migrate_signaled_p(w)) return; + } + list_for_each(&vm->waiting_grps, w, wnode) { + if (sigwait_fd_migrate_signaled_p(w)) return; + } +} + +void +rb_sigwait_fd_migrate(rb_vm_t *vm) +{ + rb_native_mutex_lock(&vm->waitpid_lock); + sigwait_fd_migrate_sleeper(vm); + rb_native_mutex_unlock(&vm->waitpid_lock); +} static void waitpid_notify(struct waitpid_state *w, rb_pid_t ret) { w->ret = ret; list_del_init(&w->wnode); - rb_native_cond_signal(w->cond); + if (w->cond) { + rb_native_cond_signal(w->cond); + } + else { + /* w is owned by this thread */ + } } #ifdef _WIN32 /* for spawnvp result from mjit.c */ @@ -954,7 +1007,7 @@ waitpid_notify(struct waitpid_state *w, rb_pid_t ret) #endif extern volatile unsigned int ruby_nocldwait; /* signal.c */ -/* called by timer thread */ +/* called by timer thread or thread which acquired sigwait_fd */ static void waitpid_each(struct list_head *head) { @@ -1008,6 +1061,17 @@ waitpid_state_init(struct waitpid_state *w, rb_pid_t pid, int options) w->options = options; } +static const struct timespec * +sigwait_sleep_time(void) +{ + if (SIGCHLD_LOSSY) { + static const struct timespec busy_wait = { 0, 100000000 }; + + return &busy_wait; + } + return 0; +} + /* * must be called with vm->waitpid_lock held, this is not interruptible */ @@ -1026,13 +1090,31 @@ ruby_waitpid_locked(rb_vm_t *vm, rb_pid_t pid, int *status, int options, if (w.ret == -1) w.errnum = errno; } else { - w.cond = cond; w.ec = 0; + w.sigwait_fd = -1; list_add(w.pid > 0 ? &vm->waiting_pids : &vm->waiting_grps, &w.wnode); do { - rb_native_cond_wait(w.cond, &vm->waitpid_lock); + if (w.sigwait_fd < 0) + w.sigwait_fd = rb_sigwait_fd_get(0); + + if (w.sigwait_fd >= 0) { + w.cond = 0; + rb_native_mutex_unlock(&vm->waitpid_lock); + rb_sigwait_sleep(0, w.sigwait_fd, sigwait_sleep_time()); + rb_native_mutex_lock(&vm->waitpid_lock); + } + else { + w.cond = cond; + rb_native_cond_wait(w.cond, &vm->waitpid_lock); + } } while (!w.ret); list_del(&w.wnode); + + /* we're done, maybe other waitpid callers are not: */ + if (w.sigwait_fd >= 0) { + rb_sigwait_fd_put(0, w.sigwait_fd); + sigwait_fd_migrate_sleeper(vm); + } } if (status) { *status = w.status; @@ -1047,7 +1129,10 @@ waitpid_wake(void *x) struct waitpid_state *w = x; /* th->interrupt_lock is already held by rb_threadptr_interrupt_common */ - rb_native_cond_signal(w->cond); + if (w->cond) + rb_native_cond_signal(w->cond); + else + rb_thread_wakeup_timer_thread(0); /* kick sigwait_fd */ } static void * @@ -1062,11 +1147,40 @@ waitpid_nogvl(void *x) * by the time we enter this. And we may also be interrupted. */ if (!w->ret && !RUBY_VM_INTERRUPTED_ANY(w->ec)) { - if (SIGCHLD_LOSSY) { - rb_thread_wakeup_timer_thread(); + if (w->sigwait_fd < 0) + w->sigwait_fd = rb_sigwait_fd_get(th); + + if (w->sigwait_fd >= 0) { + rb_nativethread_cond_t *cond = w->cond; + + w->cond = 0; + rb_native_mutex_unlock(&th->interrupt_lock); + rb_sigwait_sleep(th, w->sigwait_fd, sigwait_sleep_time()); + rb_native_mutex_lock(&th->interrupt_lock); + w->cond = cond; } - rb_native_cond_wait(w->cond, &th->interrupt_lock); + else { + if (!w->cond) + w->cond = rb_sleep_cond_get(w->ec); + + /* another thread calling rb_sigwait_sleep will process + * signals for us */ + if (SIGCHLD_LOSSY) { + rb_thread_wakeup_timer_thread(0); + } + rb_native_cond_wait(w->cond, &th->interrupt_lock); + } + } + + /* + * we must release th->native_thread_data.sleep_cond when + * re-acquiring GVL: + */ + if (w->cond) { + rb_sleep_cond_put(w->cond); + w->cond = 0; } + rb_native_mutex_unlock(&th->interrupt_lock); return 0; @@ -1096,8 +1210,15 @@ waitpid_cleanup(VALUE x) list_del(&w->wnode); rb_native_mutex_unlock(&vm->waitpid_lock); } - rb_sleep_cond_put(w->cond); + /* we may have never released and re-acquired GVL */ + if (w->cond) + rb_sleep_cond_put(w->cond); + + if (w->sigwait_fd >= 0) { + rb_sigwait_fd_put(rb_ec_thread_ptr(w->ec), w->sigwait_fd); + rb_sigwait_fd_migrate(rb_ec_vm_ptr(w->ec)); + } return Qfalse; } @@ -1124,6 +1245,7 @@ waitpid_wait(struct waitpid_state *w) } else { w->cond = rb_sleep_cond_get(w->ec); + w->sigwait_fd = -1; /* order matters, favor specified PIDs rather than -1 or 0 */ list_add(w->pid > 0 ? &vm->waiting_pids : &vm->waiting_grps, &w->wnode); } diff --git a/signal.c b/signal.c index 6393273adf..65b45cf6bd 100644 --- a/signal.c +++ b/signal.c @@ -709,9 +709,6 @@ signal_enque(int sig) static rb_atomic_t sigchld_hit; -/* Prevent compiler from reordering access */ -#define ACCESS_ONCE(type,x) (*((volatile type *)&(x))) - static RETSIGTYPE sighandler(int sig) { @@ -730,7 +727,7 @@ sighandler(int sig) else { signal_enque(sig); } - rb_thread_wakeup_timer_thread(); + rb_thread_wakeup_timer_thread(sig); #if !defined(BSD_SIGNAL) && !defined(POSIX_SIGNAL) ruby_signal(sig, sighandler); #endif @@ -764,7 +761,6 @@ rb_enable_interrupt(void) #ifdef HAVE_PTHREAD_SIGMASK sigset_t mask; sigemptyset(&mask); - sigaddset(&mask, RUBY_SIGCHLD); /* timer-thread handles this */ pthread_sigmask(SIG_SETMASK, &mask, NULL); #endif } @@ -1077,7 +1073,6 @@ rb_trap_exit(void) void ruby_waitpid_all(rb_vm_t *); /* process.c */ -/* only runs in the timer-thread */ void ruby_sigchld_handler(rb_vm_t *vm) { diff --git a/test/ruby/test_io.rb b/test/ruby/test_io.rb index a1462c53b2..d71efeb8a2 100644 --- a/test/ruby/test_io.rb +++ b/test/ruby/test_io.rb @@ -3556,7 +3556,8 @@ def test_open_flag_binary end if File::BINARY != 0 def test_race_gets_and_close - assert_separately([], "#{<<-"begin;"}\n#{<<-"end;"}") + opt = { signal: :ABRT, timeout: 200 } + assert_separately([], "#{<<-"begin;"}\n#{<<-"end;"}", opt) bug13076 = '[ruby-core:78845] [Bug #13076]' begin; 10.times do |i| @@ -3578,9 +3579,9 @@ def test_race_gets_and_close w.close r.close end - assert_nothing_raised(IOError, bug13076) { - t.each(&:join) - } + t.each do |th| + assert_same(th, th.join(2), bug13076) + end end end; end diff --git a/test/ruby/test_process.rb b/test/ruby/test_process.rb index 759230f834..a0b08dd110 100644 --- a/test/ruby/test_process.rb +++ b/test/ruby/test_process.rb @@ -1767,7 +1767,7 @@ def test_daemon_no_threads puts Dir.entries("/proc/self/task") - %W[. ..] end bug4920 = '[ruby-dev:43873]' - assert_equal(2, data.size, bug4920) + assert_include(1..2, data.size, bug4920) assert_not_include(data.map(&:to_i), pid) end else # darwin diff --git a/test/ruby/test_thread.rb b/test/ruby/test_thread.rb index 03fd1e3075..56a7bfb23b 100644 --- a/test/ruby/test_thread.rb +++ b/test/ruby/test_thread.rb @@ -952,15 +952,16 @@ def test_backtrace def test_thread_timer_and_interrupt bug5757 = '[ruby-dev:44985]' pid = nil - cmd = 'Signal.trap(:INT, "DEFAULT"); r,=IO.pipe; Thread.start {Thread.pass until Thread.main.stop?; puts; STDOUT.flush}; r.read' + cmd = 'Signal.trap(:INT, "DEFAULT"); pipe=IO.pipe; Thread.start {Thread.pass until Thread.main.stop?; puts; STDOUT.flush}; pipe[0].read' opt = {} opt[:new_pgroup] = true if /mswin|mingw/ =~ RUBY_PLATFORM s, t, _err = EnvUtil.invoke_ruby(['-e', cmd], "", true, true, opt) do |in_p, out_p, err_p, cpid| + assert IO.select([out_p], nil, nil, 10), 'subprocess not ready' out_p.gets pid = cpid t0 = Time.now.to_f Process.kill(:SIGINT, pid) - Process.wait(pid) + Timeout.timeout(10) { Process.wait(pid) } t1 = Time.now.to_f [$?, t1 - t0, err_p.read] end diff --git a/thread.c b/thread.c index a7d48a464f..9c7fcee05f 100644 --- a/thread.c +++ b/thread.c @@ -106,8 +106,13 @@ static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th); static const char *thread_status_name(rb_thread_t *th, int detail); static void timespec_add(struct timespec *, const struct timespec *); static void timespec_sub(struct timespec *, const struct timespec *); +static int timespec_cmp(const struct timespec *a, const struct timespec *b); static int timespec_update_expire(struct timespec *, const struct timespec *); static void getclockofday(struct timespec *); +NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd)); +static int consume_communication_pipe(int fd); +static int check_signals_nogvl(rb_thread_t *, int sigwait_fd); +void rb_sigwait_fd_migrate(rb_vm_t *); /* process.c */ #define eKillSignal INT2FIX(0) #define eTerminateSignal INT2FIX(1) @@ -348,7 +353,14 @@ rb_thread_s_debug_set(VALUE self, VALUE val) #endif NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_start)); -static void timer_thread_function(void *); +static void timer_thread_function(void); +void ruby_sigchld_handler(rb_vm_t *); /* signal.c */ + +static void +ubf_sigwait(void *ignore) +{ + rb_thread_wakeup_timer_thread(0); +} #if defined(_WIN32) #include "thread_win32.c" @@ -373,6 +385,15 @@ static void timer_thread_function(void *); #error "unsupported thread type" #endif +/* + * TODO: somebody with win32 knowledge should be able to get rid of + * timer-thread by busy-waiting on signals. And it should be possible + * to make the GVL in thread_pthread.c be platform-independent. + */ +#ifndef BUSY_WAIT_SIGNALS +# define BUSY_WAIT_SIGNALS (0) +#endif + #if THREAD_DEBUG static int debug_mutex_initialized = 1; static rb_nativethread_lock_t debug_mutex; @@ -412,7 +433,6 @@ rb_vm_gvl_destroy(rb_vm_t *vm) { gvl_release(vm); gvl_destroy(vm); - rb_native_mutex_destroy(&vm->thread_destruct_lock); if (0) { /* may be held by running threads */ rb_native_mutex_destroy(&vm->waitpid_lock); @@ -773,10 +793,6 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_s rb_fiber_close(th->ec->fiber_ptr); } - rb_native_mutex_lock(&th->vm->thread_destruct_lock); - /* make sure vm->running_thread never point me after this point.*/ - th->vm->running_thread = NULL; - rb_native_mutex_unlock(&th->vm->thread_destruct_lock); thread_cleanup_func(th, FALSE); gvl_release(th->vm); @@ -2163,6 +2179,14 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) /* signal handling */ if (trap_interrupt && (th == th->vm->main_thread)) { enum rb_thread_status prev_status = th->status; + int sigwait_fd = rb_sigwait_fd_get(th); + + if (sigwait_fd >= 0) { + (void)consume_communication_pipe(sigwait_fd); + ruby_sigchld_handler(th->vm); + rb_sigwait_fd_put(th, sigwait_fd); + rb_sigwait_fd_migrate(th->vm); + } th->status = THREAD_RUNNABLE; while ((sig = rb_get_next_signal()) != 0) { rb_signal_exec(th, sig); @@ -3840,86 +3864,95 @@ wait_retryable(int *result, int errnum, struct timespec *timeout, return FALSE; } -#define restore_fdset(fds1, fds2) \ - ((fds1) ? rb_fd_dup(fds1, fds2) : (void)0) - struct select_set { - rb_fdset_t read; - rb_fdset_t write; - rb_fdset_t except; + int max; + int sigwait_fd; + rb_thread_t *th; + rb_fdset_t *rset; + rb_fdset_t *wset; + rb_fdset_t *eset; + rb_fdset_t orig_rset; + rb_fdset_t orig_wset; + rb_fdset_t orig_eset; + struct timeval *timeout; }; -static size_t -select_set_memsize(const void *p) +static VALUE +select_set_free(VALUE p) { - return sizeof(struct select_set); + struct select_set *set = (struct select_set *)p; + + if (set->sigwait_fd >= 0) { + rb_sigwait_fd_put(set->th, set->sigwait_fd); + rb_sigwait_fd_migrate(set->th->vm); + } + + rb_fd_term(&set->orig_rset); + rb_fd_term(&set->orig_wset); + rb_fd_term(&set->orig_eset); + + return Qfalse; } -static void -select_set_free(void *p) +static const struct timespec * +sigwait_timeout(rb_thread_t *th, int sigwait_fd, const struct timespec *orig, + int *drained_p) { - struct select_set *orig = p; + static const struct timespec quantum = { 0, TIME_QUANTUM_USEC * 1000 }; - rb_fd_term(&orig->read); - rb_fd_term(&orig->write); - rb_fd_term(&orig->except); - xfree(orig); -} + if (sigwait_fd >= 0 && (!ubf_threads_empty() || BUSY_WAIT_SIGNALS)) { + *drained_p = check_signals_nogvl(th, sigwait_fd); + if (!orig || timespec_cmp(orig, &quantum) > 0) + return &quantum; + } -static const rb_data_type_t select_set_type = { - "select_set", - {NULL, select_set_free, select_set_memsize,}, - 0, 0, RUBY_TYPED_FREE_IMMEDIATELY -}; + return orig; +} -static int -do_select(int n, rb_fdset_t *const readfds, rb_fdset_t *const writefds, - rb_fdset_t *const exceptfds, struct timeval *timeout) +static VALUE +do_select(VALUE p) { + struct select_set *set = (struct select_set *)p; int MAYBE_UNUSED(result); int lerrno; struct timespec ts, end, *tsp; - rb_thread_t *th = GET_THREAD(); - VALUE o; - struct select_set *orig; - - o = TypedData_Make_Struct(0, struct select_set, &select_set_type, orig); + const struct timespec *to; + struct timeval tv; - timeout_prepare(&tsp, &ts, &end, timeout); + timeout_prepare(&tsp, &ts, &end, set->timeout); +#define restore_fdset(dst, src) \ + ((dst) ? rb_fd_dup(dst, src) : (void)0) #define do_select_update() \ - (restore_fdset(readfds, &orig->read), \ - restore_fdset(writefds, &orig->write), \ - restore_fdset(exceptfds, &orig->except), \ + (restore_fdset(set->rset, &set->orig_rset), \ + restore_fdset(set->wset, &set->orig_wset), \ + restore_fdset(set->eset, &set->orig_eset), \ TRUE) -#define fd_init_copy(f) \ - (f##fds) ? rb_fd_init_copy(&orig->f, f##fds) : rb_fd_no_init(&orig->f) - fd_init_copy(read); - fd_init_copy(write); - fd_init_copy(except); -#undef fd_init_copy - do { + int drained; lerrno = 0; - BLOCKING_REGION(th, { - result = native_fd_select(n, readfds, writefds, exceptfds, - timeval_for(timeout, tsp), th); + BLOCKING_REGION(set->th, { + to = sigwait_timeout(set->th, set->sigwait_fd, tsp, &drained); + result = native_fd_select(set->max, set->rset, set->wset, set->eset, + timeval_for(&tv, to), set->th); if (result < 0) lerrno = errno; - }, ubf_select, th, FALSE); + }, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, FALSE); - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may raise */ - } while (wait_retryable(&result, lerrno, tsp, &end) && do_select_update()); + if (set->sigwait_fd >= 0) { + if (result > 0 && rb_fd_isset(set->sigwait_fd, set->rset)) + result--; + (void)check_signals_nogvl(set->th, set->sigwait_fd); + } - /* didn't raise, perform cleanup ourselves */ - select_set_free(orig); - rb_gc_force_recycle(o); + RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */ + } while (wait_retryable(&result, lerrno, tsp, &end) && do_select_update()); if (result < 0) { errno = lerrno; } - return result; + return (VALUE)result; } static void @@ -3955,11 +3988,42 @@ rb_thread_fd_writable(int fd) return TRUE; } +static rb_fdset_t * +init_set_fd(int fd, rb_fdset_t *fds) +{ + if (fd < 0) { + return 0; + } + rb_fd_init(fds); + rb_fd_set(fd, fds); + + return fds; +} + int rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except, struct timeval *timeout) { - if (!read && !write && !except) { + struct select_set set; + + set.th = GET_THREAD(); + set.max = max; + set.sigwait_fd = rb_sigwait_fd_get(set.th); + set.rset = read; + set.wset = write; + set.eset = except; + set.timeout = timeout; + + if (set.sigwait_fd >= 0) { + if (set.rset) + rb_fd_set(set.sigwait_fd, set.rset); + else + set.rset = init_set_fd(set.sigwait_fd, &set.orig_rset); + if (set.sigwait_fd > set.max) { + set.max = set.sigwait_fd + 1; + } + } + if (!set.rset && !set.wset && !set.eset) { if (!timeout) { rb_thread_sleep_forever(); return 0; @@ -3968,16 +4032,23 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * return 0; } - if (read) { - rb_fd_resize(max - 1, read); - } - if (write) { - rb_fd_resize(max - 1, write); - } - if (except) { - rb_fd_resize(max - 1, except); - } - return do_select(max, read, write, except, timeout); +#define fd_init_copy(f) do { \ + if (set.f) { \ + rb_fd_resize(set.max - 1, set.f); \ + if (&set.orig_##f != set.f) { /* sigwait_fd */ \ + rb_fd_init_copy(&set.orig_##f, set.f); \ + } \ + } \ + else { \ + rb_fd_no_init(&set.orig_##f); \ + } \ + } while (0) + fd_init_copy(rset); + fd_init_copy(wset); + fd_init_copy(eset); +#undef fd_init_copy + + return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set); } #ifdef USE_POLL @@ -3991,68 +4062,64 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * # define POLLERR_SET (0) #endif -#ifndef HAVE_PPOLL -/* TODO: don't ignore sigmask */ -static int -ruby_ppoll(struct pollfd *fds, nfds_t nfds, - const struct timespec *ts, const sigset_t *sigmask) -{ - int timeout_ms; - - if (ts) { - int tmp, tmp2; - - if (ts->tv_sec > INT_MAX/1000) - timeout_ms = INT_MAX; - else { - tmp = (int)(ts->tv_sec * 1000); - /* round up 1ns to 1ms to avoid excessive wakeups for <1ms sleep */ - tmp2 = (int)((ts->tv_nsec + 999999L) / (1000L * 1000L)); - if (INT_MAX - tmp < tmp2) - timeout_ms = INT_MAX; - else - timeout_ms = (int)(tmp + tmp2); - } - } - else - timeout_ms = -1; - - return poll(fds, nfds, timeout_ms); -} -# define ppoll(fds,nfds,ts,sigmask) ruby_ppoll((fds),(nfds),(ts),(sigmask)) -#endif - /* * returns a mask of events */ int rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) { - struct pollfd fds; + struct pollfd fds[2]; int result = 0, lerrno; struct timespec ts, end, *tsp; + const struct timespec *to; + int drained; rb_thread_t *th = GET_THREAD(); + nfds_t nfds; + rb_unblock_function_t *ubf; timeout_prepare(&tsp, &ts, &end, timeout); - fds.fd = fd; - fds.events = (short)events; - + fds[0].fd = fd; + fds[0].events = (short)events; do { - fds.revents = 0; + fds[0].revents = 0; + fds[1].fd = rb_sigwait_fd_get(th); + + if (fds[1].fd >= 0) { + fds[1].events = POLLIN; + fds[1].revents = 0; + nfds = 2; + ubf = ubf_sigwait; + } + else { + nfds = 1; + ubf = ubf_select; + } + lerrno = 0; BLOCKING_REGION(th, { - result = ppoll(&fds, 1, tsp, NULL); + to = sigwait_timeout(th, fds[1].fd, tsp, &drained); + result = ppoll(fds, nfds, to, NULL); if (result < 0) lerrno = errno; - }, ubf_select, th, FALSE); + }, ubf, th, FALSE); + if (fds[1].fd >= 0) { + if (result > 0 && fds[1].revents) { + result--; + fds[1].revents = 0; + } + (void)check_signals_nogvl(th, fds[1].fd); + rb_sigwait_fd_put(th, fds[1].fd); + rb_sigwait_fd_migrate(th->vm); + } RUBY_VM_CHECK_INTS_BLOCKING(th->ec); } while (wait_retryable(&result, lerrno, tsp, &end)); + if (result < 0) { errno = lerrno; return -1; } - if (fds.revents & POLLNVAL) { + if (fds[0].revents & POLLNVAL) { errno = EBADF; return -1; } @@ -4062,32 +4129,20 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) * Therefore we need to fix it up. */ result = 0; - if (fds.revents & POLLIN_SET) + if (fds[0].revents & POLLIN_SET) result |= RB_WAITFD_IN; - if (fds.revents & POLLOUT_SET) + if (fds[0].revents & POLLOUT_SET) result |= RB_WAITFD_OUT; - if (fds.revents & POLLEX_SET) + if (fds[0].revents & POLLEX_SET) result |= RB_WAITFD_PRI; /* all requested events are ready if there is an error */ - if (fds.revents & POLLERR_SET) + if (fds[0].revents & POLLERR_SET) result |= events; return result; } #else /* ! USE_POLL - implement rb_io_poll_fd() using select() */ -static rb_fdset_t * -init_set_fd(int fd, rb_fdset_t *fds) -{ - if (fd < 0) { - return 0; - } - rb_fd_init(fds); - rb_fd_set(fd, fds); - - return fds; -} - struct select_args { union { int fd; @@ -4168,10 +4223,6 @@ rb_gc_set_stack_end(VALUE **stack_end_p) } #endif - -/* signal.c */ -void ruby_sigchld_handler(rb_vm_t *); - /* * */ @@ -4187,36 +4238,81 @@ rb_threadptr_check_signal(rb_thread_t *mth) } static void -timer_thread_function(void *arg) +timer_thread_function(void) { - rb_vm_t *vm = GET_VM(); /* TODO: fix me for Multi-VM */ + volatile rb_execution_context_t *ec; - /* - * Tricky: thread_destruct_lock doesn't close a race against - * vm->running_thread switch. however it guarantees th->running_thread - * point to valid pointer or NULL. - */ - rb_native_mutex_lock(&vm->thread_destruct_lock); /* for time slice */ - if (vm->running_thread) { - RUBY_VM_SET_TIMER_INTERRUPT(vm->running_thread->ec); - } - rb_native_mutex_unlock(&vm->thread_destruct_lock); - - /* check signal */ - ruby_sigchld_handler(vm); - rb_threadptr_check_signal(vm->main_thread); + ec = ACCESS_ONCE(rb_execution_context_t *, + ruby_current_execution_context_ptr); + if (ec) RUBY_VM_SET_TIMER_INTERRUPT(ec); +} -#if 0 - /* prove profiler */ - if (vm->prove_profile.enable) { - rb_thread_t *th = vm->running_thread; +static void +async_bug_fd(const char *mesg, int errno_arg, int fd) +{ + char buff[64]; + size_t n = strlcpy(buff, mesg, sizeof(buff)); + if (n < sizeof(buff)-3) { + ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd); + } + rb_async_bug_errno(buff, errno_arg); +} - if (vm->during_gc) { - /* GC prove profiling */ +/* VM-dependent API is not available for this function */ +static int +consume_communication_pipe(int fd) +{ +#define CCP_READ_BUFF_SIZE 1024 + /* buffer can be shared because no one refers to them. */ + static char buff[CCP_READ_BUFF_SIZE]; + ssize_t result; + int ret = FALSE; /* for rb_sigwait_sleep */ + + while (1) { + result = read(fd, buff, sizeof(buff)); + if (result > 0) { + ret = TRUE; + if (result < (ssize_t)sizeof(buff)) { + return ret; + } } - } + else if (result == 0) { + return ret; + } + else if (result < 0) { + int e = errno; + switch (e) { + case EINTR: + continue; /* retry */ + case EAGAIN: +#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: #endif + return ret; + default: + async_bug_fd("consume_communication_pipe: read", e, fd); + } + } + } +} + +static int +check_signals_nogvl(rb_thread_t *th, int sigwait_fd) +{ + rb_vm_t *vm = GET_VM(); /* th may be 0 */ + int ret = consume_communication_pipe(sigwait_fd); + ubf_wakeup_all_threads(); + ruby_sigchld_handler(vm); + if (rb_signal_buff_size()) { + if (th == vm->main_thread) + /* no need to lock + wakeup if already in main thread */ + RUBY_VM_SET_TRAP_INTERRUPT(th->ec); + else + threadptr_trap_interrupt(vm->main_thread); + ret = TRUE; /* for SIGCHLD_LOSSY && rb_sigwait_sleep */ + } + return ret; } void @@ -5046,7 +5142,6 @@ Init_Thread(void) /* acquire global vm lock */ gvl_init(th->vm); gvl_acquire(th->vm, th); - rb_native_mutex_initialize(&th->vm->thread_destruct_lock); rb_native_mutex_initialize(&th->vm->waitpid_lock); rb_native_mutex_initialize(&th->interrupt_lock); diff --git a/thread_pthread.c b/thread_pthread.c index 29805ef2df..545cc2fa3b 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -45,27 +45,21 @@ void rb_native_cond_broadcast(rb_nativethread_cond_t *cond); void rb_native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *mutex); void rb_native_cond_initialize(rb_nativethread_cond_t *cond); void rb_native_cond_destroy(rb_nativethread_cond_t *cond); -static void rb_thread_wakeup_timer_thread_low(void); static void clear_thread_cache_altstack(void); +static void ubf_wakeup_all_threads(void); +static int ubf_threads_empty(void); +static int native_cond_timedwait(rb_nativethread_cond_t *, pthread_mutex_t *, + const struct timespec *); +static const struct timespec *sigwait_timeout(rb_thread_t *, int sigwait_fd, + const struct timespec *, + int *drained_p); -#define TIMER_THREAD_MASK (1) -#define TIMER_THREAD_SLEEPY (2|TIMER_THREAD_MASK) -#define TIMER_THREAD_BUSY (4|TIMER_THREAD_MASK) +#define TIMER_THREAD_CREATED_P() (timer_thread_pipe.owner_process == getpid()) -#if defined(HAVE_POLL) && defined(HAVE_FCNTL) && defined(F_GETFL) && \ - defined(F_SETFL) && defined(O_NONBLOCK) && \ - defined(F_GETFD) && defined(F_SETFD) && defined(FD_CLOEXEC) -/* The timer thread sleeps while only one Ruby thread is running. */ -# define TIMER_IMPL TIMER_THREAD_SLEEPY -#else -# define TIMER_IMPL TIMER_THREAD_BUSY -#endif - -static struct { - pthread_t id; - int created; -} timer_thread; -#define TIMER_THREAD_CREATED_P() (timer_thread.created != 0) +/* for testing, and in case we come across a platform w/o pipes: */ +#define BUSY_WAIT_SIGNALS (0) +#define THREAD_INVALID ((const rb_thread_t *)-1) +static const rb_thread_t *sigwait_th; #ifdef HAVE_SCHED_YIELD #define native_thread_yield() (void)sched_yield() @@ -82,49 +76,96 @@ static pthread_condattr_t *condattr_monotonic = &condattr_mono; static const void *const condattr_monotonic = NULL; #endif +/* 100ms. 10ms is too small for user level thread scheduling + * on recent Linux (tested on 2.6.35) + */ +#define TIME_QUANTUM_USEC (100 * 1000) + +static struct timespec native_cond_timeout(rb_nativethread_cond_t *, + struct timespec rel); + static void -gvl_acquire_common(rb_vm_t *vm) +gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th) { if (vm->gvl.acquired) { - - if (!vm->gvl.waiting++) { - /* - * Wake up timer thread iff timer thread is slept. - * When timer thread is polling mode, we don't want to - * make confusing timer thread interval time. - */ - rb_thread_wakeup_timer_thread_low(); - } - - while (vm->gvl.acquired) { - rb_native_cond_wait(&vm->gvl.cond, &vm->gvl.lock); - } - - --vm->gvl.waiting; - - if (vm->gvl.need_yield) { - vm->gvl.need_yield = 0; + native_thread_data_t *nd = &th->native_thread_data; + + VM_ASSERT(th->unblock.func == 0 && "we reuse ubf_list for GVL waitq"); + + list_add_tail(&vm->gvl.waitq, &nd->ubf_list); + do { + if (!vm->gvl.timer) { + static struct timespec ts; + static int err = ETIMEDOUT; + + /* + * become designated timer thread to kick vm->gvl.acquired + * periodically. Continue on old timeout if it expired: + */ + if (err == ETIMEDOUT) { + ts.tv_sec = 0; + ts.tv_nsec = TIME_QUANTUM_USEC * 1000; + ts = native_cond_timeout(&nd->cond.gvlq, ts); + } + vm->gvl.timer = th; + err = native_cond_timedwait(&nd->cond.gvlq, &vm->gvl.lock, &ts); + vm->gvl.timer = 0; + ubf_wakeup_all_threads(); + + /* + * Timeslice. We can't touch thread_destruct_lock here, + * as the process may fork while this thread is contending + * for GVL: + */ + if (vm->gvl.acquired) timer_thread_function(); + } + else { + rb_native_cond_wait(&nd->cond.gvlq, &vm->gvl.lock); + } + } while (vm->gvl.acquired); + + list_del_init(&nd->ubf_list); + + if (vm->gvl.need_yield) { + vm->gvl.need_yield = 0; rb_native_cond_signal(&vm->gvl.switch_cond); - } + } } + vm->gvl.acquired = th; + /* + * Designate the next gvl.timer thread, favor the last thread in + * the waitq since it will be in waitq longest + */ + if (!vm->gvl.timer) { + native_thread_data_t *last; - vm->gvl.acquired = 1; + last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list); + if (last) { + rb_native_cond_signal(&last->cond.gvlq); + } + else if (!ubf_threads_empty()) { + rb_thread_wakeup_timer_thread(0); + } + } } static void gvl_acquire(rb_vm_t *vm, rb_thread_t *th) { rb_native_mutex_lock(&vm->gvl.lock); - gvl_acquire_common(vm); + gvl_acquire_common(vm, th); rb_native_mutex_unlock(&vm->gvl.lock); } -static void +static native_thread_data_t * gvl_release_common(rb_vm_t *vm) { + native_thread_data_t *next; vm->gvl.acquired = 0; - if (vm->gvl.waiting > 0) - rb_native_cond_signal(&vm->gvl.cond); + next = list_top(&vm->gvl.waitq, native_thread_data_t, ubf_list); + if (next) rb_native_cond_signal(&next->cond.gvlq); + + return next; } static void @@ -138,34 +179,38 @@ gvl_release(rb_vm_t *vm) static void gvl_yield(rb_vm_t *vm, rb_thread_t *th) { - rb_native_mutex_lock(&vm->gvl.lock); + native_thread_data_t *next; - gvl_release_common(vm); + rb_native_mutex_lock(&vm->gvl.lock); + next = gvl_release_common(vm); /* An another thread is processing GVL yield. */ if (UNLIKELY(vm->gvl.wait_yield)) { - while (vm->gvl.wait_yield) + while (vm->gvl.wait_yield) rb_native_cond_wait(&vm->gvl.switch_wait_cond, &vm->gvl.lock); - goto acquire; } - - if (vm->gvl.waiting > 0) { - /* Wait until another thread task take GVL. */ - vm->gvl.need_yield = 1; - vm->gvl.wait_yield = 1; - while (vm->gvl.need_yield) + else if (next) { + /* Wait until another thread task takes GVL. */ + vm->gvl.need_yield = 1; + vm->gvl.wait_yield = 1; + while (vm->gvl.need_yield) rb_native_cond_wait(&vm->gvl.switch_cond, &vm->gvl.lock); - vm->gvl.wait_yield = 0; + vm->gvl.wait_yield = 0; + rb_native_cond_broadcast(&vm->gvl.switch_wait_cond); } else { - rb_native_mutex_unlock(&vm->gvl.lock); - sched_yield(); + rb_native_mutex_unlock(&vm->gvl.lock); + /* + * GVL was not contended when we released, so we have no potential + * contenders for reacquisition. Perhaps they are stuck in blocking + * region w/o GVL, too, so we kick them: + */ + ubf_wakeup_all_threads(); + native_thread_yield(); rb_native_mutex_lock(&vm->gvl.lock); + rb_native_cond_broadcast(&vm->gvl.switch_wait_cond); } - - rb_native_cond_broadcast(&vm->gvl.switch_wait_cond); - acquire: - gvl_acquire_common(vm); + gvl_acquire_common(vm, th); rb_native_mutex_unlock(&vm->gvl.lock); } @@ -173,11 +218,11 @@ static void gvl_init(rb_vm_t *vm) { rb_native_mutex_initialize(&vm->gvl.lock); - rb_native_cond_initialize(&vm->gvl.cond); rb_native_cond_initialize(&vm->gvl.switch_cond); rb_native_cond_initialize(&vm->gvl.switch_wait_cond); + list_head_init(&vm->gvl.waitq); vm->gvl.acquired = 0; - vm->gvl.waiting = 0; + vm->gvl.timer = 0; vm->gvl.need_yield = 0; vm->gvl.wait_yield = 0; } @@ -185,10 +230,16 @@ gvl_init(rb_vm_t *vm) static void gvl_destroy(rb_vm_t *vm) { - rb_native_cond_destroy(&vm->gvl.switch_wait_cond); - rb_native_cond_destroy(&vm->gvl.switch_cond); - rb_native_cond_destroy(&vm->gvl.cond); - rb_native_mutex_destroy(&vm->gvl.lock); + /* + * only called once at VM shutdown (not atfork), another thread + * may still grab vm->gvl.lock when calling gvl_release at + * the end of thread_start_func_2 + */ + if (0) { + rb_native_cond_destroy(&vm->gvl.switch_wait_cond); + rb_native_cond_destroy(&vm->gvl.switch_cond); + rb_native_mutex_destroy(&vm->gvl.lock); + } clear_thread_cache_altstack(); } @@ -433,7 +484,9 @@ native_thread_init(rb_thread_t *th) #ifdef USE_UBF_LIST list_node_init(&nd->ubf_list); #endif - rb_native_cond_initialize(&nd->sleep_cond); + rb_native_cond_initialize(&nd->cond.gvlq); + if (&nd->cond.gvlq != &nd->cond.intr) + rb_native_cond_initialize(&nd->cond.intr); ruby_thread_set_native(th); } @@ -444,7 +497,11 @@ native_thread_init(rb_thread_t *th) static void native_thread_destroy(rb_thread_t *th) { - rb_native_cond_destroy(&th->native_thread_data.sleep_cond); + native_thread_data_t *nd = &th->native_thread_data; + + rb_native_cond_destroy(&nd->cond.gvlq); + if (&nd->cond.gvlq != &nd->cond.intr) + rb_native_cond_destroy(&nd->cond.intr); /* * prevent false positive from ruby_thread_has_gvl_p if that @@ -1012,17 +1069,6 @@ native_thread_create(rb_thread_t *th) return err; } -#if (TIMER_IMPL & TIMER_THREAD_MASK) -static void -native_thread_join(pthread_t th) -{ - int err = pthread_join(th, 0); - if (err) { - rb_raise(rb_eThreadError, "native_thread_join() failed (%d)", err); - } -} -#endif /* TIMER_THREAD_MASK */ - #if USE_NATIVE_THREAD_PRIORITY static void @@ -1064,15 +1110,15 @@ ubf_pthread_cond_signal(void *ptr) { rb_thread_t *th = (rb_thread_t *)ptr; thread_debug("ubf_pthread_cond_signal (%p)\n", (void *)th); - rb_native_cond_signal(&th->native_thread_data.sleep_cond); + rb_native_cond_signal(&th->native_thread_data.cond.intr); } static void -native_sleep(rb_thread_t *th, struct timespec *timeout_rel) +native_cond_sleep(rb_thread_t *th, struct timespec *timeout_rel) { struct timespec timeout; rb_nativethread_lock_t *lock = &th->interrupt_lock; - rb_nativethread_cond_t *cond = &th->native_thread_data.sleep_cond; + rb_nativethread_cond_t *cond = &th->native_thread_data.cond.intr; if (timeout_rel) { /* Solaris cond_timedwait() return EINVAL if an argument is greater than @@ -1164,17 +1210,30 @@ static void ubf_select(void *ptr) { rb_thread_t *th = (rb_thread_t *)ptr; + rb_vm_t *vm = th->vm; + register_ubf_list(th); /* * ubf_wakeup_thread() doesn't guarantee to wake up a target thread. * Therefore, we repeatedly call ubf_wakeup_thread() until a target thread - * exit from ubf function. - * In the other hands, we shouldn't call rb_thread_wakeup_timer_thread() - * if running on timer thread because it may make endless wakeups. + * exit from ubf function. We must designate a timer-thread to perform + * this operation. */ - if (!pthread_equal(pthread_self(), timer_thread.id)) - rb_thread_wakeup_timer_thread(); + rb_native_mutex_lock(&vm->gvl.lock); + if (!vm->gvl.timer) { + native_thread_data_t *last; + + last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list); + if (last) { + rb_native_cond_signal(&last->cond.gvlq); + } + else { + rb_thread_wakeup_timer_thread(0); + } + } + rb_native_mutex_unlock(&vm->gvl.lock); + ubf_wakeup_thread(th); } @@ -1211,39 +1270,16 @@ static int ubf_threads_empty(void) { return 1; } #define TT_DEBUG 0 #define WRITE_CONST(fd, str) (void)(write((fd),(str),sizeof(str)-1)<0) -/* 100ms. 10ms is too small for user level thread scheduling - * on recent Linux (tested on 2.6.35) - */ -#define TIME_QUANTUM_USEC (100 * 1000) - -#if TIMER_IMPL == TIMER_THREAD_SLEEPY static struct { - /* - * Read end of each pipe is closed inside timer thread for shutdown - * Write ends are closed by a normal Ruby thread during shutdown - */ + /* pipes are closed in forked children when owner_process does not match */ int normal[2]; - int low[2]; /* volatile for signal handler use: */ volatile rb_pid_t owner_process; } timer_thread_pipe = { {-1, -1}, - {-1, -1}, /* low priority */ }; -NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd)); -static void -async_bug_fd(const char *mesg, int errno_arg, int fd) -{ - char buff[64]; - size_t n = strlcpy(buff, mesg, sizeof(buff)); - if (n < sizeof(buff)-3) { - ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd); - } - rb_async_bug_errno(buff, errno_arg); -} - /* only use signal-safe system calls here */ static void rb_thread_wakeup_timer_thread_fd(int fd) @@ -1275,49 +1311,33 @@ rb_thread_wakeup_timer_thread_fd(int fd) } void -rb_thread_wakeup_timer_thread(void) +rb_thread_wakeup_timer_thread(int sig) { /* must be safe inside sighandler, so no mutex */ if (timer_thread_pipe.owner_process == getpid()) { - rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]); - } -} - -static void -rb_thread_wakeup_timer_thread_low(void) -{ - if (timer_thread_pipe.owner_process == getpid()) { - rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.low[1]); - } -} - -/* VM-dependent API is not available for this function */ -static void -consume_communication_pipe(int fd) -{ -#define CCP_READ_BUFF_SIZE 1024 - /* buffer can be shared because no one refers to them. */ - static char buff[CCP_READ_BUFF_SIZE]; - ssize_t result; - - while (1) { - result = read(fd, buff, sizeof(buff)); - if (result == 0) { - return; - } - else if (result < 0) { - int e = errno; - switch (e) { - case EINTR: - continue; /* retry */ - case EAGAIN: -#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN - case EWOULDBLOCK: -#endif - return; - default: - async_bug_fd("consume_communication_pipe: read", e, fd); - } + rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]); + + /* + * system_working check is required because vm and main_thread are + * freed during shutdown + */ + if (sig && system_working) { + volatile rb_execution_context_t *ec; + rb_vm_t *vm = GET_VM(); + rb_thread_t *mth; + + /* + * FIXME: root VM and main_thread should be static and not + * on heap for maximum safety (and startup/shutdown speed) + */ + if (!vm) return; + mth = vm->main_thread; + if (!mth || !system_working) return; + + /* this relies on GC for grace period before cont_free */ + ec = ACCESS_ONCE(rb_execution_context_t *, mth->ec); + + if (ec) RUBY_VM_SET_TRAP_INTERRUPT(ec); } } } @@ -1350,6 +1370,7 @@ set_nonblock(int fd) rb_sys_fail(0); } +/* communication pipe with timer thread and signal handler */ static int setup_communication_pipe_internal(int pipes[2]) { @@ -1374,108 +1395,6 @@ setup_communication_pipe_internal(int pipes[2]) return 0; } -/* communication pipe with timer thread and signal handler */ -static int -setup_communication_pipe(void) -{ - rb_pid_t owner = timer_thread_pipe.owner_process; - - if (owner && owner != getpid()) { - CLOSE_INVALIDATE(normal[0]); - CLOSE_INVALIDATE(normal[1]); - CLOSE_INVALIDATE(low[0]); - CLOSE_INVALIDATE(low[1]); - } - - if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) { - return errno; - } - if (setup_communication_pipe_internal(timer_thread_pipe.low) < 0) { - return errno; - } - - return 0; -} - -/** - * Let the timer thread sleep a while. - * - * The timer thread sleeps until woken up by rb_thread_wakeup_timer_thread() if only one Ruby thread is running. - * @pre the calling context is in the timer thread. - */ -static inline void -timer_thread_sleep(rb_vm_t *vm) -{ - int result; - int need_polling; - struct pollfd pollfds[2]; - - pollfds[0].fd = timer_thread_pipe.normal[0]; - pollfds[0].events = POLLIN; - pollfds[1].fd = timer_thread_pipe.low[0]; - pollfds[1].events = POLLIN; - - need_polling = !ubf_threads_empty(); - - if (SIGCHLD_LOSSY && !need_polling) { - rb_native_mutex_lock(&vm->waitpid_lock); - if (!list_empty(&vm->waiting_pids) || !list_empty(&vm->waiting_grps)) { - need_polling = 1; - } - rb_native_mutex_unlock(&vm->waitpid_lock); - } - - if (vm->gvl.waiting > 0 || need_polling) { - /* polling (TIME_QUANTUM_USEC usec) */ - result = poll(pollfds, 1, TIME_QUANTUM_USEC/1000); - } - else { - /* wait (infinite) */ - result = poll(pollfds, numberof(pollfds), -1); - } - - if (result == 0) { - /* maybe timeout */ - } - else if (result > 0) { - consume_communication_pipe(timer_thread_pipe.normal[0]); - consume_communication_pipe(timer_thread_pipe.low[0]); - } - else { /* result < 0 */ - int e = errno; - switch (e) { - case EBADF: - case EINVAL: - case ENOMEM: /* from Linux man */ - case EFAULT: /* from FreeBSD man */ - rb_async_bug_errno("thread_timer: select", e); - default: - /* ignore */; - } - } -} -#endif /* TIMER_THREAD_SLEEPY */ - -#if TIMER_IMPL == TIMER_THREAD_BUSY -# define PER_NANO 1000000000 -void rb_thread_wakeup_timer_thread(void) {} -static void rb_thread_wakeup_timer_thread_low(void) {} - -static rb_nativethread_lock_t timer_thread_lock; -static rb_nativethread_cond_t timer_thread_cond; - -static inline void -timer_thread_sleep(rb_vm_t *unused) -{ - struct timespec ts; - ts.tv_sec = 0; - ts.tv_nsec = TIME_QUANTUM_USEC * 1000; - ts = native_cond_timeout(&timer_thread_cond, ts); - - native_cond_timedwait(&timer_thread_cond, &timer_thread_lock, &ts); -} -#endif /* TIMER_IMPL == TIMER_THREAD_BUSY */ - #if !defined(SET_CURRENT_THREAD_NAME) && defined(__linux__) && defined(PR_SET_NAME) # define SET_CURRENT_THREAD_NAME(name) prctl(PR_SET_NAME, name) #endif @@ -1526,137 +1445,26 @@ native_set_another_thread_name(rb_nativethread_id_t thread_id, VALUE name) return name; } -static void * -thread_timer(void *p) -{ - rb_vm_t *vm = p; -#ifdef HAVE_PTHREAD_SIGMASK /* mainly to enable SIGCHLD */ - { - sigset_t mask; - sigemptyset(&mask); - pthread_sigmask(SIG_SETMASK, &mask, NULL); - } -#endif - - if (TT_DEBUG) WRITE_CONST(2, "start timer thread\n"); - -#ifdef SET_CURRENT_THREAD_NAME - SET_CURRENT_THREAD_NAME("ruby-timer-thr"); -#endif - -#if TIMER_IMPL == TIMER_THREAD_BUSY - rb_native_mutex_initialize(&timer_thread_lock); - rb_native_cond_initialize(&timer_thread_cond); - rb_native_mutex_lock(&timer_thread_lock); -#endif - while (system_working > 0) { - - /* timer function */ - ubf_wakeup_all_threads(); - timer_thread_function(0); - - if (TT_DEBUG) WRITE_CONST(2, "tick\n"); - - /* wait */ - timer_thread_sleep(vm); - } -#if TIMER_IMPL == TIMER_THREAD_BUSY - rb_native_mutex_unlock(&timer_thread_lock); - rb_native_cond_destroy(&timer_thread_cond); - rb_native_mutex_destroy(&timer_thread_lock); -#endif - - if (TT_DEBUG) WRITE_CONST(2, "finish timer thread\n"); - return NULL; -} - -#if (TIMER_IMPL & TIMER_THREAD_MASK) static void rb_thread_create_timer_thread(void) { - if (!timer_thread.created) { - size_t stack_size = 0; - int err; - pthread_attr_t attr; - rb_vm_t *vm = GET_VM(); + /* we only create the pipe, and lazy-spawn */ + rb_pid_t current = getpid(); + rb_pid_t owner = timer_thread_pipe.owner_process; - err = pthread_attr_init(&attr); - if (err != 0) { - rb_warn("pthread_attr_init failed for timer: %s, scheduling broken", - strerror(err)); - return; - } -# ifdef PTHREAD_STACK_MIN - { - size_t stack_min = PTHREAD_STACK_MIN; /* may be dynamic, get only once */ - const size_t min_size = (4096 * 4); - /* Allocate the machine stack for the timer thread - * at least 16KB (4 pages). FreeBSD 8.2 AMD64 causes - * machine stack overflow only with PTHREAD_STACK_MIN. - */ - enum { - needs_more_stack = -#if defined HAVE_VALGRIND_MEMCHECK_H && defined __APPLE__ - 1 -#else - THREAD_DEBUG != 0 -#endif - }; - stack_size = stack_min; - if (stack_size < min_size) stack_size = min_size; - if (needs_more_stack) { - stack_size += +((BUFSIZ - 1) / stack_min + 1) * stack_min; - } - err = pthread_attr_setstacksize(&attr, stack_size); - if (err != 0) { - rb_bug("pthread_attr_setstacksize(.., %"PRIuSIZE") failed: %s", - stack_size, strerror(err)); - } - } -# endif + if (owner && owner != current) { + CLOSE_INVALIDATE(normal[0]); + CLOSE_INVALIDATE(normal[1]); + } -#if TIMER_IMPL == TIMER_THREAD_SLEEPY - err = setup_communication_pipe(); - if (err) return; -#endif /* TIMER_THREAD_SLEEPY */ + if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) return; - /* create timer thread */ - if (timer_thread.created) { - rb_bug("rb_thread_create_timer_thread: Timer thread was already created\n"); - } - err = pthread_create(&timer_thread.id, &attr, thread_timer, vm); - pthread_attr_destroy(&attr); - - if (err == EINVAL) { - /* - * Even if we are careful with our own stack use in thread_timer(), - * any third-party libraries (eg libkqueue) which rely on __thread - * storage can cause small stack sizes to fail. So lets hope the - * default stack size is enough for them: - */ - stack_size = 0; - err = pthread_create(&timer_thread.id, NULL, thread_timer, vm); - } - if (err != 0) { - rb_warn("pthread_create failed for timer: %s, scheduling broken", - strerror(err)); - if (stack_size) { - rb_warn("timer thread stack size: %"PRIuSIZE, stack_size); - } - else { - rb_warn("timer thread stack size: system default"); - } - VM_ASSERT(err == 0); - return; - } -#if TIMER_IMPL == TIMER_THREAD_SLEEPY - /* validate pipe on this process */ - timer_thread_pipe.owner_process = getpid(); -#endif /* TIMER_THREAD_SLEEPY */ - timer_thread.created = 1; + if (owner != current) { + /* validate pipe on this process */ + sigwait_th = THREAD_INVALID; + timer_thread_pipe.owner_process = current; } } -#endif /* TIMER_IMPL & TIMER_THREAD_MASK */ static int native_stop_timer_thread(void) @@ -1665,24 +1473,6 @@ native_stop_timer_thread(void) stopped = --system_working <= 0; if (TT_DEBUG) fprintf(stderr, "stop timer thread\n"); - if (stopped) { -#if TIMER_IMPL == TIMER_THREAD_SLEEPY - /* kick timer thread out of sleep */ - rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]); -#endif - - /* timer thread will stop looping when system_working <= 0: */ - native_thread_join(timer_thread.id); - - /* - * don't care if timer_thread_pipe may fill up at this point. - * If we restart timer thread, signals will be processed, if - * we don't, it's because we're in a different child - */ - - if (TT_DEBUG) fprintf(stderr, "joined timer thread\n"); - timer_thread.created = 0; - } return stopped; } @@ -1739,20 +1529,14 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr) int rb_reserved_fd_p(int fd) { -#if TIMER_IMPL == TIMER_THREAD_SLEEPY if ((fd == timer_thread_pipe.normal[0] || - fd == timer_thread_pipe.normal[1] || - fd == timer_thread_pipe.low[0] || - fd == timer_thread_pipe.low[1]) && + fd == timer_thread_pipe.normal[1]) && timer_thread_pipe.owner_process == getpid()) { /* async-signal-safe */ return 1; } else { return 0; } -#else - return 0; -#endif } rb_nativethread_id_t @@ -1803,7 +1587,7 @@ rb_sleep_cond_get(const rb_execution_context_t *ec) { rb_thread_t *th = rb_ec_thread_ptr(ec); - return &th->native_thread_data.sleep_cond; + return &th->native_thread_data.cond.intr; } void @@ -1813,4 +1597,126 @@ rb_sleep_cond_put(rb_nativethread_cond_t *cond) } #endif /* USE_NATIVE_SLEEP_COND */ +int +rb_sigwait_fd_get(const rb_thread_t *th) +{ + if (timer_thread_pipe.owner_process == getpid() && + timer_thread_pipe.normal[0] >= 0) { + if (ATOMIC_PTR_CAS(sigwait_th, THREAD_INVALID, th) == THREAD_INVALID) { + return timer_thread_pipe.normal[0]; + } + } + return -1; /* avoid thundering herd */ +} + +void +rb_sigwait_fd_put(const rb_thread_t *th, int fd) +{ + const rb_thread_t *old; + + VM_ASSERT(timer_thread_pipe.normal[0] == fd); + old = ATOMIC_PTR_EXCHANGE(sigwait_th, THREAD_INVALID); + if (old != th) assert(old == th); +} + +#ifndef HAVE_PPOLL +/* TODO: don't ignore sigmask */ +static int +ruby_ppoll(struct pollfd *fds, nfds_t nfds, + const struct timespec *ts, const sigset_t *sigmask) +{ + int timeout_ms; + + if (ts) { + int tmp, tmp2; + + if (ts->tv_sec > INT_MAX/1000) + timeout_ms = INT_MAX; + else { + tmp = (int)(ts->tv_sec * 1000); + /* round up 1ns to 1ms to avoid excessive wakeups for <1ms sleep */ + tmp2 = (int)((ts->tv_nsec + 999999L) / (1000L * 1000L)); + if (INT_MAX - tmp < tmp2) + timeout_ms = INT_MAX; + else + timeout_ms = (int)(tmp + tmp2); + } + } + else + timeout_ms = -1; + + return poll(fds, nfds, timeout_ms); +} +# define ppoll(fds,nfds,ts,sigmask) ruby_ppoll((fds),(nfds),(ts),(sigmask)) +#endif + +void +rb_sigwait_sleep(rb_thread_t *th, int sigwait_fd, const struct timespec *ts) +{ + struct pollfd pfd; + + pfd.fd = sigwait_fd; + pfd.events = POLLIN; + + if (!BUSY_WAIT_SIGNALS && ubf_threads_empty()) { + (void)ppoll(&pfd, 1, ts, 0); + check_signals_nogvl(th, sigwait_fd); + } + else { + struct timespec end, diff; + const struct timespec *to; + int n = 0; + + if (ts) { + getclockofday(&end); + timespec_add(&end, ts); + diff = *ts; + ts = &diff; + } + /* + * tricky: this needs to return on spurious wakeup (no auto-retry). + * But we also need to distinguish between periodic quantum + * wakeups, so we care about the result of consume_communication_pipe + */ + for (;;) { + to = sigwait_timeout(th, sigwait_fd, ts, &n); + if (n) return; + n = ppoll(&pfd, 1, to, 0); + if (check_signals_nogvl(th, sigwait_fd)) + return; + if (n || (th && RUBY_VM_INTERRUPTED(th->ec))) + return; + if (ts && timespec_update_expire(&diff, &end)) + return; + } + } +} + +static void +native_sleep(rb_thread_t *th, struct timespec *timeout_rel) +{ + int sigwait_fd = rb_sigwait_fd_get(th); + + if (sigwait_fd >= 0) { + rb_native_mutex_lock(&th->interrupt_lock); + th->unblock.func = ubf_sigwait; + rb_native_mutex_unlock(&th->interrupt_lock); + + GVL_UNLOCK_BEGIN(th); + + if (!RUBY_VM_INTERRUPTED(th->ec)) { + rb_sigwait_sleep(th, sigwait_fd, timeout_rel); + } + else { + check_signals_nogvl(th, sigwait_fd); + } + unblock_function_clear(th); + GVL_UNLOCK_END(th); + rb_sigwait_fd_put(th, sigwait_fd); + rb_sigwait_fd_migrate(th->vm); + } + else { + native_cond_sleep(th, timeout_rel); + } +} #endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */ diff --git a/thread_pthread.h b/thread_pthread.h index 0566193eb5..60e0fe0ea3 100644 --- a/thread_pthread.h +++ b/thread_pthread.h @@ -22,7 +22,19 @@ typedef pthread_cond_t rb_nativethread_cond_t; typedef struct native_thread_data_struct { struct list_node ubf_list; - rb_nativethread_cond_t sleep_cond; +#if defined(__GLIBC__) || defined(__FreeBSD__) + union +#else + /* + * assume the platform condvars are badly implemented and have a + * "memory" of which mutex they're associated with + */ + struct +#endif + { + rb_nativethread_cond_t intr; /* th->interrupt_lock */ + rb_nativethread_cond_t gvlq; /* vm->gvl.lock */ + } cond; } native_thread_data_t; #undef except @@ -32,12 +44,12 @@ typedef struct native_thread_data_struct { typedef struct rb_global_vm_lock_struct { /* fast path */ - unsigned long acquired; + const struct rb_thread_struct *acquired; rb_nativethread_lock_t lock; /* slow path */ - volatile unsigned long waiting; - rb_nativethread_cond_t cond; + struct list_head waitq; + const struct rb_thread_struct *timer; /* yield */ rb_nativethread_cond_t switch_cond; diff --git a/thread_win32.c b/thread_win32.c index 2d5eac1ff4..6db1f25fa7 100644 --- a/thread_win32.c +++ b/thread_win32.c @@ -20,6 +20,8 @@ #define native_thread_yield() Sleep(0) #define unregister_ubf_list(th) +#define ubf_wakeup_all_threads() do {} while (0) +#define ubf_threads_empty() (1) static volatile DWORD ruby_native_thread_key = TLS_OUT_OF_INDEXES; @@ -680,18 +682,21 @@ static struct { static unsigned long __stdcall timer_thread_func(void *dummy) { + rb_vm_t *vm = GET_VM(); thread_debug("timer_thread\n"); rb_w32_set_thread_description(GetCurrentThread(), L"ruby-timer-thread"); while (WaitForSingleObject(timer_thread.lock, TIME_QUANTUM_USEC/1000) == WAIT_TIMEOUT) { - timer_thread_function(dummy); + timer_thread_function(); + ruby_sigchld_handler(vm); /* probably no-op */ + rb_threadptr_check_signal(vm->main_thread); } thread_debug("timer killed\n"); return 0; } void -rb_thread_wakeup_timer_thread(void) +rb_thread_wakeup_timer_thread(int sig) { /* do nothing */ } @@ -768,6 +773,26 @@ rb_reserved_fd_p(int fd) return 0; } +int +rb_sigwait_fd_get(rb_thread_t *th) +{ + return -1; /* TODO */ +} + +NORETURN(void rb_sigwait_fd_put(rb_thread_t *, int)); +void +rb_sigwait_fd_put(rb_thread_t *th, int fd) +{ + rb_bug("not implemented, should not be called"); +} + +NORETURN(void rb_sigwait_sleep(const rb_thread_t *, int, const struct timespec *)); +void +rb_sigwait_sleep(const rb_thread_t *th, int fd, const struct timespec *ts) +{ + rb_bug("not implemented, should not be called"); +} + rb_nativethread_id_t rb_nativethread_self(void) { diff --git a/vm_core.h b/vm_core.h index 01bb74ccde..e547e45f8b 100644 --- a/vm_core.h +++ b/vm_core.h @@ -564,10 +564,12 @@ typedef struct rb_vm_struct { VALUE self; rb_global_vm_lock_t gvl; - rb_nativethread_lock_t thread_destruct_lock; struct rb_thread_struct *main_thread; - struct rb_thread_struct *running_thread; + + /* persists across uncontended GVL release/acquire for time slice */ + const struct rb_thread_struct *running_thread; + #ifdef USE_SIGALTSTACK void *main_altstack; #endif @@ -1581,7 +1583,7 @@ void rb_vm_pop_frame(rb_execution_context_t *ec); void rb_thread_start_timer_thread(void); void rb_thread_stop_timer_thread(void); void rb_thread_reset_timer_thread(void); -void rb_thread_wakeup_timer_thread(void); +void rb_thread_wakeup_timer_thread(int); static inline void rb_vm_living_threads_init(rb_vm_t *vm) -- EW