From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS12876 163.172.0.0/16 X-Spam-Status: No, score=0.6 required=3.0 tests=AWL,BAYES_00,RCVD_IN_MSPIKE_BL, RCVD_IN_MSPIKE_ZBI,RCVD_IN_XBL,SPF_FAIL,SPF_HELO_FAIL shortcircuit=no autolearn=no autolearn_force=no version=3.4.0 Received: from 80x24.org (tor-exit-readme.memcpy.io [163.172.67.180]) by dcvr.yhbt.net (Postfix) with ESMTP id D56541FAED for ; Wed, 7 Jun 2017 19:59:13 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [PATCH 1/2] speed up IO#close with many threads Date: Wed, 7 Jun 2017 19:59:00 +0000 Message-Id: <20170607195901.18958-2-e@80x24.org> In-Reply-To: <20170607195901.18958-1-e@80x24.org> References: <20170607195901.18958-1-e@80x24.org> List-Id: From: normal Today, it increases IO#close performance with many threads: Execution time (sec) name trunk after vm_thread_close 4.276 3.018 Speedup ratio: compare with the result of `trunk' (greater is better) name after vm_thread_close 1.417 This speedup comes because rb_notify_fd_close only scans threads inside rb_thread_io_blocking_region, not all threads in the VM. In the future, this type data structure may allow us to notify waiters of multiple FDs on a single thread (when using Fibers). * thread.c (struct waiting_fd): declare (rb_thread_io_blocking_region): use on-stack list waiter (rb_notify_fd_close): walk vm->waiting_fds instead (call_without_gvl): remove old field setting (th_init): ditto * vm_core.h (typedef struct rb_vm_struct): add waiting_fds list * (typedef struct rb_thread_struct): remove waiting_fd field (rb_vm_living_threads_init): initialize waiting_fds list I am now kicking myself for not thinking about this 3 years ago when I introduced ccan/list in [Feature #9632] to optimize this same function :< This backport of r58812 is necessary to ease backporting r59028, which fixes a real bug. --- thread.c | 23 ++++++++++++++++------- vm.c | 1 - vm_core.h | 4 ++-- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/thread.c b/thread.c index 7391cf96d2..2a82d33c42 100644 --- a/thread.c +++ b/thread.c @@ -96,6 +96,11 @@ static int rb_threadptr_pending_interrupt_empty_p(rb_thread_t *th); static volatile int system_working = 1; #define closed_stream_error GET_VM()->special_exceptions[ruby_error_closed_stream] +struct waiting_fd { + struct list_node wfd_node; /* <=> vm.waiting_fds */ + rb_thread_t *th; + int fd; +}; inline static void st_delete_wrap(st_table *table, st_data_t key) @@ -1310,7 +1315,6 @@ call_without_gvl(void *(*func)(void *), void *data1, rb_thread_t *th = GET_THREAD(); int saved_errno = 0; - th->waiting_fd = -1; if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) { ubf = ubf_select; data2 = th; @@ -1433,11 +1437,15 @@ VALUE rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) { volatile VALUE val = Qundef; /* shouldn't be used */ + rb_vm_t *vm = GET_VM(); rb_thread_t *th = GET_THREAD(); volatile int saved_errno = 0; int state; + struct waiting_fd wfd; - th->waiting_fd = fd; + wfd.fd = fd; + wfd.th = th; + list_add(&vm->waiting_fds, &wfd.wfd_node); TH_PUSH_TAG(th); if ((state = EXEC_TAG()) == 0) { @@ -1448,8 +1456,8 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) } TH_POP_TAG(); - /* clear waiting_fd anytime */ - th->waiting_fd = -1; + /* must be deleted before jump */ + list_del(&wfd.wfd_node); if (state) { TH_JUMP_TAG(th, state); @@ -2195,12 +2203,13 @@ int rb_notify_fd_close(int fd) { rb_vm_t *vm = GET_THREAD()->vm; - rb_thread_t *th = 0; + struct waiting_fd *wfd = 0; int busy; busy = 0; - list_for_each(&vm->living_threads, th, vmlt_node) { - if (th->waiting_fd == fd) { + list_for_each(&vm->waiting_fds, wfd, wfd_node) { + if (wfd->fd == fd) { + rb_thread_t *th = wfd->th; VALUE err = th->vm->special_exceptions[ruby_error_closed_stream]; rb_threadptr_pending_interrupt_enque(th, err); rb_threadptr_interrupt(th); diff --git a/vm.c b/vm.c index 0544bcc4e5..c8ed39ad1e 100644 --- a/vm.c +++ b/vm.c @@ -2477,7 +2477,6 @@ th_init(rb_thread_t *th, VALUE self) th->status = THREAD_RUNNABLE; th->errinfo = Qnil; th->last_status = Qnil; - th->waiting_fd = -1; th->root_svar = Qfalse; th->local_storage_recursive_hash = Qnil; th->local_storage_recursive_hash_for_trace = Qnil; diff --git a/vm_core.h b/vm_core.h index d0385d219f..22dbba9cbb 100644 --- a/vm_core.h +++ b/vm_core.h @@ -490,6 +490,7 @@ typedef struct rb_vm_struct { struct rb_thread_struct *main_thread; struct rb_thread_struct *running_thread; + struct list_head waiting_fds; /* <=> struct waiting_fd */ struct list_head living_threads; size_t living_thread_num; VALUE thgroup_default; @@ -712,8 +713,6 @@ typedef struct rb_thread_struct { /* passing state */ int state; - int waiting_fd; - /* for rb_iterate */ VALUE passed_block_handler; @@ -1445,6 +1444,7 @@ void rb_thread_wakeup_timer_thread(void); static inline void rb_vm_living_threads_init(rb_vm_t *vm) { + list_head_init(&vm->waiting_fds); list_head_init(&vm->living_threads); vm->living_thread_num = 0; } -- EW