From d3cf61b05d9507e7b6ea5a1a1192e107a8612049 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 13 Dec 2015 12:19:00 +0000 Subject: switch to exception-free non-blocking I/O Ruby 2.3 will have `exception: false' support in socket-related classes. Additionally, 2.3 will implement the existing IO#*_nonblock methods more efficiently than before by avoiding the hash allocation necessary for keywords. For users on older Rubies, we'll continue supporting them with compatibility wrappers; even Ruby 1.9.3 users (for now). --- lib/dtas/buffer/read_write.rb | 40 +++++++++++----------- lib/dtas/nonblock.rb | 24 ++++++++++++++ lib/dtas/pipe.rb | 3 +- lib/dtas/process.rb | 17 +++++----- lib/dtas/sigevent/pipe.rb | 11 ++++--- lib/dtas/unix_accepted.rb | 77 ++++++++++++++++++++++++++----------------- lib/dtas/unix_server.rb | 26 ++++++++++----- 7 files changed, 126 insertions(+), 72 deletions(-) create mode 100644 lib/dtas/nonblock.rb (limited to 'lib/dtas') diff --git a/lib/dtas/buffer/read_write.rb b/lib/dtas/buffer/read_write.rb index 64ad297..76c60b0 100644 --- a/lib/dtas/buffer/read_write.rb +++ b/lib/dtas/buffer/read_write.rb @@ -3,6 +3,7 @@ require 'io/nonblock' require_relative '../../dtas' require_relative '../pipe' +require_relative '../nonblock' # compatibility code for systems lacking "splice" support via the # "io-splice" RubyGem. Used only by -player @@ -28,14 +29,12 @@ module DTAS::Buffer::ReadWrite # :nodoc: # always block when we have a single target def broadcast_one(targets, limit = nil) buf = _rbuf - @to_io.read_nonblock(limit || MAX_AT_ONCE, buf) + case rv = @to_io.read_nonblock(limit || MAX_AT_ONCE, buf, exception: false) + when nil, :wait_readable then return rv + end n = targets[0].write(buf) # IO#write has write-in-full behavior @bytes_xfer += n :wait_readable - rescue EOFError - nil - rescue Errno::EAGAIN - :wait_readable rescue Errno::EPIPE, IOError => e __dst_error(targets[0], e) targets.clear @@ -71,15 +70,16 @@ module DTAS::Buffer::ReadWrite # :nodoc: targets.delete_if do |dst| begin if dst.nonblock? - w = dst.write_nonblock(buf) - again[dst] = buf.byteslice(w, n) if w < n + case w = dst.write_nonblock(buf, exception: false) + when :wait_writable + blocked << dst + else + again[dst] = buf.byteslice(w, n) if w < n + end else dst.write(buf) end false - rescue Errno::EAGAIN - blocked << dst - false rescue IOError, Errno::EPIPE => e again.delete(dst) __dst_error(dst, e) @@ -90,17 +90,19 @@ module DTAS::Buffer::ReadWrite # :nodoc: # try to write as much as possible again.delete_if do |dst, sbuf| begin - w = dst.write_nonblock(sbuf) - n = sbuf.bytesize - if w < n - again[dst] = sbuf.byteslice(w, n) - false - else + case w = dst.write_nonblock(sbuf, exception: false) + when :wait_writable + blocked << dst true + else + n = sbuf.bytesize + if w < n + again[dst] = sbuf.byteslice(w, n) + false + else + true + end end - rescue Errno::EAGAIN - blocked << dst - true rescue IOError, Errno::EPIPE => e __dst_error(dst, e) true diff --git a/lib/dtas/nonblock.rb b/lib/dtas/nonblock.rb new file mode 100644 index 0000000..c8beecd --- /dev/null +++ b/lib/dtas/nonblock.rb @@ -0,0 +1,24 @@ +# Copyright (C) 2015 all contributors +# License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt) + +class DTAS::Nonblock < IO + if RUBY_VERSION.to_f <= 2.0 + EX = {}.freeze + def read_nonblock(len, buf = nil, opts = EX) + super(len, buf) + rescue IO::WaitReadable + raise if opts[:exception] + :wait_readable + rescue EOFError + raise if opts[:exception] + nil + end + + def write_nonblock(buf, opts = EX) + super(buf) + rescue IO::WaitWritable + raise if opts[:exception] + :wait_writable + end + end +end diff --git a/lib/dtas/pipe.rb b/lib/dtas/pipe.rb index 22ab85e..cc1c3b5 100644 --- a/lib/dtas/pipe.rb +++ b/lib/dtas/pipe.rb @@ -6,9 +6,10 @@ rescue LoadError end require_relative '../dtas' require_relative 'writable_iter' +require_relative 'nonblock' # pipe wrapper for -player sinks -class DTAS::Pipe < IO # :nodoc: +class DTAS::Pipe < DTAS::Nonblock # :nodoc: include DTAS::WritableIter attr_accessor :sink diff --git a/lib/dtas/process.rb b/lib/dtas/process.rb index f5f9a9e..8c46d9d 100644 --- a/lib/dtas/process.rb +++ b/lib/dtas/process.rb @@ -4,6 +4,7 @@ require 'io/wait' require 'shellwords' require_relative '../dtas' require_relative 'xs' +require_relative 'nonblock' # process management helpers module DTAS::Process # :nodoc: @@ -86,12 +87,13 @@ module DTAS::Process # :nodoc: cmd, opts = env, cmd env = {} end - r, w = IO.pipe + buf = '' + r, w = DTAS::Nonblock.pipe opts = opts.merge(out: w) r.binmode no_raise = opts.delete(:no_raise) if err_str = opts.delete(:err_str) - re, we = IO.pipe + re, we = DTAS::Nonblock.pipe re.binmode opts[:err] = we end @@ -105,12 +107,11 @@ module DTAS::Process # :nodoc: begin readable = IO.select(want.keys) or next readable[0].each do |io| - begin - want[io] << io.read_nonblock(2000) - rescue Errno::EAGAIN - # spurious wakeup, bytes may be zero - rescue EOFError - want.delete(io) + case rv = io.read_nonblock(2000, buf, exception: false) + when :wait_readable # spurious wakeup, bytes may be zero + when nil then want.delete(io) + else + want[io] << rv end end end until want.empty? diff --git a/lib/dtas/sigevent/pipe.rb b/lib/dtas/sigevent/pipe.rb index 4f42909..5dd01a6 100644 --- a/lib/dtas/sigevent/pipe.rb +++ b/lib/dtas/sigevent/pipe.rb @@ -3,11 +3,13 @@ # used in various places for safe wakeups from IO.select via signals # A fallback for non-Linux systems lacking the "sleepy_penguin" RubyGem +require_relative 'nonblock' class DTAS::Sigevent # :nodoc: attr_reader :to_io def initialize - @to_io, @wr = IO.pipe + @to_io, @wr = DTAS::Nonblock.pipe + @rbuf = '' end def signal @@ -15,11 +17,10 @@ class DTAS::Sigevent # :nodoc: end def readable_iter - begin - @to_io.read_nonblock(11) + case @to_io.read_nonblock(11, @rbuf, exception: false) + when :wait_readable then return :wait_readable + else yield self, nil # calls DTAS::Process.reaper - rescue Errno::EAGAIN - return :wait_readable end while true end diff --git a/lib/dtas/unix_accepted.rb b/lib/dtas/unix_accepted.rb index a0000cf..7db3ed5 100644 --- a/lib/dtas/unix_accepted.rb +++ b/lib/dtas/unix_accepted.rb @@ -17,55 +17,48 @@ class DTAS::UNIXAccepted # :nodoc: def emit(msg) buffered = @send_buf.size if buffered == 0 - begin - @to_io.sendmsg_nonblock(msg, Socket::MSG_EOR) - return :wait_readable - rescue Errno::EAGAIN - @send_buf << msg - return :wait_writable - rescue => e - return e + case rv = sendmsg_nonblock(msg) + when :wait_writable then @send_buf << msg end + rv elsif buffered > 100 - return RuntimeError.new("too many messages buffered") + RuntimeError.new("too many messages buffered") else # buffered > 0 @send_buf << msg - return :wait_writable + :wait_writable end + rescue => e + e end # flushes pending data if it got buffered def writable_iter - begin - msg = @send_buf.shift or return :wait_readable - @to_io.send_nonblock(msg, Socket::MSG_EOR) - rescue Errno::EAGAIN - @send_buf.unshift(msg) - return :wait_writable - rescue => e - return e - end while true + case sendmsg_nonblock(@send_buf[0]) + when :wait_writable then return :wait_writable + else + @send_buf.shift + end until @send_buf.empty? + :wait_readable + rescue => e + e end def readable_iter - io = @to_io - nread = io.nread + nread = @to_io.nread # EOF, assume no spurious wakeups for SOCK_SEQPACKET return nil if nread == 0 - begin - begin - msg = io.recv_nonblock(nread) - rescue Errno::EAGAIN - return :wait_readable - rescue EOFError, SystemCallError - return nil - end + case msg = recv_nonblock(nread) + when :wait_readable then return msg + when '', nil then return nil # EOF + else yield(self, msg) # DTAS::Player deals with this - nread = io.nread + nread = @to_io.nread end while nread > 0 :wait_readable + rescue SystemCallError + nil end def close @@ -75,4 +68,28 @@ class DTAS::UNIXAccepted # :nodoc: def closed? @to_io.closed? end + + if RUBY_VERSION.to_f >= 2.3 + def sendmsg_nonblock(msg) + @to_io.sendmsg_nonblock(msg, Socket::MSG_EOR, exception: false) + end + + def recv_nonblock(len) + @to_io.recv_nonblock(len, exception: false) + end + else + def sendmsg_nonblock(msg) + @to_io.sendmsg_nonblock(msg, Socket::MSG_EOR) + rescue IO::WaitWritable + :wait_writable + end + + def recv_nonblock(len) + @to_io.recv_nonblock(len) + rescue IO::WaitReadable + :wait_readable + rescue EOFError + nil + end + end end diff --git a/lib/dtas/unix_server.rb b/lib/dtas/unix_server.rb index e21a096..e11b0d7 100644 --- a/lib/dtas/unix_server.rb +++ b/lib/dtas/unix_server.rb @@ -58,12 +58,10 @@ class DTAS::UNIXServer # :nodoc: def readable_iter # we do not do anything with the block passed to us - begin - sock, _ = @to_io.accept_nonblock - @readers[DTAS::UNIXAccepted.new(sock)] = true - rescue Errno::ECONNABORTED # ignore this, it happens - rescue Errno::EAGAIN - return :wait_readable + case rv = accept_nonblock + when :wait_readable then return rv + else + @readers[DTAS::UNIXAccepted.new(rv[0])] = true end while true end @@ -88,9 +86,7 @@ class DTAS::UNIXServer # :nodoc: # - a consumer (e.g. DTAS::Sink) just became writable, but the # corresponding DTAS::Buffer was already readable in a previous # call. - when nil - io.close - when StandardError + when nil, StandardError io.close else raise "BUG: wait_ctl invalid: #{io} #{err.inspect}" @@ -117,4 +113,16 @@ class DTAS::UNIXServer # :nodoc: wait_ctl(io, io.readable_iter { |_io, msg| yield(_io, msg) }) end end + + if RUBY_VERSION.to_f >= 2.3 + def accept_nonblock + @to_io.accept_nonblock(exception: false) + end + else + def accept_nonblock + @to_io.accept_nonblock + rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EPROTO + :wait_readable + end + end end -- cgit v1.2.3-24-ge0c7