#!/usr/bin/env ruby # Copyright (C) 2015 all contributors # License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) # # Really janky readahead script. Requires dtas-player to be # running and unlikely to work outside of Linux as it depends on # the contents of /proc unless RUBY_PLATFORM =~ /linux/ warn "this relies on Linux /proc and probably does not work well for you" end require 'yaml' require 'io/wait' require 'dtas/unix_client' require 'dtas/process' include DTAS::Process include DTAS::SpawnFix trap(:CHLD) { DTAS::Process.reaper {} } trap(:INT) { exit(0) } trap(:TERM) { exit(0) } w = DTAS::UNIXClient.new w.req_ok('watch') c = DTAS::UNIXClient.new @max_ra = 30 * 1024 * 1024 null = DTAS.null @redir = { err: null, out: null, in: null }.freeze require 'pp' if RUBY_VERSION.to_r >= '2.3'.to_r # Old Rubies did FIONREAD, which breaks on SOCK_SEQPACKET def wait_read(w, timeout) w.to_io.wait_readable(timeout) end else def wait_read(w, timeout) r = IO.select([w], nil, nil, timeout) r ? r[0] : nil end end def seek_to_cur_pos(cur_pid, fp) cur_fd = [] fpst = fp.stat begin Dir["/proc/#{cur_pid}/fd/*"].each do |l| path = File.readlink(l) begin st = File.stat(path) if st.dev == fpst.dev && st.ino == fpst.ino cur_fd << l.split('/')[-1] end rescue Errno::ENOENT, Errno::EPERM end end rescue Errno::ENOENT => e # race, process is dead return false rescue => e warn "error reading FDs from for PID:#{cur_pid}: #{e.message}" end pos = 0 # get the position of the file of the sox process cur_fd.each do |fd| if File.read("/proc/#{cur_pid}/fdinfo/#{fd}") =~ /^pos:\s*(\d+)$/ n = $1.to_i pos = n if n > pos end end pos rescue Errno::ENOENT => e # race, process is dead return false end def children_of(ppid) `ps h -o pid --ppid=#{ppid}`.split(/\s+/s).map(&:to_i) end def expand_pid(pid) to_scan = Array(pid) pids = [] while pid = to_scan.shift pid > 0 or next to_scan.concat(children_of(pid)) pids << pid end pids.uniq end def do_ra(fp, pos, w) size = fp.size len = size - pos len = @todo_ra if len > @todo_ra return if len <= 0 path = fp.path pp({start_ra: File.basename(path), len: '%.3f' % (len / (1024 * 1024.0)), pos: pos }) spawn('soxi', path, @redir) spawn('avprobe', path, @redir) spawn('ffprobe', path, @redir) fp.advise(:sequential, pos, len) Thread.new(fp.dup) { |d| d.advise(:willneed, pos, len); d.close } at_once = 8192 adj = len while len > 0 n = len > at_once ? at_once : len n = IO.copy_stream(fp, DTAS.null, n, pos) pos += n len -= n # stop reading immediately if there's an event if wait_read(w, 0) adj = @todo_ra pos += size break end end @todo_ra -= adj (pos + len) >= size ? fp.close : nil end def do_open(path) if path =~ /\.ya?ml\z/ File.open(path) do |fp| buf = fp.read(4) case buf when "---\n" buf << fp.read(fp.size - 4) Dir.chdir(File.dirname(path)) do yml = YAML.load(buf) x = yml['infile'] and return File.open(File.expand_path(x).freeze) end end end end File.open(path) end begin work = {} cur_pid = nil @todo_ra = @max_ra t0 = DTAS.now fp = nil cur = YAML.load(c.req('current')) while @todo_ra > 0 && fp.nil? if current = cur['current'] track = current['infile'].freeze work[track] ||= fp = do_open(track) cur_pid = current['pid'] if fp pos = expand_pid(cur_pid).map do |pid| seek_to_cur_pos(pid, fp) end.compact.max pos and fp = do_ra(fp, pos, w) end else break end # queue has priority, work on it, first queue = YAML.load(c.req('queue cat')) while @todo_ra > 0 && track = queue.shift fp = nil begin work[track] ||= fp = do_open(track) rescue SystemCallError end fp = do_ra(fp, 0, w) if fp end break if @todo_ra <= 0 # the normal tracklist ids = c.req('tl tracks').split ids.shift # ignore count idx = ids.find_index(c.req('tl current-id')) repeat = c.req('tl repeat').split[-1] while @todo_ra > 0 && idx && (cid = ids[idx]) fp = nil track = c.req("tl get #{cid}").sub!(/\A1 \d+=/, '').freeze begin work[track] ||= fp = do_open(track) rescue SystemCallError end fp = do_ra(fp, 0, w) if fp if @todo_ra > 0 && fp.nil? && ids[idx += 1].nil? idx = repeat == 'true' ? 0 : nil end end idx or break cur = YAML.load(c.req('current')) current = cur['current'] or break end if current elapsed = DTAS.now - t0 p [:elapsed, elapsed] timeout = 5 - elapsed timeout = 0 if timeout < 0 else timeout = nil end r = wait_read(w, timeout) p w.res_wait if r rescue EOFError abort "dtas-player exited" rescue => e warn "#{e.message} #{e.class})" e.backtrace.each {|l| warn l } sleep 5 end while true