diff options
Diffstat (limited to 'lib/PublicInbox')
-rw-r--r-- | lib/PublicInbox/DS.pm | 43 | ||||
-rw-r--r-- | lib/PublicInbox/LeiToMail.pm | 4 | ||||
-rw-r--r-- | lib/PublicInbox/ProcessPipe.pm | 42 | ||||
-rw-r--r-- | lib/PublicInbox/Qspawn.pm | 60 | ||||
-rw-r--r-- | lib/PublicInbox/Spawn.pm | 6 |
5 files changed, 96 insertions, 59 deletions
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index e4629e97..9563a1cb 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -32,11 +32,12 @@ use PublicInbox::Syscall qw(:epoll); use PublicInbox::Tmpfile; use Errno qw(EAGAIN EINVAL); use Carp qw(carp croak); -our @EXPORT_OK = qw(now msg_more dwaitpid add_timer add_uniq_timer); +our @EXPORT_OK = qw(now msg_more dwaitpid awaitpid add_timer add_uniq_timer); my %Stack; my $nextq; # queue for next_tick my $wait_pids; # list of [ pid, callback, callback_arg ] +my $AWAIT_PIDS; # pid => [ $callback, @args ] my $reap_armed; my $ToClose; # sockets to close when event loop is done our ( @@ -74,11 +75,11 @@ sub Reset { # we may be iterating inside one of these on our stack my @q = delete @Stack{keys %Stack}; for my $q (@q) { @$q = () } - $wait_pids = $nextq = $ToClose = undef; + $AWAIT_PIDS = $wait_pids = $nextq = $ToClose = undef; $ep_io = undef; # closes real $Epoll FD $Epoll = undef; # may call DSKQXS::DESTROY } while (@Timers || keys(%Stack) || $nextq || $wait_pids || - $ToClose || keys(%DescriptorMap) || + $ToClose || keys(%DescriptorMap) || $AWAIT_PIDS || $PostLoopCallback || keys(%UniqTimer)); $reap_armed = undef; @@ -201,6 +202,13 @@ sub block_signals () { $oldset; } +sub await_cb ($;@) { + my ($pid, @cb_args) = @_; + my $cb = shift @cb_args or return; + eval { $cb->($pid, @cb_args) }; + warn "E: awaitpid($pid): $@" if $@; +} + # We can't use waitpid(-1) safely here since it can hit ``, system(), # and other things. So we scan the $wait_pids list, which is hopefully # not too big. We keep $wait_pids small by not calling dwaitpid() @@ -208,10 +216,12 @@ sub block_signals () { sub reap_pids { $reap_armed = undef; - my $tmp = $wait_pids or return; + my $tmp = $wait_pids // []; $wait_pids = undef; $Stack{reap_runq} = $tmp; my $oldset = block_signals(); + + # old API foreach my $ary (@$tmp) { my ($pid, $cb, $arg) = @$ary; my $ret = waitpid($pid, WNOHANG); @@ -226,6 +236,14 @@ sub reap_pids { warn "waitpid($pid, WNOHANG) = $ret, \$!=$!, \$?=$?"; } } + + # new API TODO: convert to waitpid(-1) in the future as long + # as we don't use threads + for my $pid (keys %$AWAIT_PIDS) { + my $wpid = waitpid($pid, WNOHANG) // next; + my $cb_args = delete $AWAIT_PIDS->{$wpid} or next; + await_cb($pid, @$cb_args); + } sig_setmask($oldset); delete $Stack{reap_runq}; } @@ -720,6 +738,23 @@ sub dwaitpid ($;$$) { } } +sub awaitpid { + my ($pid, @cb_args) = @_; + $AWAIT_PIDS->{$pid} //= @cb_args ? \@cb_args : 0; + # provide synchronous API + if (defined(wantarray) || (!$in_loop && !@cb_args)) { + my $ret = waitpid($pid, 0) // -2; + if ($ret == $pid) { + my $cb_args = delete $AWAIT_PIDS->{$pid}; + @cb_args = @$cb_args if !@cb_args && $cb_args; + await_cb($pid, @cb_args); + return $ret; + } + } + # We could've just missed our SIGCHLD, cover it, here: + enqueue_reap() if $in_loop; +} + 1; =head1 AUTHORS (Danga::Socket) diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index b58e2652..1528165a 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -150,8 +150,8 @@ sub git_to_mail { # git->cat_async callback $self->{lei}->fail("$@ (oid=$oid)") if $@; } -sub reap_compress { # dwaitpid callback - my ($lei, $pid) = @_; +sub reap_compress { # awaitpid callback + my ($pid, $lei) = @_; my $cmd = delete $lei->{"pid.$pid"}; return if $? == 0; $lei->fail("@$cmd failed", $? >> 8); diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm index 97e9c268..068631c6 100644 --- a/lib/PublicInbox/ProcessPipe.pm +++ b/lib/PublicInbox/ProcessPipe.pm @@ -1,16 +1,25 @@ -# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org> +# Copyright (C) all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> # a tied handle for auto reaping of children tied to a pipe, see perltie(1) package PublicInbox::ProcessPipe; -use strict; -use v5.10.1; +use v5.12; use Carp qw(carp); +use PublicInbox::DS qw(awaitpid); + +sub waitcb { # awaitpid callback + my ($pid, $err_ref, $cb, @args) = @_; + $$err_ref = $?; # sets >{pp_chld_err} for _close + $cb->($pid, @args) if $cb; +} sub TIEHANDLE { - my ($class, $pid, $fh, $cb, $arg) = @_; - bless { pid => $pid, fh => $fh, ppid => $$, cb => $cb, arg => $arg }, - $class; + my ($cls, $pid, $fh, @cb_arg) = @_; + my $self = bless { pid => $pid, fh => $fh, ppid => $$ }, $cls; + # we share $err (and not $self) with awaitpid to avoid a ref cycle + $self->{pp_chld_err} = \(my $err); + awaitpid($pid, \&waitcb, \$err, @cb_arg); + $self; } sub BINMODE { binmode(shift->{fh}) } # for IO::Uncompress::Gunzip @@ -33,24 +42,15 @@ sub FILENO { fileno($_[0]->{fh}) } sub _close ($;$) { my ($self, $wait) = @_; - my $fh = delete $self->{fh}; + my ($fh, $pid) = delete(@$self{qw(fh pid)}); my $ret = defined($fh) ? close($fh) : ''; - my ($pid, $cb, $arg) = delete @$self{qw(pid cb arg)}; return $ret unless defined($pid) && $self->{ppid} == $$; if ($wait) { # caller cares about the exit status: - my $wp = waitpid($pid, 0); - if ($wp == $pid) { - $ret = '' if $?; - if ($cb) { - eval { $cb->($arg, $pid) }; - carp "E: cb(arg, $pid): $@" if $@; - } - } else { - carp "waitpid($pid, 0) = $wp, \$!=$!, \$?=$?"; - } - } else { # caller just undef-ed it, let event loop deal with it - require PublicInbox::DS; - PublicInbox::DS::dwaitpid($pid, $cb, $arg); + # synchronous wait via defined(wantarray) on awaitpid: + defined(${$self->{pp_chld_err}}) or $wait = awaitpid($pid); + ($? = ${$self->{pp_chld_err}}) and $ret = ''; + } else { + awaitpid($pid); # depends on $in_loop or not } $ret; } diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 779b703a..02357dbf 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -28,6 +28,7 @@ package PublicInbox::Qspawn; use v5.12; use PublicInbox::Spawn qw(popen_rd); use PublicInbox::GzipFilter; +use PublicInbox::DS qw(awaitpid); use Scalar::Util qw(blessed); # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers @@ -57,35 +58,21 @@ sub _do_spawn { } } $self->{cmd} = $o{quiet} ? undef : $cmd; + $o{cb_arg} = [ \&waitpid_err, $self ]; eval { # popen_rd may die on EMFILE, ENFILE - $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o); - - die "E: $!" unless defined($self->{rpipe}); - + $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o) // die "E: $!"; $limiter->{running}++; $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM }; finish($self, $@) if $@; } -sub child_err ($) { - my ($child_error) = @_; # typically $? - my $exitstatus = ($child_error >> 8) or return; - my $sig = $child_error & 127; - my $msg = "exit status=$exitstatus"; - $msg .= " signal=$sig" if $sig; - $msg; -} - -sub finalize ($$) { - my ($self, $err) = @_; - - my ($env, $qx_cb, $qx_arg, $qx_buf) = - delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)}; +sub finalize ($) { + my ($self) = @_; - # done, spawn whatever's in the queue - my $limiter = $self->{limiter}; + # process is done, spawn whatever's in the queue + my $limiter = delete $self->{limiter} or return; my $running = --$limiter->{running}; if ($running < $limiter->{max}) { @@ -93,14 +80,16 @@ sub finalize ($$) { _do_spawn(@$next, $limiter); } } - - if ($err) { + if (my $err = $self->{_err}) { # set by finish or waitpid_err utf8::decode($err); if (my $dst = $self->{qsp_err}) { $$dst .= $$dst ? " $err" : "; $err"; } warn "@{$self->{cmd}}: $err" if $self->{cmd}; } + + my ($env, $qx_cb, $qx_arg, $qx_buf) = + delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)}; if ($qx_cb) { eval { $qx_cb->($qx_buf, $qx_arg) }; return unless $@; @@ -115,14 +104,28 @@ sub finalize ($$) { } } -# callback for dwaitpid or ProcessPipe -sub waitpid_err { finalize($_[0], child_err($?)) } +sub waitpid_err { # callback for awaitpid + my (undef, $self) = @_; # $_[0]: pid + $self->{_err} = ''; # for defined check in ->finish + if ($?) { + my $status = $? >> 8; + my $sig = $? & 127; + $self->{_err} .= "exit status=$status"; + $self->{_err} .= " signal=$sig" if $sig; + } + finalize($self) if !$self->{rpipe}; +} sub finish ($;$) { my ($self, $err) = @_; - my $tied_pp = delete($self->{rpipe}) or return finalize($self, $err); - my PublicInbox::ProcessPipe $pp = tied *$tied_pp; - @$pp{qw(cb arg)} = (\&waitpid_err, $self); # for ->DESTROY + $self->{_err} //= $err; # only for $@ + + # we can safely finalize if pipe was closed before, or if + # {_err} is defined by waitpid_err. Deleting {rpipe} will + # trigger PublicInbox::ProcessPipe::DESTROY -> waitpid_err, + # but it may not fire right away if inside the event loop. + my $closed_before = !delete($self->{rpipe}); + finalize($self) if $closed_before || defined($self->{_err}); } sub start ($$$) { @@ -247,10 +250,9 @@ sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb} if (ref($r) ne 'ARRAY' || scalar(@$r) == 3) { # error if ($async) { # calls rpipe->close && ->event_step $async->close; # PublicInbox::HTTPD::Async::close - } else { # generic PSGI: + } else { # generic PSGI, use PublicInbox::ProcessPipe::CLOSE delete($self->{rpipe})->close; event_step($self); - waitpid_err($self); } if (ref($r) eq 'ARRAY') { # error $wcb->($r) diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index 7f61d8db..826ee508 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -365,9 +365,9 @@ sub popen_rd { $opt->{1} = fileno($w); my $pid = spawn($cmd, $env, $opt); return ($r, $pid) if wantarray; - my $ret = gensym; - tie *$ret, 'PublicInbox::ProcessPipe', $pid, $r, @$opt{qw(cb arg)}; - $ret; + my $s = gensym; + tie *$s, 'PublicInbox::ProcessPipe', $pid, $r, @{$opt->{cb_arg} // []}; + $s; } sub run_die ($;$$) { |