dtas.git  about / heads / tags
duct tape audio suite for *nix
blob cc52f0dcfe67e381dd001d77bcce06fddb985bb8 4173 bytes (raw)
$ git show v0.0.0:lib/dtas/buffer/splice.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
 
# -*- encoding: binary -*-
# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
require 'io/wait'
require 'io/nonblock'
require 'io/splice'
require_relative '../../dtas'
require_relative '../pipe'

module DTAS::Buffer::Splice # :nodoc:
  MAX_AT_ONCE = 4096 # page size in Linux
  MAX_SIZE = File.read("/proc/sys/fs/pipe-max-size").to_i
  DEVNULL = File.open("/dev/null", "r+")
  F_MOVE = IO::Splice::F_MOVE
  WAITALL = IO::Splice::WAITALL

  def buffer_size
    @to_io.pipe_size
  end

  # nil is OK, won't reset existing pipe, either...
  def buffer_size=(bytes)
    @to_io.pipe_size = bytes if bytes
    @buffer_size = bytes
  end

  # be sure to only call this with nil when all writers to @wr are done
  def discard(bytes)
    IO.splice(@to_io, nil, DEVNULL, nil, bytes)
  end

  def broadcast_one(targets, bytes)
    # single output is always non-blocking
    s = IO.trysplice(@to_io, nil, targets[0], nil, bytes, F_MOVE)
    if Symbol === s
      targets # our one and only target blocked on write
    else
      @bytes_xfer += s
      :wait_readable # we want to read more from @to_io soon
    end
  rescue Errno::EPIPE, IOError => e
    __dst_error(targets[0], e)
    targets.clear
    nil # do not return error here, we already spewed an error message
  end

  # returns the largest value we teed
  def __broadcast_tee(blocked, targets, chunk_size)
    most_teed = 0
    targets.delete_if do |dst|
      begin
        t = dst.nonblock? ?
            IO.trytee(@to_io, dst, chunk_size) :
            IO.tee(@to_io, dst, chunk_size, WAITALL)
        if Integer === t
          most_teed = t if t > most_teed
        else
          blocked << dst
        end
        false
      rescue IOError, Errno::EPIPE => e
        __dst_error(dst, e)
        true
      end
    end
    most_teed
  end

  def broadcast_inf(targets, bytes)
    if targets.none? { |sink| sink.nonblock? }
      # if all targets are blocking, don't start until they're all writable
      r = IO.select(nil, targets, nil, 0) or return targets
      blocked = targets - r[1]

      # tell DTAS::UNIXServer#run_once to wait on the blocked targets
      return blocked if blocked[0]

      # all writable, yay!
    else
      blocked = []
    end

    # don't pin too much on one target
    bytes = bytes > MAX_AT_ONCE ? MAX_AT_ONCE : bytes

    last = targets.pop # we splice to the last one, tee to the rest
    most_teed = __broadcast_tee(blocked, targets, bytes)

    # don't splice more than the largest amount we successfully teed
    bytes = most_teed if most_teed > 0

    begin
      targets << last
      if last.nonblock?
        s = IO.trysplice(@to_io, nil, last, nil, bytes, F_MOVE)
        if Symbol === s
          blocked << last

          # we accomplished nothing!
          # If _all_ writers are blocked, do not discard data,
          # stay blocked on :wait_writable
          return blocked if most_teed == 0

          # the tees targets win, drop data intended for last
          if most_teed > 0
            discard(most_teed)
            @bytes_xfer += most_teed
            # do not watch for writability of last, last is non-blocking
            return :wait_readable
          end
        end
      else
        # the blocking case is simple
        s = IO.splice(@to_io, nil, last, nil, bytes, WAITALL|F_MOVE)
      end
      @bytes_xfer += s

      # if we can't splice everything
      # discard it so the early targets do not get repeated data
      if s < bytes && most_teed > 0
        discard(bytes - s)
      end
      :wait_readable
    rescue IOError, Errno::EPIPE => e # last failed, drop it
      __dst_error(last, e)
      targets.pop # we're no longer a valid target

      if most_teed == 0
        # nothing accomplished, watch any targets
        return blocked if blocked[0]
      else
        # some progress, discard the data we could not splice
        @bytes_xfer += most_teed
        discard(most_teed)
      end

      # stop decoding if we're completely errored out
      # returning nil will trigger close
      return targets[0] ? :wait_readable : nil
    end
  end
end

git clone git://80x24.org/dtas.git
git clone https://80x24.org/dtas.git