From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS62744 199.249.223.0/24 X-Spam-Status: No, score=-1.8 required=3.0 tests=BAYES_00,RCVD_IN_MSPIKE_BL, RCVD_IN_MSPIKE_ZBI,RCVD_IN_XBL,RDNS_NONE,SPF_FAIL,SPF_HELO_FAIL, TO_EQ_FM_DOM_SPF_FAIL shortcircuit=no autolearn=no autolearn_force=no version=3.4.0 Received: from 80x24.org (unknown [199.249.223.63]) by dcvr.yhbt.net (Postfix) with ESMTP id 5D92B1FC3E for ; Thu, 27 Apr 2017 03:34:24 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [WIP] reduce rb_mutex_t size from 160 to 80 bytes on 64-bit Date: Thu, 27 Apr 2017 03:34:23 +0000 Message-Id: <20170427033423.19856-1-e@80x24.org> List-Id: Note, test_require_with_loaded_features_pop currently fails with timeout on this patch. Everything else from "make check" seems to pass... Instead of relying on a native condvar and mutex for every Ruby Mutex object, use a doubly linked-list to implement a waiter queue in the Mutex. The immediate benefit of this is reducing the size of every Mutex object, as some projects have many objects requiring synchronization. In the future, this technique using a linked-list and on-stack list node (struct mutex_waiter) should allow us to easily transition to M:N threading model, as we can avoid the native thread dependency to implement Mutex. We already do something similar for autoload in variable.c, and this was inspired by the Linux kernel wait queue (as ccan/list is inspired by the Linux kernel linked-list). --- thread.c | 14 ++---- thread_sync.c | 139 ++++++++++++++++++++++------------------------------------ 2 files changed, 56 insertions(+), 97 deletions(-) diff --git a/thread.c b/thread.c index 6ccfed0003..416b356b59 100644 --- a/thread.c +++ b/thread.c @@ -4940,15 +4940,9 @@ debug_deadlock_check(rb_vm_t *vm, VALUE msg) th->self, th, thread_id_str(th), th->interrupt_flag); if (th->locking_mutex) { rb_mutex_t *mutex; - struct rb_thread_struct volatile *mth; - int waiting; GetMutexPtr(th->locking_mutex, mutex); - - native_mutex_lock(&mutex->lock); - mth = mutex->th; - waiting = mutex->cond_waiting; - native_mutex_unlock(&mutex->lock); - rb_str_catf(msg, " mutex:%p cond:%d", mth, waiting); + rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE, + mutex->th, rb_mutex_num_waiting(mutex)); } { rb_thread_list_t *list = th->join_list; @@ -4981,11 +4975,9 @@ rb_check_deadlock(rb_vm_t *vm) rb_mutex_t *mutex; GetMutexPtr(th->locking_mutex, mutex); - native_mutex_lock(&mutex->lock); - if (mutex->th == th || (!mutex->th && mutex->cond_waiting)) { + if (mutex->th == th || (!mutex->th && !list_empty(&mutex->waitq))) { found = 1; } - native_mutex_unlock(&mutex->lock); } if (found) break; diff --git a/thread_sync.c b/thread_sync.c index 0a6203174e..ed119441c0 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -1,16 +1,21 @@ /* included by thread.c */ +#include "ccan/list/list.h" static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable; static VALUE rb_eClosedQueueError; /* Mutex */ +/* mutex_waiter is always on-stack */ +struct mutex_waiter { + rb_thread_t *th; + struct list_node node; +}; + typedef struct rb_mutex_struct { - rb_nativethread_lock_t lock; - rb_nativethread_cond_t cond; struct rb_thread_struct volatile *th; struct rb_mutex_struct *next_mutex; - int cond_waiting; + struct list_head waitq; /* protected by GVL */ int allow_trap; } rb_mutex_t; @@ -51,6 +56,19 @@ static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *t #define mutex_mark NULL +static size_t +rb_mutex_num_waiting(rb_mutex_t *mutex) +{ + struct mutex_waiter *w; + size_t n = 0; + + list_for_each(&mutex->waitq, w, node) { + n++; + } + + return n; +} + static void mutex_free(void *ptr) { @@ -60,8 +78,6 @@ mutex_free(void *ptr) const char *err = rb_mutex_unlock_th(mutex, mutex->th); if (err) rb_bug("%s", err); } - native_mutex_destroy(&mutex->lock); - native_cond_destroy(&mutex->cond); ruby_xfree(ptr); } @@ -95,8 +111,7 @@ mutex_alloc(VALUE klass) rb_mutex_t *mutex; obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex); - native_mutex_initialize(&mutex->lock); - native_cond_initialize(&mutex->cond, RB_CONDATTR_CLOCK_MONOTONIC); + list_head_init(&mutex->waitq); return obj; } @@ -158,7 +173,6 @@ rb_mutex_trylock(VALUE self) VALUE locked = Qfalse; GetMutexPtr(self, mutex); - native_mutex_lock(&mutex->lock); if (mutex->th == 0) { rb_thread_t *th = GET_THREAD(); mutex->th = th; @@ -166,61 +180,10 @@ rb_mutex_trylock(VALUE self) mutex_locked(th, self); } - native_mutex_unlock(&mutex->lock); return locked; } -static int -lock_func(rb_thread_t *th, rb_mutex_t *mutex, int timeout_ms) -{ - int interrupted = 0; - int err = 0; - - mutex->cond_waiting++; - for (;;) { - if (!mutex->th) { - mutex->th = th; - break; - } - if (RUBY_VM_INTERRUPTED(th)) { - interrupted = 1; - break; - } - if (err == ETIMEDOUT) { - interrupted = 2; - break; - } - - if (timeout_ms) { - struct timespec timeout_rel; - struct timespec timeout; - - timeout_rel.tv_sec = 0; - timeout_rel.tv_nsec = timeout_ms * 1000 * 1000; - timeout = native_cond_timeout(&mutex->cond, timeout_rel); - err = native_cond_timedwait(&mutex->cond, &mutex->lock, &timeout); - } - else { - native_cond_wait(&mutex->cond, &mutex->lock); - err = 0; - } - } - mutex->cond_waiting--; - - return interrupted; -} - -static void -lock_interrupt(void *ptr) -{ - rb_mutex_t *mutex = (rb_mutex_t *)ptr; - native_mutex_lock(&mutex->lock); - if (mutex->cond_waiting > 0) - native_cond_broadcast(&mutex->cond); - native_mutex_unlock(&mutex->lock); -} - /* * At maximum, only one thread can use cond_timedwait and watch deadlock * periodically. Multiple polling thread (i.e. concurrent deadlock check) @@ -248,37 +211,39 @@ rb_mutex_lock(VALUE self) } if (rb_mutex_trylock(self) == Qfalse) { + struct mutex_waiter w; + if (mutex->th == th) { rb_raise(rb_eThreadError, "deadlock; recursive locking"); } + w.th = th; + while (mutex->th != th) { - int interrupted; enum rb_thread_status prev_status = th->status; - volatile int timeout_ms = 0; + struct timeval *timeout = 0; struct rb_unblock_callback oldubf; + struct timeval tv = { 0, 100000 }; - set_unblock_function(th, lock_interrupt, mutex, &oldubf, FALSE); + set_unblock_function(th, 0, 0, &oldubf, FALSE); th->status = THREAD_STOPPED_FOREVER; th->locking_mutex = self; - - native_mutex_lock(&mutex->lock); th->vm->sleeper++; /* - * Carefully! while some contended threads are in lock_func(), + * Carefully! while some contended threads are in native_sleep(), * vm->sleeper is unstable value. we have to avoid both deadlock * and busy loop. */ if ((vm_living_thread_num(th->vm) == th->vm->sleeper) && !patrol_thread) { - timeout_ms = 100; + timeout = &tv; patrol_thread = th; } - GVL_UNLOCK_BEGIN(); - interrupted = lock_func(th, mutex, (int)timeout_ms); - native_mutex_unlock(&mutex->lock); - GVL_UNLOCK_END(); + rb_check_deadlock(th->vm); + list_add_tail(&mutex->waitq, &w.node); + native_sleep(th, timeout); /* release GVL */ + list_del_init(&w.node); if (patrol_thread == th) patrol_thread = NULL; @@ -286,7 +251,7 @@ rb_mutex_lock(VALUE self) reset_unblock_function(th, &oldubf); th->locking_mutex = Qfalse; - if (mutex->th && interrupted == 2) { + if (mutex->th && timeout) { rb_check_deadlock(th->vm); } if (th->status == THREAD_STOPPED_FOREVER) { @@ -296,9 +261,7 @@ rb_mutex_lock(VALUE self) if (mutex->th == th) mutex_locked(th, self); - if (interrupted) { - RUBY_VM_CHECK_INTS_BLOCKING(th); - } + RUBY_VM_CHECK_INTS_BLOCKING(th); } } return self; @@ -330,29 +293,32 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th) { const char *err = NULL; - native_mutex_lock(&mutex->lock); - if (mutex->th == 0) { err = "Attempt to unlock a mutex which is not locked"; } else if (mutex->th != th) { err = "Attempt to unlock a mutex which is locked by another thread"; - } - else { - mutex->th = 0; - if (mutex->cond_waiting > 0) - native_cond_signal(&mutex->cond); - } - - native_mutex_unlock(&mutex->lock); - - if (!err) { + } else { + struct mutex_waiter *cur = 0, *next = 0; rb_mutex_t *volatile *th_mutex = &th->keeping_mutexes; while (*th_mutex != mutex) { th_mutex = &(*th_mutex)->next_mutex; } *th_mutex = mutex->next_mutex; mutex->next_mutex = NULL; + mutex->th = 0; + list_for_each_safe(&mutex->waitq, cur, next, node) { + list_del_init(&cur->node); + switch (cur->th->state) { + case THREAD_KILLED: + continue; + case THREAD_STOPPED: + case THREAD_RUNNABLE: + case THREAD_STOPPED_FOREVER: + mutex->th = cur->th; + rb_threadptr_interrupt(cur->th); + } + } } return err; @@ -411,6 +377,7 @@ rb_mutex_abandon_all(rb_mutex_t *mutexes) mutexes = mutex->next_mutex; mutex->th = 0; mutex->next_mutex = 0; + list_head_init(&mutex->waitq); } } #endif -- EW