From: Eric Wong <e@80x24.org>
To: dtas-all@nongnu.org
Subject: [PATCH v2 3/5] buffer: replace sleepy_penguin with fiddle
Date: Fri, 20 Dec 2019 01:39:15 +0000 [thread overview]
Message-ID: <20191220013917.17212-4-e@80x24.org> (raw)
In-Reply-To: <20191220013917.17212-1-e@80x24.org>
Fiddle exists on all Ruby 1.9.2+ installations and seems
alright. Since splice is a Linux-only API, we don't need to
worry about the values of constants changing (and they're
architecture-independent).
---
lib/dtas/buffer.rb | 15 ++-
lib/dtas/buffer/fiddle_splice.rb | 216 +++++++++++++++++++++++++++++++
lib/dtas/buffer/read_write.rb | 4 +-
lib/dtas/buffer/splice.rb | 2 +
test/test_buffer.rb | 10 +-
5 files changed, 234 insertions(+), 13 deletions(-)
create mode 100644 lib/dtas/buffer/fiddle_splice.rb
diff --git a/lib/dtas/buffer.rb b/lib/dtas/buffer.rb
index 39070d7..e0919d4 100644
--- a/lib/dtas/buffer.rb
+++ b/lib/dtas/buffer.rb
@@ -8,12 +8,15 @@
class DTAS::Buffer # :nodoc:
begin
raise LoadError, "no splice with _DTAS_POSIX" if ENV["_DTAS_POSIX"]
- require 'sleepy_penguin' # splice is only in Linux for now...
- SleepyPenguin.respond_to?(:splice) or
- raise LoadError, 'sleepy_penguin 3.5+ required for splice', []
- require_relative 'buffer/splice'
- include DTAS::Buffer::Splice
- rescue LoadError
+ # splice is only in Linux for now
+ begin
+ require_relative 'buffer/splice'
+ include DTAS::Buffer::Splice
+ rescue LoadError
+ require_relative 'buffer/fiddle_splice'
+ include DTAS::Buffer::FiddleSplice
+ end
+ rescue LoadError, StandardError
require_relative 'buffer/read_write'
include DTAS::Buffer::ReadWrite
end
diff --git a/lib/dtas/buffer/fiddle_splice.rb b/lib/dtas/buffer/fiddle_splice.rb
new file mode 100644
index 0000000..543b3e0
--- /dev/null
+++ b/lib/dtas/buffer/fiddle_splice.rb
@@ -0,0 +1,216 @@
+# Copyright (C) 2013-2019 all contributors <dtas-all@nongnu.org>
+# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
+# frozen_string_literal: true
+require 'io/nonblock'
+require 'fiddle' # require_relative caller should expect LoadError
+require_relative '../../dtas'
+require_relative '../pipe'
+
+# Used by -player on Linux systems with the "splice" syscall
+module DTAS::Buffer::FiddleSplice # :nodoc:
+ MAX_AT_ONCE = 4096 # page size in Linux
+ MAX_AT_ONCE_1 = 65536
+ F_MOVE = 1
+ F_NONBLOCK = 2
+
+ Splice = Fiddle::Function.new(DTAS.libc['splice'], [
+ Fiddle::TYPE_INT, # int fd_in,
+ Fiddle::TYPE_VOIDP, # loff_t *off_in
+ Fiddle::TYPE_INT, # int fd_out
+ Fiddle::TYPE_VOIDP, # loff_t *off_out
+ Fiddle::TYPE_SIZE_T, # size_t len
+ Fiddle::TYPE_INT, # unsigned int flags
+ ],
+ Fiddle::TYPE_SSIZE_T) # ssize_t
+
+ Tee = Fiddle::Function.new(DTAS.libc['tee'], [
+ Fiddle::TYPE_INT, # int fd_in,
+ Fiddle::TYPE_INT, # int fd_out
+ Fiddle::TYPE_SIZE_T, # size_t len
+ Fiddle::TYPE_INT, # unsigned int flags
+ ],
+ Fiddle::TYPE_SSIZE_T) # ssize_t
+
+ def _syserr(s, func)
+ raise "BUG: we should not encounter EOF on #{func}" if s == 0
+ case errno = Fiddle.last_error
+ when Errno::EAGAIN::Errno
+ return :EAGAIN
+ when Errno::EPIPE::Errno
+ raise Errno::EPIPE.exception
+ when Errno::EINTR::Errno
+ return nil
+ else
+ raise SystemCallError, "#{func} error: #{errno}"
+ end
+ end
+
+ def splice(src, dst, len, flags)
+ begin
+ s = Splice.call(src.fileno, nil, dst.fileno, nil, len, flags)
+ return s if s > 0
+ sym = _syserr(s, 'splice') and return sym
+ end while true
+ end
+
+ def tee(src, dst, len, flags = 0)
+ begin
+ s = Tee.call(src.fileno, dst.fileno, len, flags)
+ return s if s > 0
+ sym = _syserr(s, 'tee') and return sym
+ end while true
+ end
+
+ def buffer_size
+ @to_io.pipe_size
+ end
+
+ # nil is OK, won't reset existing pipe, either...
+ def buffer_size=(bytes)
+ @to_io.pipe_size = bytes if bytes
+ @buffer_size = bytes
+ end
+
+ # be sure to only call this with nil when all writers to @wr are done
+ def discard(bytes)
+ splice(@to_io, DTAS.null, bytes, 0)
+ end
+
+ def broadcast_one(targets, limit = nil)
+ # single output is always non-blocking
+ limit ||= MAX_AT_ONCE_1
+ s = splice(@to_io, targets[0], limit, F_MOVE|F_NONBLOCK)
+ if Symbol === s
+ targets # our one and only target blocked on write
+ else
+ @bytes_xfer += s
+ :wait_readable # we want to read more from @to_io soon
+ end
+ rescue Errno::EPIPE, IOError => e
+ __dst_error(targets[0], e)
+ targets.clear
+ nil # do not return error here, we already spewed an error message
+ end
+
+ def __tee_in_full(src, dst, bytes)
+ rv = 0
+ while bytes > 0
+ s = tee(src, dst, bytes)
+ bytes -= s
+ rv += s
+ end
+ rv
+ end
+
+ def __splice_in_full(src, dst, bytes, flags)
+ rv = 0
+ while bytes > 0
+ s = splice(src, dst, bytes, flags)
+ rv += s
+ bytes -= s
+ end
+ rv
+ end
+
+ # returns the largest value we teed
+ def __broadcast_tee(blocked, targets, chunk_size)
+ most_teed = 0
+ targets.delete_if do |dst|
+ begin
+ t = (dst.nonblock? || most_teed == 0) ?
+ tee(@to_io, dst, chunk_size, F_NONBLOCK) :
+ __tee_in_full(@to_io, dst, chunk_size)
+ if Integer === t
+ if t > most_teed
+ chunk_size = t if most_teed == 0
+ most_teed = t
+ end
+ else
+ blocked << dst
+ end
+ false
+ rescue IOError, Errno::EPIPE => e
+ __dst_error(dst, e)
+ true
+ end
+ end
+ most_teed
+ end
+
+ def broadcast_inf(targets, limit = nil)
+ if targets.all?(&:ready_write_optimized?)
+ blocked = []
+ elsif targets.none?(&:nonblock?)
+ # if all targets are blocking, don't start until they're all writable
+ r = IO.select(nil, targets, nil, 0) or return targets
+ blocked = targets - r[1]
+
+ # tell DTAS::UNIXServer#run_once to wait on the blocked targets
+ return blocked if blocked[0]
+
+ # all writable, yay!
+ else
+ blocked = []
+ end
+
+ # don't pin too much on one target
+ bytes = limit || MAX_AT_ONCE
+ last = targets.pop # we splice to the last one, tee to the rest
+
+ # this may return zero if all targets were non-blocking
+ most_teed = __broadcast_tee(blocked, targets, bytes)
+
+ # don't splice more than the largest amount we successfully teed
+ bytes = most_teed if most_teed > 0
+
+ begin
+ targets << last
+ if last.nonblock? || most_teed == 0
+ s = splice(@to_io, last, bytes, F_MOVE|F_NONBLOCK)
+ if Symbol === s
+ blocked << last
+
+ # we accomplished nothing!
+ # If _all_ writers are blocked, do not discard data,
+ # stay blocked on :wait_writable
+ return blocked if most_teed == 0
+
+ # the tees targets win, drop data intended for last
+ if most_teed > 0
+ discard(most_teed)
+ @bytes_xfer += most_teed
+ # do not watch for writability of last, last is non-blocking
+ return :wait_readable
+ end
+ end
+ else
+ # the blocking case is simple
+ s = __splice_in_full(@to_io, last, bytes, F_MOVE)
+ end
+ @bytes_xfer += s
+
+ # if we can't splice everything
+ # discard it so the early targets do not get repeated data
+ if s < bytes && most_teed > 0
+ discard(bytes - s)
+ end
+ :wait_readable
+ rescue IOError, Errno::EPIPE => e # last failed, drop it
+ __dst_error(last, e)
+ targets.pop # we're no longer a valid target
+
+ if most_teed == 0
+ # nothing accomplished, watch any targets
+ return blocked if blocked[0]
+ else
+ # some progress, discard the data we could not splice
+ @bytes_xfer += most_teed
+ discard(most_teed)
+ end
+
+ # stop decoding if we're completely errored out
+ # returning nil will trigger close
+ return targets[0] ? :wait_readable : nil
+ end
+ end
+end
diff --git a/lib/dtas/buffer/read_write.rb b/lib/dtas/buffer/read_write.rb
index 04856c7..e2001b6 100644
--- a/lib/dtas/buffer/read_write.rb
+++ b/lib/dtas/buffer/read_write.rb
@@ -6,8 +6,8 @@
require_relative '../pipe'
require_relative '../nonblock'
-# compatibility code for systems lacking "splice" support via the
-# "sleepy_penguin" 3.5+ RubyGem. Used only by -player
+# compatibility code for non-Linux systems lacking "splice" support.
+# Used only by -player
module DTAS::Buffer::ReadWrite # :nodoc:
MAX_AT_ONCE = 512 # min PIPE_BUF value in POSIX
attr_accessor :buffer_size
diff --git a/lib/dtas/buffer/splice.rb b/lib/dtas/buffer/splice.rb
index 2e86d0a..b234a57 100644
--- a/lib/dtas/buffer/splice.rb
+++ b/lib/dtas/buffer/splice.rb
@@ -5,6 +5,8 @@
require 'sleepy_penguin'
require_relative '../../dtas'
require_relative '../pipe'
+SleepyPenguin.respond_to?(:splice) or
+ raise LoadError, 'sleepy_penguin 3.5+ required for splice', []
# Used by -player on Linux systems with the "sleepy_penguin" RubyGem installed
module DTAS::Buffer::Splice # :nodoc:
diff --git a/test/test_buffer.rb b/test/test_buffer.rb
index 8f5d8b5..1773ca3 100644
--- a/test/test_buffer.rb
+++ b/test/test_buffer.rb
@@ -49,14 +49,14 @@ def test_set_buffer_size
buf = new_buffer
buf.buffer_size = @@max_size
assert_equal @@max_size, buf.buffer_size
- end if defined?(SleepyPenguin::F_GETPIPE_SZ)
+ end if defined?(DTAS::Pipe::F_GETPIPE_SZ)
def test_buffer_size
buf = new_buffer
assert_operator buf.buffer_size, :>, 128
buf.buffer_size = @@max_size
assert_equal @@max_size, buf.buffer_size
- end if defined?(SleepyPenguin::F_GETPIPE_SZ)
+ end if defined?(DTAS::Pipe::F_GETPIPE_SZ)
def test_broadcast_1
buf = new_buffer
@@ -108,7 +108,7 @@ def test_broadcast
assert_equal "HELLO", a[0].read(5)
assert_equal "HELLO", b[0].read(5)
- return unless defined?(SleepyPenguin::F_GETPIPE_SZ)
+ return unless defined?(DTAS::Pipe::F_GETPIPE_SZ)
b[1].nonblock = true
b[1].write('*' * pipe_size(b[1]))
@@ -167,7 +167,7 @@ def test_broadcast_all_full
buf.wr.write "HELLO"
assert_equal tmp, buf.broadcast(tmp)
assert_equal [a[1], b[1]], tmp
- end if defined?(SleepyPenguin::F_GETPIPE_SZ)
+ end if defined?(DTAS::Pipe::F_GETPIPE_SZ)
def test_serialize
buf = new_buffer
@@ -206,6 +206,6 @@ def test_load_size
end
def pipe_size(io)
- io.fcntl(SleepyPenguin::F_GETPIPE_SZ)
+ io.fcntl(DTAS::Pipe::F_GETPIPE_SZ)
end
end
next prev parent reply other threads:[~2019-12-20 1:40 UTC|newest]
Thread overview: 6+ messages / expand[flat|nested] mbox.gz Atom feed top
2019-12-20 1:39 [PATCH v2 0/5] support fiddle for Linux-only syscalls Eric Wong
2019-12-20 1:39 ` [PATCH v2 1/5] pipe: avoid loading sleepy_penguin Eric Wong
2019-12-20 1:39 ` [PATCH v2 2/5] provide fiddle-based eventfd implementation Eric Wong
2019-12-20 1:39 ` Eric Wong [this message]
2019-12-20 1:39 ` [PATCH v2 4/5] watchable: use fiddle for inotify support Eric Wong
2019-12-20 1:39 ` [PATCH v2 5/5] doc: remove most recommendations for sleepy_penguin Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://80x24.org/dtas/README
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20191220013917.17212-4-e@80x24.org \
--to=e@80x24.org \
--cc=dtas-all@nongnu.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://80x24.org/dtas.git/
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).