about summary refs log tree commit homepage
path: root/lib/dtas/pipeline.rb
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2017-04-28 18:15:20 +0000
committerEric Wong <e@80x24.org>2017-04-28 18:38:55 +0000
commit60d7f657f7922457c18b46f56bcd58b8d9e56bbf (patch)
tree0cf45e25f0ae5b8bb955c6fbce80d3e4915a723b /lib/dtas/pipeline.rb
parent668eac36b811d9c321d9a48ad784807f43171882 (diff)
downloaddtas-60d7f657f7922457c18b46f56bcd58b8d9e56bbf.tar.gz
This should allow us easily to manipulate process pipelines
as an array of arrays.

Originally posted at
http://blade.nagaokaut.ac.jp/cgi-bin/scat.rb/ruby/ruby-talk/435624
Diffstat (limited to 'lib/dtas/pipeline.rb')
-rw-r--r--lib/dtas/pipeline.rb75
1 files changed, 75 insertions, 0 deletions
diff --git a/lib/dtas/pipeline.rb b/lib/dtas/pipeline.rb
new file mode 100644
index 0000000..b04b7f7
--- /dev/null
+++ b/lib/dtas/pipeline.rb
@@ -0,0 +1,75 @@
+# Copyright (C) 2017 all contributors <dtas-all@nongnu.org>
+# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
+# frozen_string_literal: true
+require_relative '../dtas'
+require_relative 'spawn_fix'
+
+module DTAS::Pipeline # :nodoc:
+  include DTAS::SpawnFix
+
+  # Process.spawn wrapper which supports running Proc-like objects in
+  # a separate process, not just external commands.
+  # Returns the pid of the spawned process
+  def pspawn(env, cmd, rdr = {})
+    case cmd
+    when Array
+      spawn(env, *cmd, rdr)
+    else # support running Proc-like objects, too:
+      fork do
+        ENV.update(env) if env
+
+        # setup redirects
+        [ $stdin, $stdout, $stderr ].each_with_index do |io, fd|
+          dst = rdr[fd] and io.reopen(dst)
+        end
+
+        # close all other pipes, since we can't rely on FD_CLOEXEC
+        # (as we do not exec, here)
+        rdr.each do |k, v|
+          k.close if v == :close
+        end
+        cmd.call
+      end
+    end
+  end
+
+  # +pipeline+ is an Array of (Arrays or Procs)
+  def run_pipeline(env, pipeline)
+    pids = {} # pid => pipeline index
+    work = pipeline.dup
+    last = work.pop
+    nr = work.size
+    rdr = {} # redirect mapping for Process.spawn
+
+    # we need to make sure pipes are closed in any forked processes
+    # (they are redirected to stdin or stdout, first)
+    pipes = nr.times.map { IO.pipe.each { |io| rdr[io] = :close } }
+
+    # start the first and last commands first, they only have one pipe, each
+    last_pid = pspawn(env, last, rdr.merge(0 => pipes[-1][0]))
+    pids[last_pid] = nr
+    first = work.shift
+    first_pid = pspawn(env, first, rdr.merge(1 => pipes[0][1]))
+    pids[first_pid] = 0
+
+    # start the middle commands, they both have two pipes:
+    work.each_with_index do |cmd, i|
+      pid = pspawn(env, cmd, rdr.merge(0 => pipes[i][0], 1 => pipes[i+1][1]))
+      pids[pid] = i + 1
+    end
+
+    # all pipes handed off to children, close so they see EOF
+    pipes.flatten!.each(&:close).clear
+
+    # wait for children to finish
+    fails = []
+    until pids.empty?
+      pid, status = Process.waitpid2(-1)
+      nr = pids.delete(pid)
+      status.success? or
+        fails << "reaped #{nr} #{pipeline[nr].inspect} #{status.inspect}"
+    end
+    # behave like "set -o pipefail" in bash
+    raise fails.join("\n") if fails[0]
+  end
+end