From e90e414ebf4b143b3ddbaa16460987f485ad8117 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 20 Sep 2015 22:56:48 +0000 Subject: dtas-readahead: new script for -player users on Linux This is dependent on Linux /proc/ (the "pos: " field of /proc/$PID/fdinfo/$FD to be exact). This was written to avoid seek latencies on a remote FUSE filesystem with occasional packet loss. --- bin/dtas-readahead | 207 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 207 insertions(+) create mode 100644 bin/dtas-readahead (limited to 'bin') diff --git a/bin/dtas-readahead b/bin/dtas-readahead new file mode 100644 index 0000000..f02bc35 --- /dev/null +++ b/bin/dtas-readahead @@ -0,0 +1,207 @@ +#!/usr/bin/env ruby +# Copyright (C) 2015 all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +# +# Really janky readahead script. Requires dtas-player to be +# running and unlikely to work outside of Linux as it depends on +# the contents of /proc +unless RUBY_PLATFORM =~ /linux/ + warn "this relies on Linux /proc and probably does not work well for you" +end + +require 'yaml' +require 'io/wait' +require 'dtas/unix_client' +require 'dtas/process' + +include DTAS::Process +include DTAS::SpawnFix +trap(:CHLD) { DTAS::Process.reaper {} } +trap(:INT) { exit(0) } +trap(:TERM) { exit(0) } +w = DTAS::UNIXClient.new +w.req_ok('watch') +c = DTAS::UNIXClient.new +@max_ra = 30 * 1024 * 1024 +null = DTAS.null +@redir = { err: null, out: null, in: null }.freeze +require 'pp' + +if RUBY_VERSION.to_r >= '2.3'.to_r + # Old Rubies did FIONREAD, which breaks on SOCK_SEQPACKET + def wait_read(w, timeout) + w.to_io.wait_readable(timeout) + end +else + def wait_read(w, timeout) + r = IO.select([w], nil, nil, timeout) + r ? r[0] : nil + end +end + +def seek_to_cur_pos(cur_pid, fp) + cur_fd = [] + fpst = fp.stat + begin + Dir["/proc/#{cur_pid}/fd/*"].each do |l| + path = File.readlink(l) + begin + st = File.stat(path) + if st.dev == fpst.dev && st.ino == fpst.ino + cur_fd << l.split('/')[-1] + end + rescue Errno::ENOENT, Errno::EPERM + end + end + rescue Errno::ENOENT => e # race, process is dead + return false + rescue => e + warn "error reading FDs from for PID:#{cur_pid}: #{e.message}" + end + pos = 0 + # get the position of the file of the sox process + cur_fd.each do |fd| + if File.read("/proc/#{cur_pid}/fdinfo/#{fd}") =~ /^pos:\s*(\d+)$/ + n = $1.to_i + pos = n if n > pos + end + end + pos +rescue Errno::ENOENT => e # race, process is dead + return false +end + +def children_of(ppid) + `ps h -o pid --ppid=#{ppid}`.split(/\s+/s).map(&:to_i) +end + +def expand_pid(pid) + to_scan = Array(pid) + pids = [] + while pid = to_scan.shift + pid > 0 or next + to_scan.concat(children_of(pid)) + pids << pid + end + pids.uniq +end + +def do_ra(fp, pos, w) + size = fp.size + len = size - pos + len = @todo_ra if len > @todo_ra + return if len <= 0 + path = fp.path + pp({start_ra: File.basename(path), + len: '%.3f' % (len / (1024 * 1024.0)), + pos: pos }) + Process.spawn('soxi', path, @redir) + Process.spawn('avprobe', path, @redir) + Process.spawn('ffprobe', path, @redir) + fp.advise(:sequential, pos, len) + Thread.new(fp.dup) { |d| d.advise(:willneed, pos, len); d.close } + + at_once = 8192 + adj = len + while len > 0 + n = len > at_once ? at_once : len + n = IO.copy_stream(fp, DTAS.null, n, pos) + pos += n + len -= n + + # stop reading immediately if there's an event + if wait_read(w, 0) + adj = @todo_ra + pos += size + break + end + end + @todo_ra -= adj + (pos + len) >= size ? fp.close : nil +end + +def do_open(path) + if path =~ /\.ya?ml\z/ + File.open(path) do |fp| + buf = fp.read(4) + case buf + when "---\n" + buf << fp.read(fp.size - 4) + Dir.chdir(File.dirname(path)) do + yml = YAML.load(buf) + x = yml['infile'] and return File.open(File.expand_path(x).freeze) + end + end + end + end + File.open(path) +end + +begin + work = {} + cur_pid = nil + @todo_ra = @max_ra + t0 = DTAS.now + fp = nil + cur = YAML.load(c.req('current')) + while @todo_ra > 0 && fp.nil? + if current = cur['current'] + track = current['infile'].freeze + work[track] ||= fp = do_open(track) + cur_pid = current['pid'] + if fp + pos = expand_pid(cur_pid).map do |pid| + seek_to_cur_pos(pid, fp) + end.compact.max + pos and fp = do_ra(fp, pos, w) + end + else + break + end + + # queue has priority, work on it, first + queue = YAML.load(c.req('queue cat')) + while @todo_ra > 0 && track = queue.shift + fp = nil + begin + work[track] ||= fp = do_open(track) + rescue SystemCallError + end + fp = do_ra(fp, 0, w) if fp + end + break if @todo_ra <= 0 + + # the normal tracklist + ids = c.req('tl tracks').split + ids.shift # ignore count + idx = ids.find_index(c.req('tl current-id')) + repeat = c.req('tl repeat').split[-1] + while @todo_ra > 0 && idx && (cid = ids[idx]) + fp = nil + track = c.req("tl get #{cid}").sub!(/\A1 \d+=/, '').freeze + begin + work[track] ||= fp = do_open(track) + rescue SystemCallError + end + fp = do_ra(fp, 0, w) if fp + if @todo_ra > 0 && fp.nil? && ids[idx += 1].nil? + idx = repeat == 'true' ? 0 : nil + end + end + idx or break + cur = YAML.load(c.req('current')) + cur['current'] or break + end + elapsed = DTAS.now - t0 + p [:elapsed, elapsed] + timeout = 5 - elapsed + timeout = 0 if timeout < 0 + r = wait_read(w, timeout) + p w.res_wait if r +rescue EOFError + abort "dtas-player exited" +rescue => e + warn "#{e.message} #{e.class})" + e.backtrace.each {|l| warn l } + sleep 5 +end while true -- cgit v1.2.3-24-ge0c7