From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS3215 2.0.0.0/16 X-Spam-Status: No, score=-3.2 required=3.0 tests=BAYES_00, HEADER_FROM_DIFFERENT_DOMAINS,RCVD_IN_DNSWL_HI,SPF_PASS shortcircuit=no autolearn=ham autolearn_force=no version=3.4.0 Received: from lists.gnu.org (lists.gnu.org [IPv6:2001:4830:134:3::11]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by dcvr.yhbt.net (Postfix) with ESMTPS id 0DD19207E4 for ; Fri, 28 Apr 2017 20:08:21 +0000 (UTC) Received: from localhost ([::1]:38630 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1d4CBf-0005qx-BW for e@80x24.org; Fri, 28 Apr 2017 16:08:19 -0400 Received: from eggs.gnu.org ([2001:4830:134:3::10]:50135) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1d4CBb-0005qe-Qj for dtas-all@nongnu.org; Fri, 28 Apr 2017 16:08:17 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1d4CBY-00087K-BP for dtas-all@nongnu.org; Fri, 28 Apr 2017 16:08:15 -0400 Received: from dcvr.yhbt.net ([64.71.152.64]:37568) by eggs.gnu.org with esmtps (TLS1.0:DHE_RSA_AES_256_CBC_SHA1:32) (Exim 4.71) (envelope-from ) id 1d4CBY-00086c-3g for dtas-all@nongnu.org; Fri, 28 Apr 2017 16:08:12 -0400 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id BCC60207E4 for ; Fri, 28 Apr 2017 20:08:09 +0000 (UTC) From: Eric Wong To: dtas-all@nongnu.org Subject: [PATCH] pipeline: new module for running process pipelines Date: Fri, 28 Apr 2017 20:08:09 +0000 Message-Id: <20170428200809.28822-1-e@80x24.org> X-detected-operating-system: by eggs.gnu.org: GNU/Linux 2.2.x-3.x [generic] [fuzzy] X-Received-From: 64.71.152.64 X-BeenThere: dtas-all@nongnu.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: duct tape audio suite List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dtas-all-bounces+e=80x24.org@nongnu.org Sender: "dtas-all" This should allow us easily to manipulate process pipelines as an array of arrays. Originally posted at http://blade.nagaokaut.ac.jp/cgi-bin/scat.rb/ruby/ruby-talk/435624 --- lib/dtas/pipeline.rb | 75 +++++++++++++++++++++++++++++++++++++++++++++++++++ test/test_pipeline.rb | 47 ++++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+) create mode 100644 lib/dtas/pipeline.rb create mode 100644 test/test_pipeline.rb diff --git a/lib/dtas/pipeline.rb b/lib/dtas/pipeline.rb new file mode 100644 index 0000000..b04b7f7 --- /dev/null +++ b/lib/dtas/pipeline.rb @@ -0,0 +1,75 @@ +# Copyright (C) 2017 all contributors +# License: GPL-3.0+ +# frozen_string_literal: true +require_relative '../dtas' +require_relative 'spawn_fix' + +module DTAS::Pipeline # :nodoc: + include DTAS::SpawnFix + + # Process.spawn wrapper which supports running Proc-like objects in + # a separate process, not just external commands. + # Returns the pid of the spawned process + def pspawn(env, cmd, rdr = {}) + case cmd + when Array + spawn(env, *cmd, rdr) + else # support running Proc-like objects, too: + fork do + ENV.update(env) if env + + # setup redirects + [ $stdin, $stdout, $stderr ].each_with_index do |io, fd| + dst = rdr[fd] and io.reopen(dst) + end + + # close all other pipes, since we can't rely on FD_CLOEXEC + # (as we do not exec, here) + rdr.each do |k, v| + k.close if v == :close + end + cmd.call + end + end + end + + # +pipeline+ is an Array of (Arrays or Procs) + def run_pipeline(env, pipeline) + pids = {} # pid => pipeline index + work = pipeline.dup + last = work.pop + nr = work.size + rdr = {} # redirect mapping for Process.spawn + + # we need to make sure pipes are closed in any forked processes + # (they are redirected to stdin or stdout, first) + pipes = nr.times.map { IO.pipe.each { |io| rdr[io] = :close } } + + # start the first and last commands first, they only have one pipe, each + last_pid = pspawn(env, last, rdr.merge(0 => pipes[-1][0])) + pids[last_pid] = nr + first = work.shift + first_pid = pspawn(env, first, rdr.merge(1 => pipes[0][1])) + pids[first_pid] = 0 + + # start the middle commands, they both have two pipes: + work.each_with_index do |cmd, i| + pid = pspawn(env, cmd, rdr.merge(0 => pipes[i][0], 1 => pipes[i+1][1])) + pids[pid] = i + 1 + end + + # all pipes handed off to children, close so they see EOF + pipes.flatten!.each(&:close).clear + + # wait for children to finish + fails = [] + until pids.empty? + pid, status = Process.waitpid2(-1) + nr = pids.delete(pid) + status.success? or + fails << "reaped #{nr} #{pipeline[nr].inspect} #{status.inspect}" + end + # behave like "set -o pipefail" in bash + raise fails.join("\n") if fails[0] + end +end diff --git a/test/test_pipeline.rb b/test/test_pipeline.rb new file mode 100644 index 0000000..3cc32cc --- /dev/null +++ b/test/test_pipeline.rb @@ -0,0 +1,47 @@ +# Copyright (C) 2017 all contributors +# License: GPL-3.0+ +# frozen_string_literal: true +require './test/helper' +require 'dtas/pipeline' + +class TestPipeline < Testcase + include DTAS::Pipeline + def setup + @env = ENV.to_hash + end + + def pipeline_result + IO.pipe do |rd, wr| + begin + pid = fork do + rd.close + $stdout.reopen(wr) + yield + exit!(0) + end + wr.close + return rd.read + ensure + _, status = Process.waitpid2(pid) + assert_predicate status, :success? + end + end + nil + end + + def test_pipeline + assert_equal("BYYRU\n", pipeline_result do + run_pipeline(@env, [ + %w(echo hello), # anything which generates something to stdout + %w(tr [a-z] [A-Z]), # upcase + # this lambda runs inside its own process + lambda do + $stdin.each_line { |l| $stdout.write("#{l.chomp.reverse}\n") } + exit!(0) + end, + # rot13 + %w(tr [a-m][n-z][A-M][N-Z] [n-z][a-m][N-Z][A-M]) + ]) + end) + end +end -- EW