1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
require 'io/wait'
require 'io/nonblock'
require 'io/splice'
require_relative '../../dtas'
require_relative '../pipe'
module DTAS::Buffer::Splice # :nodoc:
MAX_AT_ONCE = 4096 # page size in Linux
MAX_SIZE = File.read("/proc/sys/fs/pipe-max-size").to_i
DEVNULL = File.open("/dev/null", "r+")
F_MOVE = IO::Splice::F_MOVE
WAITALL = IO::Splice::WAITALL
def buffer_size
@to_io.pipe_size
end
# nil is OK, won't reset existing pipe, either...
def buffer_size=(bytes)
@to_io.pipe_size = bytes if bytes
@buffer_size = bytes
end
# be sure to only call this with nil when all writers to @wr are done
def discard(bytes)
IO.splice(@to_io, nil, DEVNULL, nil, bytes)
end
def broadcast_one(targets, bytes)
# single output is always non-blocking
s = IO.trysplice(@to_io, nil, targets[0], nil, bytes, F_MOVE)
if Symbol === s
targets # our one and only target blocked on write
else
@bytes_xfer += s
:wait_readable # we want to read more from @to_io soon
end
rescue Errno::EPIPE, IOError => e
__dst_error(targets[0], e)
targets.clear
nil # do not return error here, we already spewed an error message
end
# returns the largest value we teed
def __broadcast_tee(blocked, targets, chunk_size)
most_teed = 0
targets.delete_if do |dst|
begin
t = dst.nonblock? ?
IO.trytee(@to_io, dst, chunk_size) :
IO.tee(@to_io, dst, chunk_size, WAITALL)
if Integer === t
most_teed = t if t > most_teed
else
blocked << dst
end
false
rescue IOError, Errno::EPIPE => e
__dst_error(dst, e)
true
end
end
most_teed
end
def broadcast_inf(targets, bytes)
if targets.none? { |sink| sink.nonblock? }
# if all targets are blocking, don't start until they're all writable
r = IO.select(nil, targets, nil, 0) or return targets
blocked = targets - r[1]
# tell DTAS::UNIXServer#run_once to wait on the blocked targets
return blocked if blocked[0]
# all writable, yay!
else
blocked = []
end
# don't pin too much on one target
bytes = bytes > MAX_AT_ONCE ? MAX_AT_ONCE : bytes
last = targets.pop # we splice to the last one, tee to the rest
most_teed = __broadcast_tee(blocked, targets, bytes)
# don't splice more than the largest amount we successfully teed
bytes = most_teed if most_teed > 0
begin
targets << last
if last.nonblock?
s = IO.trysplice(@to_io, nil, last, nil, bytes, F_MOVE)
if Symbol === s
blocked << last
# we accomplished nothing!
# If _all_ writers are blocked, do not discard data,
# stay blocked on :wait_writable
return blocked if most_teed == 0
# the tees targets win, drop data intended for last
if most_teed > 0
discard(most_teed)
@bytes_xfer += most_teed
# do not watch for writability of last, last is non-blocking
return :wait_readable
end
end
else
# the blocking case is simple
s = IO.splice(@to_io, nil, last, nil, bytes, WAITALL|F_MOVE)
end
@bytes_xfer += s
# if we can't splice everything
# discard it so the early targets do not get repeated data
if s < bytes && most_teed > 0
discard(bytes - s)
end
:wait_readable
rescue IOError, Errno::EPIPE => e # last failed, drop it
__dst_error(last, e)
targets.pop # we're no longer a valid target
if most_teed == 0
# nothing accomplished, watch any targets
return blocked if blocked[0]
else
# some progress, discard the data we could not splice
@bytes_xfer += most_teed
discard(most_teed)
end
# stop decoding if we're completely errored out
# returning nil will trigger close
return targets[0] ? :wait_readable : nil
end
end
end
|