about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/DS.pm43
-rw-r--r--lib/PublicInbox/LeiToMail.pm4
-rw-r--r--lib/PublicInbox/ProcessPipe.pm42
-rw-r--r--lib/PublicInbox/Qspawn.pm60
-rw-r--r--lib/PublicInbox/Spawn.pm6
-rw-r--r--t/spawn.t12
6 files changed, 104 insertions, 63 deletions
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index e4629e97..9563a1cb 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -32,11 +32,12 @@ use PublicInbox::Syscall qw(:epoll);
 use PublicInbox::Tmpfile;
 use Errno qw(EAGAIN EINVAL);
 use Carp qw(carp croak);
-our @EXPORT_OK = qw(now msg_more dwaitpid add_timer add_uniq_timer);
+our @EXPORT_OK = qw(now msg_more dwaitpid awaitpid add_timer add_uniq_timer);
 
 my %Stack;
 my $nextq; # queue for next_tick
 my $wait_pids; # list of [ pid, callback, callback_arg ]
+my $AWAIT_PIDS; # pid => [ $callback, @args ]
 my $reap_armed;
 my $ToClose; # sockets to close when event loop is done
 our (
@@ -74,11 +75,11 @@ sub Reset {
                 # we may be iterating inside one of these on our stack
                 my @q = delete @Stack{keys %Stack};
                 for my $q (@q) { @$q = () }
-                $wait_pids = $nextq = $ToClose = undef;
+                $AWAIT_PIDS = $wait_pids = $nextq = $ToClose = undef;
                 $ep_io = undef; # closes real $Epoll FD
                 $Epoll = undef; # may call DSKQXS::DESTROY
         } while (@Timers || keys(%Stack) || $nextq || $wait_pids ||
-                $ToClose || keys(%DescriptorMap) ||
+                $ToClose || keys(%DescriptorMap) || $AWAIT_PIDS ||
                 $PostLoopCallback || keys(%UniqTimer));
 
         $reap_armed = undef;
@@ -201,6 +202,13 @@ sub block_signals () {
         $oldset;
 }
 
+sub await_cb ($;@) {
+        my ($pid, @cb_args) = @_;
+        my $cb = shift @cb_args or return;
+        eval { $cb->($pid, @cb_args) };
+        warn "E: awaitpid($pid): $@" if $@;
+}
+
 # We can't use waitpid(-1) safely here since it can hit ``, system(),
 # and other things.  So we scan the $wait_pids list, which is hopefully
 # not too big.  We keep $wait_pids small by not calling dwaitpid()
@@ -208,10 +216,12 @@ sub block_signals () {
 
 sub reap_pids {
         $reap_armed = undef;
-        my $tmp = $wait_pids or return;
+        my $tmp = $wait_pids // [];
         $wait_pids = undef;
         $Stack{reap_runq} = $tmp;
         my $oldset = block_signals();
+
+        # old API
         foreach my $ary (@$tmp) {
                 my ($pid, $cb, $arg) = @$ary;
                 my $ret = waitpid($pid, WNOHANG);
@@ -226,6 +236,14 @@ sub reap_pids {
                         warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?";
                 }
         }
+
+        # new API TODO: convert to waitpid(-1) in the future as long
+        # as we don't use threads
+        for my $pid (keys %$AWAIT_PIDS) {
+                my $wpid = waitpid($pid, WNOHANG) // next;
+                my $cb_args = delete $AWAIT_PIDS->{$wpid} or next;
+                await_cb($pid, @$cb_args);
+        }
         sig_setmask($oldset);
         delete $Stack{reap_runq};
 }
@@ -720,6 +738,23 @@ sub dwaitpid ($;$$) {
         }
 }
 
+sub awaitpid {
+        my ($pid, @cb_args) = @_;
+        $AWAIT_PIDS->{$pid} //= @cb_args ? \@cb_args : 0;
+        # provide synchronous API
+        if (defined(wantarray) || (!$in_loop && !@cb_args)) {
+                my $ret = waitpid($pid, 0) // -2;
+                if ($ret == $pid) {
+                        my $cb_args = delete $AWAIT_PIDS->{$pid};
+                        @cb_args = @$cb_args if !@cb_args && $cb_args;
+                        await_cb($pid, @cb_args);
+                        return $ret;
+                }
+        }
+        # We could've just missed our SIGCHLD, cover it, here:
+        enqueue_reap() if $in_loop;
+}
+
 1;
 
 =head1 AUTHORS (Danga::Socket)
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index b58e2652..1528165a 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -150,8 +150,8 @@ sub git_to_mail { # git->cat_async callback
         $self->{lei}->fail("$@ (oid=$oid)") if $@;
 }
 
-sub reap_compress { # dwaitpid callback
-        my ($lei, $pid) = @_;
+sub reap_compress { # awaitpid callback
+        my ($pid, $lei) = @_;
         my $cmd = delete $lei->{"pid.$pid"};
         return if $? == 0;
         $lei->fail("@$cmd failed", $? >> 8);
diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm
index 97e9c268..068631c6 100644
--- a/lib/PublicInbox/ProcessPipe.pm
+++ b/lib/PublicInbox/ProcessPipe.pm
@@ -1,16 +1,25 @@
-# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # a tied handle for auto reaping of children tied to a pipe, see perltie(1)
 package PublicInbox::ProcessPipe;
-use strict;
-use v5.10.1;
+use v5.12;
 use Carp qw(carp);
+use PublicInbox::DS qw(awaitpid);
+
+sub waitcb { # awaitpid callback
+        my ($pid, $err_ref, $cb, @args) = @_;
+        $$err_ref = $?; # sets >{pp_chld_err} for _close
+        $cb->($pid, @args) if $cb;
+}
 
 sub TIEHANDLE {
-        my ($class, $pid, $fh, $cb, $arg) = @_;
-        bless { pid => $pid, fh => $fh, ppid => $$, cb => $cb, arg => $arg },
-                $class;
+        my ($cls, $pid, $fh, @cb_arg) = @_;
+        my $self = bless { pid => $pid, fh => $fh, ppid => $$ }, $cls;
+        # we share $err (and not $self) with awaitpid to avoid a ref cycle
+        $self->{pp_chld_err} = \(my $err);
+        awaitpid($pid, \&waitcb, \$err, @cb_arg);
+        $self;
 }
 
 sub BINMODE { binmode(shift->{fh}) } # for IO::Uncompress::Gunzip
@@ -33,24 +42,15 @@ sub FILENO { fileno($_[0]->{fh}) }
 
 sub _close ($;$) {
         my ($self, $wait) = @_;
-        my $fh = delete $self->{fh};
+        my ($fh, $pid) = delete(@$self{qw(fh pid)});
         my $ret = defined($fh) ? close($fh) : '';
-        my ($pid, $cb, $arg) = delete @$self{qw(pid cb arg)};
         return $ret unless defined($pid) && $self->{ppid} == $$;
         if ($wait) { # caller cares about the exit status:
-                my $wp = waitpid($pid, 0);
-                if ($wp == $pid) {
-                        $ret = '' if $?;
-                        if ($cb) {
-                                eval { $cb->($arg, $pid) };
-                                carp "E: cb(arg, $pid): $@" if $@;
-                        }
-                } else {
-                        carp "waitpid($pid, 0) = $wp, \$!=$!, \$?=$?";
-                }
-        } else { # caller just undef-ed it, let event loop deal with it
-                require PublicInbox::DS;
-                PublicInbox::DS::dwaitpid($pid, $cb, $arg);
+                # synchronous wait via defined(wantarray) on awaitpid:
+                defined(${$self->{pp_chld_err}}) or $wait = awaitpid($pid);
+                ($? = ${$self->{pp_chld_err}}) and $ret = '';
+        } else {
+                awaitpid($pid); # depends on $in_loop or not
         }
         $ret;
 }
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 779b703a..02357dbf 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -28,6 +28,7 @@ package PublicInbox::Qspawn;
 use v5.12;
 use PublicInbox::Spawn qw(popen_rd);
 use PublicInbox::GzipFilter;
+use PublicInbox::DS qw(awaitpid);
 use Scalar::Util qw(blessed);
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
@@ -57,35 +58,21 @@ sub _do_spawn {
                 }
         }
         $self->{cmd} = $o{quiet} ? undef : $cmd;
+        $o{cb_arg} = [ \&waitpid_err, $self ];
         eval {
                 # popen_rd may die on EMFILE, ENFILE
-                $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o);
-
-                die "E: $!" unless defined($self->{rpipe});
-
+                $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o) // die "E: $!";
                 $limiter->{running}++;
                 $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
         };
         finish($self, $@) if $@;
 }
 
-sub child_err ($) {
-        my ($child_error) = @_; # typically $?
-        my $exitstatus = ($child_error >> 8) or return;
-        my $sig = $child_error & 127;
-        my $msg = "exit status=$exitstatus";
-        $msg .= " signal=$sig" if $sig;
-        $msg;
-}
-
-sub finalize ($$) {
-        my ($self, $err) = @_;
-
-        my ($env, $qx_cb, $qx_arg, $qx_buf) =
-                delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
+sub finalize ($) {
+        my ($self) = @_;
 
-        # done, spawn whatever's in the queue
-        my $limiter = $self->{limiter};
+        # process is done, spawn whatever's in the queue
+        my $limiter = delete $self->{limiter} or return;
         my $running = --$limiter->{running};
 
         if ($running < $limiter->{max}) {
@@ -93,14 +80,16 @@ sub finalize ($$) {
                         _do_spawn(@$next, $limiter);
                 }
         }
-
-        if ($err) {
+        if (my $err = $self->{_err}) { # set by finish or waitpid_err
                 utf8::decode($err);
                 if (my $dst = $self->{qsp_err}) {
                         $$dst .= $$dst ? " $err" : "; $err";
                 }
                 warn "@{$self->{cmd}}: $err" if $self->{cmd};
         }
+
+        my ($env, $qx_cb, $qx_arg, $qx_buf) =
+                delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
         if ($qx_cb) {
                 eval { $qx_cb->($qx_buf, $qx_arg) };
                 return unless $@;
@@ -115,14 +104,28 @@ sub finalize ($$) {
         }
 }
 
-# callback for dwaitpid or ProcessPipe
-sub waitpid_err { finalize($_[0], child_err($?)) }
+sub waitpid_err { # callback for awaitpid
+        my (undef, $self) = @_; # $_[0]: pid
+        $self->{_err} = ''; # for defined check in ->finish
+        if ($?) {
+                my $status = $? >> 8;
+                my $sig = $? & 127;
+                $self->{_err} .= "exit status=$status";
+                $self->{_err} .= " signal=$sig" if $sig;
+        }
+        finalize($self) if !$self->{rpipe};
+}
 
 sub finish ($;$) {
         my ($self, $err) = @_;
-        my $tied_pp = delete($self->{rpipe}) or return finalize($self, $err);
-        my PublicInbox::ProcessPipe $pp = tied *$tied_pp;
-        @$pp{qw(cb arg)} = (\&waitpid_err, $self); # for ->DESTROY
+        $self->{_err} //= $err; # only for $@
+
+        # we can safely finalize if pipe was closed before, or if
+        # {_err} is defined by waitpid_err.  Deleting {rpipe} will
+        # trigger PublicInbox::ProcessPipe::DESTROY -> waitpid_err,
+        # but it may not fire right away if inside the event loop.
+        my $closed_before = !delete($self->{rpipe});
+        finalize($self) if $closed_before || defined($self->{_err});
 }
 
 sub start ($$$) {
@@ -247,10 +250,9 @@ sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
         if (ref($r) ne 'ARRAY' || scalar(@$r) == 3) { # error
                 if ($async) { # calls rpipe->close && ->event_step
                         $async->close; # PublicInbox::HTTPD::Async::close
-                } else { # generic PSGI:
+                } else { # generic PSGI, use PublicInbox::ProcessPipe::CLOSE
                         delete($self->{rpipe})->close;
                         event_step($self);
-                        waitpid_err($self);
                 }
                 if (ref($r) eq 'ARRAY') { # error
                         $wcb->($r)
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 7f61d8db..826ee508 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -365,9 +365,9 @@ sub popen_rd {
         $opt->{1} = fileno($w);
         my $pid = spawn($cmd, $env, $opt);
         return ($r, $pid) if wantarray;
-        my $ret = gensym;
-        tie *$ret, 'PublicInbox::ProcessPipe', $pid, $r, @$opt{qw(cb arg)};
-        $ret;
+        my $s = gensym;
+        tie *$s, 'PublicInbox::ProcessPipe', $pid, $r, @{$opt->{cb_arg} // []};
+        $s;
 }
 
 sub run_die ($;$$) {
diff --git a/t/spawn.t b/t/spawn.t
index 5fc99a2a..c22cfcfc 100644
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -140,13 +140,13 @@ EOF
 
 { # ->CLOSE vs ->DESTROY waitpid caller distinction
         my @c;
-        my $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } });
+        my $fh = popen_rd(['true'], undef, { cb_arg => [sub { @c = caller }] });
         ok(close($fh), '->CLOSE fired and successful');
         ok(scalar(@c), 'callback fired by ->CLOSE');
         ok(grep(!m[/PublicInbox/DS\.pm\z], @c), 'callback not invoked by DS');
 
         @c = ();
-        $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } });
+        $fh = popen_rd(['true'], undef, { cb_arg => [sub { @c = caller }] });
         undef $fh; # ->DESTROY
         ok(scalar(@c), 'callback fired by ->DESTROY');
         ok(grep(!m[/PublicInbox/ProcessPipe\.pm\z], @c),
@@ -156,8 +156,9 @@ EOF
 { # children don't wait on siblings
         use POSIX qw(_exit);
         pipe(my ($r, $w)) or BAIL_OUT $!;
-        my $cb = sub { warn "x=$$\n" };
-        my $fh = popen_rd(['cat'], undef, { 0 => $r, cb => $cb });
+        my @arg;
+        my $cb = [ sub { @arg = @_; warn "x=$$\n" }, 'hi' ];
+        my $fh = popen_rd(['cat'], undef, { 0 => $r, cb_arg => $cb });
         my $pp = tied *$fh;
         my $pid = fork // BAIL_OUT $!;
         local $SIG{__WARN__} = sub { _exit(1) };
@@ -173,6 +174,9 @@ EOF
         close $w;
         close $fh;
         is($?, 0, 'cat exited');
+        is(scalar(@arg), 2, 'callback got args');
+        is($arg[1], 'hi', 'passed arg');
+        like($arg[0], qr/\A\d+\z/, 'PID');
         is_deeply(\@w, [ "x=$$\n" ], 'callback fired from owner');
 }