dtas.git  about / heads / tags
duct tape audio suite for *nix
blob b900feeb03d3fc65dff5d560c03a9fbc15fb071d 2338 bytes (raw)
$ git show v0.16.1:lib/dtas/pipeline.rb	# shows this blob on the CLI

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
 
# Copyright (C) 2017-2019 all contributors <dtas-all@nongnu.org>
# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
# frozen_string_literal: true
require_relative '../dtas'
require_relative 'spawn_fix'

module DTAS::Pipeline # :nodoc:
  include DTAS::SpawnFix

  # Process.spawn wrapper which supports running Proc-like objects in
  # a separate process, not just external commands.
  # Returns the pid of the spawned process
  def pspawn(env, cmd, rdr = {})
    case cmd
    when Array
      spawn(env, *cmd, rdr)
    else # support running Proc-like objects, too:
      fork do
        ENV.update(env) if env

        # setup redirects
        [ $stdin, $stdout, $stderr ].each_with_index do |io, fd|
          dst = rdr[fd] and io.reopen(dst)
        end

        # close all other pipes, since we can't rely on FD_CLOEXEC
        # (as we do not exec, here)
        rdr.each do |k, v|
          k.close if v == :close
        end
        cmd.call
      end
    end
  end

  # +pipeline+ is an Array of (Arrays or Procs)
  def run_pipeline(env, pipeline)
    pids = {} # pid => pipeline index
    work = pipeline.dup
    last = work.pop
    nr = work.size
    rdr = {} # redirect mapping for Process.spawn

    # we need to make sure pipes are closed in any forked processes
    # (they are redirected to stdin or stdout, first)
    pipes = nr.times.map { IO.pipe.each { |io| rdr[io] = :close } }

    # start the first and last commands first, they only have one pipe, each
    last_pid = pspawn(env, last, rdr.merge(0 => pipes[-1][0]))
    pids[last_pid] = nr
    first = work.shift
    first_pid = pspawn(env, first, rdr.merge(1 => pipes[0][1]))
    pids[first_pid] = 0

    # start the middle commands, they both have two pipes:
    work.each_with_index do |cmd, i|
      pid = pspawn(env, cmd, rdr.merge(0 => pipes[i][0], 1 => pipes[i+1][1]))
      pids[pid] = i + 1
    end

    # all pipes handed off to children, close so they see EOF
    pipes.flatten!.each(&:close).clear

    # wait for children to finish
    fails = []
    until pids.empty?
      pid, status = Process.waitpid2(-1)
      nr = pids.delete(pid)
      status.success? or
        fails << "reaped #{nr} #{pipeline[nr].inspect} #{status.inspect}"
    end
    # behave like "set -o pipefail" in bash
    raise fails.join("\n") if fails[0]
  end
end

git clone git://80x24.org/dtas.git
git clone https://80x24.org/dtas.git