From 60d7f657f7922457c18b46f56bcd58b8d9e56bbf Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 28 Apr 2017 18:15:20 +0000 Subject: pipeline: new module for running process pipelines This should allow us easily to manipulate process pipelines as an array of arrays. Originally posted at http://blade.nagaokaut.ac.jp/cgi-bin/scat.rb/ruby/ruby-talk/435624 --- lib/dtas/pipeline.rb | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 lib/dtas/pipeline.rb (limited to 'lib/dtas/pipeline.rb') diff --git a/lib/dtas/pipeline.rb b/lib/dtas/pipeline.rb new file mode 100644 index 0000000..b04b7f7 --- /dev/null +++ b/lib/dtas/pipeline.rb @@ -0,0 +1,75 @@ +# Copyright (C) 2017 all contributors +# License: GPL-3.0+ +# frozen_string_literal: true +require_relative '../dtas' +require_relative 'spawn_fix' + +module DTAS::Pipeline # :nodoc: + include DTAS::SpawnFix + + # Process.spawn wrapper which supports running Proc-like objects in + # a separate process, not just external commands. + # Returns the pid of the spawned process + def pspawn(env, cmd, rdr = {}) + case cmd + when Array + spawn(env, *cmd, rdr) + else # support running Proc-like objects, too: + fork do + ENV.update(env) if env + + # setup redirects + [ $stdin, $stdout, $stderr ].each_with_index do |io, fd| + dst = rdr[fd] and io.reopen(dst) + end + + # close all other pipes, since we can't rely on FD_CLOEXEC + # (as we do not exec, here) + rdr.each do |k, v| + k.close if v == :close + end + cmd.call + end + end + end + + # +pipeline+ is an Array of (Arrays or Procs) + def run_pipeline(env, pipeline) + pids = {} # pid => pipeline index + work = pipeline.dup + last = work.pop + nr = work.size + rdr = {} # redirect mapping for Process.spawn + + # we need to make sure pipes are closed in any forked processes + # (they are redirected to stdin or stdout, first) + pipes = nr.times.map { IO.pipe.each { |io| rdr[io] = :close } } + + # start the first and last commands first, they only have one pipe, each + last_pid = pspawn(env, last, rdr.merge(0 => pipes[-1][0])) + pids[last_pid] = nr + first = work.shift + first_pid = pspawn(env, first, rdr.merge(1 => pipes[0][1])) + pids[first_pid] = 0 + + # start the middle commands, they both have two pipes: + work.each_with_index do |cmd, i| + pid = pspawn(env, cmd, rdr.merge(0 => pipes[i][0], 1 => pipes[i+1][1])) + pids[pid] = i + 1 + end + + # all pipes handed off to children, close so they see EOF + pipes.flatten!.each(&:close).clear + + # wait for children to finish + fails = [] + until pids.empty? + pid, status = Process.waitpid2(-1) + nr = pids.delete(pid) + status.success? or + fails << "reaped #{nr} #{pipeline[nr].inspect} #{status.inspect}" + end + # behave like "set -o pipefail" in bash + raise fails.join("\n") if fails[0] + end +end -- cgit v1.2.3-24-ge0c7