From 3e09ac0c10c95bb24a08af62393b4f761e2743d0 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 24 Aug 2013 09:54:45 +0000 Subject: initial commit --- lib/dtas/buffer.rb | 91 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 lib/dtas/buffer.rb (limited to 'lib/dtas/buffer.rb') diff --git a/lib/dtas/buffer.rb b/lib/dtas/buffer.rb new file mode 100644 index 0000000..d02e8a6 --- /dev/null +++ b/lib/dtas/buffer.rb @@ -0,0 +1,91 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require_relative '../dtas' + +class DTAS::Buffer + begin + raise LoadError, "no splice with _DTAS_POSIX" if ENV["_DTAS_POSIX"] + require 'io/splice' # splice is only in Linux for now... + require_relative 'buffer/splice' + include DTAS::Buffer::Splice + rescue LoadError + require_relative 'buffer/read_write' + include DTAS::Buffer::ReadWrite + end + + attr_reader :to_io # call nread on this + attr_reader :wr # processes (sources) should redirect to this + attr_accessor :bytes_xfer + + def initialize + @bytes_xfer = 0 + @buffer_size = nil + @to_io, @wr = DTAS::Pipe.new + end + + def self.load(hash) + buf = new + if hash + bs = hash["buffer_size"] and buf.buffer_size = bs + end + buf + end + + def to_hsh + @buffer_size ? { "buffer_size" => @buffer_size } : {} + end + + def __dst_error(dst, e) + warn "dropping #{dst.inspect} due to error: #{e.message} (#{e.class})" + dst.close unless dst.closed? + end + + # This will modify targets + # returns one of: + # - :wait_readable + # - subset of targets array for :wait_writable + # - some type of StandardError + # - nil + def broadcast(targets) + bytes = inflight + return :wait_readable if 0 == bytes # spurious wakeup + + case targets.size + when 0 + :ignore # this will pause decoders + when 1 + broadcast_one(targets, bytes) + else # infinity + broadcast_inf(targets, bytes) + end + end + + def readable_iter + # this calls DTAS::Buffer#broadcast from DTAS::Player + yield(self, nil) + end + + def inflight + @to_io.nread + end + + # don't really close the pipes under normal circumstances, just clear data + def close + bytes = inflight + discard(bytes) if bytes > 0 + end + + def buf_reset + close! + @bytes_xfer = 0 + @to_io, @wr = DTAS::Pipe.new + @wr.pipe_size = @buffer_size if @buffer_size + end + + def close! + @to_io.close + @wr.close + end +end -- cgit v1.2.3-24-ge0c7