summary refs log tree commit
path: root/lib/dtas/buffer/splice.rb
blob: cd00bbb34c16c04dd46dc0761a9840ea41160441 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# Copyright (C) 2013-2016 all contributors <dtas-all@nongnu.org>
# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
# frozen_string_literal: true
require 'io/nonblock'
require 'sleepy_penguin'
require_relative '../../dtas'
require_relative '../pipe'

# Used by -player on Linux systems with the "sleepy_penguin" RubyGem installed
module DTAS::Buffer::Splice # :nodoc:
  MAX_AT_ONCE = 4096 # page size in Linux
  MAX_AT_ONCE_1 = 65536
  F_MOVE = SleepyPenguin::F_MOVE
  F_NONBLOCK = SleepyPenguin::F_NONBLOCK
  TRY = { exception: false }.freeze

  def buffer_size
    @to_io.pipe_size
  end

  # nil is OK, won't reset existing pipe, either...
  def buffer_size=(bytes)
    @to_io.pipe_size = bytes if bytes
    @buffer_size = bytes
  end

  # be sure to only call this with nil when all writers to @wr are done
  def discard(bytes)
    SleepyPenguin.splice(@to_io, DTAS.null, bytes)
  end

  def broadcast_one(targets, limit = nil)
    # single output is always non-blocking
    limit ||= MAX_AT_ONCE_1
    s = SleepyPenguin.splice(@to_io, targets[0], limit, F_MOVE, TRY)
    if Symbol === s
      targets # our one and only target blocked on write
    else
      @bytes_xfer += s
      :wait_readable # we want to read more from @to_io soon
    end
  rescue Errno::EPIPE, IOError => e
    __dst_error(targets[0], e)
    targets.clear
    nil # do not return error here, we already spewed an error message
  end

  def __tee_in_full(src, dst, bytes)
    rv = 0
    while bytes > 0
      s = SleepyPenguin.tee(src, dst, bytes)
      bytes -= s
      rv += s
    end
    rv
  end

  def __splice_in_full(src, dst, bytes, flags)
    rv = 0
    while bytes > 0
      s = SleepyPenguin.splice(src, dst, bytes, flags)
      rv += s
      bytes -= s
    end
    rv
  end

  # returns the largest value we teed
  def __broadcast_tee(blocked, targets, chunk_size)
    most_teed = 0
    targets.delete_if do |dst|
      begin
        t = (dst.nonblock? || most_teed == 0) ?
            SleepyPenguin.tee(@to_io, dst, chunk_size, F_NONBLOCK, TRY) :
            __tee_in_full(@to_io, dst, chunk_size)
        if Integer === t
          if t > most_teed
            chunk_size = t if most_teed == 0
            most_teed = t
          end
        else
          blocked << dst
        end
        false
      rescue IOError, Errno::EPIPE => e
        __dst_error(dst, e)
        true
      end
    end
    most_teed
  end

  def broadcast_inf(targets, limit = nil)
    if targets.all?(&:ready_write_optimized?)
      blocked = []
    elsif targets.none?(&:nonblock?)
      # if all targets are blocking, don't start until they're all writable
      r = IO.select(nil, targets, nil, 0) or return targets
      blocked = targets - r[1]

      # tell DTAS::UNIXServer#run_once to wait on the blocked targets
      return blocked if blocked[0]

      # all writable, yay!
    else
      blocked = []
    end

    # don't pin too much on one target
    bytes = limit || MAX_AT_ONCE
    last = targets.pop # we splice to the last one, tee to the rest

    # this may return zero if all targets were non-blocking
    most_teed = __broadcast_tee(blocked, targets, bytes)

    # don't splice more than the largest amount we successfully teed
    bytes = most_teed if most_teed > 0

    begin
      targets << last
      if last.nonblock? || most_teed == 0
        s = SleepyPenguin.splice(@to_io, last, bytes, F_MOVE|F_NONBLOCK, TRY)
        if Symbol === s
          blocked << last

          # we accomplished nothing!
          # If _all_ writers are blocked, do not discard data,
          # stay blocked on :wait_writable
          return blocked if most_teed == 0

          # the tees targets win, drop data intended for last
          if most_teed > 0
            discard(most_teed)
            @bytes_xfer += most_teed
            # do not watch for writability of last, last is non-blocking
            return :wait_readable
          end
        end
      else
        # the blocking case is simple
        s = __splice_in_full(@to_io, last, bytes, F_MOVE)
      end
      @bytes_xfer += s

      # if we can't splice everything
      # discard it so the early targets do not get repeated data
      if s < bytes && most_teed > 0
        discard(bytes - s)
      end
      :wait_readable
    rescue IOError, Errno::EPIPE => e # last failed, drop it
      __dst_error(last, e)
      targets.pop # we're no longer a valid target

      if most_teed == 0
        # nothing accomplished, watch any targets
        return blocked if blocked[0]
      else
        # some progress, discard the data we could not splice
        @bytes_xfer += most_teed
        discard(most_teed)
      end

      # stop decoding if we're completely errored out
      # returning nil will trigger close
      return targets[0] ? :wait_readable : nil
    end
  end
end