1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
# Copyright (C) 2013-2016 all contributors <dtas-all@nongnu.org>
# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
# frozen_string_literal: true
require 'socket'
require_relative '../dtas'
require_relative 'unix_accepted'
# This uses SOCK_SEQPACKET, unlike ::UNIXServer in Ruby stdlib
# The programming model for the event loop here aims to be compatible
# with EPOLLONESHOT use with epoll, since that fits my brain far better
# than existing evented APIs/frameworks.
# If we cared about scalability to thousands of clients, we'd really use epoll,
# but IO.select can be just as fast (or faster) with few descriptors and
# is obviously more portable.
class DTAS::UNIXServer # :nodoc:
attr_reader :to_io
def close
File.unlink(@path)
@to_io.close
end
def initialize(path)
@path = path
# lock down access by default, arbitrary commands may run as the
# same user dtas-player runs as:
old_umask = File.umask(0077)
@to_io = Socket.new(:UNIX, :SEQPACKET, 0)
addr = Socket.pack_sockaddr_un(path)
begin
@to_io.bind(addr)
rescue Errno::EADDRINUSE
# maybe we have an old path leftover from a killed process
tmp = Socket.new(:UNIX, :SEQPACKET, 0)
begin
tmp.connect(addr)
raise RuntimeError, "socket `#{path}' is in use", []
rescue Errno::ECONNREFUSED
# ok, leftover socket, unlink and rebind anyways
File.unlink(path)
@to_io.bind(addr)
ensure
tmp.close
end
end
@to_io.listen(1024)
@readers = { self => true }
@writers = {}
ensure
File.umask(old_umask)
end
def write_failed(client, e)
warn "failed to write to #{client}: #{e.message} (#{e.class})"
client.close
end
def readable_iter
# we do not do anything with the block passed to us
case rv = accept_nonblock
when :wait_readable then return rv
else
@readers[DTAS::UNIXAccepted.new(rv[0])] = true
end while true
end
def wait_ctl(io, err)
case err
when :hot_read
# this is only safe when we're iterating through ready writers
# the linear search for Array#include? is not expensive since
# we usually don't have a lot of sinks.
@hot_read << io unless @hot_read.include?(io)
when :wait_readable
@readers[io] = true
when :wait_writable
@writers[io] = true
when :delete
@readers.delete(io)
@writers.delete(io)
when :ignore
# There are 2 cases for :ignore
# - DTAS::Buffer was readable before, but all destinations (e.g. sinks)
# were blocked, so we stop caring for producer (buffer) readability.
# - a consumer (e.g. DTAS::Sink) just became writable, but the
# corresponding DTAS::Buffer was already readable in a previous
# call.
when nil, StandardError
io.close
else
raise "BUG: wait_ctl invalid: #{io} #{err.inspect}"
end
end
def run_once
# give IO.select one-shot behavior, snapshot and replace the watchlist
begin
r = IO.select(@readers.keys, @writers.keys) or return
rescue IOError
# this only happens when sinks error out
@writers.delete_if { |io| io.to_io.closed? }
retry
end
@hot_read = r[0]
r[1].each do |io|
@writers.delete(io)
wait_ctl(io, io.writable_iter)
end
@hot_read = nil
r[0].each do |io|
@readers.delete(io)
wait_ctl(io, io.readable_iter { |_io, msg| yield(_io, msg) })
end
end
if RUBY_VERSION.to_f >= 2.3
def accept_nonblock
@to_io.accept_nonblock(exception: false)
end
else
def accept_nonblock
@to_io.accept_nonblock
rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EPROTO
:wait_readable
end
end
end
|