dtas.git  about / heads / tags
duct tape audio suite for *nix
blob ed96a1e8ebbce1d76ec519f5509e3df86eb4c3ee 3204 bytes (raw)
$ git show v0.0.0:lib/dtas/unix_server.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
 
# -*- encoding: binary -*-
# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
require 'socket'
require_relative '../dtas'
require_relative 'unix_accepted'

# This uses SOCK_SEQPACKET, unlike ::UNIXServer in Ruby stdlib

# The programming model for the event loop here aims to be compatible
# with EPOLLONESHOT use with epoll, since that fits my brain far better
# than existing evented APIs/frameworks.
# If we cared about scalability to thousands of clients, we'd really use epoll,
# but IO.select can be just as fast (or faster) with few descriptors and
# is obviously more portable.

class DTAS::UNIXServer # :nodoc:
  attr_reader :to_io

  def close
    File.unlink(@path)
    @to_io.close
  end

  def initialize(path)
    @path = path
    # lock down access by default, arbitrary commands may run as the
    # same user dtas-player runs as:
    old_umask = File.umask(0077)
    @to_io = Socket.new(:AF_UNIX, :SOCK_SEQPACKET, 0)
    addr = Socket.pack_sockaddr_un(path)
    begin
      @to_io.bind(addr)
    rescue Errno::EADDRINUSE
      # maybe we have an old path leftover from a killed process
      tmp = Socket.new(:AF_UNIX, :SOCK_SEQPACKET, 0)
      begin
        tmp.connect(addr)
        raise RuntimeError, "socket `#{path}' is in use", []
      rescue Errno::ECONNREFUSED
        # ok, leftover socket, unlink and rebind anyways
        File.unlink(path)
        @to_io.bind(addr)
      ensure
        tmp.close
      end
    end
    @to_io.listen(1024)
    @readers = { self => true }
    @writers = {}
  ensure
    File.umask(old_umask)
  end

  def write_failed(client, e)
    warn "failed to write to #{client}: #{e.message} (#{e.class})"
    client.close
  end

  def readable_iter
    # we do not do anything with the block passed to us
    begin
      sock, _ = @to_io.accept_nonblock
      @readers[DTAS::UNIXAccepted.new(sock)] = true
    rescue Errno::ECONNABORTED # ignore this, it happens
    rescue Errno::EAGAIN
      return :wait_readable
    end while true
  end

  def wait_ctl(io, err)
    case err
    when :wait_readable
      @readers[io] = true
    when :wait_writable
      @writers[io] = true
    when :delete
      @readers.delete(io)
      @writers.delete(io)
    when :ignore
      # There are 2 cases for :ignore
      # - DTAS::Buffer was readable before, but all destinations (e.g. sinks)
      #   were blocked, so we stop caring for producer (buffer) readability.
      # - a consumer (e.g. DTAS::Sink) just became writable, but the
      #   corresponding DTAS::Buffer was already readable in a previous
      #   call.
    when nil
      io.close
    when StandardError
      io.close
    else
      raise "BUG: wait_ctl invalid: #{io} #{err.inspect}"
    end
  end

  def run_once
    begin
      # give IO.select one-shot behavior, snapshot and replace the watchlist
      r = IO.select(@readers.keys, @writers.keys) or return
      r[1].each do |io|
        @writers.delete(io)
        wait_ctl(io, io.writable_iter)
      end
      r[0].each do |io|
        @readers.delete(io)
        wait_ctl(io, io.readable_iter { |_io, msg| yield(_io, msg) })
      end
    end
  end
end

git clone git://80x24.org/dtas.git
git clone https://80x24.org/dtas.git