From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 3968C1F513 for ; Mon, 23 Oct 2023 08:48:38 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1698050918; bh=JSCme8V0NyzR7COh/ekkty8EXsINBxvUXs9a5bWWiDQ=; h=From:To:Subject:Date:In-Reply-To:References:From; b=KTEBRdHxjfxb2KnuIyct/pfrPyYTtS/Jopy4Yc79Z3ZGYq66BAAcWeyGMZn4p62Hu GUWNQvxa9D6P/M0RJ6mVtKkABM9skMFtZY1tUC1zj55MUOT4RLrHkJOwEv0IZfgJFW cAU0cmwOmmbe+VNBCBvPeVCKYAJJVmD84MLrF57I= From: Eric Wong To: spew@80x24.org Subject: [PATCH 03/18] psgi_qx: use a temporary file rather than pipe Date: Mon, 23 Oct 2023 08:48:22 +0000 Message-ID: <20231023084837.2804687-3-e@80x24.org> In-Reply-To: <20231023084837.2804687-1-e@80x24.org> References: <20231023084837.2804687-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: A pipe requires more context switches and code to deal with unpredictable pipe EOF vs waitpid ordering. So just use the new spawn/aspawn features to automatically handle slurping output into a string. --- MANIFEST | 1 + lib/PublicInbox/Aspawn.pm | 30 ++++++++++ lib/PublicInbox/CodeSearchIdx.pm | 1 + lib/PublicInbox/Qspawn.pm | 95 +++++++++++--------------------- 4 files changed, 65 insertions(+), 62 deletions(-) create mode 100644 lib/PublicInbox/Aspawn.pm diff --git a/MANIFEST b/MANIFEST index dcce801c..f087621c 100644 --- a/MANIFEST +++ b/MANIFEST @@ -161,6 +161,7 @@ lib/PublicInbox/AddressPP.pm lib/PublicInbox/Admin.pm lib/PublicInbox/AdminEdit.pm lib/PublicInbox/AltId.pm +lib/PublicInbox/Aspawn.pm lib/PublicInbox/AutoReap.pm lib/PublicInbox/Cgit.pm lib/PublicInbox/CidxComm.pm diff --git a/lib/PublicInbox/Aspawn.pm b/lib/PublicInbox/Aspawn.pm new file mode 100644 index 00000000..2423ac31 --- /dev/null +++ b/lib/PublicInbox/Aspawn.pm @@ -0,0 +1,30 @@ +package PublicInbox::Aspawn; +use v5.12; +use parent qw(Exporter); +use PublicInbox::DS qw(awaitpid); +use PublicInbox::Spawn qw(spawn); +our @EXPORT_OK = qw(run_await); + +sub _await_cb { # awaitpid cb + my ($pid, $cmd, $env, $opt, $cb, @args) = @_; + PublicInbox::Spawn::read_out_err($opt); + if ($? && !$opt->{quiet}) { + my ($status, $sig) = ($? >> 8, $? & 127); + my $msg = ''; + $msg .= " (-C=$opt->{-C})" if defined $opt->{-C}; + $msg .= " status=$status" if $status; + $msg .= " signal=$sig" if $sig; + warn "E: @$cmd", $msg, "\n"; + } + $cb->($pid, $cmd, $env, $opt, @args) if $cb; +} + +sub run_await { + my ($cmd, $env, $opt, $cb, @args) = @_; + $opt->{1} //= \(my $out); + my $pid = spawn($cmd, $env, $opt); + awaitpid($pid, \&_await_cb, $cmd, $env, $opt, $cb, @args); + awaitpid($pid); # synchronous for non-$in_loop +} + +1; diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 36d00aea..b9b72ff4 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -55,6 +55,7 @@ use PublicInbox::CidxLogP; use PublicInbox::CidxComm; use PublicInbox::Git qw(%OFMT2HEXLEN); use PublicInbox::Compat qw(uniqstr); +use PublicInbox::Aspawn qw(run_await); use Carp (); use autodie qw(pipe open seek sysseek send); our ( diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index a4d78e49..59d5ed40 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -30,6 +30,7 @@ use PublicInbox::Spawn qw(popen_rd); use PublicInbox::GzipFilter; use Scalar::Util qw(blessed); use PublicInbox::Limiter; +use PublicInbox::Aspawn qw(run_await); # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers use Errno qw(EAGAIN EINTR); @@ -48,29 +49,30 @@ sub new { sub _do_spawn { my ($self, $start_cb, $limiter) = @_; - my $err; my ($cmd, $cmd_env, $opt) = @{delete $self->{args}}; my %o = %{$opt || {}}; $self->{limiter} = $limiter; - foreach my $k (@PublicInbox::Spawn::RLIMITS) { - if (defined(my $rlimit = $limiter->{$k})) { - $o{$k} = $rlimit; - } + for my $k (@PublicInbox::Spawn::RLIMITS) { + $o{$k} = $limiter->{$k} // next; } $self->{cmd} = $cmd; $self->{-quiet} = 1 if $o{quiet}; - eval { - # popen_rd may die on EMFILE, ENFILE - $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o, - \&waitpid_err, $self); - $limiter->{running}++; - $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM - }; + $limiter->{running}++; + if ($start_cb) { + eval { # popen_rd may die on EMFILE, ENFILE + $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o, + \&waitpid_err, $self); + $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM + }; + } else { + eval { run_await($cmd, $cmd_env, \%o, \&wait_await, $self) }; + warn "E: $@" if $@; + } finish($self, $@) if $@; } -sub finalize ($) { - my ($self) = @_; +sub finalize ($;$) { + my ($self, $opt) = @_; # process is done, spawn whatever's in the queue my $limiter = delete $self->{limiter} or return; @@ -89,10 +91,10 @@ sub finalize ($) { warn "@{$self->{cmd}}: $err\n" if !$self->{-quiet}; } - my ($env, $qx_cb, $qx_arg, $qx_buf) = - delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)}; - if ($qx_cb) { - eval { $qx_cb->($qx_buf, $qx_arg) }; + my ($env, $qx_cb_arg) = delete @$self{qw(psgi_env qx_cb_arg)}; + if ($qx_cb_arg) { + my $cb = shift @$qx_cb_arg; + eval { $cb->($opt->{1}, @$qx_cb_arg) }; return unless $@; warn "E: $@"; # hope qspawn.wcb can handle it } @@ -108,15 +110,20 @@ sub finalize ($) { sub DESTROY { finalize($_[0]) } # ->finalize is idempotent sub waitpid_err { # callback for awaitpid - my (undef, $self) = @_; # $_[0]: pid + my (undef, $self, $opt) = @_; # $_[0]: pid $self->{_err} = ''; # for defined check in ->finish - if ($?) { + if ($?) { # FIXME: redundant my $status = $? >> 8; my $sig = $? & 127; $self->{_err} .= "exit status=$status"; $self->{_err} .= " signal=$sig" if $sig; } - finalize($self) if !$self->{rpipe}; + finalize($self, $opt) if !$self->{rpipe}; +} + +sub wait_await { # run_await cb + my ($pid, $cmd, $cmd_env, $opt, $self) = @_; + waitpid_err($pid, $self, $opt); } sub finish ($;$) { @@ -140,52 +147,16 @@ sub start ($$$) { } } -sub psgi_qx_init_cb { # this may be PublicInbox::HTTPD::Async {cb} - my ($self) = @_; - my ($r, $buf); -reread: - $r = sysread($self->{rpipe}, $buf, 65536); - if (!defined($r)) { - return if $! == EAGAIN; # try again when notified - goto reread if $! == EINTR; - event_step($self, $!); - } elsif (my $as = delete $self->{async}) { # PublicInbox::HTTPD::Async - $as->async_pass($self->{psgi_env}->{'psgix.io'}, - $self->{qx_fh}, \$buf); - } elsif ($r) { # generic PSGI: - print { $self->{qx_fh} } $buf; - } else { # EOF - event_step($self, undef); - } -} - -sub psgi_qx_start { - my ($self) = @_; - if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) { - # PublicInbox::HTTPD::Async->new(rpipe, $cb, cb_arg, $end_obj) - $self->{async} = $async->($self->{rpipe}, - \&psgi_qx_init_cb, $self, $self); - # init_cb will call ->async_pass or ->close - } else { # generic PSGI - psgi_qx_init_cb($self) while $self->{qx_fh}; - } -} - -# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with +# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls @qx_cb_arg with # the stdout of the given command when done; but respects the given limiter # $env is the PSGI env. As with ``/qx; only use this when output is small # and safe to slurp. sub psgi_qx { - my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_; + my ($self, $env, $limiter, @qx_cb_arg) = @_; $self->{psgi_env} = $env; - my $qx_buf = ''; - open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar - $self->{qx_cb} = $qx_cb; - $self->{qx_arg} = $qx_arg; - $self->{qx_fh} = $qx_fh; - $self->{qx_buf} = \$qx_buf; + $self->{qx_cb_arg} = \@qx_cb_arg; $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32); - start($self, $limiter, \&psgi_qx_start); + start($self, $limiter, undef); } # this is called on pipe EOF to reap the process, may be called @@ -195,7 +166,7 @@ sub event_step { my ($self, $err) = @_; # $err: $! warn "psgi_{return,qx} $err" if defined($err); finish($self); - my ($fh, $qx_fh) = delete(@$self{qw(qfh qx_fh)}); + my $fh = delete $self->{qfh}; $fh->close if $fh; # async-only (psgi_return) }