about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--MANIFEST1
-rw-r--r--lib/PublicInbox/Aspawn.pm34
-rw-r--r--lib/PublicInbox/Qspawn.pm95
3 files changed, 68 insertions, 62 deletions
diff --git a/MANIFEST b/MANIFEST
index dcce801c..f087621c 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -161,6 +161,7 @@ lib/PublicInbox/AddressPP.pm
 lib/PublicInbox/Admin.pm
 lib/PublicInbox/AdminEdit.pm
 lib/PublicInbox/AltId.pm
+lib/PublicInbox/Aspawn.pm
 lib/PublicInbox/AutoReap.pm
 lib/PublicInbox/Cgit.pm
 lib/PublicInbox/CidxComm.pm
diff --git a/lib/PublicInbox/Aspawn.pm b/lib/PublicInbox/Aspawn.pm
new file mode 100644
index 00000000..49f8651a
--- /dev/null
+++ b/lib/PublicInbox/Aspawn.pm
@@ -0,0 +1,34 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# async system()/qx() which takes callback
+package PublicInbox::Aspawn;
+use v5.12;
+use parent qw(Exporter);
+use PublicInbox::DS qw(awaitpid);
+use PublicInbox::Spawn qw(spawn);
+our @EXPORT_OK = qw(run_await);
+
+sub _await_cb { # awaitpid cb
+        my ($pid, $cmd, $env, $opt, $cb, @args) = @_;
+        PublicInbox::Spawn::read_out_err($opt);
+        if ($? && !$opt->{quiet}) {
+                my ($status, $sig) = ($? >> 8, $? & 127);
+                my $msg = '';
+                $msg .= " (-C=$opt->{-C})" if defined $opt->{-C};
+                $msg .= " status=$status" if $status;
+                $msg .= " signal=$sig" if $sig;
+                warn "E: @$cmd", $msg, "\n";
+        }
+        $cb->($pid, $cmd, $env, $opt, @args) if $cb;
+}
+
+sub run_await {
+        my ($cmd, $env, $opt, $cb, @args) = @_;
+        $opt->{1} //= \(my $out);
+        my $pid = spawn($cmd, $env, $opt);
+        awaitpid($pid, \&_await_cb, $cmd, $env, $opt, $cb, @args);
+        awaitpid($pid); # synchronous for non-$in_loop
+}
+
+1;
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index a4d78e49..59d5ed40 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -30,6 +30,7 @@ use PublicInbox::Spawn qw(popen_rd);
 use PublicInbox::GzipFilter;
 use Scalar::Util qw(blessed);
 use PublicInbox::Limiter;
+use PublicInbox::Aspawn qw(run_await);
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
 use Errno qw(EAGAIN EINTR);
@@ -48,29 +49,30 @@ sub new {
 
 sub _do_spawn {
         my ($self, $start_cb, $limiter) = @_;
-        my $err;
         my ($cmd, $cmd_env, $opt) = @{delete $self->{args}};
         my %o = %{$opt || {}};
         $self->{limiter} = $limiter;
-        foreach my $k (@PublicInbox::Spawn::RLIMITS) {
-                if (defined(my $rlimit = $limiter->{$k})) {
-                        $o{$k} = $rlimit;
-                }
+        for my $k (@PublicInbox::Spawn::RLIMITS) {
+                $o{$k} = $limiter->{$k} // next;
         }
         $self->{cmd} = $cmd;
         $self->{-quiet} = 1 if $o{quiet};
-        eval {
-                # popen_rd may die on EMFILE, ENFILE
-                $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
-                                        \&waitpid_err, $self);
-                $limiter->{running}++;
-                $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
-        };
+        $limiter->{running}++;
+        if ($start_cb) {
+                eval { # popen_rd may die on EMFILE, ENFILE
+                        $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
+                                                \&waitpid_err, $self);
+                        $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
+                };
+        } else {
+                eval { run_await($cmd, $cmd_env, \%o, \&wait_await, $self) };
+                warn "E: $@" if $@;
+        }
         finish($self, $@) if $@;
 }
 
-sub finalize ($) {
-        my ($self) = @_;
+sub finalize ($;$) {
+        my ($self, $opt) = @_;
 
         # process is done, spawn whatever's in the queue
         my $limiter = delete $self->{limiter} or return;
@@ -89,10 +91,10 @@ sub finalize ($) {
                 warn "@{$self->{cmd}}: $err\n" if !$self->{-quiet};
         }
 
-        my ($env, $qx_cb, $qx_arg, $qx_buf) =
-                delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
-        if ($qx_cb) {
-                eval { $qx_cb->($qx_buf, $qx_arg) };
+        my ($env, $qx_cb_arg) = delete @$self{qw(psgi_env qx_cb_arg)};
+        if ($qx_cb_arg) {
+                my $cb = shift @$qx_cb_arg;
+                eval { $cb->($opt->{1}, @$qx_cb_arg) };
                 return unless $@;
                 warn "E: $@"; # hope qspawn.wcb can handle it
         }
@@ -108,15 +110,20 @@ sub finalize ($) {
 sub DESTROY { finalize($_[0]) } # ->finalize is idempotent
 
 sub waitpid_err { # callback for awaitpid
-        my (undef, $self) = @_; # $_[0]: pid
+        my (undef, $self, $opt) = @_; # $_[0]: pid
         $self->{_err} = ''; # for defined check in ->finish
-        if ($?) {
+        if ($?) { # FIXME: redundant
                 my $status = $? >> 8;
                 my $sig = $? & 127;
                 $self->{_err} .= "exit status=$status";
                 $self->{_err} .= " signal=$sig" if $sig;
         }
-        finalize($self) if !$self->{rpipe};
+        finalize($self, $opt) if !$self->{rpipe};
+}
+
+sub wait_await { # run_await cb
+        my ($pid, $cmd, $cmd_env, $opt, $self) = @_;
+        waitpid_err($pid, $self, $opt);
 }
 
 sub finish ($;$) {
@@ -140,52 +147,16 @@ sub start ($$$) {
         }
 }
 
-sub psgi_qx_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
-        my ($self) = @_;
-        my ($r, $buf);
-reread:
-        $r = sysread($self->{rpipe}, $buf, 65536);
-        if (!defined($r)) {
-                return if $! == EAGAIN; # try again when notified
-                goto reread if $! == EINTR;
-                event_step($self, $!);
-        } elsif (my $as = delete $self->{async}) { # PublicInbox::HTTPD::Async
-                $as->async_pass($self->{psgi_env}->{'psgix.io'},
-                                $self->{qx_fh}, \$buf);
-        } elsif ($r) { # generic PSGI:
-                print { $self->{qx_fh} } $buf;
-        } else { # EOF
-                event_step($self, undef);
-        }
-}
-
-sub psgi_qx_start {
-        my ($self) = @_;
-        if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) {
-                # PublicInbox::HTTPD::Async->new(rpipe, $cb, cb_arg, $end_obj)
-                $self->{async} = $async->($self->{rpipe},
-                                        \&psgi_qx_init_cb, $self, $self);
-                # init_cb will call ->async_pass or ->close
-        } else { # generic PSGI
-                psgi_qx_init_cb($self) while $self->{qx_fh};
-        }
-}
-
-# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with
+# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls @qx_cb_arg with
 # the stdout of the given command when done; but respects the given limiter
 # $env is the PSGI env.  As with ``/qx; only use this when output is small
 # and safe to slurp.
 sub psgi_qx {
-        my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_;
+        my ($self, $env, $limiter, @qx_cb_arg) = @_;
         $self->{psgi_env} = $env;
-        my $qx_buf = '';
-        open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar
-        $self->{qx_cb} = $qx_cb;
-        $self->{qx_arg} = $qx_arg;
-        $self->{qx_fh} = $qx_fh;
-        $self->{qx_buf} = \$qx_buf;
+        $self->{qx_cb_arg} = \@qx_cb_arg;
         $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
-        start($self, $limiter, \&psgi_qx_start);
+        start($self, $limiter, undef);
 }
 
 # this is called on pipe EOF to reap the process, may be called
@@ -195,7 +166,7 @@ sub event_step {
         my ($self, $err) = @_; # $err: $!
         warn "psgi_{return,qx} $err" if defined($err);
         finish($self);
-        my ($fh, $qx_fh) = delete(@$self{qw(qfh qx_fh)});
+        my $fh = delete $self->{qfh};
         $fh->close if $fh; # async-only (psgi_return)
 }