From 3e09ac0c10c95bb24a08af62393b4f761e2743d0 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 24 Aug 2013 09:54:45 +0000 Subject: initial commit --- lib/dtas/buffer.rb | 91 ++++++++ lib/dtas/buffer/read_write.rb | 103 +++++++++ lib/dtas/buffer/splice.rb | 143 ++++++++++++ lib/dtas/command.rb | 44 ++++ lib/dtas/compat_onenine.rb | 19 ++ lib/dtas/disclaimer.rb | 14 ++ lib/dtas/format.rb | 152 +++++++++++++ lib/dtas/pipe.rb | 40 ++++ lib/dtas/player.rb | 386 ++++++++++++++++++++++++++++++++ lib/dtas/player/client_handler.rb | 452 ++++++++++++++++++++++++++++++++++++++ lib/dtas/process.rb | 88 ++++++++ lib/dtas/replaygain.rb | 42 ++++ lib/dtas/rg_state.rb | 100 +++++++++ lib/dtas/serialize.rb | 10 + lib/dtas/sigevent.rb | 11 + lib/dtas/sigevent/efd.rb | 21 ++ lib/dtas/sigevent/pipe.rb | 29 +++ lib/dtas/sink.rb | 122 ++++++++++ lib/dtas/sink_reader_play.rb | 70 ++++++ lib/dtas/source.rb | 148 +++++++++++++ lib/dtas/source/command.rb | 41 ++++ lib/dtas/source/common.rb | 14 ++ lib/dtas/source/mp3.rb | 38 ++++ lib/dtas/state_file.rb | 34 +++ lib/dtas/unix_accepted.rb | 77 +++++++ lib/dtas/unix_client.rb | 52 +++++ lib/dtas/unix_server.rb | 111 ++++++++++ lib/dtas/util.rb | 16 ++ lib/dtas/writable_iter.rb | 23 ++ 29 files changed, 2491 insertions(+) create mode 100644 lib/dtas/buffer.rb create mode 100644 lib/dtas/buffer/read_write.rb create mode 100644 lib/dtas/buffer/splice.rb create mode 100644 lib/dtas/command.rb create mode 100644 lib/dtas/compat_onenine.rb create mode 100644 lib/dtas/disclaimer.rb create mode 100644 lib/dtas/format.rb create mode 100644 lib/dtas/pipe.rb create mode 100644 lib/dtas/player.rb create mode 100644 lib/dtas/player/client_handler.rb create mode 100644 lib/dtas/process.rb create mode 100644 lib/dtas/replaygain.rb create mode 100644 lib/dtas/rg_state.rb create mode 100644 lib/dtas/serialize.rb create mode 100644 lib/dtas/sigevent.rb create mode 100644 lib/dtas/sigevent/efd.rb create mode 100644 lib/dtas/sigevent/pipe.rb create mode 100644 lib/dtas/sink.rb create mode 100644 lib/dtas/sink_reader_play.rb create mode 100644 lib/dtas/source.rb create mode 100644 lib/dtas/source/command.rb create mode 100644 lib/dtas/source/common.rb create mode 100644 lib/dtas/source/mp3.rb create mode 100644 lib/dtas/state_file.rb create mode 100644 lib/dtas/unix_accepted.rb create mode 100644 lib/dtas/unix_client.rb create mode 100644 lib/dtas/unix_server.rb create mode 100644 lib/dtas/util.rb create mode 100644 lib/dtas/writable_iter.rb (limited to 'lib/dtas') diff --git a/lib/dtas/buffer.rb b/lib/dtas/buffer.rb new file mode 100644 index 0000000..d02e8a6 --- /dev/null +++ b/lib/dtas/buffer.rb @@ -0,0 +1,91 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require_relative '../dtas' + +class DTAS::Buffer + begin + raise LoadError, "no splice with _DTAS_POSIX" if ENV["_DTAS_POSIX"] + require 'io/splice' # splice is only in Linux for now... + require_relative 'buffer/splice' + include DTAS::Buffer::Splice + rescue LoadError + require_relative 'buffer/read_write' + include DTAS::Buffer::ReadWrite + end + + attr_reader :to_io # call nread on this + attr_reader :wr # processes (sources) should redirect to this + attr_accessor :bytes_xfer + + def initialize + @bytes_xfer = 0 + @buffer_size = nil + @to_io, @wr = DTAS::Pipe.new + end + + def self.load(hash) + buf = new + if hash + bs = hash["buffer_size"] and buf.buffer_size = bs + end + buf + end + + def to_hsh + @buffer_size ? { "buffer_size" => @buffer_size } : {} + end + + def __dst_error(dst, e) + warn "dropping #{dst.inspect} due to error: #{e.message} (#{e.class})" + dst.close unless dst.closed? + end + + # This will modify targets + # returns one of: + # - :wait_readable + # - subset of targets array for :wait_writable + # - some type of StandardError + # - nil + def broadcast(targets) + bytes = inflight + return :wait_readable if 0 == bytes # spurious wakeup + + case targets.size + when 0 + :ignore # this will pause decoders + when 1 + broadcast_one(targets, bytes) + else # infinity + broadcast_inf(targets, bytes) + end + end + + def readable_iter + # this calls DTAS::Buffer#broadcast from DTAS::Player + yield(self, nil) + end + + def inflight + @to_io.nread + end + + # don't really close the pipes under normal circumstances, just clear data + def close + bytes = inflight + discard(bytes) if bytes > 0 + end + + def buf_reset + close! + @bytes_xfer = 0 + @to_io, @wr = DTAS::Pipe.new + @wr.pipe_size = @buffer_size if @buffer_size + end + + def close! + @to_io.close + @wr.close + end +end diff --git a/lib/dtas/buffer/read_write.rb b/lib/dtas/buffer/read_write.rb new file mode 100644 index 0000000..93380d1 --- /dev/null +++ b/lib/dtas/buffer/read_write.rb @@ -0,0 +1,103 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require 'io/wait' +require 'io/nonblock' +require_relative '../../dtas' +require_relative '../pipe' + +module DTAS::Buffer::ReadWrite + MAX_AT_ONCE = 512 # min PIPE_BUF value in POSIX + attr_accessor :buffer_size + + def _rbuf + Thread.current[:dtas_pbuf] ||= "" + end + + # be sure to only call this with nil when all writers to @wr are done + def discard(bytes) + buf = _rbuf + begin + @to_io.read(bytes, buf) or break # EOF + bytes -= buf.bytesize + end until bytes == 0 + end + + # always block when we have a single target + def broadcast_one(targets, bytes) + buf = _rbuf + @to_io.read(bytes, buf) + n = targets[0].write(buf) # IO#write has write-in-full behavior + @bytes_xfer += n + :wait_readable + rescue Errno::EPIPE, IOError => e + __dst_error(targets[0], e) + targets.clear + nil # do not return error here, we already spewed an error message + end + + def broadcast_inf(targets, bytes) + nr_nb = targets.count { |sink| sink.nonblock? } + if nr_nb == 0 || nr_nb == targets.size + # if all targets are full, don't start until they're all writable + r = IO.select(nil, targets, nil, 0) or return targets + blocked = targets - r[1] + + # tell DTAS::UNIXServer#run_once to wait on the blocked targets + return blocked if blocked[0] + + # all writable, yay! + else + blocked = [] + end + + again = {} + + # don't pin too much on one target + bytes = bytes > MAX_AT_ONCE ? MAX_AT_ONCE : bytes + buf = _rbuf + @to_io.read(bytes, buf) + @bytes_xfer += buf.bytesize + + targets.delete_if do |dst| + begin + if dst.nonblock? + w = dst.write_nonblock(buf) + again[dst] = buf.byteslice(w, n) if w < n + else + dst.write(buf) + end + false + rescue Errno::EAGAIN + blocked << dst + false + rescue IOError, Errno::EPIPE => e + again.delete(dst) + __dst_error(dst, e) + true + end + end + + # try to write as much as possible + again.delete_if do |dst, sbuf| + begin + w = dst.write_nonblock(sbuf) + n = sbuf.bytesize + if w < n + again[dst] = sbuf.byteslice(w, n) + false + else + true + end + rescue Errno::EAGAIN + blocked << dst + true + rescue IOError, Errno::EPIPE => e + __dst_error(dst, e) + true + end + end until again.empty? + targets[0] ? :wait_readable : nil + end +end diff --git a/lib/dtas/buffer/splice.rb b/lib/dtas/buffer/splice.rb new file mode 100644 index 0000000..c2540bd --- /dev/null +++ b/lib/dtas/buffer/splice.rb @@ -0,0 +1,143 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require 'io/wait' +require 'io/nonblock' +require 'io/splice' +require_relative '../../dtas' +require_relative '../pipe' + +module DTAS::Buffer::Splice + MAX_AT_ONCE = 4096 # page size in Linux + MAX_SIZE = File.read("/proc/sys/fs/pipe-max-size").to_i + DEVNULL = File.open("/dev/null", "r+") + F_MOVE = IO::Splice::F_MOVE + WAITALL = IO::Splice::WAITALL + + def buffer_size + @to_io.pipe_size + end + + # nil is OK, won't reset existing pipe, either... + def buffer_size=(bytes) + @to_io.pipe_size = bytes if bytes + @buffer_size = bytes + end + + # be sure to only call this with nil when all writers to @wr are done + def discard(bytes) + IO.splice(@to_io, nil, DEVNULL, nil, bytes) + end + + def broadcast_one(targets, bytes) + # single output is always non-blocking + s = IO.trysplice(@to_io, nil, targets[0], nil, bytes, F_MOVE) + if Symbol === s + targets # our one and only target blocked on write + else + @bytes_xfer += s + :wait_readable # we want to read more from @to_io soon + end + rescue Errno::EPIPE, IOError => e + __dst_error(targets[0], e) + targets.clear + nil # do not return error here, we already spewed an error message + end + + # returns the largest value we teed + def __broadcast_tee(blocked, targets, chunk_size) + most_teed = 0 + targets.delete_if do |dst| + begin + t = dst.nonblock? ? + IO.trytee(@to_io, dst, chunk_size) : + IO.tee(@to_io, dst, chunk_size, WAITALL) + if Integer === t + most_teed = t if t > most_teed + else + blocked << dst + end + false + rescue IOError, Errno::EPIPE => e + __dst_error(dst, e) + true + end + end + most_teed + end + + def broadcast_inf(targets, bytes) + if targets.none? { |sink| sink.nonblock? } + # if all targets are blocking, don't start until they're all writable + r = IO.select(nil, targets, nil, 0) or return targets + blocked = targets - r[1] + + # tell DTAS::UNIXServer#run_once to wait on the blocked targets + return blocked if blocked[0] + + # all writable, yay! + else + blocked = [] + end + + # don't pin too much on one target + bytes = bytes > MAX_AT_ONCE ? MAX_AT_ONCE : bytes + + last = targets.pop # we splice to the last one, tee to the rest + most_teed = __broadcast_tee(blocked, targets, bytes) + + # don't splice more than the largest amount we successfully teed + bytes = most_teed if most_teed > 0 + + begin + targets << last + if last.nonblock? + s = IO.trysplice(@to_io, nil, last, nil, bytes, F_MOVE) + if Symbol === s + blocked << last + + # we accomplished nothing! + # If _all_ writers are blocked, do not discard data, + # stay blocked on :wait_writable + return blocked if most_teed == 0 + + # the tees targets win, drop data intended for last + if most_teed > 0 + discard(most_teed) + @bytes_xfer += most_teed + # do not watch for writability of last, last is non-blocking + return :wait_readable + end + end + else + # the blocking case is simple + s = IO.splice(@to_io, nil, last, nil, bytes, WAITALL|F_MOVE) + end + @bytes_xfer += s + + # if we can't splice everything + # discard it so the early targets do not get repeated data + if s < bytes && most_teed > 0 + discard(bytes - s) + end + :wait_readable + rescue IOError, Errno::EPIPE => e # last failed, drop it + __dst_error(last, e) + targets.pop # we're no longer a valid target + + if most_teed == 0 + # nothing accomplished, watch any targets + return blocked if blocked[0] + else + # some progress, discard the data we could not splice + @bytes_xfer += most_teed + discard(most_teed) + end + + # stop decoding if we're completely errored out + # returning nil will trigger close + return targets[0] ? :wait_readable : nil + end + end +end diff --git a/lib/dtas/command.rb b/lib/dtas/command.rb new file mode 100644 index 0000000..b957567 --- /dev/null +++ b/lib/dtas/command.rb @@ -0,0 +1,44 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +# common code for wrapping SoX/ecasound/... commands +require_relative 'serialize' +require 'shellwords' + +module DTAS::Command + include DTAS::Serialize + attr_reader :pid + attr_reader :to_io + attr_accessor :command + attr_accessor :env + attr_accessor :spawn_at + + COMMAND_DEFAULTS = { + "env" => {}, + "command" => nil, + } + + def command_init(defaults = {}) + @pid = nil + @to_io = nil + @spawn_at = nil + COMMAND_DEFAULTS.merge(defaults).each do |k,v| + v = v.dup if Hash === v || Array === v + instance_variable_set("@#{k}", v) + end + end + + def kill(sig = :TERM) + # always kill the pgroup since we run subcommands in their own shell + Process.kill(sig, -@pid) + end + + def on_death(status) + @pid = nil + end + + def command_string + @command + end +end diff --git a/lib/dtas/compat_onenine.rb b/lib/dtas/compat_onenine.rb new file mode 100644 index 0000000..98be8c9 --- /dev/null +++ b/lib/dtas/compat_onenine.rb @@ -0,0 +1,19 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) + +# Make Ruby 1.9.3 look like Ruby 2.0.0 to us +# This exists for Debian wheezy users using the stock Ruby 1.9.3 install. +# We'll drop this interface when Debian wheezy (7.0) becomes unsupported. +class String + def b + dup.force_encoding(Encoding::BINARY) + end +end unless String.method_defined?(:b) + +def IO + def self.pipe + super.map! { |io| io.close_on_exec = true; io } + end +end if RUBY_VERSION.to_f <= 1.9 diff --git a/lib/dtas/disclaimer.rb b/lib/dtas/disclaimer.rb new file mode 100644 index 0000000..c25ba77 --- /dev/null +++ b/lib/dtas/disclaimer.rb @@ -0,0 +1,14 @@ +DTAS_DISCLAIMER = < +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +# class represents an audio format (type/bits/channels/sample rate/...) +require_relative '../dtas' +require_relative 'process' +require_relative 'serialize' + +class DTAS::Format + include DTAS::Process + include DTAS::Serialize + NATIVE_ENDIAN = [1].pack("l") == [1].pack("l>") ? "big" : "little" + + attr_accessor :type # s32, f32, f64 ... any point in others? + attr_accessor :channels # 1..666 + attr_accessor :rate # 44100, 48000, 88200, 96000, 176400, 192000 ... + attr_accessor :bits # only set for playback on 16-bit DACs + attr_accessor :endian + + FORMAT_DEFAULTS = { + "type" => "s32", + "channels" => 2, + "rate" => 44100, + "bits" => nil, # default: implied from type + "endian" => nil, # unspecified + } + SIVS = FORMAT_DEFAULTS.keys + + def self.load(hash) + fmt = new + return fmt unless hash + (SIVS & hash.keys).each do |k| + fmt.instance_variable_set("@#{k}", hash[k]) + end + fmt + end + + def initialize + FORMAT_DEFAULTS.each do |k,v| + instance_variable_set("@#{k}", v) + end + end + + def to_sox_arg + rv = %W(-t#@type -c#@channels -r#@rate) + rv.concat(%W(-b#@bits)) if @bits # needed for play(1) to 16-bit DACs + rv + end + + # returns 'be' or 'le' depending on endianess + def endian2 + case e = @endian || NATIVE_ENDIAN + when "big" + "be" + when "little" + "le" + else + raise"unsupported endian=#{e}" + end + end + + def to_eca_arg + %W(-f #{@type}_#{endian2},#@channels,#@rate) + end + + def inspect + "<#{self.class}(#{xs(to_sox_arg)})>" + end + + def to_hsh + to_hash.delete_if { |k,v| v == FORMAT_DEFAULTS[k] } + end + + def to_hash + ivars_to_hash(SIVS) + end + + def from_file(path) + @channels = qx(%W(soxi -c #{path})).to_i + @type = qx(%W(soxi -t #{path})).strip + @rate = qx(%W(soxi -r #{path})).to_i + # we don't need to care for bits, do we? + end + + # for the _decoded_ output + def bits_per_sample + return @bits if @bits + /\A[fst](8|16|24|32|64)\z/ =~ @type or + raise TypeError, "invalid type=#@type (must be s32/f32/f64)" + $1.to_i + end + + def bytes_per_sample + bits_per_sample / 8 + end + + def to_env + rv = { + "SOX_FILETYPE" => @type, + "CHANNELS" => @channels.to_s, + "RATE" => @rate.to_s, + "ENDIAN" => @endian || NATIVE_ENDIAN, + "SOXFMT" => to_sox_arg.join(' '), + "ECAFMT" => to_eca_arg.join(' '), + "ENDIAN2" => endian2, + } + begin # don't set these if we can't get them, SOX_FILETYPE may be enough + rv["BITS_PER_SAMPLE"] = bits_per_sample.to_s + rescue TypeError + end + rv + end + + def bytes_to_samples(bytes) + bytes / bytes_per_sample / @channels + end + + def bytes_to_time(bytes) + Time.at(bytes_to_samples(bytes) / @rate.to_f) + end + + def valid_type?(type) + !!(type =~ %r{\A[us](?:8|16|24|32)\z} || type =~ %r{\Af?:(32|64)}) + end + + def valid_endian?(endian) + !!(endian =~ %r{\A(?:big|little|swap)\z}) + end + + # HH:MM:SS.frac (don't bother with more complex times, too much code) + # part of me wants to drop this feature from playq, feels like bloat... + def hhmmss_to_samples(hhmmss) + time = hhmmss.dup + rv = 0 + if time.sub!(/\.(\d+)\z/, "") + # convert fractional second to sample count: + rv = ("0.#$1".to_f * @rate).to_i + end + + # deal with HH:MM:SS + t = time.split(/:/) + raise ArgumentError, "Bad time format: #{hhmmss}" if t.size > 3 + + mult = 1 + while part = t.pop + rv += part.to_i * mult * @rate + mult *= 60 + end + rv + end +end diff --git a/lib/dtas/pipe.rb b/lib/dtas/pipe.rb new file mode 100644 index 0000000..891e9cd --- /dev/null +++ b/lib/dtas/pipe.rb @@ -0,0 +1,40 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +begin + require 'io/splice' +rescue LoadError +end +require_relative '../dtas' +require_relative 'writable_iter' + +class DTAS::Pipe < IO + include DTAS::WritableIter + attr_accessor :sink + + def self.new + _, w = rv = pipe + w.writable_iter_init + rv + end + + # create no-op methods for non-Linux + unless method_defined?(:pipe_size=) + def pipe_size=(_) + end + + def pipe_size + end + end +end + +# for non-blocking sinks, this avoids extra fcntl(..., F_GETFL) syscalls +# We don't need fcntl at all for splice/tee in Linux +# For non-Linux, we write_nonblock/read_nonblock already call fcntl() +# behind our backs, so there's no need to repeat it. +class DTAS::PipeNB < DTAS::Pipe + def nonblock? + true + end +end diff --git a/lib/dtas/player.rb b/lib/dtas/player.rb new file mode 100644 index 0000000..b26303c --- /dev/null +++ b/lib/dtas/player.rb @@ -0,0 +1,386 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require 'yaml' +require 'shellwords' +require_relative '../dtas' +require_relative 'source' +require_relative 'source/command' +require_relative 'sink' +require_relative 'unix_server' +require_relative 'buffer' +require_relative 'sigevent' +require_relative 'rg_state' +require_relative 'state_file' + +class DTAS::Player + require_relative 'player/client_handler' + include DTAS::Player::ClientHandler + attr_accessor :state_file + attr_accessor :socket + attr_reader :sinks + + def initialize + @state_file = nil + @socket = nil + @srv = nil + @queue = [] # sources + @paused = false + @format = DTAS::Format.new + @srccmd = nil + @srcenv = {} + + @sinks = {} # { user-defined name => sink } + @targets = [] # order matters + @rg = DTAS::RGState.new + + # sits in between shared effects (if any) and sinks + @sink_buf = DTAS::Buffer.new + @current = nil + @watchers = {} + end + + def echo(msg) + msg = Shellwords.join(msg) if Array === msg + @watchers.delete_if do |io, _| + if io.closed? + true + else + case io.emit(msg) + when :wait_readable, :wait_writable + false + else + true + end + end + end + $stdout.write(msg << "\n") + end + + def to_hsh + rv = {} + rv["socket"] = @socket + rv["paused"] = @paused if @paused + src = rv["source"] = {} + src["command"] = @srccmd if @srccmd + src["env"] = @srcenv if @srcenv.size > 0 + + # Arrays + rv["queue"] = @queue + + %w(rg sink_buf format).each do |k| + rv[k] = instance_variable_get("@#{k}").to_hsh + end + + # no empty hashes or arrays + rv.delete_if do |k,v| + case v + when Hash, Array + v.empty? + else + false + end + end + + unless @sinks.empty? + sinks = rv["sinks"] = [] + # sort sinks by name for human viewability + @sinks.keys.sort.each do |name| + sinks << @sinks[name].to_hsh + end + end + + rv + end + + def self.load(hash) + rv = new + rv.instance_eval do + @rg = DTAS::RGState.load(hash["rg"]) + if v = hash["sink_buf"] + v = v["buffer_size"] + @sink_buf.buffer_size = v + end + %w(socket queue paused).each do |k| + v = hash[k] or next + instance_variable_set("@#{k}", v) + end + if v = hash["source"] + @srccmd = v["command"] + e = v["env"] and @srcenv = e + end + + if sinks = hash["sinks"] + sinks.each do |sink_hsh| + sink = DTAS::Sink.load(sink_hsh) + @sinks[sink.name] = sink + end + end + end + rv + end + + def enq_handler(io, msg) + # check @queue[0] in case we have no sinks + if @current || @queue[0] || @paused + @queue << msg + else + next_source(msg) + end + io.emit("OK") + end + + def do_enq_head(io, msg) + # check @queue[0] in case we have no sinks + if @current || @queue[0] || @paused + @queue.unshift(msg) + else + next_source(msg) + end + io.emit("OK") + end + + # yielded from readable_iter + def client_iter(io, msg) + msg = Shellwords.split(msg) + command = msg.shift + case command + when "enq" + enq_handler(io, msg[0]) + when "enq-head" + do_enq_head(io, msg) + when "enq-cmd" + enq_handler(io, { "command" => msg[0]}) + when "pause", "play", "play_pause" + play_pause_handler(io, command) + when "seek" + do_seek(io, msg[0]) + when "clear" + @queue.clear + echo("clear") + io.emit("OK") + when "rg" + rg_handler(io, msg) + when "skip" + skip_handler(io, msg) + when "sink" + sink_handler(io, msg) + when "current" + current_handler(io, msg) + when "watch" + @watchers[io] = true + io.emit("OK") + when "format" + format_handler(io, msg) + when "env" + env_handler(io, msg) + when "restart" + restart_pipeline + io.emit("OK") + when "source" + source_handler(io, msg) + end + end + + def event_loop_iter + @srv.run_once do |io, msg| # readability handler, request/response + case io + when @sink_buf + sink_iter + when DTAS::UNIXAccepted + client_iter(io, msg) + when DTAS::Sigevent # signal received + reap_iter + else + raise "BUG: unknown event: #{io.class} #{io.inspect} #{msg.inspect}" + end + end + end + + def reap_iter + DTAS::Process.reaper do |status, obj| + warn [ :reap, obj, status ].inspect if $DEBUG + obj.on_death(status) if obj.respond_to?(:on_death) + case obj + when @current + next_source(@paused ? nil : @queue.shift) + when DTAS::Sink # on unexpected sink death + sink_death(obj, status) + end + end + :wait_readable + end + + def sink_death(sink, status) + deleted = [] + @targets.delete_if do |t| + if t.sink == sink + deleted << t + else + false + end + end + + if deleted[0] + warn("#{sink.name} died unexpectedly: #{status.inspect}") + deleted.each { |t| drop_target(t) } + __current_drop unless @targets[0] + end + + return unless sink.active + + if @queue[0] && !@paused + # we get here if source/sinks are all killed in restart_pipeline + __sink_activate(sink) + next_source(@queue.shift) + elsif sink.respawn + __sink_activate(sink) if @current + end + ensure + sink.respawn = false + end + + # returns a wait_ctl arg for self + def broadcast_iter(buf, targets) + case rv = buf.broadcast(targets) + when Array # array of blocked sinks + # have sinks wake up the this buffer when they're writable + trade_ctl = proc { @srv.wait_ctl(buf, :wait_readable) } + rv.each do |dst| + dst.on_writable = trade_ctl + @srv.wait_ctl(dst, :wait_writable) + end + + # this @sink_buf hibernates until trade_ctl is called + # via DTAS::Sink#writable_iter + :ignore + else # :wait_readable or nil + rv + end + end + + def bind + @srv = DTAS::UNIXServer.new(@socket) + end + + # only used on new installations where no sink exists + def create_default_sink + return unless @sinks.empty? + s = DTAS::Sink.new + s.name = "default" + s.active = true + @sinks[s.name] = s + end + + # called when the player is leaving idle state + def spawn_sinks(source_spec) + return true if @targets[0] + @sinks.each_value do |sink| + sink.active or next + next if sink.pid + @targets.concat(sink.spawn(@format)) + end + if @targets[0] + @targets.sort_by! { |t| t.sink.prio } + true + else + # fail, no active sink + @queue.unshift(source_spec) + false + end + end + + def next_source(source_spec) + @current = nil + if source_spec + # restart sinks iff we were idle + spawn_sinks(source_spec) or return + + case source_spec + when String + @current = DTAS::Source.new(source_spec) + echo(%W(file #{@current.infile})) + when Array + @current = DTAS::Source.new(*source_spec) + echo(%W(file #{@current.infile} #{@current.offset_samples}s)) + else + @current = DTAS::Source::Command.new(source_spec["command"]) + echo(%W(command #{@current.command_string})) + end + + if DTAS::Source === @current + @current.command = @srccmd if @srccmd + @current.env = @srcenv.dup unless @srcenv.empty? + end + + dst = @sink_buf + @current.dst_assoc(dst) + @current.spawn(@format, @rg, out: dst.wr, in: "/dev/null") + @srv.wait_ctl(dst, :wait_readable) + else + stop_sinks if @sink_buf.inflight == 0 + echo "idle" + end + end + + def drop_target(target) + @srv.wait_ctl(target, :delete) + target.close + end + + def stop_sinks + @targets.each { |t| drop_target(t) }.clear + end + + # only call on unrecoverable errors (or "skip") + def __current_drop(src = @current) + __buf_reset(src.dst) if src && src.pid + end + + # pull data from sink_buf into @targets, source feeds into sink_buf + def sink_iter + wait_iter = broadcast_iter(@sink_buf, @targets) + __current_drop if nil == wait_iter # sink error, stop source + return wait_iter if @current + + # no source left to feed sink_buf, drain the remaining data + sink_bytes = @sink_buf.inflight + if sink_bytes > 0 + return wait_iter if @targets[0] # play what is leftover + + # discard the buffer if no sinks + @sink_buf.discard(sink_bytes) + end + + # nothing left inflight, stop the sinks until we have a source + stop_sinks + :ignore + end + + # the main loop + def run + sev = DTAS::Sigevent.new + @srv.wait_ctl(sev, :wait_readable) + old_chld = trap(:CHLD) { sev.signal } + create_default_sink + next_source(@paused ? nil : @queue.shift) + begin + event_loop_iter + rescue => e # just in case... + warn "E: #{e.message} (#{e.class})" + e.backtrace.each { |l| warn l } + end while true + ensure + __current_requeue + trap(:CHLD, old_chld) + sev.close if sev + # for state file + end + + def close + @srv = @srv.close if @srv + @sink_buf.close! + @state_file.dump(self, true) if @state_file + end +end diff --git a/lib/dtas/player/client_handler.rb b/lib/dtas/player/client_handler.rb new file mode 100644 index 0000000..aabd1ab --- /dev/null +++ b/lib/dtas/player/client_handler.rb @@ -0,0 +1,452 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +module DTAS::Player::ClientHandler + + # returns true on success, wait_ctl arg on error + def set_bool(io, kv, v) + case v + when "false" then yield(false) + when "true" then yield(true) + else + return io.emit("ERR #{kv} must be true or false") + end + true + end + + def adjust_numeric(io, obj, k, v) + negate = !!v.sub!(/\A-/, '') + case v + when %r{\A\+?\d*\.\d+\z} + num = v.to_f + when %r{\A\+?\d+\z} + num = v.to_i + else + return io.emit("ERR #{k}=#{v} must be a float") + end + num = -num if negate + + if k.sub!(/\+\z/, '') # increment existing + num += obj.__send__(k) + elsif k.sub!(/-\z/, '') # decrement existing + num = obj.__send__(k) - num + # else # normal assignment + end + obj.__send__("#{k}=", num) + true + end + + # returns true on success, wait_ctl arg on error + def set_int(io, kv, v, null_ok) + case v + when %r{\A-?\d+\z} + yield(v.to_i) + when "" + null_ok or return io.emit("ERR #{kv} must be defined") + yield(nil) + else + return io.emit("ERR #{kv} must an integer") + end + true + end + + # returns true on success, wait_ctl arg on error + def set_uint(io, kv, v, null_ok) + case v + when %r{\A\d+\z} + yield(v.to_i) + when %r{\A0x[0-9a-fA-F]+\z}i # hex + yield(v.to_i(16)) + when "" + null_ok or return io.emit("ERR #{kv} must be defined") + yield(nil) + else + return io.emit("ERR #{kv} must an non-negative integer") + end + true + end + + def __sink_activate(sink) + return if sink.pid + @targets.concat(sink.spawn(@format)) + @targets.sort_by! { |t| t.sink.prio } + end + + def drop_sink(sink) + @targets.delete_if do |t| + if t.sink == sink + drop_target(t) + true + else + false + end + end + end + + # called to activate/deactivate a sink + def __sink_switch(sink) + if sink.active + if @current + # maybe it's still alive for now, but it's just being killed + # do not reactivate it until we've reaped it + if sink.pid + drop_sink(sink) + sink.respawn = true + else + __sink_activate(sink) + end + end + else + drop_sink(sink) + end + # if we change any sinks, make sure the event loop watches it for + # readability again, since we new sinks should be writable, and + # we've stopped waiting on killed sinks + @srv.wait_ctl(@sink_buf, :wait_readable) + end + + # returns a wait_ctl arg + def sink_handler(io, msg) + name = msg[1] + case msg[0] + when "ls" + io.emit(Shellwords.join(@sinks.keys.sort)) + when "rm" + sink = @sinks.delete(name) or return io.emit("ERR #{name} not found") + drop_sink(sink) + io.emit("OK") + when "ed" + sink = @sinks[name] || (new_sink = DTAS::Sink.new) + + # allow things that look like audio device names ("hw:1,0" , "/dev/dsp") + # or variable names. + sink.valid_name?(name) or return io.emit("ERR sink name invalid") + + sink.name = name + active_before = sink.active + + # multiple changes may be made at once + msg[2..-1].each do |kv| + k, v = kv.split(/=/, 2) + case k + when %r{\Aenv\.([^=]+)\z} + sink.env[$1] = v + when %r{\Aenv#([^=]+)\z} + v == nil or return io.emit("ERR unset env has no value") + sink.env.delete($1) + when "prio" + rv = set_int(io, kv, v, false) { |i| sink.prio = i } + rv == true or return rv + @targets.sort_by! { |t| t.sink.prio } if sink.active + when "nonblock", "active" + rv = set_bool(io, kv, v) { |b| sink.__send__("#{k}=", b) } + rv == true or return rv + when "pipe_size" + rv = set_uint(io, kv, v, true) { |u| sink.__send__("#{k}=", u) } + rv == true or return rv + when "command" # nothing to validate, this could be "rm -rf /" :> + sink.command = v.empty? ? DTAS::Sink::SINK_DEFAULTS["command"] : v + end + end + + @sinks[name] = new_sink if new_sink # no errors? it's a new sink! + + # start or stop a sink if its active= flag changed. Additionally, + # account for a crashed-but-marked-active sink. The user may have + # fixed the command to not crash it. + if (active_before != sink.active) || (sink.active && !sink.pid) + __sink_switch(sink) + end + io.emit("OK") + when "cat" + sink = @sinks[name] or return io.emit("ERR #{name} not found") + io.emit(sink.to_hsh.to_yaml) + else + io.emit("ERR unknown sink op #{msg[0]}") + end + end + + def bytes_decoded(src = @current) + bytes = src.dst.bytes_xfer - src.dst_zero_byte + bytes = bytes < 0 ? 0 : bytes # maybe negative in case of sink errors + end + + def __seek_offset_adj(dir, offset) + if offset.sub!(/s\z/, '') + offset = offset.to_i + else # time + offset = @current.format.hhmmss_to_samples(offset) + end + n = __current_decoded_samples + (dir * offset) + n = 0 if n < 0 + "#{n}s" + end + + def __current_decoded_samples + initial = @current.offset_samples + decoded = @format.bytes_to_samples(bytes_decoded) + decoded = out_samples(decoded, @format, @current.format) + initial + decoded + end + + def __current_requeue + return unless @current && @current.pid + + # no need to requeue if we're already due to die + return if @current.requeued + @current.requeued = true + + dst = @current.dst + # prepare to seek to the desired point based on the number of bytes which + # passed through dst buffer we want the offset for the @current file, + # which may have a different rate than our internal @format + if @current.respond_to?(:infile) + # this offset in the @current.format (not player @format) + @queue.unshift([ @current.infile, "#{__current_decoded_samples}s" ]) + else + # DTAS::Source::Command (hash), just rerun it + @queue.unshift(@current.to_hsh) + end + # We also want to hard drop the buffer so we do not get repeated audio. + __buf_reset(dst) + end + + def out_samples(in_samples, infmt, outfmt) + in_rate = infmt.rate + out_rate = outfmt.rate + return in_samples if in_rate == out_rate # easy! + (in_samples * out_rate / in_rate.to_f).round + end + + # returns the number of samples we expect from the source + # this takes into account sample rate differences between the source + # and internal player format + def current_expect_samples(in_samples) # @current.samples + out_samples(in_samples, @current.format, @format) + end + + def rg_handler(io, msg) + return io.emit(@rg.to_hsh.to_yaml) if msg.empty? + before = @rg.to_hsh + msg.each do |kv| + k, v = kv.split(/=/, 2) + case k + when "mode" + case v + when "off" + @rg.mode = nil + else + DTAS::RGState::RG_MODE.include?(v) or + return io.emit("ERR rg mode invalid") + @rg.mode = v + end + when "fallback_track" + rv = set_bool(io, kv, v) { |b| @rg.fallback_track = b } + rv == true or return rv + when %r{(?:gain_threshold|norm_threshold| + preamp|norm_level|fallback_gain)[+-]?\z}x + rv = adjust_numeric(io, @rg, k, v) + rv == true or return rv + end + end + after = @rg.to_hsh + __current_requeue if before != after + io.emit("OK") + end + + def active_sinks + sinks = @targets.map { |t| t.sink } + sinks.uniq! + sinks + end + + # show current info about what's playing + # returns non-blocking iterator retval + def current_handler(io, msg) + tmp = {} + if @current + tmp["current"] = s = @current.to_hsh + s["spawn_at"] = @current.spawn_at + s["pid"] = @current.pid + + # this offset and samples in the player @format (not @current.format) + decoded = @format.bytes_to_samples(bytes_decoded) + if @current.respond_to?(:infile) + initial = tmp["current_initial"] = @current.offset_samples + initial = out_samples(initial, @current.format, @format) + tmp["current_expect"] = current_expect_samples(s["samples"]) + s["format"] = @current.format.to_hash.delete_if { |_,v| v.nil? } + else + initial = 0 + tmp["current_expect"] = nil + s["format"] = @format.to_hash.delete_if { |_,v| v.nil? } + end + tmp["current_offset"] = initial + decoded + end + tmp["current_inflight"] = @sink_buf.inflight + tmp["format"] = @format.to_hash.delete_if { |_,v| v.nil? } + tmp["paused"] = @paused + rg = @rg.to_hsh + tmp["rg"] = rg unless rg.empty? + if @targets[0] + sinks = active_sinks + tmp["sinks"] = sinks.map! do |sink| + h = sink.to_hsh + h["pid"] = sink.pid + h + end + end + io.emit(tmp.to_yaml) + end + + def __buf_reset(buf) + @srv.wait_ctl(buf, :ignore) + buf.buf_reset + @srv.wait_ctl(buf, :wait_readable) + end + + def skip_handler(io, msg) + __current_drop + echo("skip") + io.emit("OK") + end + + def play_pause_handler(io, command) + prev = @paused + __send__("do_#{command}") + io.emit({ + "paused" => { + "before" => prev, + "after" => @paused, + } + }.to_yaml) + end + + def do_pause + return if @paused + echo("pause") + @paused = true + __current_requeue + end + + def do_play + # no echo, next_source will echo on new track + @paused = false + return if @current && @current.pid + next_source(@queue.shift) + end + + def do_play_pause + @paused ? do_play : do_pause + end + + def do_seek(io, offset) + if @current && @current.pid + if @current.respond_to?(:infile) + begin + if offset.sub!(/\A\+/, '') + offset = __seek_offset_adj(1, offset) + elsif offset.sub!(/\A-/, '') + offset = __seek_offset_adj(-1, offset) + # else: pass to sox directly + end + rescue ArgumentError + return io.emit("ERR bad time format") + end + @queue.unshift([ @current.infile, offset ]) + __buf_reset(@current.dst) # trigger EPIPE + else + return io.emit("ERR unseekable") + end + elsif @paused + case file = @queue[0] + when String + @queue[0] = [ file, offset ] + when Array + file[1] = offset + else + return io.emit("ERR unseekable") + end + # unpaused case... what do we do? + end + io.emit("OK") + end + + def restart_pipeline + return if @paused + __current_requeue + @sinks.each_value { |sink| sink.respawn = sink.active } + @targets.each { |t| drop_target(t) }.clear + end + + def format_handler(io, msg) + new_fmt = @format.dup + msg.each do |kv| + k, v = kv.split(/=/, 2) + case k + when "type" + new_fmt.valid_type?(v) or return io.emit("ERR invalid file type") + new_fmt.type = v + when "channels", "bits", "rate" + rv = set_uint(io, kv, v, false) { |u| new_fmt.__send__("#{k}=", u) } + rv == true or return rv + when "endian" + new_fmt.valid_endian?(v) or return io.emit("ERR invalid endian") + new_fmt.endian = v + end + end + + if new_fmt != @format + restart_pipeline # calls __current_requeue + + # we must assign this after __current_requeue since __current_requeue + # relies on the old @format for calculation + @format = new_fmt + end + io.emit("OK") + end + + def env_handler(io, msg) + msg.each do |kv| + case kv + when %r{\A([^=]+)=(.*)\z} + ENV[$1] = $2 + when %r{\A([^=]+)#} + ENV.delete($1) + else + return io.emit("ERR bad env") + end + end + io.emit("OK") + end + + def source_handler(io, msg) + case msg.shift + when "cat" + io.emit({ + "command" => @srccmd || DTAS::Source::SOURCE_DEFAULTS["command"], + "env" => @srcenv, + }.to_yaml) + when "ed" + before = [ @srccmd, @srcenv ].inspect + msg.each do |kv| + k, v = kv.split(/=/, 2) + case k + when "command" + @srccmd = v.empty? ? nil : v + when %r{\Aenv\.([^=]+)\z} + @srcenv[$1] = v + when %r{\Aenv#([^=]+)\z} + v == nil or return io.emit("ERR unset env has no value") + @srcenv.delete($1) + end + end + after = [ @srccmd, @srcenv ].inspect + __current_requeue if before != after + io.emit("OK") + else + io.emit("ERR unknown source op") + end + end +end diff --git a/lib/dtas/process.rb b/lib/dtas/process.rb new file mode 100644 index 0000000..35ca6a6 --- /dev/null +++ b/lib/dtas/process.rb @@ -0,0 +1,88 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require 'shellwords' +require 'io/wait' +module DTAS::Process + PIDS = {} + + def self.reaper + begin + pid, status = Process.waitpid2(-1, Process::WNOHANG) + pid or return + obj = PIDS.delete(pid) + yield status, obj + rescue Errno::ECHILD + return + end while true + end + + # a convienient way to display commands so it's easy to + # read, copy and paste to a shell + def xs(cmd) + cmd.map { |w| Shellwords.escape(w) }.join(' ') + end + + # for long-running processes (sox/play/ecasound filters) + def dtas_spawn(env, cmd, opts) + opts = { close_others: true, pgroup: true }.merge!(opts) + + # stringify env, integer values are easier to type unquoted as strings + env.each { |k,v| env[k] = v.to_s } + + pid = begin + Process.spawn(env, cmd, opts) + rescue Errno::EINTR # Ruby bug? + retry + end + warn [ :spawn, pid, cmd ].inspect if $DEBUG + @spawn_at = Time.now.to_f + PIDS[pid] = self + pid + end + + # this is like backtick, but takes an array instead of a string + # This will also raise on errors + def qx(cmd, opts = {}) + r, w = IO.pipe + opts = opts.merge(out: w) + r.binmode + if err = opts[:err] + re, we = IO.pipe + re.binmode + opts[:err] = we + end + pid = begin + Process.spawn(*cmd, opts) + rescue Errno::EINTR # Ruby bug? + retry + end + w.close + if err + we.close + res = "" + want = { r => res, re => err } + begin + readable = IO.select(want.keys) or next + readable[0].each do |io| + bytes = io.nread + begin + want[io] << io.read_nonblock(bytes > 0 ? bytes : 11) + rescue Errno::EAGAIN + # spurious wakeup, bytes may be zero + rescue EOFError + want.delete(io) + end + end + end until want.empty? + re.close + else + res = r.read + end + r.close + _, status = Process.waitpid2(pid) + return res if status.success? + raise RuntimeError, "`#{xs(cmd)}' failed: #{status.inspect}" + end +end diff --git a/lib/dtas/replaygain.rb b/lib/dtas/replaygain.rb new file mode 100644 index 0000000..e049b8d --- /dev/null +++ b/lib/dtas/replaygain.rb @@ -0,0 +1,42 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +# +# Represents ReplayGain metadata for a DTAS::Source +# cleanup/validate values to prevent malicious files from making us +# run arbitrary commands +# *_peak values are 0..inf (1.0 being full scale, but >1 is possible +# *_gain values are specified in dB + +class DTAS::ReplayGain + ATTRS = %w(reference_loudness track_gain album_gain track_peak album_peak) + ATTRS.each { |a| attr_reader a } + + def check_gain(val) + /([+-]?\d+(?:\.\d+)?)/ =~ val ? $1 : nil + end + + def check_float(val) + /(\d+(?:\.\d+)?)/ =~ val ? $1 : nil + end + + def initialize(comments) + comments or return + + # the replaygain standard specifies 89.0 dB, but maybe some apps are + # different... + @reference_loudness = + check_gain(comments["REPLAYGAIN_REFERENCE_LOUDNESS"]) || "89.0" + + @track_gain = check_gain(comments["REPLAYGAIN_TRACK_GAIN"]) + @album_gain = check_gain(comments["REPLAYGAIN_ALBUM_GAIN"]) + @track_peak = check_float(comments["REPLAYGAIN_TRACK_PEAK"]) + @album_peak = check_float(comments["REPLAYGAIN_ALBUM_PEAK"]) + end + + def self.new(comments) + tmp = super + tmp.track_gain ? tmp : nil + end +end diff --git a/lib/dtas/rg_state.rb b/lib/dtas/rg_state.rb new file mode 100644 index 0000000..6463be7 --- /dev/null +++ b/lib/dtas/rg_state.rb @@ -0,0 +1,100 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +# +# provides support for generating appropriate effects for ReplayGain +# MAYBE: account for non-standard reference loudness (89.0 dB is standard) +require_relative '../dtas' +require_relative 'serialize' +class DTAS::RGState + include DTAS::Serialize + + RG_MODE = { + # attribute name => method to use + "album_gain" => :rg_vol_gain, + "track_gain" => :rg_vol_gain, + "album_peak" => :rg_vol_norm, + "track_peak" => :rg_vol_norm, + } + + RG_DEFAULT = { + # skip the effect if the adjustment is too small to be noticeable + "gain_threshold" => 0.00000001, # in dB + "norm_threshold" => 0.00000001, + + "preamp" => 0, # no extra adjustment + # "mode" => "album_gain", # nil: off + "mode" => nil, # nil: off + "fallback_gain" => -6.0, # adjustment dB if necessary RG tag is missing + "fallback_track" => true, + "norm_level" => 1.0, # dBFS + } + + SIVS = RG_DEFAULT.keys + SIVS.each { |iv| attr_accessor iv } + + def initialize + RG_DEFAULT.each do |k,v| + instance_variable_set("@#{k}", v) + end + end + + def self.load(hash) + rv = new + hash.each { |k,v| rv.__send__("#{k}=", v) } if hash + rv + end + + def to_hash + ivars_to_hash(SIVS) + end + + def to_hsh + # no point in dumping default values, it's just a waste of space + to_hash.delete_if { |k,v| RG_DEFAULT[k] == v } + end + + # returns a dB argument to the "vol" effect, nil if nothing found + def rg_vol_gain(val) + val = val.to_f + @preamp + return if val.abs < @gain_threshold + sprintf('vol %0.8gdB', val) + end + + # returns a linear argument to the "vol" effect + def rg_vol_norm(val) + diff = @norm_level - val.to_f + return if (@norm_level - diff).abs < @norm_threshold + diff += @norm_level + sprintf('vol %0.8g', diff) + end + + # The ReplayGain fallback adjustment value (in dB), in case a file is + # missing ReplayGain tags. This is useful to avoid damage to speakers, + # eardrums and amplifiers in case a file without then necessary ReplayGain + # tag slips into the queue + def rg_fallback_effect(reason) + @fallback_gain or return + warn(reason) if $DEBUG + "vol #{@fallback_gain + @preamp}dB" + end + + # returns an array (for command-line argument) for the effect needed + # to apply ReplayGain + # this may return nil + def effect(source) + return unless @mode + rg = source.replaygain or + return rg_fallback_effect("ReplayGain tags missing") + val = rg.__send__(@mode) + if ! val && @fallback_track && @mode =~ /\Aalbum_(\w+)/ + tag = "track_#$1" + val = rg.__send__(tag) or + return rg_fallback_effect("ReplayGain tag for #@mode missing") + warn("tag for #@mode missing, using #{tag}") + end + # this may be nil if the adjustment is too small: + __send__(RG_MODE[@mode], val) + end +end diff --git a/lib/dtas/serialize.rb b/lib/dtas/serialize.rb new file mode 100644 index 0000000..57eb626 --- /dev/null +++ b/lib/dtas/serialize.rb @@ -0,0 +1,10 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +module DTAS::Serialize + def ivars_to_hash(ivars, rv = {}) + ivars.each { |k| rv[k] = instance_variable_get("@#{k}") } + rv + end +end diff --git a/lib/dtas/sigevent.rb b/lib/dtas/sigevent.rb new file mode 100644 index 0000000..ccaec2f --- /dev/null +++ b/lib/dtas/sigevent.rb @@ -0,0 +1,11 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +begin + raise LoadError, "no eventfd with _DTAS_POSIX" if ENV["_DTAS_POSIX"] + require 'sleepy_penguin' + require_relative 'sigevent/efd' +rescue LoadError + require_relative 'sigevent/pipe' +end diff --git a/lib/dtas/sigevent/efd.rb b/lib/dtas/sigevent/efd.rb new file mode 100644 index 0000000..782e383 --- /dev/null +++ b/lib/dtas/sigevent/efd.rb @@ -0,0 +1,21 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +class DTAS::Sigevent < SleepyPenguin::EventFD + include SleepyPenguin + + def self.new + super(0, EventFD::CLOEXEC) + end + + def signal + incr(1) + end + + def readable_iter + value(true) + yield self, nil # calls DTAS::Process.reaper + :wait_readable + end +end diff --git a/lib/dtas/sigevent/pipe.rb b/lib/dtas/sigevent/pipe.rb new file mode 100644 index 0000000..139aa68 --- /dev/null +++ b/lib/dtas/sigevent/pipe.rb @@ -0,0 +1,29 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +class DTAS::Sigevent + attr_reader :to_io + + def initialize + @to_io, @wr = IO.pipe + end + + def signal + @wr.syswrite('.') rescue nil + end + + def readable_iter + begin + @to_io.read_nonblock(11) + yield self, nil # calls DTAS::Process.reaper + rescue Errno::EAGAIN + return :wait_readable + end while true + end + + def close + @to_io.close + @wr.close + end +end diff --git a/lib/dtas/sink.rb b/lib/dtas/sink.rb new file mode 100644 index 0000000..7931694 --- /dev/null +++ b/lib/dtas/sink.rb @@ -0,0 +1,122 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require 'yaml' +require_relative '../dtas' +require_relative 'pipe' +require_relative 'process' +require_relative 'command' +require_relative 'format' +require_relative 'serialize' +require_relative 'writable_iter' + +# this is a sink (endpoint, audio enters but never leaves) +class DTAS::Sink + attr_accessor :prio # any Integer + attr_accessor :active # boolean + attr_accessor :name + attr_accessor :nonblock + attr_accessor :respawn + + include DTAS::Command + include DTAS::Process + include DTAS::Serialize + include DTAS::WritableIter + + SINK_DEFAULTS = COMMAND_DEFAULTS.merge({ + "name" => nil, # order matters, this is first + "command" => "exec play -q $SOXFMT -", + "prio" => 0, + "nonblock" => false, + "pipe_size" => nil, + "active" => false, + "respawn" => false, + }) + + DEVFD_RE = %r{/dev/fd/([a-zA-Z]\w*)\b} + + # order matters for Ruby 1.9+, this defines to_hsh serialization so we + # can make the state file human-friendly + SIVS = %w(name env command prio nonblock pipe_size active) + + def initialize + command_init(SINK_DEFAULTS) + writable_iter_init + @sink = self + end + + # allow things that look like audio device names ("hw:1,0" , "/dev/dsp") + # or variable names. + def valid_name?(s) + !!(s =~ %r{\A[\w:,/-]+\z}) + end + + def self.load(hash) + sink = new + return sink unless hash + (SIVS & hash.keys).each do |k| + sink.instance_variable_set("@#{k}", hash[k]) + end + sink.valid_name?(sink.name) or raise ArgumentError, "invalid sink name" + sink + end + + def parse(str) + inputs = {} + str.scan(DEVFD_RE) { |w| inputs[w[0]] = nil } + inputs + end + + def on_death(status) + super + end + + def spawn(format, opts = {}) + raise "BUG: #{self.inspect}#spawn called twice" if @pid + rv = [] + + pclass = @nonblock ? DTAS::PipeNB : DTAS::Pipe + + cmd = command_string + inputs = parse(cmd) + + if inputs.empty? + # /dev/fd/* not specified in the command, assume one input for stdin + r, w = pclass.new + w.pipe_size = @pipe_size if @pipe_size + inputs[:in] = opts[:in] = r + w.sink = self + rv << w + else + # multiple inputs, fun!, we'll tee to them + inputs.each_key do |name| + r, w = pclass.new + w.pipe_size = @pipe_size if @pipe_size + inputs[name] = r + w.sink = self + rv << w + end + opts[:in] = "/dev/null" + + # map to real /dev/fd/* values and setup proper redirects + cmd = cmd.gsub(DEVFD_RE) do + read_fd = inputs[$1].fileno + opts[read_fd] = read_fd # do not close-on-exec + "/dev/fd/#{read_fd}" + end + end + + @pid = dtas_spawn(format.to_env.merge!(@env), cmd, opts) + inputs.each_value { |rpipe| rpipe.close } + rv + end + + def to_hash + ivars_to_hash(SIVS) + end + + def to_hsh + to_hash.delete_if { |k,v| v == SINK_DEFAULTS[k] } + end +end diff --git a/lib/dtas/sink_reader_play.rb b/lib/dtas/sink_reader_play.rb new file mode 100644 index 0000000..17a0190 --- /dev/null +++ b/lib/dtas/sink_reader_play.rb @@ -0,0 +1,70 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require_relative '../dtas' + +# parses lines from play(1) -S/--show-progress like this: +# In:0.00% 00:00:37.34 [00:00:00.00] Out:1.65M [ -====|==== ] Clip:0 +# +# The authors of sox probably did not intend for the output of play(1) to +# be parsed, but we do it anyways. We need to be ready to update this +# code in case play(1) output changes. +# play -S/--show-progress +class DTAS::SinkReaderPlay + attr_reader :time, :out, :meter, :clips, :headroom + attr_reader :to_io + attr_reader :wr # this is stderr of play(1) + + def initialize + @to_io, @wr = IO.pipe + reset + end + + def readable_iter + buf = Thread.current[:dtas_lbuf] ||= "" + begin + @rbuf << @to_io.read_nonblock(1024, buf) + + # do not OOM in case SoX changes output format on us + @rbuf.clear if @rbuf.size > 0x10000 + + # don't handle partial read + next unless / Clip:\S+ *\z/ =~ @rbuf + + if @rbuf.gsub!(/(.*)\rIn:\S+ (\S+) \S+ Out:(\S+)\s+(\[[^\]]+\]) /m, "") + err = $1 + @time = $2 + @out = $3 + @meter = $4 + if @rbuf.gsub!(/Hd:(\d+\.\d+) Clip:(\S+) */, "") + @headroom = $1 + @clips = $2 + elsif @rbuf.gsub!(/\s+Clip:(\S+) */, "") + @headroom = nil + @clips = $1 + end + + $stderr.write(err) + end + rescue EOFError + return nil + rescue Errno::EAGAIN + return :wait_readable + end while true + end + + def close + @wr.close unless @wr.closed? + @to_io.close + end + + def reset + @rbuf = "" + @time = @out = @meter = @headroom = @clips = nil + end + + def closed? + @to_io.closed? + end +end diff --git a/lib/dtas/source.rb b/lib/dtas/source.rb new file mode 100644 index 0000000..f6dd443 --- /dev/null +++ b/lib/dtas/source.rb @@ -0,0 +1,148 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require_relative '../dtas' +require_relative 'command' +require_relative 'format' +require_relative 'replaygain' +require_relative 'process' +require_relative 'serialize' + +# this is usually one input file +class DTAS::Source + attr_reader :infile + attr_reader :offset + require_relative 'source/common' + require_relative 'source/mp3' + + include DTAS::Command + include DTAS::Process + include DTAS::Source::Common + include DTAS::Source::Mp3 + + SOURCE_DEFAULTS = COMMAND_DEFAULTS.merge( + "command" => 'exec sox "$INFILE" $SOXFMT - $TRIMFX $RGFX', + "comments" => nil, + ) + + SIVS = %w(infile comments command env) + + def initialize(infile, offset = nil) + command_init(SOURCE_DEFAULTS) + @format = nil + @infile = infile + @offset = offset + @comments = nil + @samples = nil + end + + # this exists mainly to make the mpris interface easier, but it's not + # necessary, the mpris interface also knows the sample rate + def offset_us + (offset_samples / format.rate.to_f) * 1000000 + end + + # returns any offset in samples (relative to the original source file), + # likely zero unless seek was used + def offset_samples + return 0 unless @offset + case @offset + when /\A\d+s\z/ + @offset.to_i + else + format.hhmmss_to_samples(@offset) + end + end + + def precision + qx(%W(soxi -p #@infile), err: "/dev/null").to_i # sox.git f4562efd0aa3 + rescue # fallback to parsing the whole output + s = qx(%W(soxi #@infile), err: "/dev/null") + s =~ /Precision\s+:\s*(\d+)-bit/ + v = $1.to_i + return v if v > 0 + raise TypeError, "could not determine precision for #@infile" + end + + def format + @format ||= begin + fmt = DTAS::Format.new + fmt.from_file(@infile) + fmt.bits ||= precision + fmt + end + end + + # A user may be downloading the file and start playing + # it before the download completes, this refreshes + def samples! + @samples = nil + samples + end + + # This is the number of samples according to the samples in the source + # file itself, not the decoded output + def samples + @samples ||= qx(%W(soxi -s #@infile)).to_i + rescue => e + warn e.message + 0 + end + + # just run soxi -a + def __load_comments + tmp = {} + case @infile + when String + err = "" + cmd = %W(soxi -a #@infile) + begin + qx(cmd, err: err).split(/\n/).each do |line| + key, value = line.split(/=/, 2) + key && value or next + # TODO: multi-line/multi-value/repeated tags + tmp[key.upcase] = value + end + rescue => e + if /FAIL formats: no handler for file extension/ =~ err + warn("#{xs(cmd)}: #{err}") + else + warn("#{e.message} (#{e.class})") + end + # TODO: fallbacks + end + end + tmp + end + + def comments + @comments ||= __load_comments + end + + def replaygain + DTAS::ReplayGain.new(comments) || DTAS::ReplayGain.new(mp3gain_comments) + end + + def spawn(format, rg_state, opts) + raise "BUG: #{self.inspect}#spawn called twice" if @to_io + e = format.to_env + e["INFILE"] = @infile + + # make sure these are visible to the "current" command... + @env["TRIMFX"] = @offset ? "trim #@offset" : nil + @env["RGFX"] = rg_state.effect(self) || nil + + @pid = dtas_spawn(e.merge!(@env), command_string, opts) + end + + def to_hsh + to_hash.delete_if { |k,v| v == SOURCE_DEFAULTS[k] } + end + + def to_hash + rv = ivars_to_hash(SIVS) + rv["samples"] = samples + rv + end +end diff --git a/lib/dtas/source/command.rb b/lib/dtas/source/command.rb new file mode 100644 index 0000000..30441eb --- /dev/null +++ b/lib/dtas/source/command.rb @@ -0,0 +1,41 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require_relative '../../dtas' +require_relative '../source' +require_relative '../command' +require_relative '../serialize' + +class DTAS::Source::Command + require_relative '../source/common' + + include DTAS::Command + include DTAS::Process + include DTAS::Source::Common + include DTAS::Serialize + + SIVS = %w(command env) + + def initialize(command) + command_init(command: command) + end + + def source_dup + rv = self.class.new + SIVS.each { |iv| rv.__send__("#{iv}=", self.__send__(iv)) } + rv + end + + def to_hash + ivars_to_hash(SIVS) + end + + alias to_hsh to_hash + + def spawn(format, rg_state, opts) + raise "BUG: #{self.inspect}#spawn called twice" if @to_io + e = format.to_env + @pid = dtas_spawn(e.merge!(@env), command_string, opts) + end +end diff --git a/lib/dtas/source/common.rb b/lib/dtas/source/common.rb new file mode 100644 index 0000000..333e74a --- /dev/null +++ b/lib/dtas/source/common.rb @@ -0,0 +1,14 @@ +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +module DTAS::Source::Common + attr_reader :dst_zero_byte + attr_reader :dst + attr_accessor :requeued + + def dst_assoc(buf) + @dst = buf + @dst_zero_byte = buf.bytes_xfer + buf.inflight + @requeued = false + end +end diff --git a/lib/dtas/source/mp3.rb b/lib/dtas/source/mp3.rb new file mode 100644 index 0000000..b013bee --- /dev/null +++ b/lib/dtas/source/mp3.rb @@ -0,0 +1,38 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require_relative '../process' + +module DTAS::Source::Mp3 + include DTAS::Process + # we use dBFS = 1.0 as scale (not 32768) + def __mp3gain_peak(str) + sprintf("%0.8g", str.to_f / 32768.0) + end + + # massage mp3gain(1) output + def mp3gain_comments + tmp = {} + case @infile + when String + @infile =~ /\.mp[g23]\z/i or return + qx(%W(mp3gain -s c #@infile)).split(/\n/).each do |line| + case line + when /^Recommended "(Track|Album)" dB change:\s*(\S+)/ + tmp["REPLAYGAIN_#{$1.upcase}_GAIN"] = $2 + when /^Max PCM sample at current gain: (\S+)/ + tmp["REPLAYGAIN_TRACK_PEAK"] = __mp3gain_peak($1) + when /^Max Album PCM sample at current gain: (\S+)/ + tmp["REPLAYGAIN_ALBUM_PEAK"] = __mp3gain_peak($1) + end + end + tmp + else + raise TypeError, "unsupported type: #{@infile.inspect}" + end + rescue => e + $DEBUG and + warn("mp3gain(#{@infile.inspect}) failed: #{e.message} (#{e.class})") + end +end diff --git a/lib/dtas/state_file.rb b/lib/dtas/state_file.rb new file mode 100644 index 0000000..cfd83d5 --- /dev/null +++ b/lib/dtas/state_file.rb @@ -0,0 +1,34 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require 'yaml' +require 'tempfile' +class DTAS::StateFile + def initialize(path, do_fsync = false) + @path = path + @do_fsync = do_fsync + end + + def tryload + YAML.load(IO.binread(@path)) if File.readable?(@path) + end + + def dump(obj, force_fsync = false) + yaml = obj.to_hsh.to_yaml.b + + # do not replace existing state file if there are no changes + # this will be racy if we ever do async dumps or shared state + # files, but we don't do that... + return if File.readable?(@path) && IO.binread(@path) == yaml + + dir = File.dirname(@path) + Tempfile.open(%w(player.state .tmp), dir) do |tmp| + tmp.binmode + tmp.write(yaml) + tmp.flush + tmp.fsync if @do_fsync || force_fsync + File.rename(tmp.path, @path) + end + end +end diff --git a/lib/dtas/unix_accepted.rb b/lib/dtas/unix_accepted.rb new file mode 100644 index 0000000..6883ee1 --- /dev/null +++ b/lib/dtas/unix_accepted.rb @@ -0,0 +1,77 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require 'socket' +require 'io/wait' + +class DTAS::UNIXAccepted + attr_reader :to_io + + def initialize(sock) + @to_io = sock + @send_buf = [] + end + + # public API (for DTAS::Player) + # returns :wait_readable on success + def emit(msg) + buffered = @send_buf.size + if buffered == 0 + begin + @to_io.sendmsg_nonblock(msg, Socket::MSG_EOR) + return :wait_readable + rescue Errno::EAGAIN + @send_buf << msg + return :wait_writable + rescue => e + return e + end + elsif buffered > 100 + return RuntimeError.new("too many messages buffered") + else # buffered > 0 + @send_buf << msg + return :wait_writable + end + end + + # flushes pending data if it got buffered + def writable_iter + begin + msg = @send_buf.shift or return :wait_readable + @to_io.send_nonblock(msg, Socket::MSG_EOR) + rescue Errno::EAGAIN + @send_buf.unshift(msg) + return :wait_writable + rescue => e + return e + end while true + end + + def readable_iter + io = @to_io + nread = io.nread + + # EOF, assume no spurious wakeups for SOCK_SEQPACKET + return nil if nread == 0 + + begin + begin + msg, _, _ = io.recvmsg_nonblock(nread) + rescue EOFError, SystemCallError + return nil + end + yield(self, msg) # DTAS::Player deals with this + nread = io.nread + end while nread > 0 + :wait_readable + end + + def close + @to_io.close + end + + def closed? + @to_io.closed? + end +end diff --git a/lib/dtas/unix_client.rb b/lib/dtas/unix_client.rb new file mode 100644 index 0000000..f46eddf --- /dev/null +++ b/lib/dtas/unix_client.rb @@ -0,0 +1,52 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require 'dtas' +require 'socket' +require 'io/wait' +require 'shellwords' + +class DTAS::UNIXClient + attr_reader :to_io + + def self.default_path + (ENV["DTAS_PLAYER_SOCK"] || File.expand_path("~/.dtas/player.sock")).b + end + + def initialize(path = self.class.default_path) + @to_io = begin + raise if ENV["_DTAS_NOSEQPACKET"] + Socket.new(:AF_UNIX, :SOCK_SEQPACKET, 0) + rescue + warn("get your operating system developers to support " \ + "SOCK_SEQPACKET for AF_UNIX sockets") + warn("falling back to SOCK_DGRAM, reliability possibly compromised") + Socket.new(:AF_UNIX, :SOCK_DGRAM, 0) + end + @to_io.connect(Socket.pack_sockaddr_un(path)) + end + + def req_start(args) + args = Shellwords.join(args) if Array === args + @to_io.send(args, Socket::MSG_EOR) + end + + def req_ok(args, timeout = nil) + res = req(args, timeout) + res == "OK" or raise "Unexpected response: #{res}" + res + end + + def req(args, timeout = nil) + req_start(args) + res_wait(timeout) + end + + def res_wait(timeout = nil) + @to_io.wait(timeout) + nr = @to_io.nread + nr > 0 or raise EOFError, "unexpected EOF from server" + @to_io.recvmsg[0] + end +end diff --git a/lib/dtas/unix_server.rb b/lib/dtas/unix_server.rb new file mode 100644 index 0000000..90f8479 --- /dev/null +++ b/lib/dtas/unix_server.rb @@ -0,0 +1,111 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require 'socket' +require_relative '../dtas' +require_relative 'unix_accepted' + +# This uses SOCK_SEQPACKET, unlike ::UNIXServer in Ruby stdlib + +# The programming model for the event loop here aims to be compatible +# with EPOLLONESHOT use with epoll, since that fits my brain far better +# than existing evented APIs/frameworks. +# If we cared about scalability to thousands of clients, we'd really use epoll, +# but IO.select can be just as fast (or faster) with few descriptors and +# is obviously more portable. + +class DTAS::UNIXServer + attr_reader :to_io + + def close + File.unlink(@path) + @to_io.close + end + + def initialize(path) + @path = path + # lock down access by default, arbitrary commands may run as the + # same user dtas-player runs as: + old_umask = File.umask(0077) + @to_io = Socket.new(:AF_UNIX, :SOCK_SEQPACKET, 0) + addr = Socket.pack_sockaddr_un(path) + begin + @to_io.bind(addr) + rescue Errno::EADDRINUSE + # maybe we have an old path leftover from a killed process + tmp = Socket.new(:AF_UNIX, :SOCK_SEQPACKET, 0) + begin + tmp.connect(addr) + raise RuntimeError, "socket `#{path}' is in use", [] + rescue Errno::ECONNREFUSED + # ok, leftover socket, unlink and rebind anyways + File.unlink(path) + @to_io.bind(addr) + ensure + tmp.close + end + end + @to_io.listen(1024) + @readers = { self => true } + @writers = {} + ensure + File.umask(old_umask) + end + + def write_failed(client, e) + warn "failed to write to #{client}: #{e.message} (#{e.class})" + client.close + end + + def readable_iter + # we do not do anything with the block passed to us + begin + sock, _ = @to_io.accept_nonblock + @readers[DTAS::UNIXAccepted.new(sock)] = true + rescue Errno::ECONNABORTED # ignore this, it happens + rescue Errno::EAGAIN + return :wait_readable + end while true + end + + def wait_ctl(io, err) + case err + when :wait_readable + @readers[io] = true + when :wait_writable + @writers[io] = true + when :delete + @readers.delete(io) + @writers.delete(io) + when :ignore + # There are 2 cases for :ignore + # - DTAS::Buffer was readable before, but all destinations (e.g. sinks) + # were blocked, so we stop caring for producer (buffer) readability. + # - a consumer (e.g. DTAS::Sink) just became writable, but the + # corresponding DTAS::Buffer was already readable in a previous + # call. + when nil + io.close + when StandardError + io.close + else + raise "BUG: wait_ctl invalid: #{io} #{err.inspect}" + end + end + + def run_once + begin + # give IO.select one-shot behavior, snapshot and replace the watchlist + r = IO.select(@readers.keys, @writers.keys) or return + r[1].each do |io| + @writers.delete(io) + wait_ctl(io, io.writable_iter) + end + r[0].each do |io| + @readers.delete(io) + wait_ctl(io, io.readable_iter { |_io, msg| yield(_io, msg) }) + end + end + end +end diff --git a/lib/dtas/util.rb b/lib/dtas/util.rb new file mode 100644 index 0000000..03c7ded --- /dev/null +++ b/lib/dtas/util.rb @@ -0,0 +1,16 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require_relative '../dtas' + +# in case we need to convert DB values to a linear scale +module DTAS::Util + def db_to_linear(val) + Math.exp(val * Math.log(10) * 0.05) + end + + def linear_to_db(val) + Math.log10(val) * 20 + end +end diff --git a/lib/dtas/writable_iter.rb b/lib/dtas/writable_iter.rb new file mode 100644 index 0000000..aa02905 --- /dev/null +++ b/lib/dtas/writable_iter.rb @@ -0,0 +1,23 @@ +# -*- encoding: binary -*- +# :stopdoc: +# Copyright (C) 2013, Eric Wong +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require_relative '../dtas' + +module DTAS::WritableIter + attr_accessor :on_writable + + def writable_iter_init + @on_writable = nil + end + + # this is used to exchange our own writable status for the readable + # status of the DTAS::Buffer which triggered us. + def writable_iter + if owr = @on_writable + @on_writable = nil + owr.call # this triggers readability watching of DTAS::Buffer + end + :ignore + end +end -- cgit v1.2.3-24-ge0c7