about summary refs log tree commit
path: root/lib/dtas/buffer.rb
blob: c3d8ee2ba2c4989be9532372ccefd579e4720031 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# Copyright (C) 2013-2016 all contributors <dtas-all@nongnu.org>
# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
# frozen_string_literal: true
require 'io/wait'
require_relative '../dtas'

# pipe buffer management for -player
class DTAS::Buffer # :nodoc:
  begin
    raise LoadError, "no splice with _DTAS_POSIX" if ENV["_DTAS_POSIX"]
    require 'sleepy_penguin' # splice is only in Linux for now...
    SleepyPenguin.respond_to?(:splice) or
      raise LoadError, 'sleepy_penguin 3.5+ required for splice', []
    require_relative 'buffer/splice'
    include DTAS::Buffer::Splice
  rescue LoadError
    require_relative 'buffer/read_write'
    include DTAS::Buffer::ReadWrite
  end

  attr_reader :to_io # call nread on this
  attr_reader :wr # processes (sources) should redirect to this
  attr_accessor :bytes_xfer

  def initialize
    @bytes_xfer = 0
    @buffer_size = nil
    @to_io, @wr = DTAS::Pipe.new
  end

  def self.load(hash)
    buf = new
    if hash
      bs = hash["buffer_size"] and buf.buffer_size = bs
    end
    buf
  end

  def to_hsh
    @buffer_size ? { "buffer_size" => @buffer_size } : {}
  end

  def __dst_error(dst, e)
    warn "dropping #{dst.inspect} due to error: #{e.message} (#{e.class})"
    dst.close unless dst.closed?
  end

  # This will modify targets
  # returns one of:
  # - :wait_readable
  # - subset of targets array for :wait_writable
  # - some type of StandardError
  # - nil
  def broadcast(targets, limit = nil)
    case targets.size
    when 0
      :ignore # this will pause decoders
    when 1
      broadcast_one(targets, limit)
    else # infinity
      broadcast_inf(targets, limit)
    end
  end

  def readable_iter
    # this calls DTAS::Buffer#broadcast from DTAS::Player
    yield(self, nil)
  end

  def inflight
    @to_io.nread
  end

  # don't really close the pipes under normal circumstances, just clear data
  def close
    bytes = inflight
    discard(bytes) if bytes > 0
  end

  def buf_reset
    close!
    @bytes_xfer = 0
    @to_io, @wr = DTAS::Pipe.new
    @wr.pipe_size = @buffer_size if @buffer_size
  end

  def close!
    @to_io.close
    @wr.close
  end
end