about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-08-24 09:54:45 +0000
committerEric Wong <normalperson@yhbt.net>2013-08-24 09:54:45 +0000
commit3e09ac0c10c95bb24a08af62393b4f761e2743d0 (patch)
tree778dffa2ba8798503fc047db0feef6d65426ea22 /lib
downloaddtas-3e09ac0c10c95bb24a08af62393b4f761e2743d0.tar.gz
Diffstat (limited to 'lib')
-rw-r--r--lib/dtas.rb8
-rw-r--r--lib/dtas/buffer.rb91
-rw-r--r--lib/dtas/buffer/read_write.rb103
-rw-r--r--lib/dtas/buffer/splice.rb143
-rw-r--r--lib/dtas/command.rb44
-rw-r--r--lib/dtas/compat_onenine.rb19
-rw-r--r--lib/dtas/disclaimer.rb14
-rw-r--r--lib/dtas/format.rb152
-rw-r--r--lib/dtas/pipe.rb40
-rw-r--r--lib/dtas/player.rb386
-rw-r--r--lib/dtas/player/client_handler.rb452
-rw-r--r--lib/dtas/process.rb88
-rw-r--r--lib/dtas/replaygain.rb42
-rw-r--r--lib/dtas/rg_state.rb100
-rw-r--r--lib/dtas/serialize.rb10
-rw-r--r--lib/dtas/sigevent.rb11
-rw-r--r--lib/dtas/sigevent/efd.rb21
-rw-r--r--lib/dtas/sigevent/pipe.rb29
-rw-r--r--lib/dtas/sink.rb122
-rw-r--r--lib/dtas/sink_reader_play.rb70
-rw-r--r--lib/dtas/source.rb148
-rw-r--r--lib/dtas/source/command.rb41
-rw-r--r--lib/dtas/source/common.rb14
-rw-r--r--lib/dtas/source/mp3.rb38
-rw-r--r--lib/dtas/state_file.rb34
-rw-r--r--lib/dtas/unix_accepted.rb77
-rw-r--r--lib/dtas/unix_client.rb52
-rw-r--r--lib/dtas/unix_server.rb111
-rw-r--r--lib/dtas/util.rb16
-rw-r--r--lib/dtas/writable_iter.rb23
30 files changed, 2499 insertions, 0 deletions
diff --git a/lib/dtas.rb b/lib/dtas.rb
new file mode 100644
index 0000000..c7ac0af
--- /dev/null
+++ b/lib/dtas.rb
@@ -0,0 +1,8 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+module DTAS
+end
+
+require 'dtas/compat_onenine'
diff --git a/lib/dtas/buffer.rb b/lib/dtas/buffer.rb
new file mode 100644
index 0000000..d02e8a6
--- /dev/null
+++ b/lib/dtas/buffer.rb
@@ -0,0 +1,91 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require_relative '../dtas'
+
+class DTAS::Buffer
+  begin
+    raise LoadError, "no splice with _DTAS_POSIX" if ENV["_DTAS_POSIX"]
+    require 'io/splice' # splice is only in Linux for now...
+    require_relative 'buffer/splice'
+    include DTAS::Buffer::Splice
+  rescue LoadError
+    require_relative 'buffer/read_write'
+    include DTAS::Buffer::ReadWrite
+  end
+
+  attr_reader :to_io # call nread on this
+  attr_reader :wr # processes (sources) should redirect to this
+  attr_accessor :bytes_xfer
+
+  def initialize
+    @bytes_xfer = 0
+    @buffer_size = nil
+    @to_io, @wr = DTAS::Pipe.new
+  end
+
+  def self.load(hash)
+    buf = new
+    if hash
+      bs = hash["buffer_size"] and buf.buffer_size = bs
+    end
+    buf
+  end
+
+  def to_hsh
+    @buffer_size ? { "buffer_size" => @buffer_size } : {}
+  end
+
+  def __dst_error(dst, e)
+    warn "dropping #{dst.inspect} due to error: #{e.message} (#{e.class})"
+    dst.close unless dst.closed?
+  end
+
+  # This will modify targets
+  # returns one of:
+  # - :wait_readable
+  # - subset of targets array for :wait_writable
+  # - some type of StandardError
+  # - nil
+  def broadcast(targets)
+    bytes = inflight
+    return :wait_readable if 0 == bytes # spurious wakeup
+
+    case targets.size
+    when 0
+      :ignore # this will pause decoders
+    when 1
+      broadcast_one(targets, bytes)
+    else # infinity
+      broadcast_inf(targets, bytes)
+    end
+  end
+
+  def readable_iter
+    # this calls DTAS::Buffer#broadcast from DTAS::Player
+    yield(self, nil)
+  end
+
+  def inflight
+    @to_io.nread
+  end
+
+  # don't really close the pipes under normal circumstances, just clear data
+  def close
+    bytes = inflight
+    discard(bytes) if bytes > 0
+  end
+
+  def buf_reset
+    close!
+    @bytes_xfer = 0
+    @to_io, @wr = DTAS::Pipe.new
+    @wr.pipe_size = @buffer_size if @buffer_size
+  end
+
+  def close!
+    @to_io.close
+    @wr.close
+  end
+end
diff --git a/lib/dtas/buffer/read_write.rb b/lib/dtas/buffer/read_write.rb
new file mode 100644
index 0000000..93380d1
--- /dev/null
+++ b/lib/dtas/buffer/read_write.rb
@@ -0,0 +1,103 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'io/wait'
+require 'io/nonblock'
+require_relative '../../dtas'
+require_relative '../pipe'
+
+module DTAS::Buffer::ReadWrite
+  MAX_AT_ONCE = 512 # min PIPE_BUF value in POSIX
+  attr_accessor :buffer_size
+
+  def _rbuf
+    Thread.current[:dtas_pbuf] ||= ""
+  end
+
+  # be sure to only call this with nil when all writers to @wr are done
+  def discard(bytes)
+    buf = _rbuf
+    begin
+      @to_io.read(bytes, buf) or break # EOF
+      bytes -= buf.bytesize
+    end until bytes == 0
+  end
+
+  # always block when we have a single target
+  def broadcast_one(targets, bytes)
+    buf = _rbuf
+    @to_io.read(bytes, buf)
+    n = targets[0].write(buf) # IO#write has write-in-full behavior
+    @bytes_xfer += n
+    :wait_readable
+  rescue Errno::EPIPE, IOError => e
+    __dst_error(targets[0], e)
+    targets.clear
+    nil # do not return error here, we already spewed an error message
+  end
+
+  def broadcast_inf(targets, bytes)
+    nr_nb = targets.count { |sink| sink.nonblock? }
+    if nr_nb == 0 || nr_nb == targets.size
+      # if all targets are full, don't start until they're all writable
+      r = IO.select(nil, targets, nil, 0) or return targets
+      blocked = targets - r[1]
+
+      # tell DTAS::UNIXServer#run_once to wait on the blocked targets
+      return blocked if blocked[0]
+
+      # all writable, yay!
+    else
+      blocked = []
+    end
+
+    again = {}
+
+    # don't pin too much on one target
+    bytes = bytes > MAX_AT_ONCE ? MAX_AT_ONCE : bytes
+    buf = _rbuf
+    @to_io.read(bytes, buf)
+    @bytes_xfer += buf.bytesize
+
+    targets.delete_if do |dst|
+      begin
+        if dst.nonblock?
+          w = dst.write_nonblock(buf)
+          again[dst] = buf.byteslice(w, n) if w < n
+        else
+          dst.write(buf)
+        end
+        false
+      rescue Errno::EAGAIN
+        blocked << dst
+        false
+      rescue IOError, Errno::EPIPE => e
+        again.delete(dst)
+        __dst_error(dst, e)
+        true
+      end
+    end
+
+    # try to write as much as possible
+    again.delete_if do |dst, sbuf|
+      begin
+        w = dst.write_nonblock(sbuf)
+        n = sbuf.bytesize
+        if w < n
+          again[dst] = sbuf.byteslice(w, n)
+          false
+        else
+          true
+        end
+      rescue Errno::EAGAIN
+        blocked << dst
+        true
+      rescue IOError, Errno::EPIPE => e
+        __dst_error(dst, e)
+        true
+      end
+    end until again.empty?
+    targets[0] ? :wait_readable : nil
+  end
+end
diff --git a/lib/dtas/buffer/splice.rb b/lib/dtas/buffer/splice.rb
new file mode 100644
index 0000000..c2540bd
--- /dev/null
+++ b/lib/dtas/buffer/splice.rb
@@ -0,0 +1,143 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'io/wait'
+require 'io/nonblock'
+require 'io/splice'
+require_relative '../../dtas'
+require_relative '../pipe'
+
+module DTAS::Buffer::Splice
+  MAX_AT_ONCE = 4096 # page size in Linux
+  MAX_SIZE = File.read("/proc/sys/fs/pipe-max-size").to_i
+  DEVNULL = File.open("/dev/null", "r+")
+  F_MOVE = IO::Splice::F_MOVE
+  WAITALL = IO::Splice::WAITALL
+
+  def buffer_size
+    @to_io.pipe_size
+  end
+
+  # nil is OK, won't reset existing pipe, either...
+  def buffer_size=(bytes)
+    @to_io.pipe_size = bytes if bytes
+    @buffer_size = bytes
+  end
+
+  # be sure to only call this with nil when all writers to @wr are done
+  def discard(bytes)
+    IO.splice(@to_io, nil, DEVNULL, nil, bytes)
+  end
+
+  def broadcast_one(targets, bytes)
+    # single output is always non-blocking
+    s = IO.trysplice(@to_io, nil, targets[0], nil, bytes, F_MOVE)
+    if Symbol === s
+      targets # our one and only target blocked on write
+    else
+      @bytes_xfer += s
+      :wait_readable # we want to read more from @to_io soon
+    end
+  rescue Errno::EPIPE, IOError => e
+    __dst_error(targets[0], e)
+    targets.clear
+    nil # do not return error here, we already spewed an error message
+  end
+
+  # returns the largest value we teed
+  def __broadcast_tee(blocked, targets, chunk_size)
+    most_teed = 0
+    targets.delete_if do |dst|
+      begin
+        t = dst.nonblock? ?
+            IO.trytee(@to_io, dst, chunk_size) :
+            IO.tee(@to_io, dst, chunk_size, WAITALL)
+        if Integer === t
+          most_teed = t if t > most_teed
+        else
+          blocked << dst
+        end
+        false
+      rescue IOError, Errno::EPIPE => e
+        __dst_error(dst, e)
+        true
+      end
+    end
+    most_teed
+  end
+
+  def broadcast_inf(targets, bytes)
+    if targets.none? { |sink| sink.nonblock? }
+      # if all targets are blocking, don't start until they're all writable
+      r = IO.select(nil, targets, nil, 0) or return targets
+      blocked = targets - r[1]
+
+      # tell DTAS::UNIXServer#run_once to wait on the blocked targets
+      return blocked if blocked[0]
+
+      # all writable, yay!
+    else
+      blocked = []
+    end
+
+    # don't pin too much on one target
+    bytes = bytes > MAX_AT_ONCE ? MAX_AT_ONCE : bytes
+
+    last = targets.pop # we splice to the last one, tee to the rest
+    most_teed = __broadcast_tee(blocked, targets, bytes)
+
+    # don't splice more than the largest amount we successfully teed
+    bytes = most_teed if most_teed > 0
+
+    begin
+      targets << last
+      if last.nonblock?
+        s = IO.trysplice(@to_io, nil, last, nil, bytes, F_MOVE)
+        if Symbol === s
+          blocked << last
+
+          # we accomplished nothing!
+          # If _all_ writers are blocked, do not discard data,
+          # stay blocked on :wait_writable
+          return blocked if most_teed == 0
+
+          # the tees targets win, drop data intended for last
+          if most_teed > 0
+            discard(most_teed)
+            @bytes_xfer += most_teed
+            # do not watch for writability of last, last is non-blocking
+            return :wait_readable
+          end
+        end
+      else
+        # the blocking case is simple
+        s = IO.splice(@to_io, nil, last, nil, bytes, WAITALL|F_MOVE)
+      end
+      @bytes_xfer += s
+
+      # if we can't splice everything
+      # discard it so the early targets do not get repeated data
+      if s < bytes && most_teed > 0
+        discard(bytes - s)
+      end
+      :wait_readable
+    rescue IOError, Errno::EPIPE => e # last failed, drop it
+      __dst_error(last, e)
+      targets.pop # we're no longer a valid target
+
+      if most_teed == 0
+        # nothing accomplished, watch any targets
+        return blocked if blocked[0]
+      else
+        # some progress, discard the data we could not splice
+        @bytes_xfer += most_teed
+        discard(most_teed)
+      end
+
+      # stop decoding if we're completely errored out
+      # returning nil will trigger close
+      return targets[0] ? :wait_readable : nil
+    end
+  end
+end
diff --git a/lib/dtas/command.rb b/lib/dtas/command.rb
new file mode 100644
index 0000000..b957567
--- /dev/null
+++ b/lib/dtas/command.rb
@@ -0,0 +1,44 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+# common code for wrapping SoX/ecasound/... commands
+require_relative 'serialize'
+require 'shellwords'
+
+module DTAS::Command
+  include DTAS::Serialize
+  attr_reader :pid
+  attr_reader :to_io
+  attr_accessor :command
+  attr_accessor :env
+  attr_accessor :spawn_at
+
+  COMMAND_DEFAULTS = {
+    "env" => {},
+    "command" => nil,
+  }
+
+  def command_init(defaults = {})
+    @pid = nil
+    @to_io = nil
+    @spawn_at = nil
+    COMMAND_DEFAULTS.merge(defaults).each do |k,v|
+      v = v.dup if Hash === v || Array === v
+      instance_variable_set("@#{k}", v)
+    end
+  end
+
+  def kill(sig = :TERM)
+    # always kill the pgroup since we run subcommands in their own shell
+    Process.kill(sig, -@pid)
+  end
+
+  def on_death(status)
+    @pid = nil
+  end
+
+  def command_string
+    @command
+  end
+end
diff --git a/lib/dtas/compat_onenine.rb b/lib/dtas/compat_onenine.rb
new file mode 100644
index 0000000..98be8c9
--- /dev/null
+++ b/lib/dtas/compat_onenine.rb
@@ -0,0 +1,19 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+
+# Make Ruby 1.9.3 look like Ruby 2.0.0 to us
+# This exists for Debian wheezy users using the stock Ruby 1.9.3 install.
+# We'll drop this interface when Debian wheezy (7.0) becomes unsupported.
+class String
+  def b
+    dup.force_encoding(Encoding::BINARY)
+  end
+end unless String.method_defined?(:b)
+
+def IO
+  def self.pipe
+    super.map! { |io| io.close_on_exec = true; io }
+  end
+end if RUBY_VERSION.to_f <= 1.9
diff --git a/lib/dtas/disclaimer.rb b/lib/dtas/disclaimer.rb
new file mode 100644
index 0000000..c25ba77
--- /dev/null
+++ b/lib/dtas/disclaimer.rb
@@ -0,0 +1,14 @@
+DTAS_DISCLAIMER = <<EOF
+# WARNING!
+#
+# Ignorant or improper use of #$0 may lead to
+# data loss, hearing loss, and damage to audio equipment.
+#
+# Please read and understand the documentation of all commands you
+# attempt to configure.
+#
+# #$0 will never prevent you from doing stupid things.
+#
+# There is no warranty, the developers of #$0
+# are not responsible for your actions.
+EOF
diff --git a/lib/dtas/format.rb b/lib/dtas/format.rb
new file mode 100644
index 0000000..1ba5487
--- /dev/null
+++ b/lib/dtas/format.rb
@@ -0,0 +1,152 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+# class represents an audio format (type/bits/channels/sample rate/...)
+require_relative '../dtas'
+require_relative 'process'
+require_relative 'serialize'
+
+class DTAS::Format
+  include DTAS::Process
+  include DTAS::Serialize
+  NATIVE_ENDIAN = [1].pack("l") == [1].pack("l>") ? "big" : "little"
+
+  attr_accessor :type # s32, f32, f64 ... any point in others?
+  attr_accessor :channels # 1..666
+  attr_accessor :rate     # 44100, 48000, 88200, 96000, 176400, 192000 ...
+  attr_accessor :bits # only set for playback on 16-bit DACs
+  attr_accessor :endian
+
+  FORMAT_DEFAULTS = {
+    "type" => "s32",
+    "channels" => 2,
+    "rate" => 44100,
+    "bits" => nil,   # default: implied from type
+    "endian" => nil, # unspecified
+  }
+  SIVS = FORMAT_DEFAULTS.keys
+
+  def self.load(hash)
+    fmt = new
+    return fmt unless hash
+    (SIVS & hash.keys).each do |k|
+      fmt.instance_variable_set("@#{k}", hash[k])
+    end
+    fmt
+  end
+
+  def initialize
+    FORMAT_DEFAULTS.each do |k,v|
+      instance_variable_set("@#{k}", v)
+    end
+  end
+
+  def to_sox_arg
+   rv = %W(-t#@type -c#@channels -r#@rate)
+   rv.concat(%W(-b#@bits)) if @bits # needed for play(1) to 16-bit DACs
+   rv
+  end
+
+  # returns 'be' or 'le' depending on endianess
+  def endian2
+    case e = @endian || NATIVE_ENDIAN
+    when "big"
+      "be"
+    when "little"
+      "le"
+    else
+      raise"unsupported endian=#{e}"
+    end
+  end
+
+  def to_eca_arg
+    %W(-f #{@type}_#{endian2},#@channels,#@rate)
+  end
+
+  def inspect
+    "<#{self.class}(#{xs(to_sox_arg)})>"
+  end
+
+  def to_hsh
+    to_hash.delete_if { |k,v| v == FORMAT_DEFAULTS[k] }
+  end
+
+  def to_hash
+    ivars_to_hash(SIVS)
+  end
+
+  def from_file(path)
+    @channels = qx(%W(soxi -c #{path})).to_i
+    @type = qx(%W(soxi -t #{path})).strip
+    @rate = qx(%W(soxi -r #{path})).to_i
+    # we don't need to care for bits, do we?
+  end
+
+  # for the _decoded_ output
+  def bits_per_sample
+    return @bits if @bits
+    /\A[fst](8|16|24|32|64)\z/ =~ @type or
+      raise TypeError, "invalid type=#@type (must be s32/f32/f64)"
+    $1.to_i
+  end
+
+  def bytes_per_sample
+    bits_per_sample / 8
+  end
+
+  def to_env
+    rv = {
+      "SOX_FILETYPE" => @type,
+      "CHANNELS" => @channels.to_s,
+      "RATE" => @rate.to_s,
+      "ENDIAN" => @endian || NATIVE_ENDIAN,
+      "SOXFMT" => to_sox_arg.join(' '),
+      "ECAFMT" => to_eca_arg.join(' '),
+      "ENDIAN2" => endian2,
+    }
+    begin # don't set these if we can't get them, SOX_FILETYPE may be enough
+      rv["BITS_PER_SAMPLE"] = bits_per_sample.to_s
+    rescue TypeError
+    end
+    rv
+  end
+
+  def bytes_to_samples(bytes)
+    bytes / bytes_per_sample / @channels
+  end
+
+  def bytes_to_time(bytes)
+    Time.at(bytes_to_samples(bytes) / @rate.to_f)
+  end
+
+  def valid_type?(type)
+    !!(type =~ %r{\A[us](?:8|16|24|32)\z} || type =~ %r{\Af?:(32|64)})
+  end
+
+  def valid_endian?(endian)
+    !!(endian =~ %r{\A(?:big|little|swap)\z})
+  end
+
+  # HH:MM:SS.frac (don't bother with more complex times, too much code)
+  # part of me wants to drop this feature from playq, feels like bloat...
+  def hhmmss_to_samples(hhmmss)
+    time = hhmmss.dup
+    rv = 0
+    if time.sub!(/\.(\d+)\z/, "")
+      # convert fractional second to sample count:
+      rv = ("0.#$1".to_f * @rate).to_i
+    end
+
+    # deal with HH:MM:SS
+    t = time.split(/:/)
+    raise ArgumentError, "Bad time format: #{hhmmss}" if t.size > 3
+
+    mult = 1
+    while part = t.pop
+      rv += part.to_i * mult * @rate
+      mult *= 60
+    end
+    rv
+  end
+end
diff --git a/lib/dtas/pipe.rb b/lib/dtas/pipe.rb
new file mode 100644
index 0000000..891e9cd
--- /dev/null
+++ b/lib/dtas/pipe.rb
@@ -0,0 +1,40 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+begin
+  require 'io/splice'
+rescue LoadError
+end
+require_relative '../dtas'
+require_relative 'writable_iter'
+
+class DTAS::Pipe < IO
+  include DTAS::WritableIter
+  attr_accessor :sink
+
+  def self.new
+    _, w = rv = pipe
+    w.writable_iter_init
+    rv
+  end
+
+  # create no-op methods for non-Linux
+  unless method_defined?(:pipe_size=)
+    def pipe_size=(_)
+    end
+
+    def pipe_size
+    end
+  end
+end
+
+# for non-blocking sinks, this avoids extra fcntl(..., F_GETFL) syscalls
+# We don't need fcntl at all for splice/tee in Linux
+# For non-Linux, we write_nonblock/read_nonblock already call fcntl()
+# behind our backs, so there's no need to repeat it.
+class DTAS::PipeNB < DTAS::Pipe
+  def nonblock?
+    true
+  end
+end
diff --git a/lib/dtas/player.rb b/lib/dtas/player.rb
new file mode 100644
index 0000000..b26303c
--- /dev/null
+++ b/lib/dtas/player.rb
@@ -0,0 +1,386 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'yaml'
+require 'shellwords'
+require_relative '../dtas'
+require_relative 'source'
+require_relative 'source/command'
+require_relative 'sink'
+require_relative 'unix_server'
+require_relative 'buffer'
+require_relative 'sigevent'
+require_relative 'rg_state'
+require_relative 'state_file'
+
+class DTAS::Player
+  require_relative 'player/client_handler'
+  include DTAS::Player::ClientHandler
+  attr_accessor :state_file
+  attr_accessor :socket
+  attr_reader :sinks
+
+  def initialize
+    @state_file = nil
+    @socket = nil
+    @srv = nil
+    @queue = [] # sources
+    @paused = false
+    @format = DTAS::Format.new
+    @srccmd = nil
+    @srcenv = {}
+
+    @sinks = {} # { user-defined name => sink }
+    @targets = [] # order matters
+    @rg = DTAS::RGState.new
+
+    # sits in between shared effects (if any) and sinks
+    @sink_buf = DTAS::Buffer.new
+    @current = nil
+    @watchers = {}
+  end
+
+  def echo(msg)
+    msg = Shellwords.join(msg) if Array === msg
+    @watchers.delete_if do |io, _|
+      if io.closed?
+        true
+      else
+        case io.emit(msg)
+        when :wait_readable, :wait_writable
+          false
+        else
+          true
+        end
+      end
+    end
+    $stdout.write(msg << "\n")
+  end
+
+  def to_hsh
+    rv = {}
+    rv["socket"] = @socket
+    rv["paused"] = @paused if @paused
+    src = rv["source"] = {}
+    src["command"] = @srccmd if @srccmd
+    src["env"] = @srcenv if @srcenv.size > 0
+
+    # Arrays
+    rv["queue"] = @queue
+
+    %w(rg sink_buf format).each do |k|
+      rv[k] = instance_variable_get("@#{k}").to_hsh
+    end
+
+    # no empty hashes or arrays
+    rv.delete_if do |k,v|
+      case v
+      when Hash, Array
+        v.empty?
+      else
+        false
+      end
+    end
+
+    unless @sinks.empty?
+      sinks = rv["sinks"] = []
+      # sort sinks by name for human viewability
+      @sinks.keys.sort.each do |name|
+        sinks << @sinks[name].to_hsh
+      end
+    end
+
+    rv
+  end
+
+  def self.load(hash)
+    rv = new
+    rv.instance_eval do
+      @rg = DTAS::RGState.load(hash["rg"])
+      if v = hash["sink_buf"]
+        v = v["buffer_size"]
+        @sink_buf.buffer_size = v
+      end
+      %w(socket queue paused).each do |k|
+        v = hash[k] or next
+        instance_variable_set("@#{k}", v)
+      end
+      if v = hash["source"]
+        @srccmd = v["command"]
+        e = v["env"] and @srcenv = e
+      end
+
+      if sinks = hash["sinks"]
+        sinks.each do |sink_hsh|
+          sink = DTAS::Sink.load(sink_hsh)
+          @sinks[sink.name] = sink
+        end
+      end
+    end
+    rv
+  end
+
+  def enq_handler(io, msg)
+    # check @queue[0] in case we have no sinks
+    if @current || @queue[0] || @paused
+      @queue << msg
+    else
+      next_source(msg)
+    end
+    io.emit("OK")
+  end
+
+  def do_enq_head(io, msg)
+    # check @queue[0] in case we have no sinks
+    if @current || @queue[0] || @paused
+      @queue.unshift(msg)
+    else
+      next_source(msg)
+    end
+    io.emit("OK")
+  end
+
+  # yielded from readable_iter
+  def client_iter(io, msg)
+    msg = Shellwords.split(msg)
+    command = msg.shift
+    case command
+    when "enq"
+      enq_handler(io, msg[0])
+    when "enq-head"
+      do_enq_head(io, msg)
+    when "enq-cmd"
+      enq_handler(io, { "command" => msg[0]})
+    when "pause", "play", "play_pause"
+      play_pause_handler(io, command)
+    when "seek"
+      do_seek(io, msg[0])
+    when "clear"
+      @queue.clear
+      echo("clear")
+      io.emit("OK")
+    when "rg"
+      rg_handler(io, msg)
+    when "skip"
+      skip_handler(io, msg)
+    when "sink"
+      sink_handler(io, msg)
+    when "current"
+      current_handler(io, msg)
+    when "watch"
+      @watchers[io] = true
+      io.emit("OK")
+    when "format"
+      format_handler(io, msg)
+    when "env"
+      env_handler(io, msg)
+    when "restart"
+      restart_pipeline
+      io.emit("OK")
+    when "source"
+      source_handler(io, msg)
+    end
+  end
+
+  def event_loop_iter
+    @srv.run_once do |io, msg| # readability handler, request/response
+      case io
+      when @sink_buf
+        sink_iter
+      when DTAS::UNIXAccepted
+        client_iter(io, msg)
+      when DTAS::Sigevent # signal received
+        reap_iter
+      else
+        raise "BUG: unknown event: #{io.class} #{io.inspect} #{msg.inspect}"
+      end
+    end
+  end
+
+  def reap_iter
+    DTAS::Process.reaper do |status, obj|
+      warn [ :reap, obj, status ].inspect if $DEBUG
+      obj.on_death(status) if obj.respond_to?(:on_death)
+      case obj
+      when @current
+        next_source(@paused ? nil : @queue.shift)
+      when DTAS::Sink # on unexpected sink death
+        sink_death(obj, status)
+      end
+    end
+    :wait_readable
+  end
+
+  def sink_death(sink, status)
+    deleted = []
+    @targets.delete_if do |t|
+      if t.sink == sink
+        deleted << t
+      else
+        false
+      end
+    end
+
+    if deleted[0]
+      warn("#{sink.name} died unexpectedly: #{status.inspect}")
+      deleted.each { |t| drop_target(t) }
+      __current_drop unless @targets[0]
+    end
+
+    return unless sink.active
+
+    if @queue[0] && !@paused
+      # we get here if source/sinks are all killed in restart_pipeline
+      __sink_activate(sink)
+      next_source(@queue.shift)
+    elsif sink.respawn
+      __sink_activate(sink) if @current
+    end
+  ensure
+    sink.respawn = false
+  end
+
+  # returns a wait_ctl arg for self
+  def broadcast_iter(buf, targets)
+    case rv = buf.broadcast(targets)
+    when Array # array of blocked sinks
+      # have sinks wake up the this buffer when they're writable
+      trade_ctl = proc { @srv.wait_ctl(buf, :wait_readable) }
+      rv.each do |dst|
+        dst.on_writable = trade_ctl
+        @srv.wait_ctl(dst, :wait_writable)
+      end
+
+      # this @sink_buf hibernates until trade_ctl is called
+      # via DTAS::Sink#writable_iter
+      :ignore
+    else # :wait_readable or nil
+      rv
+    end
+  end
+
+  def bind
+    @srv = DTAS::UNIXServer.new(@socket)
+  end
+
+  # only used on new installations where no sink exists
+  def create_default_sink
+    return unless @sinks.empty?
+    s = DTAS::Sink.new
+    s.name = "default"
+    s.active = true
+    @sinks[s.name] = s
+  end
+
+  # called when the player is leaving idle state
+  def spawn_sinks(source_spec)
+    return true if @targets[0]
+    @sinks.each_value do |sink|
+      sink.active or next
+      next if sink.pid
+      @targets.concat(sink.spawn(@format))
+    end
+    if @targets[0]
+      @targets.sort_by! { |t| t.sink.prio }
+      true
+    else
+      # fail, no active sink
+      @queue.unshift(source_spec)
+      false
+    end
+  end
+
+  def next_source(source_spec)
+    @current = nil
+    if source_spec
+      # restart sinks iff we were idle
+      spawn_sinks(source_spec) or return
+
+      case source_spec
+      when String
+        @current = DTAS::Source.new(source_spec)
+        echo(%W(file #{@current.infile}))
+      when Array
+        @current = DTAS::Source.new(*source_spec)
+        echo(%W(file #{@current.infile} #{@current.offset_samples}s))
+      else
+        @current = DTAS::Source::Command.new(source_spec["command"])
+        echo(%W(command #{@current.command_string}))
+      end
+
+      if DTAS::Source === @current
+        @current.command = @srccmd if @srccmd
+        @current.env = @srcenv.dup unless @srcenv.empty?
+      end
+
+      dst = @sink_buf
+      @current.dst_assoc(dst)
+      @current.spawn(@format, @rg, out: dst.wr, in: "/dev/null")
+      @srv.wait_ctl(dst, :wait_readable)
+    else
+      stop_sinks if @sink_buf.inflight == 0
+      echo "idle"
+    end
+  end
+
+  def drop_target(target)
+    @srv.wait_ctl(target, :delete)
+    target.close
+  end
+
+  def stop_sinks
+    @targets.each { |t| drop_target(t) }.clear
+  end
+
+  # only call on unrecoverable errors (or "skip")
+  def __current_drop(src = @current)
+    __buf_reset(src.dst) if src && src.pid
+  end
+
+  # pull data from sink_buf into @targets, source feeds into sink_buf
+  def sink_iter
+    wait_iter = broadcast_iter(@sink_buf, @targets)
+    __current_drop if nil == wait_iter # sink error, stop source
+    return wait_iter if @current
+
+    # no source left to feed sink_buf, drain the remaining data
+    sink_bytes = @sink_buf.inflight
+    if sink_bytes > 0
+      return wait_iter if @targets[0] # play what is leftover
+
+      # discard the buffer if no sinks
+      @sink_buf.discard(sink_bytes)
+    end
+
+    # nothing left inflight, stop the sinks until we have a source
+    stop_sinks
+    :ignore
+  end
+
+  # the main loop
+  def run
+    sev = DTAS::Sigevent.new
+    @srv.wait_ctl(sev, :wait_readable)
+    old_chld = trap(:CHLD) { sev.signal }
+    create_default_sink
+    next_source(@paused ? nil : @queue.shift)
+    begin
+      event_loop_iter
+    rescue => e # just in case...
+      warn "E: #{e.message} (#{e.class})"
+      e.backtrace.each { |l| warn l }
+    end while true
+  ensure
+    __current_requeue
+    trap(:CHLD, old_chld)
+    sev.close if sev
+    # for state file
+  end
+
+  def close
+    @srv = @srv.close if @srv
+    @sink_buf.close!
+    @state_file.dump(self, true) if @state_file
+  end
+end
diff --git a/lib/dtas/player/client_handler.rb b/lib/dtas/player/client_handler.rb
new file mode 100644
index 0000000..aabd1ab
--- /dev/null
+++ b/lib/dtas/player/client_handler.rb
@@ -0,0 +1,452 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+module DTAS::Player::ClientHandler
+
+  # returns true on success, wait_ctl arg on error
+  def set_bool(io, kv, v)
+    case v
+    when "false" then yield(false)
+    when "true" then yield(true)
+    else
+      return io.emit("ERR #{kv} must be true or false")
+    end
+    true
+  end
+
+  def adjust_numeric(io, obj, k, v)
+    negate = !!v.sub!(/\A-/, '')
+    case v
+    when %r{\A\+?\d*\.\d+\z}
+      num = v.to_f
+    when %r{\A\+?\d+\z}
+      num = v.to_i
+    else
+      return io.emit("ERR #{k}=#{v} must be a float")
+    end
+    num = -num if negate
+
+    if k.sub!(/\+\z/, '') # increment existing
+      num += obj.__send__(k)
+    elsif k.sub!(/-\z/, '') # decrement existing
+      num = obj.__send__(k) - num
+    # else # normal assignment
+    end
+    obj.__send__("#{k}=", num)
+    true
+  end
+
+  # returns true on success, wait_ctl arg on error
+  def set_int(io, kv, v, null_ok)
+    case v
+    when %r{\A-?\d+\z}
+      yield(v.to_i)
+    when ""
+      null_ok or return io.emit("ERR #{kv} must be defined")
+      yield(nil)
+    else
+      return io.emit("ERR #{kv} must an integer")
+    end
+    true
+  end
+
+  # returns true on success, wait_ctl arg on error
+  def set_uint(io, kv, v, null_ok)
+    case v
+    when %r{\A\d+\z}
+      yield(v.to_i)
+    when %r{\A0x[0-9a-fA-F]+\z}i # hex
+      yield(v.to_i(16))
+    when ""
+      null_ok or return io.emit("ERR #{kv} must be defined")
+      yield(nil)
+    else
+      return io.emit("ERR #{kv} must an non-negative integer")
+    end
+    true
+  end
+
+  def __sink_activate(sink)
+    return if sink.pid
+    @targets.concat(sink.spawn(@format))
+    @targets.sort_by! { |t| t.sink.prio }
+  end
+
+  def drop_sink(sink)
+    @targets.delete_if do |t|
+      if t.sink == sink
+        drop_target(t)
+        true
+      else
+        false
+      end
+    end
+  end
+
+  # called to activate/deactivate a sink
+  def __sink_switch(sink)
+    if sink.active
+      if @current
+        # maybe it's still alive for now, but it's just being killed
+        # do not reactivate it until we've reaped it
+        if sink.pid
+          drop_sink(sink)
+          sink.respawn = true
+        else
+          __sink_activate(sink)
+        end
+      end
+    else
+      drop_sink(sink)
+    end
+    # if we change any sinks, make sure the event loop watches it for
+    # readability again, since we new sinks should be writable, and
+    # we've stopped waiting on killed sinks
+    @srv.wait_ctl(@sink_buf, :wait_readable)
+  end
+
+  # returns a wait_ctl arg
+  def sink_handler(io, msg)
+    name = msg[1]
+    case msg[0]
+    when "ls"
+      io.emit(Shellwords.join(@sinks.keys.sort))
+    when "rm"
+      sink = @sinks.delete(name) or return io.emit("ERR #{name} not found")
+      drop_sink(sink)
+      io.emit("OK")
+    when "ed"
+      sink = @sinks[name] || (new_sink = DTAS::Sink.new)
+
+      # allow things that look like audio device names ("hw:1,0" , "/dev/dsp")
+      # or variable names.
+      sink.valid_name?(name) or return io.emit("ERR sink name invalid")
+
+      sink.name = name
+      active_before = sink.active
+
+      # multiple changes may be made at once
+      msg[2..-1].each do |kv|
+        k, v = kv.split(/=/, 2)
+        case k
+        when %r{\Aenv\.([^=]+)\z}
+          sink.env[$1] = v
+        when %r{\Aenv#([^=]+)\z}
+          v == nil or return io.emit("ERR unset env has no value")
+          sink.env.delete($1)
+        when "prio"
+          rv = set_int(io, kv, v, false) { |i| sink.prio = i }
+          rv == true or return rv
+          @targets.sort_by! { |t| t.sink.prio } if sink.active
+        when "nonblock", "active"
+          rv = set_bool(io, kv, v) { |b| sink.__send__("#{k}=", b) }
+          rv == true or return rv
+        when "pipe_size"
+          rv = set_uint(io, kv, v, true) { |u| sink.__send__("#{k}=", u) }
+          rv == true or return rv
+        when "command" # nothing to validate, this could be "rm -rf /" :>
+          sink.command = v.empty? ? DTAS::Sink::SINK_DEFAULTS["command"] : v
+        end
+      end
+
+      @sinks[name] = new_sink if new_sink # no errors? it's a new sink!
+
+      # start or stop a sink if its active= flag changed.  Additionally,
+      # account for a crashed-but-marked-active sink.  The user may have
+      # fixed the command to not crash it.
+      if (active_before != sink.active) || (sink.active && !sink.pid)
+        __sink_switch(sink)
+      end
+      io.emit("OK")
+    when "cat"
+      sink = @sinks[name] or return io.emit("ERR #{name} not found")
+      io.emit(sink.to_hsh.to_yaml)
+    else
+      io.emit("ERR unknown sink op #{msg[0]}")
+    end
+  end
+
+  def bytes_decoded(src = @current)
+    bytes = src.dst.bytes_xfer - src.dst_zero_byte
+    bytes = bytes < 0 ? 0 : bytes # maybe negative in case of sink errors
+  end
+
+  def __seek_offset_adj(dir, offset)
+    if offset.sub!(/s\z/, '')
+      offset = offset.to_i
+    else # time
+      offset = @current.format.hhmmss_to_samples(offset)
+    end
+    n = __current_decoded_samples + (dir * offset)
+    n = 0 if n < 0
+    "#{n}s"
+  end
+
+  def __current_decoded_samples
+    initial = @current.offset_samples
+    decoded = @format.bytes_to_samples(bytes_decoded)
+    decoded = out_samples(decoded, @format, @current.format)
+    initial + decoded
+  end
+
+  def __current_requeue
+    return unless @current && @current.pid
+
+    # no need to requeue if we're already due to die
+    return if @current.requeued
+    @current.requeued = true
+
+    dst = @current.dst
+    # prepare to seek to the desired point based on the number of bytes which
+    # passed through dst buffer we want the offset for the @current file,
+    # which may have a different rate than our internal @format
+    if @current.respond_to?(:infile)
+      # this offset in the @current.format (not player @format)
+      @queue.unshift([ @current.infile, "#{__current_decoded_samples}s" ])
+    else
+      # DTAS::Source::Command (hash), just rerun it
+      @queue.unshift(@current.to_hsh)
+    end
+    # We also want to hard drop the buffer so we do not get repeated audio.
+    __buf_reset(dst)
+  end
+
+  def out_samples(in_samples, infmt, outfmt)
+    in_rate = infmt.rate
+    out_rate = outfmt.rate
+    return in_samples if in_rate == out_rate # easy!
+    (in_samples * out_rate / in_rate.to_f).round
+  end
+
+  # returns the number of samples we expect from the source
+  # this takes into account sample rate differences between the source
+  # and internal player format
+  def current_expect_samples(in_samples) # @current.samples
+    out_samples(in_samples, @current.format, @format)
+  end
+
+  def rg_handler(io, msg)
+    return io.emit(@rg.to_hsh.to_yaml) if msg.empty?
+    before = @rg.to_hsh
+    msg.each do |kv|
+      k, v = kv.split(/=/, 2)
+      case k
+      when "mode"
+        case v
+        when "off"
+          @rg.mode = nil
+        else
+          DTAS::RGState::RG_MODE.include?(v) or
+            return io.emit("ERR rg mode invalid")
+          @rg.mode = v
+        end
+      when "fallback_track"
+        rv = set_bool(io, kv, v) { |b| @rg.fallback_track = b }
+        rv == true or return rv
+      when %r{(?:gain_threshold|norm_threshold|
+              preamp|norm_level|fallback_gain)[+-]?\z}x
+        rv = adjust_numeric(io, @rg, k, v)
+        rv == true or return rv
+      end
+    end
+    after = @rg.to_hsh
+    __current_requeue if before != after
+    io.emit("OK")
+  end
+
+  def active_sinks
+    sinks = @targets.map { |t| t.sink }
+    sinks.uniq!
+    sinks
+  end
+
+  # show current info about what's playing
+  # returns non-blocking iterator retval
+  def current_handler(io, msg)
+    tmp = {}
+    if @current
+      tmp["current"] = s = @current.to_hsh
+      s["spawn_at"] = @current.spawn_at
+      s["pid"] = @current.pid
+
+      # this offset and samples in the player @format (not @current.format)
+      decoded = @format.bytes_to_samples(bytes_decoded)
+      if @current.respond_to?(:infile)
+        initial = tmp["current_initial"] = @current.offset_samples
+        initial = out_samples(initial, @current.format, @format)
+        tmp["current_expect"] = current_expect_samples(s["samples"])
+        s["format"] = @current.format.to_hash.delete_if { |_,v| v.nil? }
+      else
+        initial = 0
+        tmp["current_expect"] = nil
+        s["format"] = @format.to_hash.delete_if { |_,v| v.nil? }
+      end
+      tmp["current_offset"] = initial + decoded
+    end
+    tmp["current_inflight"] = @sink_buf.inflight
+    tmp["format"] = @format.to_hash.delete_if { |_,v| v.nil? }
+    tmp["paused"] = @paused
+    rg = @rg.to_hsh
+    tmp["rg"] = rg unless rg.empty?
+    if @targets[0]
+      sinks = active_sinks
+      tmp["sinks"] = sinks.map! do |sink|
+        h = sink.to_hsh
+        h["pid"] = sink.pid
+        h
+      end
+    end
+    io.emit(tmp.to_yaml)
+  end
+
+  def __buf_reset(buf)
+    @srv.wait_ctl(buf, :ignore)
+    buf.buf_reset
+    @srv.wait_ctl(buf, :wait_readable)
+  end
+
+  def skip_handler(io, msg)
+    __current_drop
+    echo("skip")
+    io.emit("OK")
+  end
+
+  def play_pause_handler(io, command)
+    prev = @paused
+    __send__("do_#{command}")
+    io.emit({
+      "paused" => {
+        "before" => prev,
+        "after" => @paused,
+      }
+    }.to_yaml)
+  end
+
+  def do_pause
+    return if @paused
+    echo("pause")
+    @paused = true
+    __current_requeue
+  end
+
+  def do_play
+    # no echo, next_source will echo on new track
+    @paused = false
+    return if @current && @current.pid
+    next_source(@queue.shift)
+  end
+
+  def do_play_pause
+    @paused ? do_play : do_pause
+  end
+
+  def do_seek(io, offset)
+    if @current && @current.pid
+      if @current.respond_to?(:infile)
+        begin
+          if offset.sub!(/\A\+/, '')
+            offset = __seek_offset_adj(1, offset)
+          elsif offset.sub!(/\A-/, '')
+            offset = __seek_offset_adj(-1, offset)
+          # else: pass to sox directly
+          end
+        rescue ArgumentError
+          return io.emit("ERR bad time format")
+        end
+        @queue.unshift([ @current.infile, offset ])
+        __buf_reset(@current.dst) # trigger EPIPE
+      else
+        return io.emit("ERR unseekable")
+      end
+    elsif @paused
+      case file = @queue[0]
+      when String
+        @queue[0] = [ file, offset ]
+      when Array
+        file[1] = offset
+      else
+        return io.emit("ERR unseekable")
+      end
+    # unpaused case... what do we do?
+    end
+    io.emit("OK")
+  end
+
+  def restart_pipeline
+    return if @paused
+    __current_requeue
+    @sinks.each_value { |sink| sink.respawn = sink.active }
+    @targets.each { |t| drop_target(t) }.clear
+  end
+
+  def format_handler(io, msg)
+    new_fmt = @format.dup
+    msg.each do |kv|
+      k, v = kv.split(/=/, 2)
+      case k
+      when "type"
+        new_fmt.valid_type?(v) or return io.emit("ERR invalid file type")
+        new_fmt.type = v
+      when "channels", "bits", "rate"
+        rv = set_uint(io, kv, v, false) { |u| new_fmt.__send__("#{k}=", u) }
+        rv == true or return rv
+      when "endian"
+        new_fmt.valid_endian?(v) or return io.emit("ERR invalid endian")
+        new_fmt.endian = v
+      end
+    end
+
+    if new_fmt != @format
+      restart_pipeline # calls __current_requeue
+
+      # we must assign this after __current_requeue since __current_requeue
+      # relies on the old @format for calculation
+      @format = new_fmt
+    end
+    io.emit("OK")
+  end
+
+  def env_handler(io, msg)
+    msg.each do |kv|
+      case kv
+      when %r{\A([^=]+)=(.*)\z}
+        ENV[$1] = $2
+      when %r{\A([^=]+)#}
+        ENV.delete($1)
+      else
+        return io.emit("ERR bad env")
+      end
+    end
+    io.emit("OK")
+  end
+
+  def source_handler(io, msg)
+    case msg.shift
+    when "cat"
+      io.emit({
+        "command" => @srccmd || DTAS::Source::SOURCE_DEFAULTS["command"],
+        "env" => @srcenv,
+      }.to_yaml)
+    when "ed"
+      before = [ @srccmd, @srcenv ].inspect
+      msg.each do |kv|
+        k, v = kv.split(/=/, 2)
+        case k
+        when "command"
+          @srccmd = v.empty? ? nil : v
+        when %r{\Aenv\.([^=]+)\z}
+          @srcenv[$1] = v
+        when %r{\Aenv#([^=]+)\z}
+          v == nil or return io.emit("ERR unset env has no value")
+          @srcenv.delete($1)
+        end
+      end
+      after = [ @srccmd, @srcenv ].inspect
+      __current_requeue if before != after
+      io.emit("OK")
+    else
+      io.emit("ERR unknown source op")
+    end
+  end
+end
diff --git a/lib/dtas/process.rb b/lib/dtas/process.rb
new file mode 100644
index 0000000..35ca6a6
--- /dev/null
+++ b/lib/dtas/process.rb
@@ -0,0 +1,88 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'shellwords'
+require 'io/wait'
+module DTAS::Process
+  PIDS = {}
+
+  def self.reaper
+    begin
+      pid, status = Process.waitpid2(-1, Process::WNOHANG)
+      pid or return
+      obj = PIDS.delete(pid)
+      yield status, obj
+    rescue Errno::ECHILD
+      return
+    end while true
+  end
+
+  # a convienient way to display commands so it's easy to
+  # read, copy and paste to a shell
+  def xs(cmd)
+    cmd.map { |w| Shellwords.escape(w) }.join(' ')
+  end
+
+  # for long-running processes (sox/play/ecasound filters)
+  def dtas_spawn(env, cmd, opts)
+    opts = { close_others: true, pgroup: true }.merge!(opts)
+
+    # stringify env, integer values are easier to type unquoted as strings
+    env.each { |k,v| env[k] = v.to_s }
+
+    pid = begin
+      Process.spawn(env, cmd, opts)
+    rescue Errno::EINTR # Ruby bug?
+      retry
+    end
+    warn [ :spawn, pid, cmd ].inspect if $DEBUG
+    @spawn_at = Time.now.to_f
+    PIDS[pid] = self
+    pid
+  end
+
+  # this is like backtick, but takes an array instead of a string
+  # This will also raise on errors
+  def qx(cmd, opts = {})
+    r, w = IO.pipe
+    opts = opts.merge(out: w)
+    r.binmode
+    if err = opts[:err]
+      re, we = IO.pipe
+      re.binmode
+      opts[:err] = we
+    end
+    pid = begin
+      Process.spawn(*cmd, opts)
+    rescue Errno::EINTR # Ruby bug?
+      retry
+    end
+    w.close
+    if err
+      we.close
+      res = ""
+      want = { r => res, re => err }
+      begin
+        readable = IO.select(want.keys) or next
+        readable[0].each do |io|
+          bytes = io.nread
+          begin
+            want[io] << io.read_nonblock(bytes > 0 ? bytes : 11)
+          rescue Errno::EAGAIN
+            # spurious wakeup, bytes may be zero
+          rescue EOFError
+            want.delete(io)
+          end
+        end
+      end until want.empty?
+      re.close
+    else
+      res = r.read
+    end
+    r.close
+    _, status = Process.waitpid2(pid)
+    return res if status.success?
+    raise RuntimeError, "`#{xs(cmd)}' failed: #{status.inspect}"
+  end
+end
diff --git a/lib/dtas/replaygain.rb b/lib/dtas/replaygain.rb
new file mode 100644
index 0000000..e049b8d
--- /dev/null
+++ b/lib/dtas/replaygain.rb
@@ -0,0 +1,42 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+#
+# Represents ReplayGain metadata for a DTAS::Source
+# cleanup/validate values to prevent malicious files from making us
+# run arbitrary commands
+# *_peak values are 0..inf (1.0 being full scale, but >1 is possible
+# *_gain values are specified in dB
+
+class DTAS::ReplayGain
+  ATTRS = %w(reference_loudness track_gain album_gain track_peak album_peak)
+  ATTRS.each { |a| attr_reader a }
+
+  def check_gain(val)
+    /([+-]?\d+(?:\.\d+)?)/ =~ val ? $1 : nil
+  end
+
+  def check_float(val)
+    /(\d+(?:\.\d+)?)/ =~ val ? $1 : nil
+  end
+
+  def initialize(comments)
+    comments or return
+
+    # the replaygain standard specifies 89.0 dB, but maybe some apps are
+    # different...
+    @reference_loudness =
+              check_gain(comments["REPLAYGAIN_REFERENCE_LOUDNESS"]) || "89.0"
+
+    @track_gain = check_gain(comments["REPLAYGAIN_TRACK_GAIN"])
+    @album_gain = check_gain(comments["REPLAYGAIN_ALBUM_GAIN"])
+    @track_peak = check_float(comments["REPLAYGAIN_TRACK_PEAK"])
+    @album_peak = check_float(comments["REPLAYGAIN_ALBUM_PEAK"])
+  end
+
+  def self.new(comments)
+    tmp = super
+    tmp.track_gain ? tmp : nil
+  end
+end
diff --git a/lib/dtas/rg_state.rb b/lib/dtas/rg_state.rb
new file mode 100644
index 0000000..6463be7
--- /dev/null
+++ b/lib/dtas/rg_state.rb
@@ -0,0 +1,100 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+#
+# provides support for generating appropriate effects for ReplayGain
+# MAYBE: account for non-standard reference loudness (89.0 dB is standard)
+require_relative '../dtas'
+require_relative 'serialize'
+class DTAS::RGState
+  include DTAS::Serialize
+
+  RG_MODE = {
+    # attribute name => method to use
+    "album_gain" => :rg_vol_gain,
+    "track_gain" => :rg_vol_gain,
+    "album_peak" => :rg_vol_norm,
+    "track_peak" => :rg_vol_norm,
+  }
+
+  RG_DEFAULT = {
+    # skip the effect if the adjustment is too small to be noticeable
+    "gain_threshold" => 0.00000001, # in dB
+    "norm_threshold" => 0.00000001,
+
+    "preamp" => 0, # no extra adjustment
+    # "mode" => "album_gain", # nil: off
+    "mode" => nil, # nil: off
+    "fallback_gain" => -6.0, # adjustment dB if necessary RG tag is missing
+    "fallback_track" => true,
+    "norm_level" => 1.0, # dBFS
+  }
+
+  SIVS = RG_DEFAULT.keys
+  SIVS.each { |iv| attr_accessor iv }
+
+  def initialize
+    RG_DEFAULT.each do |k,v|
+      instance_variable_set("@#{k}", v)
+    end
+  end
+
+  def self.load(hash)
+    rv = new
+    hash.each { |k,v| rv.__send__("#{k}=", v) } if hash
+    rv
+  end
+
+  def to_hash
+    ivars_to_hash(SIVS)
+  end
+
+  def to_hsh
+    # no point in dumping default values, it's just a waste of space
+    to_hash.delete_if { |k,v| RG_DEFAULT[k] == v }
+  end
+
+  # returns a dB argument to the "vol" effect, nil if nothing found
+  def rg_vol_gain(val)
+    val = val.to_f + @preamp
+    return if val.abs < @gain_threshold
+    sprintf('vol %0.8gdB', val)
+  end
+
+  # returns a linear argument to the "vol" effect
+  def rg_vol_norm(val)
+    diff = @norm_level - val.to_f
+    return if (@norm_level - diff).abs < @norm_threshold
+    diff += @norm_level
+    sprintf('vol %0.8g', diff)
+  end
+
+  # The ReplayGain fallback adjustment value (in dB), in case a file is
+  # missing ReplayGain tags.  This is useful to avoid damage to speakers,
+  # eardrums and amplifiers in case a file without then necessary ReplayGain
+  # tag slips into the queue
+  def rg_fallback_effect(reason)
+    @fallback_gain or return
+    warn(reason) if $DEBUG
+    "vol #{@fallback_gain + @preamp}dB"
+  end
+
+  # returns an array (for command-line argument) for the effect needed
+  # to apply ReplayGain
+  # this may return nil
+  def effect(source)
+    return unless @mode
+    rg = source.replaygain or
+      return rg_fallback_effect("ReplayGain tags missing")
+    val = rg.__send__(@mode)
+    if ! val && @fallback_track && @mode =~ /\Aalbum_(\w+)/
+      tag = "track_#$1"
+      val = rg.__send__(tag) or
+        return rg_fallback_effect("ReplayGain tag for #@mode missing")
+      warn("tag for #@mode missing, using #{tag}")
+    end
+    # this may be nil if the adjustment is too small:
+    __send__(RG_MODE[@mode], val)
+  end
+end
diff --git a/lib/dtas/serialize.rb b/lib/dtas/serialize.rb
new file mode 100644
index 0000000..57eb626
--- /dev/null
+++ b/lib/dtas/serialize.rb
@@ -0,0 +1,10 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+module DTAS::Serialize
+  def ivars_to_hash(ivars, rv = {})
+    ivars.each { |k| rv[k] = instance_variable_get("@#{k}") }
+    rv
+  end
+end
diff --git a/lib/dtas/sigevent.rb b/lib/dtas/sigevent.rb
new file mode 100644
index 0000000..ccaec2f
--- /dev/null
+++ b/lib/dtas/sigevent.rb
@@ -0,0 +1,11 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+begin
+  raise LoadError, "no eventfd with _DTAS_POSIX" if ENV["_DTAS_POSIX"]
+  require 'sleepy_penguin'
+  require_relative 'sigevent/efd'
+rescue LoadError
+  require_relative 'sigevent/pipe'
+end
diff --git a/lib/dtas/sigevent/efd.rb b/lib/dtas/sigevent/efd.rb
new file mode 100644
index 0000000..782e383
--- /dev/null
+++ b/lib/dtas/sigevent/efd.rb
@@ -0,0 +1,21 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+class DTAS::Sigevent < SleepyPenguin::EventFD
+  include SleepyPenguin
+
+  def self.new
+    super(0, EventFD::CLOEXEC)
+  end
+
+  def signal
+    incr(1)
+  end
+
+  def readable_iter
+    value(true)
+    yield self, nil # calls DTAS::Process.reaper
+    :wait_readable
+  end
+end
diff --git a/lib/dtas/sigevent/pipe.rb b/lib/dtas/sigevent/pipe.rb
new file mode 100644
index 0000000..139aa68
--- /dev/null
+++ b/lib/dtas/sigevent/pipe.rb
@@ -0,0 +1,29 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+class DTAS::Sigevent
+  attr_reader :to_io
+
+  def initialize
+    @to_io, @wr = IO.pipe
+  end
+
+  def signal
+    @wr.syswrite('.') rescue nil
+  end
+
+  def readable_iter
+    begin
+      @to_io.read_nonblock(11)
+      yield self, nil # calls DTAS::Process.reaper
+    rescue Errno::EAGAIN
+      return :wait_readable
+    end while true
+  end
+
+  def close
+    @to_io.close
+    @wr.close
+  end
+end
diff --git a/lib/dtas/sink.rb b/lib/dtas/sink.rb
new file mode 100644
index 0000000..7931694
--- /dev/null
+++ b/lib/dtas/sink.rb
@@ -0,0 +1,122 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'yaml'
+require_relative '../dtas'
+require_relative 'pipe'
+require_relative 'process'
+require_relative 'command'
+require_relative 'format'
+require_relative 'serialize'
+require_relative 'writable_iter'
+
+# this is a sink (endpoint, audio enters but never leaves)
+class DTAS::Sink
+  attr_accessor :prio    # any Integer
+  attr_accessor :active  # boolean
+  attr_accessor :name
+  attr_accessor :nonblock
+  attr_accessor :respawn
+
+  include DTAS::Command
+  include DTAS::Process
+  include DTAS::Serialize
+  include DTAS::WritableIter
+
+  SINK_DEFAULTS = COMMAND_DEFAULTS.merge({
+    "name" => nil, # order matters, this is first
+    "command" => "exec play -q $SOXFMT -",
+    "prio" => 0,
+    "nonblock" => false,
+    "pipe_size" => nil,
+    "active" => false,
+    "respawn" => false,
+  })
+
+  DEVFD_RE = %r{/dev/fd/([a-zA-Z]\w*)\b}
+
+  # order matters for Ruby 1.9+, this defines to_hsh serialization so we
+  # can make the state file human-friendly
+  SIVS = %w(name env command prio nonblock pipe_size active)
+
+  def initialize
+    command_init(SINK_DEFAULTS)
+    writable_iter_init
+    @sink = self
+  end
+
+  # allow things that look like audio device names ("hw:1,0" , "/dev/dsp")
+  # or variable names.
+  def valid_name?(s)
+    !!(s =~ %r{\A[\w:,/-]+\z})
+  end
+
+  def self.load(hash)
+    sink = new
+    return sink unless hash
+    (SIVS & hash.keys).each do |k|
+      sink.instance_variable_set("@#{k}", hash[k])
+    end
+    sink.valid_name?(sink.name) or raise ArgumentError, "invalid sink name"
+    sink
+  end
+
+  def parse(str)
+    inputs = {}
+    str.scan(DEVFD_RE) { |w| inputs[w[0]] = nil }
+    inputs
+  end
+
+  def on_death(status)
+    super
+  end
+
+  def spawn(format, opts = {})
+    raise "BUG: #{self.inspect}#spawn called twice" if @pid
+    rv = []
+
+    pclass = @nonblock ? DTAS::PipeNB : DTAS::Pipe
+
+    cmd = command_string
+    inputs = parse(cmd)
+
+    if inputs.empty?
+      # /dev/fd/* not specified in the command, assume one input for stdin
+      r, w = pclass.new
+      w.pipe_size = @pipe_size if @pipe_size
+      inputs[:in] = opts[:in] = r
+      w.sink = self
+      rv << w
+    else
+      # multiple inputs, fun!, we'll tee to them
+      inputs.each_key do |name|
+        r, w = pclass.new
+        w.pipe_size = @pipe_size if @pipe_size
+        inputs[name] = r
+        w.sink = self
+        rv << w
+      end
+      opts[:in] = "/dev/null"
+
+      # map to real /dev/fd/* values and setup proper redirects
+      cmd = cmd.gsub(DEVFD_RE) do
+        read_fd = inputs[$1].fileno
+        opts[read_fd] = read_fd # do not close-on-exec
+        "/dev/fd/#{read_fd}"
+      end
+    end
+
+    @pid = dtas_spawn(format.to_env.merge!(@env), cmd, opts)
+    inputs.each_value { |rpipe| rpipe.close }
+    rv
+  end
+
+  def to_hash
+    ivars_to_hash(SIVS)
+  end
+
+  def to_hsh
+    to_hash.delete_if { |k,v| v == SINK_DEFAULTS[k] }
+  end
+end
diff --git a/lib/dtas/sink_reader_play.rb b/lib/dtas/sink_reader_play.rb
new file mode 100644
index 0000000..17a0190
--- /dev/null
+++ b/lib/dtas/sink_reader_play.rb
@@ -0,0 +1,70 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require_relative '../dtas'
+
+# parses lines from play(1) -S/--show-progress like this:
+#  In:0.00% 00:00:37.34 [00:00:00.00] Out:1.65M [ -====|====  ]        Clip:0
+#
+# The authors of sox probably did not intend for the output of play(1) to
+# be parsed, but we do it anyways.  We need to be ready to update this
+# code in case play(1) output changes.
+# play -S/--show-progress
+class DTAS::SinkReaderPlay
+  attr_reader :time, :out, :meter, :clips, :headroom
+  attr_reader :to_io
+  attr_reader :wr # this is stderr of play(1)
+
+  def initialize
+    @to_io, @wr = IO.pipe
+    reset
+  end
+
+  def readable_iter
+    buf = Thread.current[:dtas_lbuf] ||= ""
+    begin
+      @rbuf << @to_io.read_nonblock(1024, buf)
+
+      # do not OOM in case SoX changes output format on us
+      @rbuf.clear if @rbuf.size > 0x10000
+
+      # don't handle partial read
+      next unless / Clip:\S+ *\z/ =~ @rbuf
+
+      if @rbuf.gsub!(/(.*)\rIn:\S+ (\S+) \S+ Out:(\S+)\s+(\[[^\]]+\]) /m, "")
+        err = $1
+        @time = $2
+        @out = $3
+        @meter = $4
+        if @rbuf.gsub!(/Hd:(\d+\.\d+) Clip:(\S+) */, "")
+          @headroom = $1
+          @clips = $2
+        elsif @rbuf.gsub!(/\s+Clip:(\S+) */, "")
+          @headroom = nil
+          @clips = $1
+        end
+
+        $stderr.write(err)
+      end
+    rescue EOFError
+      return nil
+    rescue Errno::EAGAIN
+      return :wait_readable
+    end while true
+  end
+
+  def close
+    @wr.close unless @wr.closed?
+    @to_io.close
+  end
+
+  def reset
+    @rbuf = ""
+    @time = @out = @meter = @headroom = @clips = nil
+  end
+
+  def closed?
+    @to_io.closed?
+  end
+end
diff --git a/lib/dtas/source.rb b/lib/dtas/source.rb
new file mode 100644
index 0000000..f6dd443
--- /dev/null
+++ b/lib/dtas/source.rb
@@ -0,0 +1,148 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require_relative '../dtas'
+require_relative 'command'
+require_relative 'format'
+require_relative 'replaygain'
+require_relative 'process'
+require_relative 'serialize'
+
+# this is usually one input file
+class DTAS::Source
+  attr_reader :infile
+  attr_reader :offset
+  require_relative 'source/common'
+  require_relative 'source/mp3'
+
+  include DTAS::Command
+  include DTAS::Process
+  include DTAS::Source::Common
+  include DTAS::Source::Mp3
+
+  SOURCE_DEFAULTS = COMMAND_DEFAULTS.merge(
+    "command" => 'exec sox "$INFILE" $SOXFMT - $TRIMFX $RGFX',
+    "comments" => nil,
+  )
+
+  SIVS = %w(infile comments command env)
+
+  def initialize(infile, offset = nil)
+    command_init(SOURCE_DEFAULTS)
+    @format = nil
+    @infile = infile
+    @offset = offset
+    @comments = nil
+    @samples = nil
+  end
+
+  # this exists mainly to make the mpris interface easier, but it's not
+  # necessary, the mpris interface also knows the sample rate
+  def offset_us
+    (offset_samples / format.rate.to_f) * 1000000
+  end
+
+  # returns any offset in samples (relative to the original source file),
+  # likely zero unless seek was used
+  def offset_samples
+    return 0 unless @offset
+    case @offset
+    when /\A\d+s\z/
+      @offset.to_i
+    else
+      format.hhmmss_to_samples(@offset)
+    end
+  end
+
+  def precision
+    qx(%W(soxi -p #@infile), err: "/dev/null").to_i # sox.git f4562efd0aa3
+  rescue # fallback to parsing the whole output
+    s = qx(%W(soxi #@infile), err: "/dev/null")
+    s =~ /Precision\s+:\s*(\d+)-bit/
+    v = $1.to_i
+    return v if v > 0
+    raise TypeError, "could not determine precision for #@infile"
+  end
+
+  def format
+    @format ||= begin
+      fmt = DTAS::Format.new
+      fmt.from_file(@infile)
+      fmt.bits ||= precision
+      fmt
+    end
+  end
+
+  # A user may be downloading the file and start playing
+  # it before the download completes, this refreshes
+  def samples!
+    @samples = nil
+    samples
+  end
+
+  # This is the number of samples according to the samples in the source
+  # file itself, not the decoded output
+  def samples
+    @samples ||= qx(%W(soxi -s #@infile)).to_i
+  rescue => e
+    warn e.message
+    0
+  end
+
+  # just run soxi -a
+  def __load_comments
+    tmp = {}
+    case @infile
+    when String
+      err = ""
+      cmd = %W(soxi -a #@infile)
+      begin
+        qx(cmd, err: err).split(/\n/).each do |line|
+          key, value = line.split(/=/, 2)
+          key && value or next
+          # TODO: multi-line/multi-value/repeated tags
+          tmp[key.upcase] = value
+        end
+      rescue => e
+        if /FAIL formats: no handler for file extension/ =~ err
+          warn("#{xs(cmd)}: #{err}")
+        else
+          warn("#{e.message} (#{e.class})")
+        end
+        # TODO: fallbacks
+      end
+    end
+    tmp
+  end
+
+  def comments
+    @comments ||= __load_comments
+  end
+
+  def replaygain
+    DTAS::ReplayGain.new(comments) || DTAS::ReplayGain.new(mp3gain_comments)
+  end
+
+  def spawn(format, rg_state, opts)
+    raise "BUG: #{self.inspect}#spawn called twice" if @to_io
+    e = format.to_env
+    e["INFILE"] = @infile
+
+    # make sure these are visible to the "current" command...
+    @env["TRIMFX"] = @offset ? "trim #@offset" : nil
+    @env["RGFX"] = rg_state.effect(self) || nil
+
+    @pid = dtas_spawn(e.merge!(@env), command_string, opts)
+  end
+
+  def to_hsh
+    to_hash.delete_if { |k,v| v == SOURCE_DEFAULTS[k] }
+  end
+
+  def to_hash
+    rv = ivars_to_hash(SIVS)
+    rv["samples"] = samples
+    rv
+  end
+end
diff --git a/lib/dtas/source/command.rb b/lib/dtas/source/command.rb
new file mode 100644
index 0000000..30441eb
--- /dev/null
+++ b/lib/dtas/source/command.rb
@@ -0,0 +1,41 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require_relative '../../dtas'
+require_relative '../source'
+require_relative '../command'
+require_relative '../serialize'
+
+class DTAS::Source::Command
+  require_relative '../source/common'
+
+  include DTAS::Command
+  include DTAS::Process
+  include DTAS::Source::Common
+  include DTAS::Serialize
+
+  SIVS = %w(command env)
+
+  def initialize(command)
+    command_init(command: command)
+  end
+
+  def source_dup
+    rv = self.class.new
+    SIVS.each { |iv| rv.__send__("#{iv}=", self.__send__(iv)) }
+    rv
+  end
+
+  def to_hash
+    ivars_to_hash(SIVS)
+  end
+
+  alias to_hsh to_hash
+
+  def spawn(format, rg_state, opts)
+    raise "BUG: #{self.inspect}#spawn called twice" if @to_io
+    e = format.to_env
+    @pid = dtas_spawn(e.merge!(@env), command_string, opts)
+  end
+end
diff --git a/lib/dtas/source/common.rb b/lib/dtas/source/common.rb
new file mode 100644
index 0000000..333e74a
--- /dev/null
+++ b/lib/dtas/source/common.rb
@@ -0,0 +1,14 @@
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+module DTAS::Source::Common
+  attr_reader :dst_zero_byte
+  attr_reader :dst
+  attr_accessor :requeued
+
+  def dst_assoc(buf)
+    @dst = buf
+    @dst_zero_byte = buf.bytes_xfer + buf.inflight
+    @requeued = false
+  end
+end
diff --git a/lib/dtas/source/mp3.rb b/lib/dtas/source/mp3.rb
new file mode 100644
index 0000000..b013bee
--- /dev/null
+++ b/lib/dtas/source/mp3.rb
@@ -0,0 +1,38 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require_relative '../process'
+
+module DTAS::Source::Mp3
+  include DTAS::Process
+  # we use dBFS = 1.0 as scale (not 32768)
+  def __mp3gain_peak(str)
+    sprintf("%0.8g", str.to_f / 32768.0)
+  end
+
+  # massage mp3gain(1) output
+  def mp3gain_comments
+    tmp = {}
+    case @infile
+    when String
+      @infile =~ /\.mp[g23]\z/i or return
+      qx(%W(mp3gain -s c #@infile)).split(/\n/).each do |line|
+        case line
+        when /^Recommended "(Track|Album)" dB change:\s*(\S+)/
+          tmp["REPLAYGAIN_#{$1.upcase}_GAIN"] = $2
+        when /^Max PCM sample at current gain: (\S+)/
+          tmp["REPLAYGAIN_TRACK_PEAK"] = __mp3gain_peak($1)
+        when /^Max Album PCM sample at current gain: (\S+)/
+          tmp["REPLAYGAIN_ALBUM_PEAK"] = __mp3gain_peak($1)
+        end
+      end
+      tmp
+    else
+      raise TypeError, "unsupported type: #{@infile.inspect}"
+    end
+  rescue => e
+    $DEBUG and
+        warn("mp3gain(#{@infile.inspect}) failed: #{e.message} (#{e.class})")
+  end
+end
diff --git a/lib/dtas/state_file.rb b/lib/dtas/state_file.rb
new file mode 100644
index 0000000..cfd83d5
--- /dev/null
+++ b/lib/dtas/state_file.rb
@@ -0,0 +1,34 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'yaml'
+require 'tempfile'
+class DTAS::StateFile
+  def initialize(path, do_fsync = false)
+    @path = path
+    @do_fsync = do_fsync
+  end
+
+  def tryload
+    YAML.load(IO.binread(@path)) if File.readable?(@path)
+  end
+
+  def dump(obj, force_fsync = false)
+    yaml = obj.to_hsh.to_yaml.b
+
+    # do not replace existing state file if there are no changes
+    # this will be racy if we ever do async dumps or shared state
+    # files, but we don't do that...
+    return if File.readable?(@path) && IO.binread(@path) == yaml
+
+    dir = File.dirname(@path)
+    Tempfile.open(%w(player.state .tmp), dir) do |tmp|
+      tmp.binmode
+      tmp.write(yaml)
+      tmp.flush
+      tmp.fsync if @do_fsync || force_fsync
+      File.rename(tmp.path, @path)
+    end
+  end
+end
diff --git a/lib/dtas/unix_accepted.rb b/lib/dtas/unix_accepted.rb
new file mode 100644
index 0000000..6883ee1
--- /dev/null
+++ b/lib/dtas/unix_accepted.rb
@@ -0,0 +1,77 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'socket'
+require 'io/wait'
+
+class DTAS::UNIXAccepted
+  attr_reader :to_io
+
+  def initialize(sock)
+    @to_io = sock
+    @send_buf = []
+  end
+
+  # public API (for DTAS::Player)
+  # returns :wait_readable on success
+  def emit(msg)
+    buffered = @send_buf.size
+    if buffered == 0
+      begin
+        @to_io.sendmsg_nonblock(msg, Socket::MSG_EOR)
+        return :wait_readable
+      rescue Errno::EAGAIN
+        @send_buf << msg
+        return :wait_writable
+      rescue => e
+        return e
+      end
+    elsif buffered > 100
+      return RuntimeError.new("too many messages buffered")
+    else # buffered > 0
+      @send_buf << msg
+      return :wait_writable
+    end
+  end
+
+  # flushes pending data if it got buffered
+  def writable_iter
+    begin
+      msg = @send_buf.shift or return :wait_readable
+      @to_io.send_nonblock(msg, Socket::MSG_EOR)
+    rescue Errno::EAGAIN
+      @send_buf.unshift(msg)
+      return :wait_writable
+    rescue => e
+      return e
+    end while true
+  end
+
+  def readable_iter
+    io = @to_io
+    nread = io.nread
+
+    # EOF, assume no spurious wakeups for SOCK_SEQPACKET
+    return nil if nread == 0
+
+    begin
+      begin
+        msg, _, _ = io.recvmsg_nonblock(nread)
+      rescue EOFError, SystemCallError
+        return nil
+      end
+      yield(self, msg) # DTAS::Player deals with this
+      nread = io.nread
+    end while nread > 0
+    :wait_readable
+  end
+
+  def close
+    @to_io.close
+  end
+
+  def closed?
+    @to_io.closed?
+  end
+end
diff --git a/lib/dtas/unix_client.rb b/lib/dtas/unix_client.rb
new file mode 100644
index 0000000..f46eddf
--- /dev/null
+++ b/lib/dtas/unix_client.rb
@@ -0,0 +1,52 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'dtas'
+require 'socket'
+require 'io/wait'
+require 'shellwords'
+
+class DTAS::UNIXClient
+  attr_reader :to_io
+
+  def self.default_path
+    (ENV["DTAS_PLAYER_SOCK"] || File.expand_path("~/.dtas/player.sock")).b
+  end
+
+  def initialize(path = self.class.default_path)
+    @to_io = begin
+      raise if ENV["_DTAS_NOSEQPACKET"]
+      Socket.new(:AF_UNIX, :SOCK_SEQPACKET, 0)
+    rescue
+      warn("get your operating system developers to support " \
+           "SOCK_SEQPACKET for AF_UNIX sockets")
+      warn("falling back to SOCK_DGRAM, reliability possibly compromised")
+      Socket.new(:AF_UNIX, :SOCK_DGRAM, 0)
+    end
+    @to_io.connect(Socket.pack_sockaddr_un(path))
+  end
+
+  def req_start(args)
+    args = Shellwords.join(args) if Array === args
+    @to_io.send(args, Socket::MSG_EOR)
+  end
+
+  def req_ok(args, timeout = nil)
+    res = req(args, timeout)
+    res == "OK" or raise "Unexpected response: #{res}"
+    res
+  end
+
+  def req(args, timeout = nil)
+    req_start(args)
+    res_wait(timeout)
+  end
+
+  def res_wait(timeout = nil)
+    @to_io.wait(timeout)
+    nr = @to_io.nread
+    nr > 0 or raise EOFError, "unexpected EOF from server"
+    @to_io.recvmsg[0]
+  end
+end
diff --git a/lib/dtas/unix_server.rb b/lib/dtas/unix_server.rb
new file mode 100644
index 0000000..90f8479
--- /dev/null
+++ b/lib/dtas/unix_server.rb
@@ -0,0 +1,111 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'socket'
+require_relative '../dtas'
+require_relative 'unix_accepted'
+
+# This uses SOCK_SEQPACKET, unlike ::UNIXServer in Ruby stdlib
+
+# The programming model for the event loop here aims to be compatible
+# with EPOLLONESHOT use with epoll, since that fits my brain far better
+# than existing evented APIs/frameworks.
+# If we cared about scalability to thousands of clients, we'd really use epoll,
+# but IO.select can be just as fast (or faster) with few descriptors and
+# is obviously more portable.
+
+class DTAS::UNIXServer
+  attr_reader :to_io
+
+  def close
+    File.unlink(@path)
+    @to_io.close
+  end
+
+  def initialize(path)
+    @path = path
+    # lock down access by default, arbitrary commands may run as the
+    # same user dtas-player runs as:
+    old_umask = File.umask(0077)
+    @to_io = Socket.new(:AF_UNIX, :SOCK_SEQPACKET, 0)
+    addr = Socket.pack_sockaddr_un(path)
+    begin
+      @to_io.bind(addr)
+    rescue Errno::EADDRINUSE
+      # maybe we have an old path leftover from a killed process
+      tmp = Socket.new(:AF_UNIX, :SOCK_SEQPACKET, 0)
+      begin
+        tmp.connect(addr)
+        raise RuntimeError, "socket `#{path}' is in use", []
+      rescue Errno::ECONNREFUSED
+        # ok, leftover socket, unlink and rebind anyways
+        File.unlink(path)
+        @to_io.bind(addr)
+      ensure
+        tmp.close
+      end
+    end
+    @to_io.listen(1024)
+    @readers = { self => true }
+    @writers = {}
+  ensure
+    File.umask(old_umask)
+  end
+
+  def write_failed(client, e)
+    warn "failed to write to #{client}: #{e.message} (#{e.class})"
+    client.close
+  end
+
+  def readable_iter
+    # we do not do anything with the block passed to us
+    begin
+      sock, _ = @to_io.accept_nonblock
+      @readers[DTAS::UNIXAccepted.new(sock)] = true
+    rescue Errno::ECONNABORTED # ignore this, it happens
+    rescue Errno::EAGAIN
+      return :wait_readable
+    end while true
+  end
+
+  def wait_ctl(io, err)
+    case err
+    when :wait_readable
+      @readers[io] = true
+    when :wait_writable
+      @writers[io] = true
+    when :delete
+      @readers.delete(io)
+      @writers.delete(io)
+    when :ignore
+      # There are 2 cases for :ignore
+      # - DTAS::Buffer was readable before, but all destinations (e.g. sinks)
+      #   were blocked, so we stop caring for producer (buffer) readability.
+      # - a consumer (e.g. DTAS::Sink) just became writable, but the
+      #   corresponding DTAS::Buffer was already readable in a previous
+      #   call.
+    when nil
+      io.close
+    when StandardError
+      io.close
+    else
+      raise "BUG: wait_ctl invalid: #{io} #{err.inspect}"
+    end
+  end
+
+  def run_once
+    begin
+      # give IO.select one-shot behavior, snapshot and replace the watchlist
+      r = IO.select(@readers.keys, @writers.keys) or return
+      r[1].each do |io|
+        @writers.delete(io)
+        wait_ctl(io, io.writable_iter)
+      end
+      r[0].each do |io|
+        @readers.delete(io)
+        wait_ctl(io, io.readable_iter { |_io, msg| yield(_io, msg) })
+      end
+    end
+  end
+end
diff --git a/lib/dtas/util.rb b/lib/dtas/util.rb
new file mode 100644
index 0000000..03c7ded
--- /dev/null
+++ b/lib/dtas/util.rb
@@ -0,0 +1,16 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require_relative '../dtas'
+
+# in case we need to convert DB values to a linear scale
+module DTAS::Util
+  def db_to_linear(val)
+    Math.exp(val * Math.log(10) * 0.05)
+  end
+
+  def linear_to_db(val)
+    Math.log10(val) * 20
+  end
+end
diff --git a/lib/dtas/writable_iter.rb b/lib/dtas/writable_iter.rb
new file mode 100644
index 0000000..aa02905
--- /dev/null
+++ b/lib/dtas/writable_iter.rb
@@ -0,0 +1,23 @@
+# -*- encoding: binary -*-
+# :stopdoc:
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require_relative '../dtas'
+
+module DTAS::WritableIter
+  attr_accessor :on_writable
+
+  def writable_iter_init
+    @on_writable = nil
+  end
+
+  # this is used to exchange our own writable status for the readable
+  # status of the DTAS::Buffer which triggered us.
+  def writable_iter
+    if owr = @on_writable
+      @on_writable = nil
+      owr.call # this triggers readability watching of DTAS::Buffer
+    end
+    :ignore
+  end
+end