about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-01-17 07:19:03 +0000
committerEric Wong <e@80x24.org>2023-01-18 23:25:48 +0000
commit6e9397d12635eae55c9114ed9689413154fed8ce (patch)
tree4ffa8148f76b2fd4c71651df0f2e682f16dfe8d0
parenta1ee3e0d84fedc4a2dd4e16e7054ee6fdfbe111a (diff)
downloadpublic-inbox-6e9397d12635eae55c9114ed9689413154fed8ce.tar.gz
awaitpid is the new API which will eventually replace dwaitpid.
It enables early registration of callback handlers.  Eventually
(once dwaitpid is gone) it'll be able to use fewer waitpid
calls.

The avoidance of waitpid(-1) in our earlier days was driven by
the belief that threads may eventually become relevant for Perl 5,
but that's extremely unlikely at this stage.  I will still
introduce optional threads via C, but they definitely won't be
spawning/reaping processes.

Argument order to callbacks is swapped (PID first) to allow
flattened multiple arguments more natrually.  The previous API
(allowing only a single argument, as influenced by
pthread_create(3)) was more tedious as it involved packing
multiple arguments into yet another array.
-rw-r--r--lib/PublicInbox/DS.pm43
-rw-r--r--lib/PublicInbox/LeiToMail.pm4
-rw-r--r--lib/PublicInbox/ProcessPipe.pm42
-rw-r--r--lib/PublicInbox/Qspawn.pm60
-rw-r--r--lib/PublicInbox/Spawn.pm6
-rw-r--r--t/spawn.t12
6 files changed, 104 insertions, 63 deletions
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index e4629e97..9563a1cb 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -32,11 +32,12 @@ use PublicInbox::Syscall qw(:epoll);
 use PublicInbox::Tmpfile;
 use Errno qw(EAGAIN EINVAL);
 use Carp qw(carp croak);
-our @EXPORT_OK = qw(now msg_more dwaitpid add_timer add_uniq_timer);
+our @EXPORT_OK = qw(now msg_more dwaitpid awaitpid add_timer add_uniq_timer);
 
 my %Stack;
 my $nextq; # queue for next_tick
 my $wait_pids; # list of [ pid, callback, callback_arg ]
+my $AWAIT_PIDS; # pid => [ $callback, @args ]
 my $reap_armed;
 my $ToClose; # sockets to close when event loop is done
 our (
@@ -74,11 +75,11 @@ sub Reset {
                 # we may be iterating inside one of these on our stack
                 my @q = delete @Stack{keys %Stack};
                 for my $q (@q) { @$q = () }
-                $wait_pids = $nextq = $ToClose = undef;
+                $AWAIT_PIDS = $wait_pids = $nextq = $ToClose = undef;
                 $ep_io = undef; # closes real $Epoll FD
                 $Epoll = undef; # may call DSKQXS::DESTROY
         } while (@Timers || keys(%Stack) || $nextq || $wait_pids ||
-                $ToClose || keys(%DescriptorMap) ||
+                $ToClose || keys(%DescriptorMap) || $AWAIT_PIDS ||
                 $PostLoopCallback || keys(%UniqTimer));
 
         $reap_armed = undef;
@@ -201,6 +202,13 @@ sub block_signals () {
         $oldset;
 }
 
+sub await_cb ($;@) {
+        my ($pid, @cb_args) = @_;
+        my $cb = shift @cb_args or return;
+        eval { $cb->($pid, @cb_args) };
+        warn "E: awaitpid($pid): $@" if $@;
+}
+
 # We can't use waitpid(-1) safely here since it can hit ``, system(),
 # and other things.  So we scan the $wait_pids list, which is hopefully
 # not too big.  We keep $wait_pids small by not calling dwaitpid()
@@ -208,10 +216,12 @@ sub block_signals () {
 
 sub reap_pids {
         $reap_armed = undef;
-        my $tmp = $wait_pids or return;
+        my $tmp = $wait_pids // [];
         $wait_pids = undef;
         $Stack{reap_runq} = $tmp;
         my $oldset = block_signals();
+
+        # old API
         foreach my $ary (@$tmp) {
                 my ($pid, $cb, $arg) = @$ary;
                 my $ret = waitpid($pid, WNOHANG);
@@ -226,6 +236,14 @@ sub reap_pids {
                         warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?";
                 }
         }
+
+        # new API TODO: convert to waitpid(-1) in the future as long
+        # as we don't use threads
+        for my $pid (keys %$AWAIT_PIDS) {
+                my $wpid = waitpid($pid, WNOHANG) // next;
+                my $cb_args = delete $AWAIT_PIDS->{$wpid} or next;
+                await_cb($pid, @$cb_args);
+        }
         sig_setmask($oldset);
         delete $Stack{reap_runq};
 }
@@ -720,6 +738,23 @@ sub dwaitpid ($;$$) {
         }
 }
 
+sub awaitpid {
+        my ($pid, @cb_args) = @_;
+        $AWAIT_PIDS->{$pid} //= @cb_args ? \@cb_args : 0;
+        # provide synchronous API
+        if (defined(wantarray) || (!$in_loop && !@cb_args)) {
+                my $ret = waitpid($pid, 0) // -2;
+                if ($ret == $pid) {
+                        my $cb_args = delete $AWAIT_PIDS->{$pid};
+                        @cb_args = @$cb_args if !@cb_args && $cb_args;
+                        await_cb($pid, @cb_args);
+                        return $ret;
+                }
+        }
+        # We could've just missed our SIGCHLD, cover it, here:
+        enqueue_reap() if $in_loop;
+}
+
 1;
 
 =head1 AUTHORS (Danga::Socket)
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index b58e2652..1528165a 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -150,8 +150,8 @@ sub git_to_mail { # git->cat_async callback
         $self->{lei}->fail("$@ (oid=$oid)") if $@;
 }
 
-sub reap_compress { # dwaitpid callback
-        my ($lei, $pid) = @_;
+sub reap_compress { # awaitpid callback
+        my ($pid, $lei) = @_;
         my $cmd = delete $lei->{"pid.$pid"};
         return if $? == 0;
         $lei->fail("@$cmd failed", $? >> 8);
diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm
index 97e9c268..068631c6 100644
--- a/lib/PublicInbox/ProcessPipe.pm
+++ b/lib/PublicInbox/ProcessPipe.pm
@@ -1,16 +1,25 @@
-# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # a tied handle for auto reaping of children tied to a pipe, see perltie(1)
 package PublicInbox::ProcessPipe;
-use strict;
-use v5.10.1;
+use v5.12;
 use Carp qw(carp);
+use PublicInbox::DS qw(awaitpid);
+
+sub waitcb { # awaitpid callback
+        my ($pid, $err_ref, $cb, @args) = @_;
+        $$err_ref = $?; # sets >{pp_chld_err} for _close
+        $cb->($pid, @args) if $cb;
+}
 
 sub TIEHANDLE {
-        my ($class, $pid, $fh, $cb, $arg) = @_;
-        bless { pid => $pid, fh => $fh, ppid => $$, cb => $cb, arg => $arg },
-                $class;
+        my ($cls, $pid, $fh, @cb_arg) = @_;
+        my $self = bless { pid => $pid, fh => $fh, ppid => $$ }, $cls;
+        # we share $err (and not $self) with awaitpid to avoid a ref cycle
+        $self->{pp_chld_err} = \(my $err);
+        awaitpid($pid, \&waitcb, \$err, @cb_arg);
+        $self;
 }
 
 sub BINMODE { binmode(shift->{fh}) } # for IO::Uncompress::Gunzip
@@ -33,24 +42,15 @@ sub FILENO { fileno($_[0]->{fh}) }
 
 sub _close ($;$) {
         my ($self, $wait) = @_;
-        my $fh = delete $self->{fh};
+        my ($fh, $pid) = delete(@$self{qw(fh pid)});
         my $ret = defined($fh) ? close($fh) : '';
-        my ($pid, $cb, $arg) = delete @$self{qw(pid cb arg)};
         return $ret unless defined($pid) && $self->{ppid} == $$;
         if ($wait) { # caller cares about the exit status:
-                my $wp = waitpid($pid, 0);
-                if ($wp == $pid) {
-                        $ret = '' if $?;
-                        if ($cb) {
-                                eval { $cb->($arg, $pid) };
-                                carp "E: cb(arg, $pid): $@" if $@;
-                        }
-                } else {
-                        carp "waitpid($pid, 0) = $wp, \$!=$!, \$?=$?";
-                }
-        } else { # caller just undef-ed it, let event loop deal with it
-                require PublicInbox::DS;
-                PublicInbox::DS::dwaitpid($pid, $cb, $arg);
+                # synchronous wait via defined(wantarray) on awaitpid:
+                defined(${$self->{pp_chld_err}}) or $wait = awaitpid($pid);
+                ($? = ${$self->{pp_chld_err}}) and $ret = '';
+        } else {
+                awaitpid($pid); # depends on $in_loop or not
         }
         $ret;
 }
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 779b703a..02357dbf 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -28,6 +28,7 @@ package PublicInbox::Qspawn;
 use v5.12;
 use PublicInbox::Spawn qw(popen_rd);
 use PublicInbox::GzipFilter;
+use PublicInbox::DS qw(awaitpid);
 use Scalar::Util qw(blessed);
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
@@ -57,35 +58,21 @@ sub _do_spawn {
                 }
         }
         $self->{cmd} = $o{quiet} ? undef : $cmd;
+        $o{cb_arg} = [ \&waitpid_err, $self ];
         eval {
                 # popen_rd may die on EMFILE, ENFILE
-                $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o);
-
-                die "E: $!" unless defined($self->{rpipe});
-
+                $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o) // die "E: $!";
                 $limiter->{running}++;
                 $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
         };
         finish($self, $@) if $@;
 }
 
-sub child_err ($) {
-        my ($child_error) = @_; # typically $?
-        my $exitstatus = ($child_error >> 8) or return;
-        my $sig = $child_error & 127;
-        my $msg = "exit status=$exitstatus";
-        $msg .= " signal=$sig" if $sig;
-        $msg;
-}
-
-sub finalize ($$) {
-        my ($self, $err) = @_;
-
-        my ($env, $qx_cb, $qx_arg, $qx_buf) =
-                delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
+sub finalize ($) {
+        my ($self) = @_;
 
-        # done, spawn whatever's in the queue
-        my $limiter = $self->{limiter};
+        # process is done, spawn whatever's in the queue
+        my $limiter = delete $self->{limiter} or return;
         my $running = --$limiter->{running};
 
         if ($running < $limiter->{max}) {
@@ -93,14 +80,16 @@ sub finalize ($$) {
                         _do_spawn(@$next, $limiter);
                 }
         }
-
-        if ($err) {
+        if (my $err = $self->{_err}) { # set by finish or waitpid_err
                 utf8::decode($err);
                 if (my $dst = $self->{qsp_err}) {
                         $$dst .= $$dst ? " $err" : "; $err";
                 }
                 warn "@{$self->{cmd}}: $err" if $self->{cmd};
         }
+
+        my ($env, $qx_cb, $qx_arg, $qx_buf) =
+                delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
         if ($qx_cb) {
                 eval { $qx_cb->($qx_buf, $qx_arg) };
                 return unless $@;
@@ -115,14 +104,28 @@ sub finalize ($$) {
         }
 }
 
-# callback for dwaitpid or ProcessPipe
-sub waitpid_err { finalize($_[0], child_err($?)) }
+sub waitpid_err { # callback for awaitpid
+        my (undef, $self) = @_; # $_[0]: pid
+        $self->{_err} = ''; # for defined check in ->finish
+        if ($?) {
+                my $status = $? >> 8;
+                my $sig = $? & 127;
+                $self->{_err} .= "exit status=$status";
+                $self->{_err} .= " signal=$sig" if $sig;
+        }
+        finalize($self) if !$self->{rpipe};
+}
 
 sub finish ($;$) {
         my ($self, $err) = @_;
-        my $tied_pp = delete($self->{rpipe}) or return finalize($self, $err);
-        my PublicInbox::ProcessPipe $pp = tied *$tied_pp;
-        @$pp{qw(cb arg)} = (\&waitpid_err, $self); # for ->DESTROY
+        $self->{_err} //= $err; # only for $@
+
+        # we can safely finalize if pipe was closed before, or if
+        # {_err} is defined by waitpid_err.  Deleting {rpipe} will
+        # trigger PublicInbox::ProcessPipe::DESTROY -> waitpid_err,
+        # but it may not fire right away if inside the event loop.
+        my $closed_before = !delete($self->{rpipe});
+        finalize($self) if $closed_before || defined($self->{_err});
 }
 
 sub start ($$$) {
@@ -247,10 +250,9 @@ sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
         if (ref($r) ne 'ARRAY' || scalar(@$r) == 3) { # error
                 if ($async) { # calls rpipe->close && ->event_step
                         $async->close; # PublicInbox::HTTPD::Async::close
-                } else { # generic PSGI:
+                } else { # generic PSGI, use PublicInbox::ProcessPipe::CLOSE
                         delete($self->{rpipe})->close;
                         event_step($self);
-                        waitpid_err($self);
                 }
                 if (ref($r) eq 'ARRAY') { # error
                         $wcb->($r)
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 7f61d8db..826ee508 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -365,9 +365,9 @@ sub popen_rd {
         $opt->{1} = fileno($w);
         my $pid = spawn($cmd, $env, $opt);
         return ($r, $pid) if wantarray;
-        my $ret = gensym;
-        tie *$ret, 'PublicInbox::ProcessPipe', $pid, $r, @$opt{qw(cb arg)};
-        $ret;
+        my $s = gensym;
+        tie *$s, 'PublicInbox::ProcessPipe', $pid, $r, @{$opt->{cb_arg} // []};
+        $s;
 }
 
 sub run_die ($;$$) {
diff --git a/t/spawn.t b/t/spawn.t
index 5fc99a2a..c22cfcfc 100644
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -140,13 +140,13 @@ EOF
 
 { # ->CLOSE vs ->DESTROY waitpid caller distinction
         my @c;
-        my $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } });
+        my $fh = popen_rd(['true'], undef, { cb_arg => [sub { @c = caller }] });
         ok(close($fh), '->CLOSE fired and successful');
         ok(scalar(@c), 'callback fired by ->CLOSE');
         ok(grep(!m[/PublicInbox/DS\.pm\z], @c), 'callback not invoked by DS');
 
         @c = ();
-        $fh = popen_rd(['true'], undef, { cb => sub { @c = caller } });
+        $fh = popen_rd(['true'], undef, { cb_arg => [sub { @c = caller }] });
         undef $fh; # ->DESTROY
         ok(scalar(@c), 'callback fired by ->DESTROY');
         ok(grep(!m[/PublicInbox/ProcessPipe\.pm\z], @c),
@@ -156,8 +156,9 @@ EOF
 { # children don't wait on siblings
         use POSIX qw(_exit);
         pipe(my ($r, $w)) or BAIL_OUT $!;
-        my $cb = sub { warn "x=$$\n" };
-        my $fh = popen_rd(['cat'], undef, { 0 => $r, cb => $cb });
+        my @arg;
+        my $cb = [ sub { @arg = @_; warn "x=$$\n" }, 'hi' ];
+        my $fh = popen_rd(['cat'], undef, { 0 => $r, cb_arg => $cb });
         my $pp = tied *$fh;
         my $pid = fork // BAIL_OUT $!;
         local $SIG{__WARN__} = sub { _exit(1) };
@@ -173,6 +174,9 @@ EOF
         close $w;
         close $fh;
         is($?, 0, 'cat exited');
+        is(scalar(@arg), 2, 'callback got args');
+        is($arg[1], 'hi', 'passed arg');
+        like($arg[0], qr/\A\d+\z/, 'PID');
         is_deeply(\@w, [ "x=$$\n" ], 'callback fired from owner');
 }