about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2015-09-20 22:56:48 +0000
committerEric Wong <e@80x24.org>2015-09-20 22:58:03 +0000
commite90e414ebf4b143b3ddbaa16460987f485ad8117 (patch)
treea58ae0ef9552f680e24d576afb0a71f19683714f
parentf9e3dd47c25534af631a13e57f86fc69b03db9be (diff)
downloaddtas-e90e414ebf4b143b3ddbaa16460987f485ad8117.tar.gz
This is dependent on Linux /proc/ (the "pos: " field
of /proc/$PID/fdinfo/$FD to be exact).

This was written to avoid seek latencies on a remote FUSE
filesystem with occasional packet loss.
-rw-r--r--bin/dtas-readahead207
1 files changed, 207 insertions, 0 deletions
diff --git a/bin/dtas-readahead b/bin/dtas-readahead
new file mode 100644
index 0000000..f02bc35
--- /dev/null
+++ b/bin/dtas-readahead
@@ -0,0 +1,207 @@
+#!/usr/bin/env ruby
+# Copyright (C) 2015 all contributors <dtas-all@nongnu.org>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+#
+# Really janky readahead script.  Requires dtas-player to be
+# running and unlikely to work outside of Linux as it depends on
+# the contents of /proc
+unless RUBY_PLATFORM =~ /linux/
+  warn "this relies on Linux /proc and probably does not work well for you"
+end
+
+require 'yaml'
+require 'io/wait'
+require 'dtas/unix_client'
+require 'dtas/process'
+
+include DTAS::Process
+include DTAS::SpawnFix
+trap(:CHLD) { DTAS::Process.reaper {} }
+trap(:INT) { exit(0) }
+trap(:TERM) { exit(0) }
+w = DTAS::UNIXClient.new
+w.req_ok('watch')
+c = DTAS::UNIXClient.new
+@max_ra = 30 * 1024 * 1024
+null = DTAS.null
+@redir = { err: null, out: null, in: null }.freeze
+require 'pp'
+
+if RUBY_VERSION.to_r >= '2.3'.to_r
+  # Old Rubies did FIONREAD, which breaks on SOCK_SEQPACKET
+  def wait_read(w, timeout)
+    w.to_io.wait_readable(timeout)
+  end
+else
+  def wait_read(w, timeout)
+    r = IO.select([w], nil, nil, timeout)
+    r ? r[0] : nil
+  end
+end
+
+def seek_to_cur_pos(cur_pid, fp)
+  cur_fd = []
+  fpst = fp.stat
+  begin
+    Dir["/proc/#{cur_pid}/fd/*"].each do |l|
+      path = File.readlink(l)
+      begin
+        st = File.stat(path)
+        if st.dev == fpst.dev && st.ino == fpst.ino
+          cur_fd << l.split('/')[-1]
+        end
+      rescue Errno::ENOENT, Errno::EPERM
+      end
+    end
+  rescue Errno::ENOENT => e # race, process is dead
+    return false
+  rescue => e
+    warn "error reading FDs from for PID:#{cur_pid}: #{e.message}"
+  end
+  pos = 0
+  # get the position of the file of the sox process
+  cur_fd.each do |fd|
+    if File.read("/proc/#{cur_pid}/fdinfo/#{fd}") =~ /^pos:\s*(\d+)$/
+      n = $1.to_i
+      pos = n if n > pos
+    end
+  end
+  pos
+rescue Errno::ENOENT => e # race, process is dead
+  return false
+end
+
+def children_of(ppid)
+  `ps h -o pid --ppid=#{ppid}`.split(/\s+/s).map(&:to_i)
+end
+
+def expand_pid(pid)
+  to_scan = Array(pid)
+  pids = []
+  while pid = to_scan.shift
+    pid > 0 or next
+    to_scan.concat(children_of(pid))
+    pids << pid
+  end
+  pids.uniq
+end
+
+def do_ra(fp, pos, w)
+  size = fp.size
+  len = size - pos
+  len = @todo_ra if len > @todo_ra
+  return if len <= 0
+  path = fp.path
+  pp({start_ra: File.basename(path),
+      len: '%.3f' % (len / (1024 * 1024.0)),
+      pos: pos })
+  Process.spawn('soxi', path, @redir)
+  Process.spawn('avprobe', path, @redir)
+  Process.spawn('ffprobe', path, @redir)
+  fp.advise(:sequential, pos, len)
+  Thread.new(fp.dup) { |d| d.advise(:willneed, pos, len); d.close }
+
+  at_once = 8192
+  adj = len
+  while len > 0
+    n = len > at_once ? at_once : len
+    n = IO.copy_stream(fp, DTAS.null, n, pos)
+    pos += n
+    len -= n
+
+    # stop reading immediately if there's an event
+    if wait_read(w, 0)
+      adj = @todo_ra
+      pos += size
+      break
+    end
+  end
+  @todo_ra -= adj
+  (pos + len) >= size ? fp.close : nil
+end
+
+def do_open(path)
+  if path =~ /\.ya?ml\z/
+    File.open(path) do |fp|
+      buf = fp.read(4)
+      case buf
+      when "---\n"
+        buf << fp.read(fp.size - 4)
+        Dir.chdir(File.dirname(path)) do
+          yml = YAML.load(buf)
+          x = yml['infile'] and return File.open(File.expand_path(x).freeze)
+        end
+      end
+    end
+  end
+  File.open(path)
+end
+
+begin
+  work = {}
+  cur_pid = nil
+  @todo_ra = @max_ra
+  t0 = DTAS.now
+  fp = nil
+  cur = YAML.load(c.req('current'))
+  while @todo_ra > 0 && fp.nil?
+    if current = cur['current']
+      track = current['infile'].freeze
+      work[track] ||= fp = do_open(track)
+      cur_pid = current['pid']
+      if fp
+        pos = expand_pid(cur_pid).map do |pid|
+          seek_to_cur_pos(pid, fp)
+        end.compact.max
+        pos and fp = do_ra(fp, pos, w)
+      end
+    else
+      break
+    end
+
+    # queue has priority, work on it, first
+    queue = YAML.load(c.req('queue cat'))
+    while @todo_ra > 0 && track = queue.shift
+      fp = nil
+      begin
+        work[track] ||= fp = do_open(track)
+      rescue SystemCallError
+      end
+      fp = do_ra(fp, 0, w) if fp
+    end
+    break if @todo_ra <= 0
+
+    # the normal tracklist
+    ids = c.req('tl tracks').split
+    ids.shift # ignore count
+    idx = ids.find_index(c.req('tl current-id'))
+    repeat = c.req('tl repeat').split[-1]
+    while @todo_ra > 0 && idx && (cid = ids[idx])
+      fp = nil
+      track = c.req("tl get #{cid}").sub!(/\A1 \d+=/, '').freeze
+      begin
+        work[track] ||= fp = do_open(track)
+      rescue SystemCallError
+      end
+      fp = do_ra(fp, 0, w) if fp
+      if @todo_ra > 0 && fp.nil? && ids[idx += 1].nil?
+        idx = repeat == 'true' ? 0 : nil
+      end
+    end
+    idx or break
+    cur = YAML.load(c.req('current'))
+    cur['current'] or break
+  end
+  elapsed = DTAS.now - t0
+  p [:elapsed, elapsed]
+  timeout = 5 - elapsed
+  timeout = 0 if timeout < 0
+  r = wait_read(w, timeout)
+  p w.res_wait if r
+rescue EOFError
+  abort "dtas-player exited"
+rescue => e
+  warn "#{e.message} #{e.class})"
+  e.backtrace.each {|l| warn l }
+  sleep 5
+end while true