everything related to duct tape audio suite (dtas)
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: dtas-all@nongnu.org
Subject: [PATCH] pipeline: new module for running process pipelines
Date: Fri, 28 Apr 2017 20:08:09 +0000	[thread overview]
Message-ID: <20170428200809.28822-1-e@80x24.org> (raw)

This should allow us easily to manipulate process pipelines
as an array of arrays.

Originally posted at
http://blade.nagaokaut.ac.jp/cgi-bin/scat.rb/ruby/ruby-talk/435624
---
 lib/dtas/pipeline.rb  | 75 +++++++++++++++++++++++++++++++++++++++++++++++++++
 test/test_pipeline.rb | 47 ++++++++++++++++++++++++++++++++
 2 files changed, 122 insertions(+)
 create mode 100644 lib/dtas/pipeline.rb
 create mode 100644 test/test_pipeline.rb

diff --git a/lib/dtas/pipeline.rb b/lib/dtas/pipeline.rb
new file mode 100644
index 0000000..b04b7f7
--- /dev/null
+++ b/lib/dtas/pipeline.rb
@@ -0,0 +1,75 @@
+# Copyright (C) 2017 all contributors <dtas-all@nongnu.org>
+# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
+# frozen_string_literal: true
+require_relative '../dtas'
+require_relative 'spawn_fix'
+
+module DTAS::Pipeline # :nodoc:
+  include DTAS::SpawnFix
+
+  # Process.spawn wrapper which supports running Proc-like objects in
+  # a separate process, not just external commands.
+  # Returns the pid of the spawned process
+  def pspawn(env, cmd, rdr = {})
+    case cmd
+    when Array
+      spawn(env, *cmd, rdr)
+    else # support running Proc-like objects, too:
+      fork do
+        ENV.update(env) if env
+
+        # setup redirects
+        [ $stdin, $stdout, $stderr ].each_with_index do |io, fd|
+          dst = rdr[fd] and io.reopen(dst)
+        end
+
+        # close all other pipes, since we can't rely on FD_CLOEXEC
+        # (as we do not exec, here)
+        rdr.each do |k, v|
+          k.close if v == :close
+        end
+        cmd.call
+      end
+    end
+  end
+
+  # +pipeline+ is an Array of (Arrays or Procs)
+  def run_pipeline(env, pipeline)
+    pids = {} # pid => pipeline index
+    work = pipeline.dup
+    last = work.pop
+    nr = work.size
+    rdr = {} # redirect mapping for Process.spawn
+
+    # we need to make sure pipes are closed in any forked processes
+    # (they are redirected to stdin or stdout, first)
+    pipes = nr.times.map { IO.pipe.each { |io| rdr[io] = :close } }
+
+    # start the first and last commands first, they only have one pipe, each
+    last_pid = pspawn(env, last, rdr.merge(0 => pipes[-1][0]))
+    pids[last_pid] = nr
+    first = work.shift
+    first_pid = pspawn(env, first, rdr.merge(1 => pipes[0][1]))
+    pids[first_pid] = 0
+
+    # start the middle commands, they both have two pipes:
+    work.each_with_index do |cmd, i|
+      pid = pspawn(env, cmd, rdr.merge(0 => pipes[i][0], 1 => pipes[i+1][1]))
+      pids[pid] = i + 1
+    end
+
+    # all pipes handed off to children, close so they see EOF
+    pipes.flatten!.each(&:close).clear
+
+    # wait for children to finish
+    fails = []
+    until pids.empty?
+      pid, status = Process.waitpid2(-1)
+      nr = pids.delete(pid)
+      status.success? or
+        fails << "reaped #{nr} #{pipeline[nr].inspect} #{status.inspect}"
+    end
+    # behave like "set -o pipefail" in bash
+    raise fails.join("\n") if fails[0]
+  end
+end
diff --git a/test/test_pipeline.rb b/test/test_pipeline.rb
new file mode 100644
index 0000000..3cc32cc
--- /dev/null
+++ b/test/test_pipeline.rb
@@ -0,0 +1,47 @@
+# Copyright (C) 2017 all contributors <dtas-all@nongnu.org>
+# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
+# frozen_string_literal: true
+require './test/helper'
+require 'dtas/pipeline'
+
+class TestPipeline < Testcase
+  include DTAS::Pipeline
+  def setup
+    @env = ENV.to_hash
+  end
+
+  def pipeline_result
+    IO.pipe do |rd, wr|
+      begin
+        pid = fork do
+          rd.close
+          $stdout.reopen(wr)
+          yield
+          exit!(0)
+        end
+        wr.close
+        return rd.read
+      ensure
+        _, status = Process.waitpid2(pid)
+        assert_predicate status, :success?
+      end
+    end
+    nil
+  end
+
+  def test_pipeline
+    assert_equal("BYYRU\n", pipeline_result do
+      run_pipeline(@env, [
+        %w(echo hello), # anything which generates something to stdout
+        %w(tr [a-z] [A-Z]), # upcase
+        # this lambda runs inside its own process
+        lambda do
+          $stdin.each_line { |l| $stdout.write("#{l.chomp.reverse}\n") }
+          exit!(0)
+        end,
+        # rot13
+        %w(tr [a-m][n-z][A-M][N-Z] [n-z][a-m][N-Z][A-M])
+      ])
+    end)
+  end
+end
-- 
EW



                 reply	other threads:[~2017-04-28 20:08 UTC|newest]

Thread overview: [no followups] expand[flat|nested]  mbox.gz  Atom feed

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://80x24.org/dtas/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20170428200809.28822-1-e@80x24.org \
    --to=e@80x24.org \
    --cc=dtas-all@nongnu.org \
    --subject='Re: [PATCH] pipeline: new module for running process pipelines' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

everything related to duct tape audio suite (dtas)

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://80x24.org/dtas-all
	git clone --mirror http://ou63pmih66umazou.onion/dtas-all

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V1 dtas-all dtas-all/ https://80x24.org/dtas-all \
		dtas-all@nongnu.org
	public-inbox-index dtas-all

Example config snippet for mirrors.
Newsgroups are available over NNTP:
	nntp://news.public-inbox.org/inbox.comp.audio.dtas
	nntp://7fh6tueqddpjyxjmgtdiueylzoqt6pt7hec3pukyptlmohoowvhde4yd.onion/inbox.comp.audio.dtas
 note: .onion URLs require Tor: https://www.torproject.org/

code repositories for project(s) associated with this inbox:

	../../dtas.git

AGPL code for this site: git clone http://7fh6tueqddpjyxjmgtdiueylzoqt6pt7hec3pukyptlmohoowvhde4yd.onion/public-inbox.git