From: Eric Wong <e@80x24.org>
To: dtas-all@nongnu.org
Subject: [PATCH] dtas-readahead: new script for -player users on Linux
Date: Sun, 20 Sep 2015 23:02:37 +0000 [thread overview]
Message-ID: <20150920230237.GA29471@dcvr.yhbt.net> (raw)
In-Reply-To: <20150907214034.28039-1-e@80x24.org>
This is dependent on Linux /proc/ (the "pos: " field
of /proc/$PID/fdinfo/$FD to be exact).
This was written to avoid seek latencies on a remote FUSE
filesystem with occasional packet loss.
---
Eric Wong <e@80x24.org> wrote:
> I'll be adding userspace readahead for playback on slow network
> filesystems. It'll start off as Linux-only and will stay as a
> separate process/executable to:
>
> a) allow tuning/modification without interrupting playback in
> dtas-player
>
> b) not place additional demands on the weak, non-RT (mainline)
> Ruby 2.x threading/GC system we use for dtas-player.
bin/dtas-readahead | 207 +++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 207 insertions(+)
create mode 100644 bin/dtas-readahead
diff --git a/bin/dtas-readahead b/bin/dtas-readahead
new file mode 100644
index 0000000..f02bc35
--- /dev/null
+++ b/bin/dtas-readahead
@@ -0,0 +1,207 @@
+#!/usr/bin/env ruby
+# Copyright (C) 2015 all contributors <dtas-all@nongnu.org>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+#
+# Really janky readahead script. Requires dtas-player to be
+# running and unlikely to work outside of Linux as it depends on
+# the contents of /proc
+unless RUBY_PLATFORM =~ /linux/
+ warn "this relies on Linux /proc and probably does not work well for you"
+end
+
+require 'yaml'
+require 'io/wait'
+require 'dtas/unix_client'
+require 'dtas/process'
+
+include DTAS::Process
+include DTAS::SpawnFix
+trap(:CHLD) { DTAS::Process.reaper {} }
+trap(:INT) { exit(0) }
+trap(:TERM) { exit(0) }
+w = DTAS::UNIXClient.new
+w.req_ok('watch')
+c = DTAS::UNIXClient.new
+@max_ra = 30 * 1024 * 1024
+null = DTAS.null
+@redir = { err: null, out: null, in: null }.freeze
+require 'pp'
+
+if RUBY_VERSION.to_r >= '2.3'.to_r
+ # Old Rubies did FIONREAD, which breaks on SOCK_SEQPACKET
+ def wait_read(w, timeout)
+ w.to_io.wait_readable(timeout)
+ end
+else
+ def wait_read(w, timeout)
+ r = IO.select([w], nil, nil, timeout)
+ r ? r[0] : nil
+ end
+end
+
+def seek_to_cur_pos(cur_pid, fp)
+ cur_fd = []
+ fpst = fp.stat
+ begin
+ Dir["/proc/#{cur_pid}/fd/*"].each do |l|
+ path = File.readlink(l)
+ begin
+ st = File.stat(path)
+ if st.dev == fpst.dev && st.ino == fpst.ino
+ cur_fd << l.split('/')[-1]
+ end
+ rescue Errno::ENOENT, Errno::EPERM
+ end
+ end
+ rescue Errno::ENOENT => e # race, process is dead
+ return false
+ rescue => e
+ warn "error reading FDs from for PID:#{cur_pid}: #{e.message}"
+ end
+ pos = 0
+ # get the position of the file of the sox process
+ cur_fd.each do |fd|
+ if File.read("/proc/#{cur_pid}/fdinfo/#{fd}") =~ /^pos:\s*(\d+)$/
+ n = $1.to_i
+ pos = n if n > pos
+ end
+ end
+ pos
+rescue Errno::ENOENT => e # race, process is dead
+ return false
+end
+
+def children_of(ppid)
+ `ps h -o pid --ppid=#{ppid}`.split(/\s+/s).map(&:to_i)
+end
+
+def expand_pid(pid)
+ to_scan = Array(pid)
+ pids = []
+ while pid = to_scan.shift
+ pid > 0 or next
+ to_scan.concat(children_of(pid))
+ pids << pid
+ end
+ pids.uniq
+end
+
+def do_ra(fp, pos, w)
+ size = fp.size
+ len = size - pos
+ len = @todo_ra if len > @todo_ra
+ return if len <= 0
+ path = fp.path
+ pp({start_ra: File.basename(path),
+ len: '%.3f' % (len / (1024 * 1024.0)),
+ pos: pos })
+ Process.spawn('soxi', path, @redir)
+ Process.spawn('avprobe', path, @redir)
+ Process.spawn('ffprobe', path, @redir)
+ fp.advise(:sequential, pos, len)
+ Thread.new(fp.dup) { |d| d.advise(:willneed, pos, len); d.close }
+
+ at_once = 8192
+ adj = len
+ while len > 0
+ n = len > at_once ? at_once : len
+ n = IO.copy_stream(fp, DTAS.null, n, pos)
+ pos += n
+ len -= n
+
+ # stop reading immediately if there's an event
+ if wait_read(w, 0)
+ adj = @todo_ra
+ pos += size
+ break
+ end
+ end
+ @todo_ra -= adj
+ (pos + len) >= size ? fp.close : nil
+end
+
+def do_open(path)
+ if path =~ /\.ya?ml\z/
+ File.open(path) do |fp|
+ buf = fp.read(4)
+ case buf
+ when "---\n"
+ buf << fp.read(fp.size - 4)
+ Dir.chdir(File.dirname(path)) do
+ yml = YAML.load(buf)
+ x = yml['infile'] and return File.open(File.expand_path(x).freeze)
+ end
+ end
+ end
+ end
+ File.open(path)
+end
+
+begin
+ work = {}
+ cur_pid = nil
+ @todo_ra = @max_ra
+ t0 = DTAS.now
+ fp = nil
+ cur = YAML.load(c.req('current'))
+ while @todo_ra > 0 && fp.nil?
+ if current = cur['current']
+ track = current['infile'].freeze
+ work[track] ||= fp = do_open(track)
+ cur_pid = current['pid']
+ if fp
+ pos = expand_pid(cur_pid).map do |pid|
+ seek_to_cur_pos(pid, fp)
+ end.compact.max
+ pos and fp = do_ra(fp, pos, w)
+ end
+ else
+ break
+ end
+
+ # queue has priority, work on it, first
+ queue = YAML.load(c.req('queue cat'))
+ while @todo_ra > 0 && track = queue.shift
+ fp = nil
+ begin
+ work[track] ||= fp = do_open(track)
+ rescue SystemCallError
+ end
+ fp = do_ra(fp, 0, w) if fp
+ end
+ break if @todo_ra <= 0
+
+ # the normal tracklist
+ ids = c.req('tl tracks').split
+ ids.shift # ignore count
+ idx = ids.find_index(c.req('tl current-id'))
+ repeat = c.req('tl repeat').split[-1]
+ while @todo_ra > 0 && idx && (cid = ids[idx])
+ fp = nil
+ track = c.req("tl get #{cid}").sub!(/\A1 \d+=/, '').freeze
+ begin
+ work[track] ||= fp = do_open(track)
+ rescue SystemCallError
+ end
+ fp = do_ra(fp, 0, w) if fp
+ if @todo_ra > 0 && fp.nil? && ids[idx += 1].nil?
+ idx = repeat == 'true' ? 0 : nil
+ end
+ end
+ idx or break
+ cur = YAML.load(c.req('current'))
+ cur['current'] or break
+ end
+ elapsed = DTAS.now - t0
+ p [:elapsed, elapsed]
+ timeout = 5 - elapsed
+ timeout = 0 if timeout < 0
+ r = wait_read(w, timeout)
+ p w.res_wait if r
+rescue EOFError
+ abort "dtas-player exited"
+rescue => e
+ warn "#{e.message} #{e.class})"
+ e.backtrace.each {|l| warn l }
+ sleep 5
+end while true
--
EW
next prev parent reply other threads:[~2015-09-20 23:02 UTC|newest]
Thread overview: 5+ messages / expand[flat|nested] mbox.gz Atom feed top
2015-09-07 21:40 [PATCH 0/2] minor updates for readahead for -player Eric Wong
2015-09-07 21:40 ` [PATCH 1/2] use a common /dev/null Eric Wong
2015-09-07 21:40 ` [PATCH 2/2] player: add "queue cat" command Eric Wong
2015-09-20 23:02 ` Eric Wong [this message]
2015-10-03 9:38 ` [PATCH] dtas-readahead: make executable Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://80x24.org/dtas/README
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20150920230237.GA29471@dcvr.yhbt.net \
--to=e@80x24.org \
--cc=dtas-all@nongnu.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://80x24.org/dtas.git/
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).