dtas.git  about / heads / tags
duct tape audio suite for *nix
blob ccfa662dc60efaf85f9f60aa3b56d3eec17b878e 3763 bytes (raw)
$ git show v0.16.1:lib/dtas/unix_server.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
 
# Copyright (C) 2013-2016 all contributors <dtas-all@nongnu.org>
# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
# frozen_string_literal: true
require 'socket'
require_relative '../dtas'
require_relative 'unix_accepted'

# This uses SOCK_SEQPACKET, unlike ::UNIXServer in Ruby stdlib

# The programming model for the event loop here aims to be compatible
# with EPOLLONESHOT use with epoll, since that fits my brain far better
# than existing evented APIs/frameworks.
# If we cared about scalability to thousands of clients, we'd really use epoll,
# but IO.select can be just as fast (or faster) with few descriptors and
# is obviously more portable.

class DTAS::UNIXServer # :nodoc:
  attr_reader :to_io

  def close
    File.unlink(@path)
    @to_io.close
  end

  def initialize(path)
    @path = path
    # lock down access by default, arbitrary commands may run as the
    # same user dtas-player runs as:
    old_umask = File.umask(0077)
    @to_io = Socket.new(:UNIX, :SEQPACKET, 0)
    addr = Socket.pack_sockaddr_un(path)
    begin
      @to_io.bind(addr)
    rescue Errno::EADDRINUSE
      # maybe we have an old path leftover from a killed process
      tmp = Socket.new(:UNIX, :SEQPACKET, 0)
      begin
        tmp.connect(addr)
        raise RuntimeError, "socket `#{path}' is in use", []
      rescue Errno::ECONNREFUSED
        # ok, leftover socket, unlink and rebind anyways
        File.unlink(path)
        @to_io.bind(addr)
      ensure
        tmp.close
      end
    end
    @to_io.listen(1024)
    @readers = { self => true }
    @writers = {}
  ensure
    File.umask(old_umask)
  end

  def write_failed(client, e)
    warn "failed to write to #{client}: #{e.message} (#{e.class})"
    client.close
  end

  def readable_iter
    # we do not do anything with the block passed to us
    case rv = accept_nonblock
    when :wait_readable then return rv
    else
      @readers[DTAS::UNIXAccepted.new(rv[0])] = true
    end while true
  end

  def wait_ctl(io, err)
    case err
    when :hot_read
      # this is only safe when we're iterating through ready writers
      # the linear search for Array#include? is not expensive since
      # we usually don't have a lot of sinks.
      @hot_read << io unless @hot_read.include?(io)
    when :wait_readable
      @readers[io] = true
    when :wait_writable
      @writers[io] = true
    when :delete
      @readers.delete(io)
      @writers.delete(io)
    when :ignore
      # There are 2 cases for :ignore
      # - DTAS::Buffer was readable before, but all destinations (e.g. sinks)
      #   were blocked, so we stop caring for producer (buffer) readability.
      # - a consumer (e.g. DTAS::Sink) just became writable, but the
      #   corresponding DTAS::Buffer was already readable in a previous
      #   call.
    when nil, StandardError
      io.close
    else
      raise "BUG: wait_ctl invalid: #{io} #{err.inspect}"
    end
  end

  def run_once
    # give IO.select one-shot behavior, snapshot and replace the watchlist
    begin
      r = IO.select(@readers.keys, @writers.keys) or return
    rescue IOError
      # this only happens when sinks error out
      @writers.delete_if { |io| io.to_io.closed? }
      retry
    end
    @hot_read = r[0]
    r[1].each do |io|
      @writers.delete(io)
      wait_ctl(io, io.writable_iter)
    end
    @hot_read = nil
    r[0].each do |io|
      @readers.delete(io)
      wait_ctl(io, io.readable_iter { |_io, msg| yield(_io, msg) })
    end
  end

  if RUBY_VERSION.to_f >= 2.3
    def accept_nonblock
      @to_io.accept_nonblock(exception: false)
    end
  else
    def accept_nonblock
      @to_io.accept_nonblock
    rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EPROTO
      :wait_readable
    end
  end
end

git clone git://80x24.org/dtas.git
git clone https://80x24.org/dtas.git