about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2015-12-13 12:19:00 +0000
committerEric Wong <e@80x24.org>2015-12-13 12:29:43 +0000
commitd3cf61b05d9507e7b6ea5a1a1192e107a8612049 (patch)
tree7e150cbc3177d58941aa3f26017627ac006b13aa /lib
parente751a396e43f07f15d9b639a850c0c8cbebf9539 (diff)
downloaddtas-d3cf61b05d9507e7b6ea5a1a1192e107a8612049.tar.gz
Ruby 2.3 will have `exception: false' support in socket-related
classes.  Additionally, 2.3 will implement the existing
IO#*_nonblock methods more efficiently than before by avoiding
the hash allocation necessary for keywords.

For users on older Rubies, we'll continue supporting them with
compatibility wrappers; even Ruby 1.9.3 users (for now).
Diffstat (limited to 'lib')
-rw-r--r--lib/dtas/buffer/read_write.rb40
-rw-r--r--lib/dtas/nonblock.rb24
-rw-r--r--lib/dtas/pipe.rb3
-rw-r--r--lib/dtas/process.rb17
-rw-r--r--lib/dtas/sigevent/pipe.rb11
-rw-r--r--lib/dtas/unix_accepted.rb77
-rw-r--r--lib/dtas/unix_server.rb26
7 files changed, 126 insertions, 72 deletions
diff --git a/lib/dtas/buffer/read_write.rb b/lib/dtas/buffer/read_write.rb
index 64ad297..76c60b0 100644
--- a/lib/dtas/buffer/read_write.rb
+++ b/lib/dtas/buffer/read_write.rb
@@ -3,6 +3,7 @@
 require 'io/nonblock'
 require_relative '../../dtas'
 require_relative '../pipe'
+require_relative '../nonblock'
 
 # compatibility code for systems lacking "splice" support via the
 # "io-splice" RubyGem.  Used only by -player
@@ -28,14 +29,12 @@ module DTAS::Buffer::ReadWrite # :nodoc:
   # always block when we have a single target
   def broadcast_one(targets, limit = nil)
     buf = _rbuf
-    @to_io.read_nonblock(limit || MAX_AT_ONCE, buf)
+    case rv = @to_io.read_nonblock(limit || MAX_AT_ONCE, buf, exception: false)
+    when nil, :wait_readable then return rv
+    end
     n = targets[0].write(buf) # IO#write has write-in-full behavior
     @bytes_xfer += n
     :wait_readable
-  rescue EOFError
-    nil
-  rescue Errno::EAGAIN
-    :wait_readable
   rescue Errno::EPIPE, IOError => e
     __dst_error(targets[0], e)
     targets.clear
@@ -71,15 +70,16 @@ module DTAS::Buffer::ReadWrite # :nodoc:
     targets.delete_if do |dst|
       begin
         if dst.nonblock?
-          w = dst.write_nonblock(buf)
-          again[dst] = buf.byteslice(w, n) if w < n
+          case w = dst.write_nonblock(buf, exception: false)
+          when :wait_writable
+            blocked << dst
+          else
+            again[dst] = buf.byteslice(w, n) if w < n
+          end
         else
           dst.write(buf)
         end
         false
-      rescue Errno::EAGAIN
-        blocked << dst
-        false
       rescue IOError, Errno::EPIPE => e
         again.delete(dst)
         __dst_error(dst, e)
@@ -90,17 +90,19 @@ module DTAS::Buffer::ReadWrite # :nodoc:
     # try to write as much as possible
     again.delete_if do |dst, sbuf|
       begin
-        w = dst.write_nonblock(sbuf)
-        n = sbuf.bytesize
-        if w < n
-          again[dst] = sbuf.byteslice(w, n)
-          false
-        else
+        case w = dst.write_nonblock(sbuf, exception: false)
+        when :wait_writable
+          blocked << dst
           true
+        else
+          n = sbuf.bytesize
+          if w < n
+            again[dst] = sbuf.byteslice(w, n)
+            false
+          else
+            true
+          end
         end
-      rescue Errno::EAGAIN
-        blocked << dst
-        true
       rescue IOError, Errno::EPIPE => e
         __dst_error(dst, e)
         true
diff --git a/lib/dtas/nonblock.rb b/lib/dtas/nonblock.rb
new file mode 100644
index 0000000..c8beecd
--- /dev/null
+++ b/lib/dtas/nonblock.rb
@@ -0,0 +1,24 @@
+# Copyright (C) 2015 all contributors <dtas-all@nongnu.org>
+# License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
+
+class DTAS::Nonblock < IO
+  if RUBY_VERSION.to_f <= 2.0
+    EX = {}.freeze
+    def read_nonblock(len, buf = nil, opts = EX)
+      super(len, buf)
+    rescue IO::WaitReadable
+      raise if opts[:exception]
+      :wait_readable
+    rescue EOFError
+      raise if opts[:exception]
+      nil
+    end
+
+    def write_nonblock(buf, opts = EX)
+      super(buf)
+    rescue IO::WaitWritable
+      raise if opts[:exception]
+      :wait_writable
+    end
+  end
+end
diff --git a/lib/dtas/pipe.rb b/lib/dtas/pipe.rb
index 22ab85e..cc1c3b5 100644
--- a/lib/dtas/pipe.rb
+++ b/lib/dtas/pipe.rb
@@ -6,9 +6,10 @@ rescue LoadError
 end
 require_relative '../dtas'
 require_relative 'writable_iter'
+require_relative 'nonblock'
 
 # pipe wrapper for -player sinks
-class DTAS::Pipe < IO # :nodoc:
+class DTAS::Pipe < DTAS::Nonblock # :nodoc:
   include DTAS::WritableIter
   attr_accessor :sink
 
diff --git a/lib/dtas/process.rb b/lib/dtas/process.rb
index f5f9a9e..8c46d9d 100644
--- a/lib/dtas/process.rb
+++ b/lib/dtas/process.rb
@@ -4,6 +4,7 @@ require 'io/wait'
 require 'shellwords'
 require_relative '../dtas'
 require_relative 'xs'
+require_relative 'nonblock'
 
 # process management helpers
 module DTAS::Process # :nodoc:
@@ -86,12 +87,13 @@ module DTAS::Process # :nodoc:
       cmd, opts = env, cmd
       env = {}
     end
-    r, w = IO.pipe
+    buf = ''
+    r, w = DTAS::Nonblock.pipe
     opts = opts.merge(out: w)
     r.binmode
     no_raise = opts.delete(:no_raise)
     if err_str = opts.delete(:err_str)
-      re, we = IO.pipe
+      re, we = DTAS::Nonblock.pipe
       re.binmode
       opts[:err] = we
     end
@@ -105,12 +107,11 @@ module DTAS::Process # :nodoc:
       begin
         readable = IO.select(want.keys) or next
         readable[0].each do |io|
-          begin
-            want[io] << io.read_nonblock(2000)
-          rescue Errno::EAGAIN
-            # spurious wakeup, bytes may be zero
-          rescue EOFError
-            want.delete(io)
+          case rv = io.read_nonblock(2000, buf, exception: false)
+          when :wait_readable # spurious wakeup, bytes may be zero
+          when nil then want.delete(io)
+          else
+            want[io] << rv
           end
         end
       end until want.empty?
diff --git a/lib/dtas/sigevent/pipe.rb b/lib/dtas/sigevent/pipe.rb
index 4f42909..5dd01a6 100644
--- a/lib/dtas/sigevent/pipe.rb
+++ b/lib/dtas/sigevent/pipe.rb
@@ -3,11 +3,13 @@
 
 # used in various places for safe wakeups from IO.select via signals
 # A fallback for non-Linux systems lacking the "sleepy_penguin" RubyGem
+require_relative 'nonblock'
 class DTAS::Sigevent # :nodoc:
   attr_reader :to_io
 
   def initialize
-    @to_io, @wr = IO.pipe
+    @to_io, @wr = DTAS::Nonblock.pipe
+    @rbuf = ''
   end
 
   def signal
@@ -15,11 +17,10 @@ class DTAS::Sigevent # :nodoc:
   end
 
   def readable_iter
-    begin
-      @to_io.read_nonblock(11)
+    case @to_io.read_nonblock(11, @rbuf, exception: false)
+    when :wait_readable then return :wait_readable
+    else
       yield self, nil # calls DTAS::Process.reaper
-    rescue Errno::EAGAIN
-      return :wait_readable
     end while true
   end
 
diff --git a/lib/dtas/unix_accepted.rb b/lib/dtas/unix_accepted.rb
index a0000cf..7db3ed5 100644
--- a/lib/dtas/unix_accepted.rb
+++ b/lib/dtas/unix_accepted.rb
@@ -17,55 +17,48 @@ class DTAS::UNIXAccepted # :nodoc:
   def emit(msg)
     buffered = @send_buf.size
     if buffered == 0
-      begin
-        @to_io.sendmsg_nonblock(msg, Socket::MSG_EOR)
-        return :wait_readable
-      rescue Errno::EAGAIN
-        @send_buf << msg
-        return :wait_writable
-      rescue => e
-        return e
+      case rv = sendmsg_nonblock(msg)
+      when :wait_writable then @send_buf << msg
       end
+      rv
     elsif buffered > 100
-      return RuntimeError.new("too many messages buffered")
+      RuntimeError.new("too many messages buffered")
     else # buffered > 0
       @send_buf << msg
-      return :wait_writable
+      :wait_writable
     end
+  rescue => e
+    e
   end
 
   # flushes pending data if it got buffered
   def writable_iter
-    begin
-      msg = @send_buf.shift or return :wait_readable
-      @to_io.send_nonblock(msg, Socket::MSG_EOR)
-    rescue Errno::EAGAIN
-      @send_buf.unshift(msg)
-      return :wait_writable
-    rescue => e
-      return e
-    end while true
+    case sendmsg_nonblock(@send_buf[0])
+    when :wait_writable then return :wait_writable
+    else
+      @send_buf.shift
+    end until @send_buf.empty?
+    :wait_readable
+  rescue => e
+    e
   end
 
   def readable_iter
-    io = @to_io
-    nread = io.nread
+    nread = @to_io.nread
 
     # EOF, assume no spurious wakeups for SOCK_SEQPACKET
     return nil if nread == 0
 
-    begin
-      begin
-        msg = io.recv_nonblock(nread)
-      rescue Errno::EAGAIN
-        return :wait_readable
-      rescue EOFError, SystemCallError
-        return nil
-      end
+    case msg = recv_nonblock(nread)
+    when :wait_readable then return msg
+    when '', nil then return nil # EOF
+    else
       yield(self, msg) # DTAS::Player deals with this
-      nread = io.nread
+      nread = @to_io.nread
     end while nread > 0
     :wait_readable
+  rescue SystemCallError
+    nil
   end
 
   def close
@@ -75,4 +68,28 @@ class DTAS::UNIXAccepted # :nodoc:
   def closed?
     @to_io.closed?
   end
+
+  if RUBY_VERSION.to_f >= 2.3
+    def sendmsg_nonblock(msg)
+      @to_io.sendmsg_nonblock(msg, Socket::MSG_EOR, exception: false)
+    end
+
+    def recv_nonblock(len)
+      @to_io.recv_nonblock(len, exception: false)
+    end
+  else
+    def sendmsg_nonblock(msg)
+      @to_io.sendmsg_nonblock(msg, Socket::MSG_EOR)
+    rescue IO::WaitWritable
+      :wait_writable
+    end
+
+    def recv_nonblock(len)
+      @to_io.recv_nonblock(len)
+    rescue IO::WaitReadable
+      :wait_readable
+    rescue EOFError
+      nil
+    end
+  end
 end
diff --git a/lib/dtas/unix_server.rb b/lib/dtas/unix_server.rb
index e21a096..e11b0d7 100644
--- a/lib/dtas/unix_server.rb
+++ b/lib/dtas/unix_server.rb
@@ -58,12 +58,10 @@ class DTAS::UNIXServer # :nodoc:
 
   def readable_iter
     # we do not do anything with the block passed to us
-    begin
-      sock, _ = @to_io.accept_nonblock
-      @readers[DTAS::UNIXAccepted.new(sock)] = true
-    rescue Errno::ECONNABORTED # ignore this, it happens
-    rescue Errno::EAGAIN
-      return :wait_readable
+    case rv = accept_nonblock
+    when :wait_readable then return rv
+    else
+      @readers[DTAS::UNIXAccepted.new(rv[0])] = true
     end while true
   end
 
@@ -88,9 +86,7 @@ class DTAS::UNIXServer # :nodoc:
       # - a consumer (e.g. DTAS::Sink) just became writable, but the
       #   corresponding DTAS::Buffer was already readable in a previous
       #   call.
-    when nil
-      io.close
-    when StandardError
+    when nil, StandardError
       io.close
     else
       raise "BUG: wait_ctl invalid: #{io} #{err.inspect}"
@@ -117,4 +113,16 @@ class DTAS::UNIXServer # :nodoc:
       wait_ctl(io, io.readable_iter { |_io, msg| yield(_io, msg) })
     end
   end
+
+  if RUBY_VERSION.to_f >= 2.3
+    def accept_nonblock
+      @to_io.accept_nonblock(exception: false)
+    end
+  else
+    def accept_nonblock
+      @to_io.accept_nonblock
+    rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EPROTO
+      :wait_readable
+    end
+  end
 end