From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS16276 144.217.0.0/16 X-Spam-Status: No, score=-0.2 required=3.0 tests=AWL,BAYES_00, RCVD_IN_BL_SPAMCOP_NET,RCVD_IN_MSPIKE_BL,RCVD_IN_MSPIKE_ZBI,RCVD_IN_XBL, RDNS_NONE,SPF_FAIL,SPF_HELO_FAIL,TO_EQ_FM_DOM_SPF_FAIL shortcircuit=no autolearn=no autolearn_force=no version=3.4.0 Received: from 80x24.org (unknown [144.217.245.243]) by dcvr.yhbt.net (Postfix) with ESMTP id 9B21E1F404 for ; Sat, 21 Apr 2018 02:15:04 +0000 (UTC) From: Eric Wong To: spew@80x24.org Subject: [PATCH] rb_thread_io_blocking_region: discard "stream closed" if func succeeds Date: Sat, 21 Apr 2018 02:15:02 +0000 Message-Id: <20180421021502.31552-1-e@80x24.org> List-Id: It's possible for a "stream closed" exception to hit despite func() succeeding. th-1 | th-2 -------------------------------------+--------------------------------- | | io.syswrite | rb_thread_io_blocking_region | add wfd to waiting_fds list | release GVL acquire GVL | write(2) succeeds, sets `val' io.close # io_close_fptr | rb_notify_fd_close | scan waiting_fds list | enqueue "stream closed" for th-2 | fptr_finalize_flush | close(2) | rb_thread_schedule | release GVL | | acquire GVL | sees enqueued "stream closed" | RAISE! | successful `val' never reported --- test/ruby/test_io.rb | 2 +- thread.c | 24 +++++++++++++++++++++--- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/test/ruby/test_io.rb b/test/ruby/test_io.rb index 43c3ed7566c..f20ca4c8df1 100644 --- a/test/ruby/test_io.rb +++ b/test/ruby/test_io.rb @@ -3593,7 +3593,7 @@ def test_race_closed_stream thread = Thread.new do begin q << true - assert_raise_with_message(IOError, /stream closed/) do + assert_raise_with_message(IOError, /stream closed|closed stream/) do while r.gets end end diff --git a/thread.c b/thread.c index 2fb02409626..c77b6da47a3 100644 --- a/thread.c +++ b/thread.c @@ -1510,6 +1510,21 @@ rb_thread_call_without_gvl(void *(*func)(void *data), void *data1, return call_without_gvl(func, data1, ubf, data2, FALSE); } +static void +discard_stream_closed_error(const rb_thread_t *th) +{ + long i; + + for (i = 0; i < RARRAY_LEN(th->pending_interrupt_queue); i++) { + VALUE err = RARRAY_AREF(th->pending_interrupt_queue, i); + if (err == th->vm->special_exceptions[ruby_error_stream_closed]) { + rb_ary_delete_at(th->pending_interrupt_queue, i); + } + } +} + +static int threadptr_pending_interrupt_active_p(const rb_thread_t *th); + VALUE rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) { @@ -1518,9 +1533,10 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) volatile int saved_errno = 0; enum ruby_tag_type state; struct waiting_fd wfd; + rb_thread_t *th = rb_ec_thread_ptr(ec); wfd.fd = fd; - wfd.th = rb_ec_thread_ptr(ec); + wfd.th = th; list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &wfd.wfd_node); EC_PUSH_TAG(ec); @@ -1538,7 +1554,9 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) if (state) { EC_JUMP_TAG(ec, state); } - /* TODO: check func() */ + if (threadptr_pending_interrupt_active_p(th) && val != Qundef) { + discard_stream_closed_error(th); + } RUBY_VM_CHECK_INTS_BLOCKING(ec); errno = saved_errno; @@ -1790,7 +1808,7 @@ rb_threadptr_pending_interrupt_deque(rb_thread_t *th, enum handle_interrupt_timi } static int -threadptr_pending_interrupt_active_p(rb_thread_t *th) +threadptr_pending_interrupt_active_p(const rb_thread_t *th) { /* * For optimization, we don't check async errinfo queue -- EW