about summary refs log tree commit homepage
path: root/lib/dtas/buffer.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-08-24 09:54:45 +0000
committerEric Wong <normalperson@yhbt.net>2013-08-24 09:54:45 +0000
commit3e09ac0c10c95bb24a08af62393b4f761e2743d0 (patch)
tree778dffa2ba8798503fc047db0feef6d65426ea22 /lib/dtas/buffer.rb
downloaddtas-3e09ac0c10c95bb24a08af62393b4f761e2743d0.tar.gz
Diffstat (limited to 'lib/dtas/buffer.rb')
-rw-r--r--lib/dtas/buffer.rb91
1 files changed, 91 insertions, 0 deletions
diff --git a/lib/dtas/buffer.rb b/lib/dtas/buffer.rb
new file mode 100644
index 0000000..d02e8a6
--- /dev/null
+++ b/lib/dtas/buffer.rb
@@ -0,0 +1,91 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require_relative '../dtas'
+
+class DTAS::Buffer
+  begin
+    raise LoadError, "no splice with _DTAS_POSIX" if ENV["_DTAS_POSIX"]
+    require 'io/splice' # splice is only in Linux for now...
+    require_relative 'buffer/splice'
+    include DTAS::Buffer::Splice
+  rescue LoadError
+    require_relative 'buffer/read_write'
+    include DTAS::Buffer::ReadWrite
+  end
+
+  attr_reader :to_io # call nread on this
+  attr_reader :wr # processes (sources) should redirect to this
+  attr_accessor :bytes_xfer
+
+  def initialize
+    @bytes_xfer = 0
+    @buffer_size = nil
+    @to_io, @wr = DTAS::Pipe.new
+  end
+
+  def self.load(hash)
+    buf = new
+    if hash
+      bs = hash["buffer_size"] and buf.buffer_size = bs
+    end
+    buf
+  end
+
+  def to_hsh
+    @buffer_size ? { "buffer_size" => @buffer_size } : {}
+  end
+
+  def __dst_error(dst, e)
+    warn "dropping #{dst.inspect} due to error: #{e.message} (#{e.class})"
+    dst.close unless dst.closed?
+  end
+
+  # This will modify targets
+  # returns one of:
+  # - :wait_readable
+  # - subset of targets array for :wait_writable
+  # - some type of StandardError
+  # - nil
+  def broadcast(targets)
+    bytes = inflight
+    return :wait_readable if 0 == bytes # spurious wakeup
+
+    case targets.size
+    when 0
+      :ignore # this will pause decoders
+    when 1
+      broadcast_one(targets, bytes)
+    else # infinity
+      broadcast_inf(targets, bytes)
+    end
+  end
+
+  def readable_iter
+    # this calls DTAS::Buffer#broadcast from DTAS::Player
+    yield(self, nil)
+  end
+
+  def inflight
+    @to_io.nread
+  end
+
+  # don't really close the pipes under normal circumstances, just clear data
+  def close
+    bytes = inflight
+    discard(bytes) if bytes > 0
+  end
+
+  def buf_reset
+    close!
+    @bytes_xfer = 0
+    @to_io, @wr = DTAS::Pipe.new
+    @wr.pipe_size = @buffer_size if @buffer_size
+  end
+
+  def close!
+    @to_io.close
+    @wr.close
+  end
+end