about summary refs log tree commit homepage
path: root/lib/dtas/unix_server.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-08-24 09:54:45 +0000
committerEric Wong <normalperson@yhbt.net>2013-08-24 09:54:45 +0000
commit3e09ac0c10c95bb24a08af62393b4f761e2743d0 (patch)
tree778dffa2ba8798503fc047db0feef6d65426ea22 /lib/dtas/unix_server.rb
downloaddtas-3e09ac0c10c95bb24a08af62393b4f761e2743d0.tar.gz
Diffstat (limited to 'lib/dtas/unix_server.rb')
-rw-r--r--lib/dtas/unix_server.rb111
1 files changed, 111 insertions, 0 deletions
diff --git a/lib/dtas/unix_server.rb b/lib/dtas/unix_server.rb
new file mode 100644
index 0000000..90f8479
--- /dev/null
+++ b/lib/dtas/unix_server.rb
@@ -0,0 +1,111 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'socket'
+require_relative '../dtas'
+require_relative 'unix_accepted'
+
+# This uses SOCK_SEQPACKET, unlike ::UNIXServer in Ruby stdlib
+
+# The programming model for the event loop here aims to be compatible
+# with EPOLLONESHOT use with epoll, since that fits my brain far better
+# than existing evented APIs/frameworks.
+# If we cared about scalability to thousands of clients, we'd really use epoll,
+# but IO.select can be just as fast (or faster) with few descriptors and
+# is obviously more portable.
+
+class DTAS::UNIXServer
+  attr_reader :to_io
+
+  def close
+    File.unlink(@path)
+    @to_io.close
+  end
+
+  def initialize(path)
+    @path = path
+    # lock down access by default, arbitrary commands may run as the
+    # same user dtas-player runs as:
+    old_umask = File.umask(0077)
+    @to_io = Socket.new(:AF_UNIX, :SOCK_SEQPACKET, 0)
+    addr = Socket.pack_sockaddr_un(path)
+    begin
+      @to_io.bind(addr)
+    rescue Errno::EADDRINUSE
+      # maybe we have an old path leftover from a killed process
+      tmp = Socket.new(:AF_UNIX, :SOCK_SEQPACKET, 0)
+      begin
+        tmp.connect(addr)
+        raise RuntimeError, "socket `#{path}' is in use", []
+      rescue Errno::ECONNREFUSED
+        # ok, leftover socket, unlink and rebind anyways
+        File.unlink(path)
+        @to_io.bind(addr)
+      ensure
+        tmp.close
+      end
+    end
+    @to_io.listen(1024)
+    @readers = { self => true }
+    @writers = {}
+  ensure
+    File.umask(old_umask)
+  end
+
+  def write_failed(client, e)
+    warn "failed to write to #{client}: #{e.message} (#{e.class})"
+    client.close
+  end
+
+  def readable_iter
+    # we do not do anything with the block passed to us
+    begin
+      sock, _ = @to_io.accept_nonblock
+      @readers[DTAS::UNIXAccepted.new(sock)] = true
+    rescue Errno::ECONNABORTED # ignore this, it happens
+    rescue Errno::EAGAIN
+      return :wait_readable
+    end while true
+  end
+
+  def wait_ctl(io, err)
+    case err
+    when :wait_readable
+      @readers[io] = true
+    when :wait_writable
+      @writers[io] = true
+    when :delete
+      @readers.delete(io)
+      @writers.delete(io)
+    when :ignore
+      # There are 2 cases for :ignore
+      # - DTAS::Buffer was readable before, but all destinations (e.g. sinks)
+      #   were blocked, so we stop caring for producer (buffer) readability.
+      # - a consumer (e.g. DTAS::Sink) just became writable, but the
+      #   corresponding DTAS::Buffer was already readable in a previous
+      #   call.
+    when nil
+      io.close
+    when StandardError
+      io.close
+    else
+      raise "BUG: wait_ctl invalid: #{io} #{err.inspect}"
+    end
+  end
+
+  def run_once
+    begin
+      # give IO.select one-shot behavior, snapshot and replace the watchlist
+      r = IO.select(@readers.keys, @writers.keys) or return
+      r[1].each do |io|
+        @writers.delete(io)
+        wait_ctl(io, io.writable_iter)
+      end
+      r[0].each do |io|
+        @readers.delete(io)
+        wait_ctl(io, io.readable_iter { |_io, msg| yield(_io, msg) })
+      end
+    end
+  end
+end