1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
| | # Copyright (C) all contributors <dtas-all@nongnu.org>
# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
# frozen_string_literal: true
# encoding: binary
require_relative '../../dtas'
require_relative '../source'
require_relative '../replaygain'
require_relative '../xs'
# this is usually one input file
class DTAS::Source::Sox # :nodoc:
require_relative 'file'
include DTAS::Source::File
include DTAS::XS
extend DTAS::XS
SOX_DEFAULTS = COMMAND_DEFAULTS.merge(
"command" => 'exec sox "$INFILE" $SOXFMT - $TRIMFX $RGFX',
"tryorder" => 0,
)
def soxi_failed(infile, msg)
return if @last_failed == infile
@last_failed = infile
case msg
when Process::Status then msg = "failed with #{msg.exitstatus}"
when 0 then msg = 'detected zero samples'
end
warn("soxi #{infile}: #{msg}\n")
end
def initialize(mcache = nil)
@mcache = nil
@last_failed = nil
command_init(SOX_DEFAULTS)
end
def mcache_lookup(infile)
(@mcache ||= DTAS::Mcache.new).lookup(infile) do |input, dst|
err = ''.b
out = qx(@env.dup, %W(soxi #{input}), err_str: err, no_raise: true)
return soxi_failed(infile, out) if Process::Status === out
return soxi_failed(infile, err) if err =~ /soxi FAIL formats:/
out =~ /^Duration\s*:[^=]*= (\d+) samples /n
samples = dst['samples'] = $1.to_i
return soxi_failed(infile, 0) if samples == 0
out =~ /^Channels\s*:\s*(\d+)/n and dst['channels'] = $1.to_i
out =~ /^Sample Rate\s*:\s*(\d+)/n and dst['rate'] = $1.to_i
out =~ /^Precision\s*:\s*(\d+)-bit/n and dst['bits'] = $1.to_i
enc = Encoding.default_external # typically Encoding::UTF_8
if out =~ /\nComments\s*:[ \t]*\n?(.*)\z/mn
comments = dst['comments'] = {}
key = nil
$1.split(/\n/n).each do |line|
if line.sub!(/^([^=]+)=/ni, '')
key = DTAS.try_enc($1.upcase, enc)
end
(comments[key] ||= ''.b) << "#{line}\n" unless line.empty?
end
comments.each do |k,v|
v.chomp!
DTAS.try_enc(v, enc)
comments[k] = v
end
end
dst
end
end
def try(infile, offset = nil, trim = nil)
ent = mcache_lookup(infile) or return
ret = source_file_dup(infile, offset, trim)
ret.instance_eval do
@samples = ent['samples']
@format = DTAS::Format.load(ent)
@comments = ent['comments']
end
ret
end
def format
@format ||= begin
ent = mcache_lookup(@infile)
ent ? DTAS::Format.load(ent) : nil
end
end
def duration
samples / format.rate.to_f
end
# This is the number of samples according to the samples in the source
# file itself, not the decoded output
def samples
(@samples ||= begin
ent = mcache_lookup(@infile)
ent ? ent['samples'] : nil
end) || 0
end
def __load_comments
tmp = {}
case @infile
when String
ent = mcache_lookup(@infile) and tmp = ent['comments']
end
tmp
end
def src_spawn(player_format, rg_state, opts)
raise "BUG: #{self.inspect}#src_spawn called twice" if @to_io
e = @env.merge!(player_format.to_env)
e["INFILE"] = @infile
# make sure these are visible to the "current" command...
e["TRIMFX"] = trimfx
e["RGFX"] = rg_state.effect(self) || nil
e.merge!(@rg.to_env) if @rg
@pid = dtas_spawn(e, command_string, opts)
end
def to_hsh
to_hash.delete_if { |k,v| v == SOX_DEFAULTS[k] }
end
def source_defaults
SOX_DEFAULTS
end
end
|