everything related to duct tape audio suite (dtas)
 help / color / mirror / code / Atom feed
* [PATCH] pipeline: new module for running process pipelines
@ 2017-04-28 20:08 Eric Wong
  0 siblings, 0 replies; only message in thread
From: Eric Wong @ 2017-04-28 20:08 UTC (permalink / raw)
  To: dtas-all

This should allow us easily to manipulate process pipelines
as an array of arrays.

Originally posted at
http://blade.nagaokaut.ac.jp/cgi-bin/scat.rb/ruby/ruby-talk/435624
---
 lib/dtas/pipeline.rb  | 75 +++++++++++++++++++++++++++++++++++++++++++++++++++
 test/test_pipeline.rb | 47 ++++++++++++++++++++++++++++++++
 2 files changed, 122 insertions(+)
 create mode 100644 lib/dtas/pipeline.rb
 create mode 100644 test/test_pipeline.rb

diff --git a/lib/dtas/pipeline.rb b/lib/dtas/pipeline.rb
new file mode 100644
index 0000000..b04b7f7
--- /dev/null
+++ b/lib/dtas/pipeline.rb
@@ -0,0 +1,75 @@
+# Copyright (C) 2017 all contributors <dtas-all@nongnu.org>
+# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
+# frozen_string_literal: true
+require_relative '../dtas'
+require_relative 'spawn_fix'
+
+module DTAS::Pipeline # :nodoc:
+  include DTAS::SpawnFix
+
+  # Process.spawn wrapper which supports running Proc-like objects in
+  # a separate process, not just external commands.
+  # Returns the pid of the spawned process
+  def pspawn(env, cmd, rdr = {})
+    case cmd
+    when Array
+      spawn(env, *cmd, rdr)
+    else # support running Proc-like objects, too:
+      fork do
+        ENV.update(env) if env
+
+        # setup redirects
+        [ $stdin, $stdout, $stderr ].each_with_index do |io, fd|
+          dst = rdr[fd] and io.reopen(dst)
+        end
+
+        # close all other pipes, since we can't rely on FD_CLOEXEC
+        # (as we do not exec, here)
+        rdr.each do |k, v|
+          k.close if v == :close
+        end
+        cmd.call
+      end
+    end
+  end
+
+  # +pipeline+ is an Array of (Arrays or Procs)
+  def run_pipeline(env, pipeline)
+    pids = {} # pid => pipeline index
+    work = pipeline.dup
+    last = work.pop
+    nr = work.size
+    rdr = {} # redirect mapping for Process.spawn
+
+    # we need to make sure pipes are closed in any forked processes
+    # (they are redirected to stdin or stdout, first)
+    pipes = nr.times.map { IO.pipe.each { |io| rdr[io] = :close } }
+
+    # start the first and last commands first, they only have one pipe, each
+    last_pid = pspawn(env, last, rdr.merge(0 => pipes[-1][0]))
+    pids[last_pid] = nr
+    first = work.shift
+    first_pid = pspawn(env, first, rdr.merge(1 => pipes[0][1]))
+    pids[first_pid] = 0
+
+    # start the middle commands, they both have two pipes:
+    work.each_with_index do |cmd, i|
+      pid = pspawn(env, cmd, rdr.merge(0 => pipes[i][0], 1 => pipes[i+1][1]))
+      pids[pid] = i + 1
+    end
+
+    # all pipes handed off to children, close so they see EOF
+    pipes.flatten!.each(&:close).clear
+
+    # wait for children to finish
+    fails = []
+    until pids.empty?
+      pid, status = Process.waitpid2(-1)
+      nr = pids.delete(pid)
+      status.success? or
+        fails << "reaped #{nr} #{pipeline[nr].inspect} #{status.inspect}"
+    end
+    # behave like "set -o pipefail" in bash
+    raise fails.join("\n") if fails[0]
+  end
+end
diff --git a/test/test_pipeline.rb b/test/test_pipeline.rb
new file mode 100644
index 0000000..3cc32cc
--- /dev/null
+++ b/test/test_pipeline.rb
@@ -0,0 +1,47 @@
+# Copyright (C) 2017 all contributors <dtas-all@nongnu.org>
+# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
+# frozen_string_literal: true
+require './test/helper'
+require 'dtas/pipeline'
+
+class TestPipeline < Testcase
+  include DTAS::Pipeline
+  def setup
+    @env = ENV.to_hash
+  end
+
+  def pipeline_result
+    IO.pipe do |rd, wr|
+      begin
+        pid = fork do
+          rd.close
+          $stdout.reopen(wr)
+          yield
+          exit!(0)
+        end
+        wr.close
+        return rd.read
+      ensure
+        _, status = Process.waitpid2(pid)
+        assert_predicate status, :success?
+      end
+    end
+    nil
+  end
+
+  def test_pipeline
+    assert_equal("BYYRU\n", pipeline_result do
+      run_pipeline(@env, [
+        %w(echo hello), # anything which generates something to stdout
+        %w(tr [a-z] [A-Z]), # upcase
+        # this lambda runs inside its own process
+        lambda do
+          $stdin.each_line { |l| $stdout.write("#{l.chomp.reverse}\n") }
+          exit!(0)
+        end,
+        # rot13
+        %w(tr [a-m][n-z][A-M][N-Z] [n-z][a-m][N-Z][A-M])
+      ])
+    end)
+  end
+end
-- 
EW



^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2017-04-28 20:08 UTC | newest]

Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2017-04-28 20:08 [PATCH] pipeline: new module for running process pipelines Eric Wong

Code repositories for project(s) associated with this public inbox

	https://80x24.org/dtas.git/

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).