From 3e09ac0c10c95bb24a08af62393b4f761e2743d0 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 24 Aug 2013 09:54:45 +0000 Subject: initial commit --- lib/dtas/buffer/splice.rb | 143 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 lib/dtas/buffer/splice.rb (limited to 'lib/dtas/buffer/splice.rb') diff --git a/lib/dtas/buffer/splice.rb b/lib/dtas/buffer/splice.rb new file mode 100644 index 0000000..c2540bd --- /dev/null +++ b/lib/dtas/buffer/splice.rb @@ -0,0 +1,143 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require 'io/wait' +require 'io/nonblock' +require 'io/splice' +require_relative '../../dtas' +require_relative '../pipe' + +module DTAS::Buffer::Splice + MAX_AT_ONCE = 4096 # page size in Linux + MAX_SIZE = File.read("/proc/sys/fs/pipe-max-size").to_i + DEVNULL = File.open("/dev/null", "r+") + F_MOVE = IO::Splice::F_MOVE + WAITALL = IO::Splice::WAITALL + + def buffer_size + @to_io.pipe_size + end + + # nil is OK, won't reset existing pipe, either... + def buffer_size=(bytes) + @to_io.pipe_size = bytes if bytes + @buffer_size = bytes + end + + # be sure to only call this with nil when all writers to @wr are done + def discard(bytes) + IO.splice(@to_io, nil, DEVNULL, nil, bytes) + end + + def broadcast_one(targets, bytes) + # single output is always non-blocking + s = IO.trysplice(@to_io, nil, targets[0], nil, bytes, F_MOVE) + if Symbol === s + targets # our one and only target blocked on write + else + @bytes_xfer += s + :wait_readable # we want to read more from @to_io soon + end + rescue Errno::EPIPE, IOError => e + __dst_error(targets[0], e) + targets.clear + nil # do not return error here, we already spewed an error message + end + + # returns the largest value we teed + def __broadcast_tee(blocked, targets, chunk_size) + most_teed = 0 + targets.delete_if do |dst| + begin + t = dst.nonblock? ? + IO.trytee(@to_io, dst, chunk_size) : + IO.tee(@to_io, dst, chunk_size, WAITALL) + if Integer === t + most_teed = t if t > most_teed + else + blocked << dst + end + false + rescue IOError, Errno::EPIPE => e + __dst_error(dst, e) + true + end + end + most_teed + end + + def broadcast_inf(targets, bytes) + if targets.none? { |sink| sink.nonblock? } + # if all targets are blocking, don't start until they're all writable + r = IO.select(nil, targets, nil, 0) or return targets + blocked = targets - r[1] + + # tell DTAS::UNIXServer#run_once to wait on the blocked targets + return blocked if blocked[0] + + # all writable, yay! + else + blocked = [] + end + + # don't pin too much on one target + bytes = bytes > MAX_AT_ONCE ? MAX_AT_ONCE : bytes + + last = targets.pop # we splice to the last one, tee to the rest + most_teed = __broadcast_tee(blocked, targets, bytes) + + # don't splice more than the largest amount we successfully teed + bytes = most_teed if most_teed > 0 + + begin + targets << last + if last.nonblock? + s = IO.trysplice(@to_io, nil, last, nil, bytes, F_MOVE) + if Symbol === s + blocked << last + + # we accomplished nothing! + # If _all_ writers are blocked, do not discard data, + # stay blocked on :wait_writable + return blocked if most_teed == 0 + + # the tees targets win, drop data intended for last + if most_teed > 0 + discard(most_teed) + @bytes_xfer += most_teed + # do not watch for writability of last, last is non-blocking + return :wait_readable + end + end + else + # the blocking case is simple + s = IO.splice(@to_io, nil, last, nil, bytes, WAITALL|F_MOVE) + end + @bytes_xfer += s + + # if we can't splice everything + # discard it so the early targets do not get repeated data + if s < bytes && most_teed > 0 + discard(bytes - s) + end + :wait_readable + rescue IOError, Errno::EPIPE => e # last failed, drop it + __dst_error(last, e) + targets.pop # we're no longer a valid target + + if most_teed == 0 + # nothing accomplished, watch any targets + return blocked if blocked[0] + else + # some progress, discard the data we could not splice + @bytes_xfer += most_teed + discard(most_teed) + end + + # stop decoding if we're completely errored out + # returning nil will trigger close + return targets[0] ? :wait_readable : nil + end + end +end -- cgit v1.2.3-24-ge0c7