From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.1 (2015-04-28) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-3.6 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.1 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 270941F46C for ; Thu, 9 Aug 2018 03:46:19 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [PATCH 2/2] thread_pthread: use POSIX timer or thread to get rid of races Date: Thu, 9 Aug 2018 03:46:18 +0000 Message-Id: <20180809034618.20082-2-e@80x24.org> In-Reply-To: <20180809034618.20082-1-e@80x24.org> References: <20180809034618.20082-1-e@80x24.org> List-Id: This closes race condition where GVL is uncontended and a thread receives a signal immediately before calling the blocking function when releasing GVL: 1) check interrupts 2) release GVL 3) blocking function If signal fires after 1) but before 3), that thread may never wake up if GVL is uncontended We also need to wakeup the ubf_list unconditionally on gvl_yield; because two threads can be yielding to each other while waiting on IO#close while waiting on threads in IO#read or IO#gets. --- configure.ac | 8 ++ thread_pthread.c | 326 ++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 290 insertions(+), 44 deletions(-) diff --git a/configure.ac b/configure.ac index 074a3fd394..b52886caf4 100644 --- a/configure.ac +++ b/configure.ac @@ -1998,6 +1998,14 @@ AS_IF([test x"$ac_cv_func_clock_gettime" != xyes], [ ]) ]) AC_CHECK_FUNCS(clock_getres) # clock_getres should be tested after clock_gettime test including librt test. +AC_CHECK_LIB([rt], [timer_create]) +AC_CHECK_LIB([rt], [timer_settime]) +AS_IF([test x"$ac_cv_lib_rt_timer_create" = xyes], [ + AC_DEFINE(HAVE_TIMER_CREATE, 1) +]) +AS_IF([test x"$ac_cv_lib_rt_timer_settime" = xyes], [ + AC_DEFINE(HAVE_TIMER_SETTIME, 1) +]) AC_CACHE_CHECK(for unsetenv returns a value, rb_cv_unsetenv_return_value, [AC_TRY_COMPILE([ diff --git a/thread_pthread.c b/thread_pthread.c index 545cc2fa3b..16235b04e4 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -34,6 +34,56 @@ #if defined(__HAIKU__) #include #endif +#include +#include + +#if defined(SIGVTALRM) && !defined(__CYGWIN__) +# define USE_UBF_LIST 1 +#endif + +/* + * UBF_TIMER and ubf_list both use SIGVTALRM. + * UBF_TIMER is to close TOCTTOU signal race on programs + * without GVL contention blocking read/write to sockets. + * + * ubf_list wakeups may be triggered periodically by UBF_TIMER on + * gvl_yield. + */ +#define UBF_TIMER_NONE 0 +#define UBF_TIMER_POSIX 1 +#define UBF_TIMER_PTHREAD 2 + +#ifndef UBF_TIMER +# if defined(HAVE_TIMER_SETTIME) && defined(HAVE_TIMER_CREATE) && \ + defined(CLOCK_MONOTONIC) && defined(USE_UBF_LIST) + /* preferred */ +# define UBF_TIMER UBF_TIMER_POSIX +# elif defined(USE_UBF_LIST) + /* safe, but inefficient */ +# define UBF_TIMER UBF_TIMER_PTHREAD +# else + /* we'll be racy without SIGVTALRM for ubf_list */ +# define UBF_TIMER UBF_TIMER_NONE +# endif +#endif + +#if UBF_TIMER == UBF_TIMER_POSIX +static struct { + timer_t timerid; + rb_atomic_t armed; /* 0: disarmed, 1: arming, 2: armed */ + rb_pid_t owner; +} timer_posix; +#elif UBF_TIMER == UBF_TIMER_PTHREAD +static void *timer_pthread_fn(void *); +static struct { + int low[2]; + rb_atomic_t armed; /* boolean */ + rb_pid_t owner; + pthread_t thid; +} timer_pthread = { + { -1, -1 }, +}; +#endif void rb_native_mutex_lock(rb_nativethread_lock_t *lock); void rb_native_mutex_unlock(rb_nativethread_lock_t *lock); @@ -53,6 +103,7 @@ static int native_cond_timedwait(rb_nativethread_cond_t *, pthread_mutex_t *, static const struct timespec *sigwait_timeout(rb_thread_t *, int sigwait_fd, const struct timespec *, int *drained_p); +static void rb_timer_disarm(void); #define TIMER_THREAD_CREATED_P() (timer_thread_pipe.owner_process == getpid()) @@ -79,11 +130,30 @@ static const void *const condattr_monotonic = NULL; /* 100ms. 10ms is too small for user level thread scheduling * on recent Linux (tested on 2.6.35) */ -#define TIME_QUANTUM_USEC (100 * 1000) +#define TIME_QUANTUM_MSEC (100) +#define TIME_QUANTUM_USEC (TIME_QUANTUM_MSEC * 1000) +#define TIME_QUANTUM_NSEC (TIME_QUANTUM_USEC * 1000) static struct timespec native_cond_timeout(rb_nativethread_cond_t *, struct timespec rel); +/* + * Designate the next gvl.timer thread, favor the last thread in + * the waitq since it will be in waitq longest + */ +static int +designate_timer_thread(rb_vm_t *vm) +{ + native_thread_data_t *last; + + last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list); + if (last) { + rb_native_cond_signal(&last->cond.gvlq); + return TRUE; + } + return FALSE; +} + static void gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th) { @@ -98,13 +168,16 @@ gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th) static struct timespec ts; static int err = ETIMEDOUT; + /* take over timing from timer */ + rb_timer_disarm(); + /* * become designated timer thread to kick vm->gvl.acquired * periodically. Continue on old timeout if it expired: */ if (err == ETIMEDOUT) { ts.tv_sec = 0; - ts.tv_nsec = TIME_QUANTUM_USEC * 1000; + ts.tv_nsec = TIME_QUANTUM_NSEC; ts = native_cond_timeout(&nd->cond.gvlq, ts); } vm->gvl.timer = th; @@ -132,18 +205,8 @@ gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th) } } vm->gvl.acquired = th; - /* - * Designate the next gvl.timer thread, favor the last thread in - * the waitq since it will be in waitq longest - */ if (!vm->gvl.timer) { - native_thread_data_t *last; - - last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list); - if (last) { - rb_native_cond_signal(&last->cond.gvlq); - } - else if (!ubf_threads_empty()) { + if (!designate_timer_thread(vm) && !ubf_threads_empty()) { rb_thread_wakeup_timer_thread(0); } } @@ -181,6 +244,7 @@ gvl_yield(rb_vm_t *vm, rb_thread_t *th) { native_thread_data_t *next; + ubf_wakeup_all_threads(); rb_native_mutex_lock(&vm->gvl.lock); next = gvl_release_common(vm); @@ -200,12 +264,6 @@ gvl_yield(rb_vm_t *vm, rb_thread_t *th) } else { rb_native_mutex_unlock(&vm->gvl.lock); - /* - * GVL was not contended when we released, so we have no potential - * contenders for reacquisition. Perhaps they are stuck in blocking - * region w/o GVL, too, so we kick them: - */ - ubf_wakeup_all_threads(); native_thread_yield(); rb_native_mutex_lock(&vm->gvl.lock); rb_native_cond_broadcast(&vm->gvl.switch_wait_cond); @@ -427,8 +485,7 @@ native_cond_timeout(rb_nativethread_cond_t *cond, struct timespec timeout_rel) #define native_cleanup_push pthread_cleanup_push #define native_cleanup_pop pthread_cleanup_pop -#if defined(SIGVTALRM) && !defined(__CYGWIN__) -#define USE_UBF_LIST 1 +#if defined(USE_UBF_LIST) static rb_nativethread_lock_t ubf_list_lock; #endif @@ -1189,7 +1246,10 @@ unregister_ubf_list(rb_thread_t *th) if (!list_empty((struct list_head*)node)) { rb_native_mutex_lock(&ubf_list_lock); - list_del_init(node); + list_del_init(node); + if (list_empty(&ubf_list_head) && !rb_signal_buff_size()) { + rb_timer_disarm(); + } rb_native_mutex_unlock(&ubf_list_lock); } } @@ -1222,13 +1282,7 @@ ubf_select(void *ptr) */ rb_native_mutex_lock(&vm->gvl.lock); if (!vm->gvl.timer) { - native_thread_data_t *last; - - last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list); - if (last) { - rb_native_cond_signal(&last->cond.gvlq); - } - else { + if (!designate_timer_thread(vm)) { rb_thread_wakeup_timer_thread(0); } } @@ -1310,18 +1364,54 @@ rb_thread_wakeup_timer_thread_fd(int fd) } } +static void +rb_timer_arm(rb_pid_t current) /* async signal safe */ +{ +#if UBF_TIMER == UBF_TIMER_POSIX + if (timer_posix.owner == current && !ATOMIC_CAS(timer_posix.armed, 0, 1)) { + struct itimerspec it; + + it.it_interval.tv_sec = it.it_value.tv_sec = 0; + it.it_interval.tv_nsec = it.it_value.tv_nsec = TIME_QUANTUM_NSEC; + + if (timer_settime(timer_posix.timerid, 0, &it, 0)) + rb_async_bug_errno("timer_settime (arm)", errno); + + switch (ATOMIC_CAS(timer_posix.armed, 1, 2)) { + case 0: + /* somebody requested a disarm while we were arming */ + it.it_interval.tv_nsec = it.it_value.tv_nsec = 0; + if (timer_settime(timer_posix.timerid, 0, &it, 0)) + rb_async_bug_errno("timer_settime (disarm)", errno); + + case 1: return; /* success */ + case 2: + rb_async_bug_errno("UBF_TIMER_POSIX state 2 unexpected", EINVAL); + default: + rb_async_bug_errno("UBF_TIMER_POSIX unknown state", ERANGE); + } + } +#elif UBF_TIMER == UBF_TIMER_PTHREAD + if (current == timer_pthread.owner) { + if (ATOMIC_EXCHANGE(timer_pthread.armed, 1) == 0) + rb_thread_wakeup_timer_thread_fd(timer_pthread.low[1]); + } +#endif +} + void rb_thread_wakeup_timer_thread(int sig) { /* must be safe inside sighandler, so no mutex */ - if (timer_thread_pipe.owner_process == getpid()) { + rb_pid_t current = getpid(); + if (timer_thread_pipe.owner_process == current) { rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]); /* * system_working check is required because vm and main_thread are * freed during shutdown */ - if (sig && system_working) { + if (sig && system_working > 0) { volatile rb_execution_context_t *ec; rb_vm_t *vm = GET_VM(); rb_thread_t *mth; @@ -1332,18 +1422,24 @@ rb_thread_wakeup_timer_thread(int sig) */ if (!vm) return; mth = vm->main_thread; - if (!mth || !system_working) return; + if (!mth || system_working <= 0) return; /* this relies on GC for grace period before cont_free */ ec = ACCESS_ONCE(rb_execution_context_t *, mth->ec); - if (ec) RUBY_VM_SET_TRAP_INTERRUPT(ec); + if (ec) { + RUBY_VM_SET_TRAP_INTERRUPT(ec); + rb_timer_arm(current); + } } + else if (sig == 0 && system_working > 0) { + rb_timer_arm(current); + } } } #define CLOSE_INVALIDATE(expr) \ - close_invalidate(&timer_thread_pipe.expr,"close_invalidate: "#expr) + close_invalidate(&expr,"close_invalidate: "#expr) static void close_invalidate(int *fdp, const char *msg) { @@ -1445,6 +1541,52 @@ native_set_another_thread_name(rb_nativethread_id_t thread_id, VALUE name) return name; } +static void +rb_timer_invalidate(void) +{ +#if UBF_TIMER == UBF_TIMER_PTHREAD + CLOSE_INVALIDATE(timer_pthread.low[0]); + CLOSE_INVALIDATE(timer_pthread.low[1]); +#endif +} + +static void +rb_timer_pthread_create(rb_pid_t current) +{ +#if UBF_TIMER == UBF_TIMER_PTHREAD + int err; + if (timer_pthread.owner == current) + return; + + if (setup_communication_pipe_internal(timer_pthread.low) < 0) + return; + + err = pthread_create(&timer_pthread.thid, 0, timer_pthread_fn, GET_VM()); + if (!err) + timer_pthread.owner = current; + else + rb_warn("pthread_create failed for timer: %s, signals racy", + strerror(err)); +#endif +} + +static void +rb_timer_create(rb_pid_t current) +{ +#if UBF_TIMER == UBF_TIMER_POSIX + struct sigevent sev; + + sev.sigev_notify = SIGEV_SIGNAL; + sev.sigev_signo = SIGVTALRM; + sev.sigev_value.sival_ptr = &timer_posix; + if (!timer_create(CLOCK_MONOTONIC, &sev, &timer_posix.timerid)) + timer_posix.owner = current; + else + rb_warn("timer_create failed: %s, signals racy", strerror(errno)); +#endif + rb_timer_pthread_create(current); +} + static void rb_thread_create_timer_thread(void) { @@ -1453,17 +1595,65 @@ rb_thread_create_timer_thread(void) rb_pid_t owner = timer_thread_pipe.owner_process; if (owner && owner != current) { - CLOSE_INVALIDATE(normal[0]); - CLOSE_INVALIDATE(normal[1]); + CLOSE_INVALIDATE(timer_thread_pipe.normal[0]); + CLOSE_INVALIDATE(timer_thread_pipe.normal[1]); + rb_timer_invalidate(); } if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) return; if (owner != current) { /* validate pipe on this process */ + rb_timer_create(current); sigwait_th = THREAD_INVALID; timer_thread_pipe.owner_process = current; } + else { + /* UBF_TIMER_PTHREAD needs to recreate after fork */ + rb_timer_pthread_create(current); + } +} + +static void +rb_timer_disarm(void) +{ +#if UBF_TIMER == UBF_TIMER_POSIX + static const struct itimerspec zero; + rb_atomic_t armed = ATOMIC_EXCHANGE(timer_posix.armed, 0); + + if (LIKELY(armed) == 0) return; + switch (armed) { + case 1: return; /* rb_timer_arm was arming and will disarm itself */ + case 2: + if (timer_settime(timer_posix.timerid, 0, &zero, 0)) + rb_bug_errno("timer_settime (disarm)", errno); + return; + default: + rb_bug("UBF_TIMER_POSIX bad state: %u\n", (unsigned)armed); + } +#elif UBF_TIMER == UBF_TIMER_PTHREAD + ATOMIC_SET(timer_pthread.armed, 0); +#endif +} + +static void +rb_timer_destroy(void) +{ +#if UBF_TIMER == UBF_TIMER_PTHREAD + rb_pid_t current = getpid(); + if (timer_pthread.owner == current) { + int err; + + timer_pthread.owner = 0; + rb_timer_disarm(); + rb_thread_wakeup_timer_thread_fd(timer_pthread.low[1]); + err = pthread_join(timer_pthread.thid, 0); + if (err) { + rb_raise(rb_eThreadError, "native_thread_join() failed (%d)", err); + } + } +#endif +/* no need to destroy real POSIX timers */ } static int @@ -1471,6 +1661,8 @@ native_stop_timer_thread(void) { int stopped; stopped = --system_working <= 0; + if (stopped) + rb_timer_destroy(); if (TT_DEBUG) fprintf(stderr, "stop timer thread\n"); return stopped; @@ -1529,14 +1721,18 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr) int rb_reserved_fd_p(int fd) { - if ((fd == timer_thread_pipe.normal[0] || - fd == timer_thread_pipe.normal[1]) && - timer_thread_pipe.owner_process == getpid()) { /* async-signal-safe */ +#if UBF_TIMER == UBF_TIMER_PTHREAD + if (fd == timer_pthread.low[0] || fd == timer_pthread.low[1]) + goto check_pid; +#endif + if (fd == timer_thread_pipe.normal[0] || fd == timer_thread_pipe.normal[1]) + goto check_pid; + + return 0; +check_pid: + if (timer_thread_pipe.owner_process == getpid()) /* async-signal-safe */ return 1; - } - else { - return 0; - } + return 0; } rb_nativethread_id_t @@ -1600,8 +1796,17 @@ rb_sleep_cond_put(rb_nativethread_cond_t *cond) int rb_sigwait_fd_get(const rb_thread_t *th) { - if (timer_thread_pipe.owner_process == getpid() && + rb_pid_t current = getpid(); + + if (timer_thread_pipe.owner_process == current && timer_thread_pipe.normal[0] >= 0) { + + /* + * no need to keep firing the timer if any thread is sleeping + * on the signal self-pipe + */ + rb_timer_disarm(); + if (ATOMIC_PTR_CAS(sigwait_th, THREAD_INVALID, th) == THREAD_INVALID) { return timer_thread_pipe.normal[0]; } @@ -1719,4 +1924,37 @@ native_sleep(rb_thread_t *th, struct timespec *timeout_rel) native_cond_sleep(th, timeout_rel); } } + +#if UBF_TIMER == UBF_TIMER_PTHREAD +static void * +timer_pthread_fn(void *p) +{ + rb_vm_t *vm = p; + pthread_t main_thread_id = vm->main_thread->thread_id; + struct pollfd pfd; + int timeout = -1; + + pfd.fd = timer_pthread.low[0]; + pfd.events = POLLIN; + + while (system_working > 0) { + (void)poll(&pfd, 1, timeout); + (void)consume_communication_pipe(pfd.fd); + + if (system_working > 0 && ATOMIC_EXCHANGE(timer_pthread.armed, 0)) { + pthread_kill(main_thread_id, SIGVTALRM); + + if (rb_signal_buff_size() || !ubf_threads_empty()) { + ATOMIC_SET(timer_pthread.armed, 1); + timeout = TIME_QUANTUM_MSEC; + } + else { + timeout = -1; + } + } + } + + return 0; +} +#endif /* UBF_TIMER_PTHREAD */ #endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */ -- EW