everything related to duct tape audio suite (dtas)
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: dtas-all@nongnu.org
Subject: [PATCH] dtas-readahead: new script for -player users on Linux
Date: Sun, 20 Sep 2015 23:02:37 +0000	[thread overview]
Message-ID: <20150920230237.GA29471@dcvr.yhbt.net> (raw)
In-Reply-To: <20150907214034.28039-1-e@80x24.org>

This is dependent on Linux /proc/ (the "pos: " field
of /proc/$PID/fdinfo/$FD to be exact).

This was written to avoid seek latencies on a remote FUSE
filesystem with occasional packet loss.
---
  Eric Wong <e@80x24.org> wrote:
  > I'll be adding userspace readahead for playback on slow network
  > filesystems.  It'll start off as Linux-only and will stay as a
  > separate process/executable to:
  > 
  > a) allow tuning/modification without interrupting playback in
  >    dtas-player
  > 
  > b) not place additional demands on the weak, non-RT (mainline)
  >    Ruby 2.x threading/GC system we use for dtas-player.

 bin/dtas-readahead | 207 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 207 insertions(+)
 create mode 100644 bin/dtas-readahead

diff --git a/bin/dtas-readahead b/bin/dtas-readahead
new file mode 100644
index 0000000..f02bc35
--- /dev/null
+++ b/bin/dtas-readahead
@@ -0,0 +1,207 @@
+#!/usr/bin/env ruby
+# Copyright (C) 2015 all contributors <dtas-all@nongnu.org>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+#
+# Really janky readahead script.  Requires dtas-player to be
+# running and unlikely to work outside of Linux as it depends on
+# the contents of /proc
+unless RUBY_PLATFORM =~ /linux/
+  warn "this relies on Linux /proc and probably does not work well for you"
+end
+
+require 'yaml'
+require 'io/wait'
+require 'dtas/unix_client'
+require 'dtas/process'
+
+include DTAS::Process
+include DTAS::SpawnFix
+trap(:CHLD) { DTAS::Process.reaper {} }
+trap(:INT) { exit(0) }
+trap(:TERM) { exit(0) }
+w = DTAS::UNIXClient.new
+w.req_ok('watch')
+c = DTAS::UNIXClient.new
+@max_ra = 30 * 1024 * 1024
+null = DTAS.null
+@redir = { err: null, out: null, in: null }.freeze
+require 'pp'
+
+if RUBY_VERSION.to_r >= '2.3'.to_r
+  # Old Rubies did FIONREAD, which breaks on SOCK_SEQPACKET
+  def wait_read(w, timeout)
+    w.to_io.wait_readable(timeout)
+  end
+else
+  def wait_read(w, timeout)
+    r = IO.select([w], nil, nil, timeout)
+    r ? r[0] : nil
+  end
+end
+
+def seek_to_cur_pos(cur_pid, fp)
+  cur_fd = []
+  fpst = fp.stat
+  begin
+    Dir["/proc/#{cur_pid}/fd/*"].each do |l|
+      path = File.readlink(l)
+      begin
+        st = File.stat(path)
+        if st.dev == fpst.dev && st.ino == fpst.ino
+          cur_fd << l.split('/')[-1]
+        end
+      rescue Errno::ENOENT, Errno::EPERM
+      end
+    end
+  rescue Errno::ENOENT => e # race, process is dead
+    return false
+  rescue => e
+    warn "error reading FDs from for PID:#{cur_pid}: #{e.message}"
+  end
+  pos = 0
+  # get the position of the file of the sox process
+  cur_fd.each do |fd|
+    if File.read("/proc/#{cur_pid}/fdinfo/#{fd}") =~ /^pos:\s*(\d+)$/
+      n = $1.to_i
+      pos = n if n > pos
+    end
+  end
+  pos
+rescue Errno::ENOENT => e # race, process is dead
+  return false
+end
+
+def children_of(ppid)
+  `ps h -o pid --ppid=#{ppid}`.split(/\s+/s).map(&:to_i)
+end
+
+def expand_pid(pid)
+  to_scan = Array(pid)
+  pids = []
+  while pid = to_scan.shift
+    pid > 0 or next
+    to_scan.concat(children_of(pid))
+    pids << pid
+  end
+  pids.uniq
+end
+
+def do_ra(fp, pos, w)
+  size = fp.size
+  len = size - pos
+  len = @todo_ra if len > @todo_ra
+  return if len <= 0
+  path = fp.path
+  pp({start_ra: File.basename(path),
+      len: '%.3f' % (len / (1024 * 1024.0)),
+      pos: pos })
+  Process.spawn('soxi', path, @redir)
+  Process.spawn('avprobe', path, @redir)
+  Process.spawn('ffprobe', path, @redir)
+  fp.advise(:sequential, pos, len)
+  Thread.new(fp.dup) { |d| d.advise(:willneed, pos, len); d.close }
+
+  at_once = 8192
+  adj = len
+  while len > 0
+    n = len > at_once ? at_once : len
+    n = IO.copy_stream(fp, DTAS.null, n, pos)
+    pos += n
+    len -= n
+
+    # stop reading immediately if there's an event
+    if wait_read(w, 0)
+      adj = @todo_ra
+      pos += size
+      break
+    end
+  end
+  @todo_ra -= adj
+  (pos + len) >= size ? fp.close : nil
+end
+
+def do_open(path)
+  if path =~ /\.ya?ml\z/
+    File.open(path) do |fp|
+      buf = fp.read(4)
+      case buf
+      when "---\n"
+        buf << fp.read(fp.size - 4)
+        Dir.chdir(File.dirname(path)) do
+          yml = YAML.load(buf)
+          x = yml['infile'] and return File.open(File.expand_path(x).freeze)
+        end
+      end
+    end
+  end
+  File.open(path)
+end
+
+begin
+  work = {}
+  cur_pid = nil
+  @todo_ra = @max_ra
+  t0 = DTAS.now
+  fp = nil
+  cur = YAML.load(c.req('current'))
+  while @todo_ra > 0 && fp.nil?
+    if current = cur['current']
+      track = current['infile'].freeze
+      work[track] ||= fp = do_open(track)
+      cur_pid = current['pid']
+      if fp
+        pos = expand_pid(cur_pid).map do |pid|
+          seek_to_cur_pos(pid, fp)
+        end.compact.max
+        pos and fp = do_ra(fp, pos, w)
+      end
+    else
+      break
+    end
+
+    # queue has priority, work on it, first
+    queue = YAML.load(c.req('queue cat'))
+    while @todo_ra > 0 && track = queue.shift
+      fp = nil
+      begin
+        work[track] ||= fp = do_open(track)
+      rescue SystemCallError
+      end
+      fp = do_ra(fp, 0, w) if fp
+    end
+    break if @todo_ra <= 0
+
+    # the normal tracklist
+    ids = c.req('tl tracks').split
+    ids.shift # ignore count
+    idx = ids.find_index(c.req('tl current-id'))
+    repeat = c.req('tl repeat').split[-1]
+    while @todo_ra > 0 && idx && (cid = ids[idx])
+      fp = nil
+      track = c.req("tl get #{cid}").sub!(/\A1 \d+=/, '').freeze
+      begin
+        work[track] ||= fp = do_open(track)
+      rescue SystemCallError
+      end
+      fp = do_ra(fp, 0, w) if fp
+      if @todo_ra > 0 && fp.nil? && ids[idx += 1].nil?
+        idx = repeat == 'true' ? 0 : nil
+      end
+    end
+    idx or break
+    cur = YAML.load(c.req('current'))
+    cur['current'] or break
+  end
+  elapsed = DTAS.now - t0
+  p [:elapsed, elapsed]
+  timeout = 5 - elapsed
+  timeout = 0 if timeout < 0
+  r = wait_read(w, timeout)
+  p w.res_wait if r
+rescue EOFError
+  abort "dtas-player exited"
+rescue => e
+  warn "#{e.message} #{e.class})"
+  e.backtrace.each {|l| warn l }
+  sleep 5
+end while true
-- 
EW


  parent reply	other threads:[~2015-09-20 23:02 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2015-09-07 21:40 [PATCH 0/2] minor updates for readahead for -player Eric Wong
2015-09-07 21:40 ` [PATCH 1/2] use a common /dev/null Eric Wong
2015-09-07 21:40 ` [PATCH 2/2] player: add "queue cat" command Eric Wong
2015-09-20 23:02 ` Eric Wong [this message]
2015-10-03  9:38   ` [PATCH] dtas-readahead: make executable Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://80x24.org/dtas/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20150920230237.GA29471@dcvr.yhbt.net \
    --to=e@80x24.org \
    --cc=dtas-all@nongnu.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://80x24.org/dtas.git/

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).