about summary refs log tree commit
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2019-12-20 01:39:15 +0000
committerEric Wong <e@80x24.org>2020-01-06 08:15:15 +0000
commit01fec98a75a6c879c96b3c757557559b9bd18a19 (patch)
treee43df1eef9730e2fb50be264550af9e104d22782
parent940c0b3cffa7f691620e7890ada15c7519817307 (diff)
downloaddtas-01fec98a75a6c879c96b3c757557559b9bd18a19.tar.gz
Fiddle exists on all Ruby 1.9.2+ installations and seems
alright.  Since splice is a Linux-only API, we don't need to
worry about the values of constants changing (and they're
architecture-independent).
-rw-r--r--lib/dtas/buffer.rb15
-rw-r--r--lib/dtas/buffer/fiddle_splice.rb216
-rw-r--r--lib/dtas/buffer/read_write.rb4
-rw-r--r--lib/dtas/buffer/splice.rb2
-rw-r--r--test/test_buffer.rb10
5 files changed, 234 insertions, 13 deletions
diff --git a/lib/dtas/buffer.rb b/lib/dtas/buffer.rb
index 39070d7..e0919d4 100644
--- a/lib/dtas/buffer.rb
+++ b/lib/dtas/buffer.rb
@@ -8,12 +8,15 @@ require_relative '../dtas'
 class DTAS::Buffer # :nodoc:
   begin
     raise LoadError, "no splice with _DTAS_POSIX" if ENV["_DTAS_POSIX"]
-    require 'sleepy_penguin' # splice is only in Linux for now...
-    SleepyPenguin.respond_to?(:splice) or
-      raise LoadError, 'sleepy_penguin 3.5+ required for splice', []
-    require_relative 'buffer/splice'
-    include DTAS::Buffer::Splice
-  rescue LoadError
+    # splice is only in Linux for now
+    begin
+      require_relative 'buffer/splice'
+      include DTAS::Buffer::Splice
+    rescue LoadError
+      require_relative 'buffer/fiddle_splice'
+      include DTAS::Buffer::FiddleSplice
+    end
+  rescue LoadError, StandardError
     require_relative 'buffer/read_write'
     include DTAS::Buffer::ReadWrite
   end
diff --git a/lib/dtas/buffer/fiddle_splice.rb b/lib/dtas/buffer/fiddle_splice.rb
new file mode 100644
index 0000000..543b3e0
--- /dev/null
+++ b/lib/dtas/buffer/fiddle_splice.rb
@@ -0,0 +1,216 @@
+# Copyright (C) 2013-2019 all contributors <dtas-all@nongnu.org>
+# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
+# frozen_string_literal: true
+require 'io/nonblock'
+require 'fiddle' # require_relative caller should expect LoadError
+require_relative '../../dtas'
+require_relative '../pipe'
+
+# Used by -player on Linux systems with the "splice" syscall
+module DTAS::Buffer::FiddleSplice # :nodoc:
+  MAX_AT_ONCE = 4096 # page size in Linux
+  MAX_AT_ONCE_1 = 65536
+  F_MOVE = 1
+  F_NONBLOCK = 2
+
+  Splice = Fiddle::Function.new(DTAS.libc['splice'], [
+      Fiddle::TYPE_INT, # int fd_in,
+      Fiddle::TYPE_VOIDP, # loff_t *off_in
+      Fiddle::TYPE_INT, # int fd_out
+      Fiddle::TYPE_VOIDP, # loff_t *off_out
+      Fiddle::TYPE_SIZE_T, # size_t len
+      Fiddle::TYPE_INT, # unsigned int flags
+    ],
+    Fiddle::TYPE_SSIZE_T) # ssize_t
+
+  Tee = Fiddle::Function.new(DTAS.libc['tee'], [
+      Fiddle::TYPE_INT, # int fd_in,
+      Fiddle::TYPE_INT, # int fd_out
+      Fiddle::TYPE_SIZE_T, # size_t len
+      Fiddle::TYPE_INT, # unsigned int flags
+    ],
+    Fiddle::TYPE_SSIZE_T) # ssize_t
+
+  def _syserr(s, func)
+    raise "BUG: we should not encounter EOF on #{func}" if s == 0
+    case errno = Fiddle.last_error
+    when Errno::EAGAIN::Errno
+      return :EAGAIN
+    when Errno::EPIPE::Errno
+      raise Errno::EPIPE.exception
+    when Errno::EINTR::Errno
+      return nil
+    else
+      raise SystemCallError, "#{func} error: #{errno}"
+    end
+  end
+
+  def splice(src, dst, len, flags)
+    begin
+      s = Splice.call(src.fileno, nil, dst.fileno, nil, len, flags)
+      return s if s > 0
+      sym = _syserr(s, 'splice') and return sym
+    end while true
+  end
+
+  def tee(src, dst, len, flags = 0)
+    begin
+      s = Tee.call(src.fileno, dst.fileno, len, flags)
+      return s if s > 0
+      sym = _syserr(s, 'tee') and return sym
+    end while true
+  end
+
+  def buffer_size
+    @to_io.pipe_size
+  end
+
+  # nil is OK, won't reset existing pipe, either...
+  def buffer_size=(bytes)
+    @to_io.pipe_size = bytes if bytes
+    @buffer_size = bytes
+  end
+
+  # be sure to only call this with nil when all writers to @wr are done
+  def discard(bytes)
+    splice(@to_io, DTAS.null, bytes, 0)
+  end
+
+  def broadcast_one(targets, limit = nil)
+    # single output is always non-blocking
+    limit ||= MAX_AT_ONCE_1
+    s = splice(@to_io, targets[0], limit, F_MOVE|F_NONBLOCK)
+    if Symbol === s
+      targets # our one and only target blocked on write
+    else
+      @bytes_xfer += s
+      :wait_readable # we want to read more from @to_io soon
+    end
+  rescue Errno::EPIPE, IOError => e
+    __dst_error(targets[0], e)
+    targets.clear
+    nil # do not return error here, we already spewed an error message
+  end
+
+  def __tee_in_full(src, dst, bytes)
+    rv = 0
+    while bytes > 0
+      s = tee(src, dst, bytes)
+      bytes -= s
+      rv += s
+    end
+    rv
+  end
+
+  def __splice_in_full(src, dst, bytes, flags)
+    rv = 0
+    while bytes > 0
+      s = splice(src, dst, bytes, flags)
+      rv += s
+      bytes -= s
+    end
+    rv
+  end
+
+  # returns the largest value we teed
+  def __broadcast_tee(blocked, targets, chunk_size)
+    most_teed = 0
+    targets.delete_if do |dst|
+      begin
+        t = (dst.nonblock? || most_teed == 0) ?
+              tee(@to_io, dst, chunk_size, F_NONBLOCK) :
+              __tee_in_full(@to_io, dst, chunk_size)
+        if Integer === t
+          if t > most_teed
+            chunk_size = t if most_teed == 0
+            most_teed = t
+          end
+        else
+          blocked << dst
+        end
+        false
+      rescue IOError, Errno::EPIPE => e
+        __dst_error(dst, e)
+        true
+      end
+    end
+    most_teed
+  end
+
+  def broadcast_inf(targets, limit = nil)
+    if targets.all?(&:ready_write_optimized?)
+      blocked = []
+    elsif targets.none?(&:nonblock?)
+      # if all targets are blocking, don't start until they're all writable
+      r = IO.select(nil, targets, nil, 0) or return targets
+      blocked = targets - r[1]
+
+      # tell DTAS::UNIXServer#run_once to wait on the blocked targets
+      return blocked if blocked[0]
+
+      # all writable, yay!
+    else
+      blocked = []
+    end
+
+    # don't pin too much on one target
+    bytes = limit || MAX_AT_ONCE
+    last = targets.pop # we splice to the last one, tee to the rest
+
+    # this may return zero if all targets were non-blocking
+    most_teed = __broadcast_tee(blocked, targets, bytes)
+
+    # don't splice more than the largest amount we successfully teed
+    bytes = most_teed if most_teed > 0
+
+    begin
+      targets << last
+      if last.nonblock? || most_teed == 0
+        s = splice(@to_io, last, bytes, F_MOVE|F_NONBLOCK)
+        if Symbol === s
+          blocked << last
+
+          # we accomplished nothing!
+          # If _all_ writers are blocked, do not discard data,
+          # stay blocked on :wait_writable
+          return blocked if most_teed == 0
+
+          # the tees targets win, drop data intended for last
+          if most_teed > 0
+            discard(most_teed)
+            @bytes_xfer += most_teed
+            # do not watch for writability of last, last is non-blocking
+            return :wait_readable
+          end
+        end
+      else
+        # the blocking case is simple
+        s = __splice_in_full(@to_io, last, bytes, F_MOVE)
+      end
+      @bytes_xfer += s
+
+      # if we can't splice everything
+      # discard it so the early targets do not get repeated data
+      if s < bytes && most_teed > 0
+        discard(bytes - s)
+      end
+      :wait_readable
+    rescue IOError, Errno::EPIPE => e # last failed, drop it
+      __dst_error(last, e)
+      targets.pop # we're no longer a valid target
+
+      if most_teed == 0
+        # nothing accomplished, watch any targets
+        return blocked if blocked[0]
+      else
+        # some progress, discard the data we could not splice
+        @bytes_xfer += most_teed
+        discard(most_teed)
+      end
+
+      # stop decoding if we're completely errored out
+      # returning nil will trigger close
+      return targets[0] ? :wait_readable : nil
+    end
+  end
+end
diff --git a/lib/dtas/buffer/read_write.rb b/lib/dtas/buffer/read_write.rb
index 04856c7..e2001b6 100644
--- a/lib/dtas/buffer/read_write.rb
+++ b/lib/dtas/buffer/read_write.rb
@@ -6,8 +6,8 @@ require_relative '../../dtas'
 require_relative '../pipe'
 require_relative '../nonblock'
 
-# compatibility code for systems lacking "splice" support via the
-# "sleepy_penguin" 3.5+ RubyGem.  Used only by -player
+# compatibility code for non-Linux systems lacking "splice" support.
+# Used only by -player
 module DTAS::Buffer::ReadWrite # :nodoc:
   MAX_AT_ONCE = 512 # min PIPE_BUF value in POSIX
   attr_accessor :buffer_size
diff --git a/lib/dtas/buffer/splice.rb b/lib/dtas/buffer/splice.rb
index 2e86d0a..b234a57 100644
--- a/lib/dtas/buffer/splice.rb
+++ b/lib/dtas/buffer/splice.rb
@@ -5,6 +5,8 @@ require 'io/nonblock'
 require 'sleepy_penguin'
 require_relative '../../dtas'
 require_relative '../pipe'
+SleepyPenguin.respond_to?(:splice) or
+  raise LoadError, 'sleepy_penguin 3.5+ required for splice', []
 
 # Used by -player on Linux systems with the "sleepy_penguin" RubyGem installed
 module DTAS::Buffer::Splice # :nodoc:
diff --git a/test/test_buffer.rb b/test/test_buffer.rb
index 8f5d8b5..1773ca3 100644
--- a/test/test_buffer.rb
+++ b/test/test_buffer.rb
@@ -49,14 +49,14 @@ class TestBuffer < Testcase
     buf = new_buffer
     buf.buffer_size = @@max_size
     assert_equal @@max_size, buf.buffer_size
-  end if defined?(SleepyPenguin::F_GETPIPE_SZ)
+  end if defined?(DTAS::Pipe::F_GETPIPE_SZ)
 
   def test_buffer_size
     buf = new_buffer
     assert_operator buf.buffer_size, :>, 128
     buf.buffer_size = @@max_size
     assert_equal @@max_size, buf.buffer_size
-  end if defined?(SleepyPenguin::F_GETPIPE_SZ)
+  end if defined?(DTAS::Pipe::F_GETPIPE_SZ)
 
   def test_broadcast_1
     buf = new_buffer
@@ -108,7 +108,7 @@ class TestBuffer < Testcase
     assert_equal "HELLO", a[0].read(5)
     assert_equal "HELLO", b[0].read(5)
 
-    return unless defined?(SleepyPenguin::F_GETPIPE_SZ)
+    return unless defined?(DTAS::Pipe::F_GETPIPE_SZ)
 
     b[1].nonblock = true
     b[1].write('*' * pipe_size(b[1]))
@@ -167,7 +167,7 @@ class TestBuffer < Testcase
     buf.wr.write "HELLO"
     assert_equal tmp, buf.broadcast(tmp)
     assert_equal [a[1], b[1]], tmp
-  end if defined?(SleepyPenguin::F_GETPIPE_SZ)
+  end if defined?(DTAS::Pipe::F_GETPIPE_SZ)
 
   def test_serialize
     buf = new_buffer
@@ -206,6 +206,6 @@ class TestBuffer < Testcase
   end
 
   def pipe_size(io)
-    io.fcntl(SleepyPenguin::F_GETPIPE_SZ)
+    io.fcntl(DTAS::Pipe::F_GETPIPE_SZ)
   end
 end