about summary refs log tree commit homepage
path: root/lib/dtas/unix_server.rb
blob: cad3fc409934cd1713e8f1751f3102f288310008 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# Copyright (C) 2013-2020 all contributors <dtas-all@nongnu.org>
# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
# frozen_string_literal: true
require 'socket'
require_relative '../dtas'
require_relative 'unix_accepted'

# This uses SOCK_SEQPACKET, unlike ::UNIXServer in Ruby stdlib

# The programming model for the event loop here aims to be compatible
# with EPOLLONESHOT use with epoll, since that fits my brain far better
# than existing evented APIs/frameworks.
# If we cared about scalability to thousands of clients, we'd really use epoll,
# but IO.select can be just as fast (or faster) with few descriptors and
# is obviously more portable.

class DTAS::UNIXServer # :nodoc:
  attr_reader :to_io

  def close
    File.unlink(@path)
    @to_io.close
  end

  def initialize(path)
    @path = path
    # lock down access by default, arbitrary commands may run as the
    # same user dtas-player runs as:
    old_umask = File.umask(0077)
    @to_io = Socket.new(:UNIX, :SEQPACKET, 0)
    addr = Socket.pack_sockaddr_un(path)
    begin
      @to_io.bind(addr)
    rescue Errno::EADDRINUSE
      # maybe we have an old path leftover from a killed process
      tmp = Socket.new(:UNIX, :SEQPACKET, 0)
      begin
        tmp.connect(addr)
        raise RuntimeError, "socket `#{path}' is in use", []
      rescue Errno::ECONNREFUSED
        # ok, leftover socket, unlink and rebind anyways
        File.unlink(path)
        @to_io.bind(addr)
      ensure
        tmp.close
      end
    end
    @to_io.listen(1024)
    @readers = { self => true }
    @writers = {}
  ensure
    File.umask(old_umask)
  end

  def write_failed(client, e)
    warn "failed to write to #{client}: #{e.message} (#{e.class})"
    client.close
  end

  def readable_iter
    # we do not do anything with the block passed to us
    case rv = accept_nonblock
    when :wait_readable then return rv
    else
      @readers[DTAS::UNIXAccepted.new(rv[0])] = true
    end while true
  end

  def wait_ctl(io, err)
    case err
    when :hot_read
      # this is only safe when we're iterating through ready writers
      # the linear search for Array#include? is not expensive since
      # we usually don't have a lot of sinks.
      @hot_read << io unless @hot_read.include?(io)
    when :wait_readable
      @readers[io] = true
    when :wait_writable
      @writers[io] = true
    when :delete
      @readers.delete(io)
      @writers.delete(io)
    when :ignore
      # There are 2 cases for :ignore
      # - DTAS::Buffer was readable before, but all destinations (e.g. sinks)
      #   were blocked, so we stop caring for producer (buffer) readability.
      # - a consumer (e.g. DTAS::Sink) just became writable, but the
      #   corresponding DTAS::Buffer was already readable in a previous
      #   call.
    when nil, StandardError
      io.close
    else
      raise "BUG: wait_ctl invalid: #{io} #{err.inspect}"
    end
  end

  def run_once
    # give IO.select one-shot behavior, snapshot and replace the watchlist
    begin
      r = IO.select(@readers.keys, @writers.keys) or return
    rescue IOError
      # this only happens when sinks error out
      @writers.delete_if { |io| io.to_io.closed? }
      retry
    end
    @hot_read = r[0]
    r[1].each do |io|
      @writers.delete(io)
      wait_ctl(io, io.writable_iter)
    end
    @hot_read = nil
    r[0].each do |io|
      @readers.delete(io)
      wait_ctl(io, io.readable_iter { |_io, msg| yield(_io, msg) })
    end
  end

  if RUBY_VERSION.to_f >= 2.3
    def accept_nonblock
      @to_io.accept_nonblock(exception: false)
    end
  else
    def accept_nonblock
      @to_io.accept_nonblock
    rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EPROTO
      :wait_readable
    end
  end
end