From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS6939 216.218.128.0/17 X-Spam-Status: No, score=-1.8 required=3.0 tests=BAYES_00,RCVD_IN_MSPIKE_BL, RCVD_IN_MSPIKE_ZBI,RCVD_IN_XBL,RDNS_NONE,SPF_FAIL,SPF_HELO_FAIL, TO_EQ_FM_DOM_SPF_FAIL shortcircuit=no autolearn=no autolearn_force=no version=3.4.0 Received: from 80x24.org (unknown [216.218.222.11]) by dcvr.yhbt.net (Postfix) with ESMTP id B95801FF34 for ; Tue, 9 May 2017 06:20:37 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [PATCH] thread_futex: initial cut to replace pthread mutex and condvar types Date: Tue, 9 May 2017 06:20:22 +0000 Message-Id: <20170509062022.4413-1-e@80x24.org> List-Id: We only use basic functionality of pthreads mutexes and condition variables, so we can avoid the complexity and overhead by using bare futexes and atomic instructions. thread_pthread: extract gvl_waiting_p function We will support a futex-based GVL implementation without wrapping pthreads in the next commit. futex based GVL: another take... Performance is finally stable between multi-core and single-core on Thread.pass benchmarks. However, there is still a regression for single-core systems. Speedup ratio: compare with the result of `before' (greater is better) 1 core: vm_thread_pass 0.820 vm_thread_pass_flood 1.065 8 cores: vm_thread_pass 6.717 vm_thread_pass_flood 1.023 futex_cmp_requeue allows specifying number to wake up native_cond_broadcast: force contended state after requeue futex: disable FUTEX_CMP_REQUEUE support GVL: add FUTEX_REQUEUE (not FUTEX_CMP_REQUEUE) support But keep it disabled. comments waiting --- common.mk | 1 + configure.in | 1 + include/ruby/thread_native.h | 11 + thread_futex.h | 518 +++++++++++++++++++++++++++++++++++++++++++ thread_pthread.c | 19 +- thread_pthread.h | 19 +- 6 files changed, 566 insertions(+), 3 deletions(-) create mode 100644 thread_futex.h diff --git a/common.mk b/common.mk index 6ea4812da4..b833b48747 100644 --- a/common.mk +++ b/common.mk @@ -2595,6 +2595,7 @@ thread.$(OBJEXT): {$(VPATH)}thread.c thread.$(OBJEXT): {$(VPATH)}thread.h thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).c thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h +thread.$(OBJEXT): {$(VPATH)}thread_futex.h thread.$(OBJEXT): {$(VPATH)}thread_native.h thread.$(OBJEXT): {$(VPATH)}thread_sync.c thread.$(OBJEXT): {$(VPATH)}timev.h diff --git a/configure.in b/configure.in index 1ca8d142ac..3318ffe2eb 100644 --- a/configure.in +++ b/configure.in @@ -1378,6 +1378,7 @@ AC_CHECK_HEADERS(ieeefp.h) AC_CHECK_HEADERS(intrinsics.h) AC_CHECK_HEADERS(langinfo.h) AC_CHECK_HEADERS(limits.h) +AC_CHECK_HEADERS(linux/futex.h) AC_CHECK_HEADERS(locale.h) AC_CHECK_HEADERS(malloc.h) AC_CHECK_HEADERS(malloc/malloc.h) diff --git a/include/ruby/thread_native.h b/include/ruby/thread_native.h index 8e500c5a13..bd27208aec 100644 --- a/include/ruby/thread_native.h +++ b/include/ruby/thread_native.h @@ -34,8 +34,19 @@ typedef union rb_thread_lock_union { #elif defined(HAVE_PTHREAD_H) #include +# if defined(HAVE_LINUX_FUTEX_H) && defined(HAVE_SYS_SYSCALL_H) +# define RB_PTHREAD_USE_FUTEX (1) +# else +# define RB_PTHREAD_USE_FUTEX (0) +# endif typedef pthread_t rb_nativethread_id_t; +# if RB_PTHREAD_USE_FUTEX +typedef struct rb_futex_lock_struct { + int ftx; +} rb_nativethread_lock_t; +# else typedef pthread_mutex_t rb_nativethread_lock_t; +# endif #else #error "unsupported thread type" diff --git a/thread_futex.h b/thread_futex.h new file mode 100644 index 0000000000..81bb801a67 --- /dev/null +++ b/thread_futex.h @@ -0,0 +1,518 @@ +/* + * futex-based locks and condvars for Linux 2.6 and later + */ +#ifndef RB_THREAD_FUTEX_H +#define RB_THREAD_FUTEX_H +/* #include "ruby_atomic.h" */ +#include +#include +#include +#include + +/* + * errnos are positive in Linux, this allows us to use negative + * errnos (e.g. -EAGAIN) as return values. + */ +STATIC_ASSERT(eagain_positive, EAGAIN > 0); +STATIC_ASSERT(etimedout_positive, ETIMEDOUT > 0); +STATIC_ASSERT(eintr_positive, EINTR > 0); + +/* + * optimization for Linux 2.6.7+, disabled for now since I'm not + * 100% sure I'm using it correctly, and also it does not seem to + * improve performance. Also see the comment around FUTEX_REQUEUE + * usage in gvl_yield, below; I'm not 100% sure I am using this right + * even though "make exam" succeeds with FUTEX_CMP_REQUEUE + */ +#if 0 && defined(FUTEX_CMP_REQUEUE) +static int ftx_cmp_requeue = FUTEX_CMP_REQUEUE; +#else /* no FUTEX_CMP_REQUEUE in headers, assume kernel is old, too */ +static int ftx_cmp_requeue; +#endif + +/* optimization for Linux 2.6.22+ */ +#if defined(FUTEX_PRIVATE_FLAG) +static int futex_private_flag = FUTEX_PRIVATE_FLAG; +#else /* no FUTEX_PRIVATE_FLAG in headers, assume kernel is old, too */ +static int futex_private_flag; +#endif + +/* prevent the compiler from reordering access */ +#define RB_ACCESS_ONCE(type,x) (*((volatile type *)&(x))) + +static int +rb_futex(int *uaddr, int futex_op, int val, + const struct timespec *timeout, /* or uint32_t val2 */ + int *uaddr2, int val3) +{ + return syscall(__NR_futex, uaddr, futex_op, val, timeout, uaddr2, val3); +} + +static void +native_thread_sysinit(void) +{ + /* + * Linux <2.6.22 does not support the FUTEX_PRIVATE_FLAG, + * however Ruby may be compiled with newer glibc headers or + * newer system entirely and then run on an old kernel, + * so we must detect at startup + */ + int op = FUTEX_WAKE | futex_private_flag; + int val = 1; + int rc = rb_futex(&val, op, 1, 0, 0, 0); + + if (rc < 0) + futex_private_flag = 0; + + /* + * FUTEX_CMP_REQUEUE is a race-free, thundering-herd free version + * of FUTEX_WAKE introduced in Linux 2.6.7+ + */ + if (ftx_cmp_requeue) { + static char const msg[] = + "FUTEX_CMP_REQUEUE missing, upgrade to Linux 2.6.7+\n"; + int src = 0; + + op = ftx_cmp_requeue | futex_private_flag; + rc = rb_futex(&src, op, INT_MAX, 0, &val, 0); + if (rc < 0) { + ftx_cmp_requeue = 0; + write(2, msg, sizeof(msg)-1); + } + } +} + +static int +futex_wait(int *addr, int val, const struct timespec *ts) +{ + int op = FUTEX_WAIT | futex_private_flag; + int rc = rb_futex(addr, op, val, ts, 0, 0); + + if (rc == 0) return 0; /* successfully woken */ + + rc = errno; + if (rc != EAGAIN && rc != ETIMEDOUT && rc != EINTR) + rb_bug_errno("FUTEX_WAIT", rc); + return -rc; +} + +/* returns number of threads woken */ +static int +futex_wake(int *addr, int nwake) +{ + int op = FUTEX_WAKE | futex_private_flag; + int rc = rb_futex(addr, op, nwake, 0, 0, 0); + + if (rc >= 0) return rc; + rb_bug_errno("FUTEX_WAKE", errno); +} + +/* + * returns total number of threads woken or requeued + * returns -EAGAIN if a futex_wake is necessary + */ +static int +futex_cmp_requeue(int *src, int *dst, int nwake, int expect) +{ + int op = ftx_cmp_requeue; + if (op) { + /* wake up one, requeue the rest (val2=INT_MAX) at dst */ + const struct timespec *val2 = (const struct timespec *)INT_MAX; + int rc; + + rc = rb_futex(src, op | futex_private_flag, nwake, val2, dst, expect); + + if (rc >= 0) return rc; + rc = errno; + if (rc == EAGAIN) return -EAGAIN; + rb_bug_errno("FUTEX_WAKE", rc); + } + return -EAGAIN; +} + +static void +native_mutex_lock_contended(rb_nativethread_lock_t *lock) +{ + /* tell waiters we are contended */ + while (ATOMIC_EXCHANGE(lock->ftx, 2)) { + futex_wait(&lock->ftx, 2, 0); + } +} + +static void +native_mutex_lock(rb_nativethread_lock_t *lock) +{ + int old = ATOMIC_CAS(lock->ftx, 0, 1); + + if (old == 0) return; /* uncontended fast path */ + + native_mutex_lock_contended(lock); +} + +static void +native_mutex_unlock(rb_nativethread_lock_t *lock) +{ + int old = ATOMIC_EXCHANGE(lock->ftx, 0); + + if (old == 1) return; /* uncontended fast path */ + if (old == 0) rb_bug_errno("unlocking unlocked mutex", EINVAL); + futex_wake(&lock->ftx, 1); +} + +static int +native_mutex_trylock(rb_nativethread_lock_t *lock) +{ + int old = ATOMIC_CAS(lock->ftx, 0, 1); + + return old ? EBUSY : 0; +} + +static void +native_mutex_initialize(rb_nativethread_lock_t *lock) +{ + lock->ftx = 0; +} + +static void +native_mutex_destroy(rb_nativethread_lock_t *lock) +{ + if (lock->ftx) rb_bug_errno("mutex destroy busy", EBUSY); + native_mutex_initialize(lock); +} + +static void +native_cond_initialize(rb_nativethread_cond_t *cond, int flags) +{ + cond->lock = 0; + cond->seq = 0; + cond->clockid = CLOCK_REALTIME; + if (flags & RB_CONDATTR_CLOCK_MONOTONIC) { + cond->clockid = CLOCK_MONOTONIC; + } +} + +static void +native_cond_destroy(rb_nativethread_cond_t *cond) +{ + native_cond_initialize(cond, 0); +} + +/* + * n.b.: native_cond_signal/native_cond_broadcoast do not check for the + * existence of a current waiters. For the Ruby mutex part, we rely on + * the rb_mutex_t->cond_waiting counter to do that. + */ +static void +native_cond_signal(rb_nativethread_cond_t *cond) +{ + ATOMIC_INC(cond->seq); + futex_wake(&cond->seq, 1); +} + +#if 0 +static void +native_cond_broadcast(rb_nativethread_cond_t *cond) +{ + if (cond->lock) { + int expect = ATOMIC_INC(cond->seq) + 1; + int rc = futex_cmp_requeue(&cond->seq, &cond->lock->ftx, 1, expect); + if (rc > 0) { + /* ensure we tell native_mutex_unlock to wake someone up */ + rc = ATOMIC_EXCHANGE(cond->lock->ftx, 2); + assert(rc > 0 && "native_cond_broadcast without having lock"); + } if (rc == -EAGAIN) { + futex_wake(&cond->seq, INT_MAX); + } + } +} +#endif + +static void +native_cond_gettime(rb_nativethread_cond_t *cond, struct timespec *ts) +{ + int rc = clock_gettime(cond->clockid, ts); + if (rc != 0) + rb_bug_errno("clock_gettime()", errno); +} + +/* + * returns 0 if `end' is in the future + * returns 1 if end is past, only triggered on EINTR + */ +static int +ts_expired_p(rb_nativethread_cond_t *cond, struct timespec *end, + struct timespec *rel) +{ + struct timespec now; + + native_cond_gettime(cond, &now); + + rel->tv_sec = end->tv_sec - now.tv_sec; + rel->tv_nsec = end->tv_nsec - now.tv_nsec; + + if (rel->tv_nsec < 0) { + rel->tv_nsec += 1000000000; + rel->tv_sec--; + } + + if (rel->tv_sec < 0) + return 1; + + return 0; +} + +static int +native_cond_timedwait(rb_nativethread_cond_t *cond, + rb_nativethread_lock_t *lock, const struct timespec *ts) +{ + int val; + int rc; + struct timespec *end; + struct timespec tend; + struct timespec rel; + + /* + * native_cond_timedwait is normally a wrapper around + * pthread_cond_timedwait (which takes an absolute timestamp). + * FUTEX_WAIT takes a relative timestamp, so we need to convert + * from absolute to relative. + */ + if (ts) { /* in case of EINTR */ + native_cond_gettime(cond, &tend); + tend.tv_nsec += ts->tv_nsec; + tend.tv_sec += ts->tv_sec; + if (tend.tv_nsec >= 1000000000) { + tend.tv_nsec -= 1000000000; + tend.tv_sec++; + } + end = &tend; + rel = *ts; + ts = &rel; + } + + if (cond->lock != lock) { + if (cond->lock) { + rb_bug_errno("futex native_cond_timedwait owner mismatch", EINVAL); + } + if (ATOMIC_PTR_EXCHANGE(cond->lock, lock) != 0) { + /* condvar is broken at this point */ + rb_bug_errno("futex native_cond_timedwait owner race", EINVAL); + } + } + + val = cond->seq; + native_mutex_unlock(lock); + do { + rc = futex_wait(&cond->seq, val, ts); + } while ((cond->seq == val) && (!ts || !ts_expired_p(cond, end, &rel))); + + /* + * lock the mutex assuming contention, since the cond signal/broadcast + * thread currently has the futex + */ + native_mutex_lock_contended(lock); + + /* we can return ETIMEDOUT like pthread_cond_timedwait does */ + return rc == -ETIMEDOUT ? ETIMEDOUT : 0; +} + +static void +native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *lock) +{ + (void)native_cond_timedwait(cond, lock, 0); +} + +static struct timespec +native_cond_timeout(rb_nativethread_cond_t *cond, struct timespec timeout_rel) +{ + /* XXX: this is dirty and hides bugs in our callers */ + if (timeout_rel.tv_nsec >= 1000000000) { + timeout_rel.tv_nsec -= 1000000000; + timeout_rel.tv_sec++; + } + + return timeout_rel; /* Futexes use relative timeouts internally */ +} + +static int +gvl_waiting_p(const rb_global_vm_lock_t *gvl) +{ + int nr = RB_ACCESS_ONCE(int, gvl->ftx); + + return nr > 1; +} + +static void +gvl_acquire_contended(rb_vm_t *vm, rb_thread_t *th) +{ + while (ATOMIC_EXCHANGE(vm->gvl.ftx, 2)) { + /* + * reduce wakeups by only signalling the timer thread + * for the first waiter we get + */ + if (!ATOMIC_INC(vm->gvl.waiting)) { + rb_thread_wakeup_timer_thread_low(); + } + futex_wait(&vm->gvl.ftx, 2, NULL); + ATOMIC_DEC(vm->gvl.waiting); + } + + if (vm->gvl.need_yield) { + vm->gvl.need_yield = 0; + futex_wake(&vm->gvl.need_yield, 1); + } +} + +static void +gvl_acquire(rb_vm_t *vm, rb_thread_t *th) +{ + if (ATOMIC_CAS(vm->gvl.ftx, 0, 1)) { + gvl_acquire_contended(vm, th); + } +} + +/* returns 1 if another thread is woken */ +static int +gvl_release(rb_vm_t *vm) +{ + int n = ATOMIC_EXCHANGE(vm->gvl.ftx, 0); + + if (n == 1) return 0; /* fast path, no waiters */ + assert(n != 0); + + n = futex_wake(&vm->gvl.ftx, 1); + /* if (n) { */ + /* sched_yield(); */ + /* } */ + return n; +} + +static void +gvl_yield(rb_vm_t *vm, rb_thread_t *th) +{ + int yielding = ATOMIC_EXCHANGE(vm->gvl.wait_yield, 1); + + if (UNLIKELY(yielding)) { /* another thread is inside gvl_yield */ + if (1) { + if (gvl_release(vm)) { + sched_yield(); + gvl_acquire_contended(vm, th); + } + /* spurious wakeup should be harmless, here */ + else if (futex_wait(&vm->gvl.wait_yield, 1, NULL) == -EAGAIN) { + sched_yield(); + gvl_acquire_contended(vm, th); + } + else { + gvl_acquire(vm, th); + } + } + else { + gvl_release(vm); + sched_yield(); + gvl_acquire(vm, th); + } + } + else { + int maybe_more = 0; + /* + * no need for atomic while holding GVL, gvl_acquire_contended + * cannot see need_yield change until we call gvl_release() + */ + vm->gvl.need_yield = 1; + + if (gvl_release(vm)) { + /* we have active waiters, wait for them to kick us */ + do { + futex_wait(&vm->gvl.need_yield, 1, NULL); + } while (vm->gvl.need_yield); + ATOMIC_EXCHANGE(vm->gvl.wait_yield, 0); + /* + * we've waited, and have no idea who else is inside + * this function, so we must assume they exist and + * only wake up the rest AFTER we have the GVL + */ + maybe_more = 1; + } + else { + /* we have no GVL, and no idea how be fair, here... */ + vm->gvl.need_yield = 0; + ATOMIC_EXCHANGE(vm->gvl.wait_yield, 0); + + /* first, try to avoid a thundering herd w/o GVL */ + if (futex_wake(&vm->gvl.wait_yield, 1) > 0) { + maybe_more = 1; + } + + /* either way, hope other threads get some work done... */ + sched_yield(); + } + if (maybe_more) { + gvl_acquire_contended(vm, th); + futex_wake(&vm->gvl.wait_yield, INT_MAX); + } + else { + gvl_acquire(vm, th); + return; + } + +/* + * FIXME: the FUTEX_REQUEUE path does not seem to work right; + * "make test" or test-all gets stuck fails; and so does using + * FUTEX_CMP_REQUEUE here (however FUTEX_CMP_REQUEUE seems non-problematic + * with native_cond_broadcast) + */ +#define REQUEUE (0) + + /* + * now that we took the GVL, again, migrate any concurrent + * gvl_yield callers which started after us into the main + * vm->gvl.ftx queue + */ + if (REQUEUE) { + /* + * No waking anybody up since we have the GVL at this point, + * just move them over to the vm->gvl.ftx queue. No need + * to worry about the race which FUTEX_CMP_REQUEUE is designed + * for, either; just migrate everyone sitting in wait_yield. + */ + int nwake = 0; + const struct timespec *nmove = (const struct timespec *)INT_MAX; + int rc = rb_futex(&vm->gvl.wait_yield, + FUTEX_REQUEUE | futex_private_flag, + nwake, nmove, &vm->gvl.ftx, 0); + if (rc < 0) { /* does this happen? */ + futex_wake(&vm->gvl.wait_yield, INT_MAX); + } + else if (rc > 0) { + /* + * OK, we've requeued some other threads + * be sure we wake up vm->gvl.ftx upon future gvl_release: + */ + rc = ATOMIC_EXCHANGE(vm->gvl.ftx, 2); + assert(rc > 0 && "GVL not locked"); + + /* and ensure the timer thread can schedule us away */ + if (rc == 1) + rb_thread_wakeup_timer_thread_low(); + } + } + else { /* !REQUEUE, simply move them over as a thundering herd */ + /* + * wake up the thundering herd, and force them into + * vm->gvl.ftx via gvl_acquire + */ + futex_wake(&vm->gvl.wait_yield, INT_MAX); + } + } +} + +static void +gvl_init(rb_vm_t *vm) +{ + memset(&vm->gvl, 0, sizeof(rb_global_vm_lock_t)); +} + +#define gvl_destroy(vm) gvl_init((vm)) + +#endif /* RB_THREAD_FUTEX_H */ diff --git a/thread_pthread.c b/thread_pthread.c index 43644e7c06..6fe3e6077b 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -71,6 +71,17 @@ static struct { # define USE_SLEEPY_TIMER_THREAD 0 #endif +#if RB_PTHREAD_USE_FUTEX +# include "thread_futex.h" +#else /* RB_PTHREAD_USE_FUTEX == 0 */ +# if USE_SLEEPY_TIMER_THREAD +static int +gvl_waiting_p(const rb_global_vm_lock_t *gvl) +{ + return gvl->waiting > 0; +} +# endif + static void gvl_acquire_common(rb_vm_t *vm) { @@ -181,6 +192,8 @@ gvl_destroy(rb_vm_t *vm) native_mutex_destroy(&vm->gvl.lock); } +#endif /* !RB_PTHREAD_USE_FUTEX */ + #if defined(HAVE_WORKING_FORK) static void gvl_atfork(rb_vm_t *vm) @@ -192,6 +205,8 @@ gvl_atfork(rb_vm_t *vm) #define NATIVE_MUTEX_LOCK_DEBUG 0 +#if !RB_PTHREAD_USE_FUTEX +static inline void native_thread_sysinit(void) { /* no-op */ } static void mutex_debug(const char *msg, void *lock) { @@ -412,6 +427,7 @@ native_cond_timeout(rb_nativethread_cond_t *cond, struct timespec timeout_rel) return timeout; } +#endif /* !RB_PTHREAD_USE_FUTEX */ #define native_cleanup_push pthread_cleanup_push #define native_cleanup_pop pthread_cleanup_pop @@ -453,6 +469,7 @@ Init_native_thread(void) { rb_thread_t *th = GET_THREAD(); + native_thread_sysinit(); pthread_key_create(&ruby_native_thread_key, NULL); th->thread_id = pthread_self(); fill_thread_id_str(th); @@ -1451,7 +1468,7 @@ timer_thread_sleep(rb_global_vm_lock_t* gvl) need_polling = !ubf_threads_empty(); - if (gvl->waiting > 0 || need_polling) { + if (gvl_waiting_p(gvl) || need_polling) { /* polling (TIME_QUANTUM_USEC usec) */ result = poll(pollfds, 1, TIME_QUANTUM_USEC/1000); } diff --git a/thread_pthread.h b/thread_pthread.h index 1c3782caf4..438458b26b 100644 --- a/thread_pthread.h +++ b/thread_pthread.h @@ -15,11 +15,21 @@ #include #endif -#define RB_NATIVETHREAD_LOCK_INIT PTHREAD_MUTEX_INITIALIZER -#define RB_NATIVETHREAD_COND_INIT { PTHREAD_COND_INITIALIZER, } +#if RB_PTHREAD_USE_FUTEX /* defined in include/ruby/thread_native.h */ +# define RB_NATIVETHREAD_LOCK_INIT { 0 } +# define RB_NATIVETHREAD_COND_INIT { 0, 0, } +#else /* really using pthreads */ +# define RB_NATIVETHREAD_LOCK_INIT PTHREAD_MUTEX_INITIALIZER +# define RB_NATIVETHREAD_COND_INIT { PTHREAD_COND_INITIALIZER, } +#endif typedef struct rb_thread_cond_struct { +#if RB_PTHREAD_USE_FUTEX + rb_nativethread_lock_t *lock; + int seq; +#else pthread_cond_t cond; +#endif #ifdef HAVE_CLOCKID_T clockid_t clockid; #endif @@ -36,6 +46,10 @@ typedef struct native_thread_data_struct { #undef finally typedef struct rb_global_vm_lock_struct { +#if RB_PTHREAD_USE_FUTEX + int ftx; + int waiting; +#else /* RB_PTHREAD_USE_FUTEX == 0 */ /* fast path */ unsigned long acquired; rb_nativethread_lock_t lock; @@ -47,6 +61,7 @@ typedef struct rb_global_vm_lock_struct { /* yield */ rb_nativethread_cond_t switch_cond; rb_nativethread_cond_t switch_wait_cond; +#endif /* RB_PTHREAD_USE_FUTEX == 0 */ int need_yield; int wait_yield; } rb_global_vm_lock_t; -- EW