dtas.git  about / heads / tags
duct tape audio suite for *nix
blob ad007eb23f83cff55d8f4e02a6fc83eddfcde1bb 6082 bytes (raw)
$ git show v0.18.0:lib/dtas/buffer/fiddle_splice.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
 
# Copyright (C) 2013-2020 all contributors <dtas-all@nongnu.org>
# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
# frozen_string_literal: true
require 'io/nonblock'
require 'fiddle' # require_relative caller should expect LoadError
require_relative '../../dtas'
require_relative '../pipe'

# Used by -player on Linux systems with the "splice" syscall
module DTAS::Buffer::FiddleSplice # :nodoc:
  MAX_AT_ONCE = 4096 # page size in Linux
  MAX_AT_ONCE_1 = 65536
  F_MOVE = 1
  F_NONBLOCK = 2

  Splice = Fiddle::Function.new(DTAS.libc['splice'], [
      Fiddle::TYPE_INT, # int fd_in,
      Fiddle::TYPE_VOIDP, # loff_t *off_in
      Fiddle::TYPE_INT, # int fd_out
      Fiddle::TYPE_VOIDP, # loff_t *off_out
      Fiddle::TYPE_SIZE_T, # size_t len
      Fiddle::TYPE_INT, # unsigned int flags
    ],
    Fiddle::TYPE_SSIZE_T) # ssize_t

  Tee = Fiddle::Function.new(DTAS.libc['tee'], [
      Fiddle::TYPE_INT, # int fd_in,
      Fiddle::TYPE_INT, # int fd_out
      Fiddle::TYPE_SIZE_T, # size_t len
      Fiddle::TYPE_INT, # unsigned int flags
    ],
    Fiddle::TYPE_SSIZE_T) # ssize_t

  def _syserr(s, func)
    raise "BUG: we should not encounter EOF on #{func}" if s == 0
    case errno = Fiddle.last_error
    when Errno::EAGAIN::Errno
      return :EAGAIN
    when Errno::EPIPE::Errno
      raise Errno::EPIPE.exception
    when Errno::EINTR::Errno
      return nil
    else
      raise SystemCallError, "#{func} error: #{errno}"
    end
  end

  def splice(src, dst, len, flags)
    begin
      s = Splice.call(src.fileno, nil, dst.fileno, nil, len, flags)
      return s if s > 0
      sym = _syserr(s, 'splice') and return sym
    end while true
  end

  def tee(src, dst, len, flags = 0)
    begin
      s = Tee.call(src.fileno, dst.fileno, len, flags)
      return s if s > 0
      sym = _syserr(s, 'tee') and return sym
    end while true
  end

  def buffer_size
    @to_io.pipe_size
  end

  # nil is OK, won't reset existing pipe, either...
  def buffer_size=(bytes)
    @to_io.pipe_size = bytes if bytes
    @buffer_size = bytes
  end

  # be sure to only call this with nil when all writers to @wr are done
  def discard(bytes)
    splice(@to_io, DTAS.null, bytes, 0)
  end

  def broadcast_one(targets, limit = nil)
    # single output is always non-blocking
    limit ||= MAX_AT_ONCE_1
    s = splice(@to_io, targets[0], limit, F_MOVE|F_NONBLOCK)
    if Symbol === s
      targets # our one and only target blocked on write
    else
      @bytes_xfer += s
      :wait_readable # we want to read more from @to_io soon
    end
  rescue Errno::EPIPE, IOError => e
    __dst_error(targets[0], e)
    targets.clear
    nil # do not return error here, we already spewed an error message
  end

  def __tee_in_full(src, dst, bytes)
    rv = 0
    while bytes > 0
      s = tee(src, dst, bytes)
      bytes -= s
      rv += s
    end
    rv
  end

  def __splice_in_full(src, dst, bytes, flags)
    rv = 0
    while bytes > 0
      s = splice(src, dst, bytes, flags)
      rv += s
      bytes -= s
    end
    rv
  end

  # returns the largest value we teed
  def __broadcast_tee(blocked, targets, chunk_size)
    most_teed = 0
    targets.delete_if do |dst|
      begin
        t = (dst.nonblock? || most_teed == 0) ?
              tee(@to_io, dst, chunk_size, F_NONBLOCK) :
              __tee_in_full(@to_io, dst, chunk_size)
        if Integer === t
          if t > most_teed
            chunk_size = t if most_teed == 0
            most_teed = t
          end
        else
          blocked << dst
        end
        false
      rescue IOError, Errno::EPIPE => e
        __dst_error(dst, e)
        true
      end
    end
    most_teed
  end

  def broadcast_inf(targets, limit = nil)
    if targets.all?(&:ready_write_optimized?)
      blocked = []
    elsif targets.none?(&:nonblock?)
      # if all targets are blocking, don't start until they're all writable
      r = IO.select(nil, targets, nil, 0) or return targets
      blocked = targets - r[1]

      # tell DTAS::UNIXServer#run_once to wait on the blocked targets
      return blocked if blocked[0]

      # all writable, yay!
    else
      blocked = []
    end

    # don't pin too much on one target
    bytes = limit || MAX_AT_ONCE
    last = targets.pop # we splice to the last one, tee to the rest

    # this may return zero if all targets were non-blocking
    most_teed = __broadcast_tee(blocked, targets, bytes)

    # don't splice more than the largest amount we successfully teed
    bytes = most_teed if most_teed > 0

    begin
      targets << last
      if last.nonblock? || most_teed == 0
        s = splice(@to_io, last, bytes, F_MOVE|F_NONBLOCK)
        if Symbol === s
          blocked << last

          # we accomplished nothing!
          # If _all_ writers are blocked, do not discard data,
          # stay blocked on :wait_writable
          return blocked if most_teed == 0

          # the tees targets win, drop data intended for last
          if most_teed > 0
            discard(most_teed)
            @bytes_xfer += most_teed
            # do not watch for writability of last, last is non-blocking
            return :wait_readable
          end
        end
      else
        # the blocking case is simple
        s = __splice_in_full(@to_io, last, bytes, F_MOVE)
      end
      @bytes_xfer += s

      # if we can't splice everything
      # discard it so the early targets do not get repeated data
      if s < bytes && most_teed > 0
        discard(bytes - s)
      end
      :wait_readable
    rescue IOError, Errno::EPIPE => e # last failed, drop it
      __dst_error(last, e)
      targets.pop # we're no longer a valid target

      if most_teed == 0
        # nothing accomplished, watch any targets
        return blocked if blocked[0]
      else
        # some progress, discard the data we could not splice
        @bytes_xfer += most_teed
        discard(most_teed)
      end

      # stop decoding if we're completely errored out
      # returning nil will trigger close
      return targets[0] ? :wait_readable : nil
    end
  end
end

git clone git://80x24.org/dtas.git
git clone https://80x24.org/dtas.git