* [PATCH 1/8] limiter: split out from qspawn
@ 2023-10-19 1:15 Eric Wong
2023-10-19 1:15 ` [PATCH 2/8] spawn: support synchronous run_qx Eric Wong
` (6 more replies)
0 siblings, 7 replies; 8+ messages in thread
From: Eric Wong @ 2023-10-19 1:15 UTC (permalink / raw)
To: spew
It's slightly better organized this way, especially since
`publicinboxLimiter' has its own user-facing config section
and knobs. I may use it in LeiMirror and CodeSearchIdx for
process management.
---
MANIFEST | 1 +
lib/PublicInbox/Config.pm | 4 +--
lib/PublicInbox/GitHTTPBackend.pm | 3 +-
lib/PublicInbox/Inbox.pm | 4 +--
lib/PublicInbox/Limiter.pm | 47 +++++++++++++++++++++++++++++++
lib/PublicInbox/MailDiff.pm | 1 +
lib/PublicInbox/Qspawn.pm | 47 ++-----------------------------
t/qspawn.t | 3 +-
8 files changed, 60 insertions(+), 50 deletions(-)
create mode 100644 lib/PublicInbox/Limiter.pm
diff --git a/MANIFEST b/MANIFEST
index 791d91a7..dcce801c 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -287,6 +287,7 @@ lib/PublicInbox/LeiUp.pm
lib/PublicInbox/LeiViewText.pm
lib/PublicInbox/LeiWatch.pm
lib/PublicInbox/LeiXSearch.pm
+lib/PublicInbox/Limiter.pm
lib/PublicInbox/Linkify.pm
lib/PublicInbox/Listener.pm
lib/PublicInbox/Lock.pm
diff --git a/lib/PublicInbox/Config.pm b/lib/PublicInbox/Config.pm
index 15e0872e..d156b2d3 100644
--- a/lib/PublicInbox/Config.pm
+++ b/lib/PublicInbox/Config.pm
@@ -124,9 +124,9 @@ sub lookup_newsgroup {
sub limiter {
my ($self, $name) = @_;
$self->{-limiters}->{$name} //= do {
- require PublicInbox::Qspawn;
+ require PublicInbox::Limiter;
my $max = $self->{"publicinboxlimiter.$name.max"} || 1;
- my $limiter = PublicInbox::Qspawn::Limiter->new($max);
+ my $limiter = PublicInbox::Limiter->new($max);
$limiter->setup_rlimit($name, $self);
$limiter;
};
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index 74432429..d69f5f8b 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -9,13 +9,14 @@ use v5.10.1;
use Fcntl qw(:seek);
use IO::Handle; # ->flush
use HTTP::Date qw(time2str);
+use PublicInbox::Limiter;
use PublicInbox::Qspawn;
use PublicInbox::Tmpfile;
use PublicInbox::WwwStatic qw(r @NO_CACHE);
use Carp ();
# 32 is same as the git-daemon connection limit
-my $default_limiter = PublicInbox::Qspawn::Limiter->new(32);
+my $default_limiter = PublicInbox::Limiter->new(32);
# n.b. serving "description" and "cloneurl" should be innocuous enough to
# not cause problems. serving "config" might...
diff --git a/lib/PublicInbox/Inbox.pm b/lib/PublicInbox/Inbox.pm
index 9afbb478..3dad7004 100644
--- a/lib/PublicInbox/Inbox.pm
+++ b/lib/PublicInbox/Inbox.pm
@@ -55,8 +55,8 @@ sub _set_limiter ($$$) {
my $val = $self->{$mkey} or return;
my $lim;
if ($val =~ /\A[0-9]+\z/) {
- require PublicInbox::Qspawn;
- $lim = PublicInbox::Qspawn::Limiter->new($val);
+ require PublicInbox::Limiter;
+ $lim = PublicInbox::Limiter->new($val);
} elsif ($val =~ /\A[a-z][a-z0-9]*\z/) {
$lim = $pi_cfg->limiter($val);
warn "$mkey limiter=$val not found\n" if !$lim;
diff --git a/lib/PublicInbox/Limiter.pm b/lib/PublicInbox/Limiter.pm
new file mode 100644
index 00000000..48a2b6a3
--- /dev/null
+++ b/lib/PublicInbox/Limiter.pm
@@ -0,0 +1,47 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+package PublicInbox::Limiter;
+use v5.12;
+use PublicInbox::Spawn;
+
+sub new {
+ my ($class, $max) = @_;
+ bless {
+ # 32 is same as the git-daemon connection limit
+ max => $max || 32,
+ running => 0,
+ run_queue => [],
+ # RLIMIT_CPU => undef,
+ # RLIMIT_DATA => undef,
+ # RLIMIT_CORE => undef,
+ }, $class;
+}
+
+sub setup_rlimit {
+ my ($self, $name, $cfg) = @_;
+ for my $rlim (@PublicInbox::Spawn::RLIMITS) {
+ my $k = lc($rlim);
+ $k =~ tr/_//d;
+ $k = "publicinboxlimiter.$name.$k";
+ my $v = $cfg->{$k} // next;
+ my @rlimit = split(/\s*,\s*/, $v);
+ if (scalar(@rlimit) == 1) {
+ push @rlimit, $rlimit[0];
+ } elsif (scalar(@rlimit) != 2) {
+ warn "could not parse $k: $v\n";
+ }
+ eval { require BSD::Resource };
+ if ($@) {
+ warn "BSD::Resource missing for $rlim";
+ next;
+ }
+ for my $i (0..$#rlimit) {
+ next if $rlimit[$i] ne 'INFINITY';
+ $rlimit[$i] = BSD::Resource::RLIM_INFINITY();
+ }
+ $self->{$rlim} = \@rlimit;
+ }
+}
+
+1;
diff --git a/lib/PublicInbox/MailDiff.pm b/lib/PublicInbox/MailDiff.pm
index 994c7851..c3ce9365 100644
--- a/lib/PublicInbox/MailDiff.pm
+++ b/lib/PublicInbox/MailDiff.pm
@@ -8,6 +8,7 @@ use PublicInbox::MsgIter qw(msg_part_text);
use PublicInbox::ViewDiff qw(flush_diff);
use PublicInbox::GitAsyncCat;
use PublicInbox::ContentDigestDbg;
+use PublicInbox::Qspawn;
sub write_part { # Eml->each_part callback
my ($ary, $self) = @_;
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 0e52617c..a4d78e49 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -29,6 +29,7 @@ use v5.12;
use PublicInbox::Spawn qw(popen_rd);
use PublicInbox::GzipFilter;
use Scalar::Util qw(blessed);
+use PublicInbox::Limiter;
# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
use Errno qw(EAGAIN EINTR);
@@ -183,7 +184,7 @@ sub psgi_qx {
$self->{qx_arg} = $qx_arg;
$self->{qx_fh} = $qx_fh;
$self->{qx_buf} = \$qx_buf;
- $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+ $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
start($self, $limiter, \&psgi_qx_start);
}
@@ -317,7 +318,7 @@ sub psgi_return {
$self->{psgi_env} = $env;
$self->{hdr_buf} = \(my $hdr_buf = '');
$self->{parse_hdr} = [ $parse_hdr, $hdr_arg ];
- $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+ $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
# the caller already captured the PSGI write callback from
# the PSGI server, so we can call ->start, here:
@@ -334,46 +335,4 @@ sub psgi_return {
}
}
-package PublicInbox::Qspawn::Limiter;
-use v5.12;
-
-sub new {
- my ($class, $max) = @_;
- bless {
- # 32 is same as the git-daemon connection limit
- max => $max || 32,
- running => 0,
- run_queue => [],
- # RLIMIT_CPU => undef,
- # RLIMIT_DATA => undef,
- # RLIMIT_CORE => undef,
- }, $class;
-}
-
-sub setup_rlimit {
- my ($self, $name, $cfg) = @_;
- foreach my $rlim (@PublicInbox::Spawn::RLIMITS) {
- my $k = lc($rlim);
- $k =~ tr/_//d;
- $k = "publicinboxlimiter.$name.$k";
- defined(my $v = $cfg->{$k}) or next;
- my @rlimit = split(/\s*,\s*/, $v);
- if (scalar(@rlimit) == 1) {
- push @rlimit, $rlimit[0];
- } elsif (scalar(@rlimit) != 2) {
- warn "could not parse $k: $v\n";
- }
- eval { require BSD::Resource };
- if ($@) {
- warn "BSD::Resource missing for $rlim";
- next;
- }
- foreach my $i (0..$#rlimit) {
- next if $rlimit[$i] ne 'INFINITY';
- $rlimit[$i] = BSD::Resource::RLIM_INFINITY();
- }
- $self->{$rlim} = \@rlimit;
- }
-}
-
1;
diff --git a/t/qspawn.t b/t/qspawn.t
index 224e20db..507f86a5 100644
--- a/t/qspawn.t
+++ b/t/qspawn.t
@@ -3,6 +3,7 @@
use v5.12;
use Test::More;
use_ok 'PublicInbox::Qspawn';
+use_ok 'PublicInbox::Limiter';
{
my $cmd = [qw(sh -c), 'echo >&2 err; echo out'];
@@ -23,7 +24,7 @@ sub finish_err ($) {
$qsp->{qsp_err} && ${$qsp->{qsp_err}};
}
-my $limiter = PublicInbox::Qspawn::Limiter->new(1);
+my $limiter = PublicInbox::Limiter->new(1);
{
my $x = PublicInbox::Qspawn->new([qw(true)]);
$x->{qsp_err} = \(my $err = '');
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 2/8] spawn: support synchronous run_qx
2023-10-19 1:15 [PATCH 1/8] limiter: split out from qspawn Eric Wong
@ 2023-10-19 1:15 ` Eric Wong
2023-10-19 1:15 ` [PATCH 3/8] psgi_qx: use a temporary file rather than pipe Eric Wong
` (5 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-10-19 1:15 UTC (permalink / raw)
To: spew
This is similar to `backtick` but supports all our existing spawn
functionality (chdir, env, rlimit, redirects, etc.). It also
supports SCALAR ref redirects like run_script in our test suite
for std{in,out,err}.
We can probably use :utf8 by default for these redirects, even.
---
lib/PublicInbox/Git.pm | 6 ++++
lib/PublicInbox/SearchIdx.pm | 19 ++++------
lib/PublicInbox/Spawn.pm | 69 ++++++++++++++++++++++++++----------
t/spawn.t | 13 ++++++-
4 files changed, 76 insertions(+), 31 deletions(-)
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index a460d155..476dcf30 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -69,6 +69,7 @@ sub check_git_exe () {
$GIT_VER = eval("v$1") // die "BUG: bad vstring: $1 ($v)";
$EXE_ST = $st;
}
+ $GIT_EXE;
}
sub git_version {
@@ -422,6 +423,11 @@ sub async_err ($$$$$) {
$async_warn ? carp($msg) : $self->fail($msg);
}
+sub cmd {
+ my $self = shift;
+ [ $GIT_EXE // check_git_exe(), "--git-dir=$self->{git_dir}", @_ ]
+}
+
# $git->popen(qw(show f00)); # or
# $git->popen(qw(show f00), { GIT_CONFIG => ... }, { 2 => ... });
sub popen {
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 8a571cfb..3c64c715 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -22,7 +22,7 @@ use POSIX qw(strftime);
use Fcntl qw(SEEK_SET);
use Time::Local qw(timegm);
use PublicInbox::OverIdx;
-use PublicInbox::Spawn qw(run_wait);
+use PublicInbox::Spawn qw(run_wait run_qx);
use PublicInbox::Git qw(git_unquote);
use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp);
use PublicInbox::Address;
@@ -351,23 +351,18 @@ sub index_diff ($$$) {
}
sub patch_id {
- my ($self) = @_; # $_[1] is the diff (may be huge)
- open(my $fh, '+>:utf8', undef) or die "open: $!";
- open(my $eh, '+>', undef) or die "open: $!";
- $fh->autoflush(1);
- print $fh $_[1] or die "print: $!";
- sysseek($fh, 0, SEEK_SET) or die "sysseek: $!";
- my $id = ($self->{ibx} // $self->{eidx} // $self)->git->qx(
- [qw(patch-id --stable)], {}, { 0 => $fh, 2 => $eh });
- seek($eh, 0, SEEK_SET) or die "seek: $!";
- while (<$eh>) { warn $_ }
+ my ($self, $sref) = @_;
+ my $git = ($self->{ibx} // $self->{eidx} // $self)->git;
+ my $opt = { 0 => $sref, 2 => \(my $err) };
+ my $id = run_qx($git->cmd(qw(patch-id --stable)), undef, $opt);
+ warn $err if $err;
$id =~ /\A([a-f0-9]{40,})/ ? $1 : undef;
}
sub index_body_text {
my ($self, $doc, $sref) = @_;
if ($$sref =~ /^(?:diff|---|\+\+\+) /ms) {
- my $id = patch_id($self, $$sref);
+ my $id = patch_id($self, $sref);
$doc->add_term('XDFID'.$id) if defined($id);
}
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 106f5e01..1fa7a41f 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -22,8 +22,9 @@ use Fcntl qw(SEEK_SET);
use IO::Handle ();
use Carp qw(croak);
use PublicInbox::ProcessIO;
-our @EXPORT_OK = qw(which spawn popen_rd popen_wr run_die run_wait);
+our @EXPORT_OK = qw(which spawn popen_rd popen_wr run_die run_wait run_qx);
our @RLIMITS = qw(RLIMIT_CPU RLIMIT_CORE RLIMIT_DATA);
+use autodie qw(open pipe read seek sysseek truncate);
BEGIN {
my $all_libc = <<'ALL_LIBC'; # all *nix systems we support
@@ -290,7 +291,6 @@ ALL_LIBC
undef $all_libc unless -d $inline_dir;
if (defined $all_libc) {
local $ENV{PERL_INLINE_DIRECTORY} = $inline_dir;
- use autodie;
# CentOS 7.x ships Inline 0.53, 0.64+ has built-in locking
my $lk = PublicInbox::Lock->new($inline_dir.
'/.public-inbox.lock');
@@ -301,7 +301,7 @@ ALL_LIBC
open STDERR, '>&', $fh;
STDERR->autoflush(1);
STDOUT->autoflush(1);
- CORE::eval 'use Inline C => $all_libc, BUILD_NOISY => 1';
+ eval 'use Inline C => $all_libc, BUILD_NOISY => 1';
my $err = $@;
open(STDERR, '>&', $olderr);
open(STDOUT, '>&', $oldout);
@@ -332,26 +332,34 @@ sub which ($) {
}
sub spawn ($;$$) {
- my ($cmd, $env, $opts) = @_;
+ my ($cmd, $env, $opt) = @_;
my $f = which($cmd->[0]) // die "$cmd->[0]: command not found\n";
- my @env;
+ my (@env, @rdr);
my %env = (%ENV, $env ? %$env : ());
while (my ($k, $v) = each %env) {
push @env, "$k=$v" if defined($v);
}
- my $redir = [];
for my $child_fd (0..2) {
- my $parent_fd = $opts->{$child_fd};
- if (defined($parent_fd) && $parent_fd !~ /\A[0-9]+\z/) {
- my $fd = fileno($parent_fd) //
- die "$parent_fd not an IO GLOB? $!";
- $parent_fd = $fd;
+ my $pfd = $opt->{$child_fd};
+ if ('SCALAR' eq ref($pfd)) {
+ open my $fh, '+>:utf8', undef;
+ $opt->{"fh.$child_fd"} = $fh;
+ if ($child_fd == 0) {
+ print $fh $$pfd;
+ $fh->flush or die "flush: $!";
+ sysseek($fh, 0, SEEK_SET);
+ }
+ $pfd = fileno($fh);
+ } elsif (defined($pfd) && $pfd !~ /\A[0-9]+\z/) {
+ my $fd = fileno($pfd) //
+ die "$pfd not an IO GLOB? $!";
+ $pfd = $fd;
}
- $redir->[$child_fd] = $parent_fd // $child_fd;
+ $rdr[$child_fd] = $pfd // $child_fd;
}
my $rlim = [];
foreach my $l (@RLIMITS) {
- my $v = $opts->{$l} // next;
+ my $v = $opt->{$l} // next;
my $r = eval "require BSD::Resource; BSD::Resource::$l();";
unless (defined $r) {
warn "$l undefined by BSD::Resource: $@\n";
@@ -359,31 +367,41 @@ sub spawn ($;$$) {
}
push @$rlim, $r, @$v;
}
- my $cd = $opts->{'-C'} // ''; # undef => NULL mapping doesn't work?
- my $pgid = $opts->{pgid} // -1;
- my $pid = pi_fork_exec($redir, $f, $cmd, \@env, $rlim, $cd, $pgid);
+ my $cd = $opt->{'-C'} // ''; # undef => NULL mapping doesn't work?
+ my $pgid = $opt->{pgid} // -1;
+ my $pid = pi_fork_exec(\@rdr, $f, $cmd, \@env, $rlim, $cd, $pgid);
die "fork_exec @$cmd failed: $!\n" unless $pid > 0;
$pid;
}
sub popen_rd {
my ($cmd, $env, $opt, @cb_arg) = @_;
- pipe(my $r, local $opt->{1}) or die "pipe: $!\n";
+ pipe(my $r, local $opt->{1});
my $pid = spawn($cmd, $env, $opt);
PublicInbox::ProcessIO->maybe_new($pid, $r, @cb_arg);
}
sub popen_wr {
my ($cmd, $env, $opt, @cb_arg) = @_;
- pipe(local $opt->{0}, my $w) or die "pipe: $!\n";
+ pipe(local $opt->{0}, my $w);
$w->autoflush(1);
my $pid = spawn($cmd, $env, $opt);
PublicInbox::ProcessIO->maybe_new($pid, $w, @cb_arg)
}
+sub read_out_err ($) {
+ my ($opt) = @_;
+ for my $fd (1, 2) { # read stdout/stderr
+ my $fh = delete($opt->{"fh.$fd"}) // next;
+ seek($fh, 0, SEEK_SET);
+ read($fh, ${$opt->{$fd}}, -s $fh, length(${$opt->{$fd}} // ''));
+ }
+}
+
sub run_wait ($;$$) {
my ($cmd, $env, $opt) = @_;
waitpid(spawn($cmd, $env, $opt), 0);
+ read_out_err($opt);
$?
}
@@ -392,4 +410,19 @@ sub run_die ($;$$) {
run_wait($cmd, $env, $rdr) and croak "E: @$cmd failed: \$?=$?";
}
+sub run_qx {
+ my ($cmd, $env, $opt) = @_;
+ my $fh = popen_rd($cmd, $env, $opt);
+ my @ret;
+ if (wantarray) {
+ @ret = <$fh>;
+ } else {
+ local $/;
+ $ret[0] = <$fh>;
+ }
+ close $fh; # caller should check $?
+ read_out_err($opt);
+ wantarray ? @ret : $ret[0];
+}
+
1;
diff --git a/t/spawn.t b/t/spawn.t
index 1af66bda..4b3baae4 100644
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -3,7 +3,7 @@
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
use v5.12;
use Test::More;
-use PublicInbox::Spawn qw(which spawn popen_rd);
+use PublicInbox::Spawn qw(which spawn popen_rd run_qx);
require PublicInbox::Sigfd;
require PublicInbox::DS;
@@ -19,6 +19,17 @@ require PublicInbox::DS;
is($?, 0, 'true exited successfully');
}
+{
+ my $opt = { 0 => \'in', 2 => \(my $e) };
+ my $out = run_qx(['sh', '-c', 'echo e >&2; cat'], undef, $opt);
+ is($e, "e\n", 'captured stderr');
+ is($out, 'in', 'stdin read and stdout captured');
+ $opt->{0} = \"IN\n3\nLINES";
+ my @out = run_qx(['sh', '-c', 'echo E >&2; cat'], undef, $opt);
+ is($e, "e\nE\n", 'captured stderr appended to string');
+ is_deeply(\@out, [ "IN\n", "3\n", 'LINES' ], 'stdout array');
+}
+
SKIP: {
my $pid = spawn(['true'], undef, { pgid => 0 });
ok($pid, 'spawned process with new pgid');
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 3/8] psgi_qx: use a temporary file rather than pipe
2023-10-19 1:15 [PATCH 1/8] limiter: split out from qspawn Eric Wong
2023-10-19 1:15 ` [PATCH 2/8] spawn: support synchronous run_qx Eric Wong
@ 2023-10-19 1:15 ` Eric Wong
2023-10-19 1:15 ` [PATCH 4/8] www_coderepo: capture uses a flattened list Eric Wong
` (4 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-10-19 1:15 UTC (permalink / raw)
To: spew
A pipe requires more context switches and code to deal with
unpredictable pipe EOF vs waitpid ordering. So just use the
new spawn/aspawn features to automatically handle slurping
output into a string.
---
MANIFEST | 1 +
lib/PublicInbox/Aspawn.pm | 30 ++++++++++
lib/PublicInbox/CodeSearchIdx.pm | 1 +
lib/PublicInbox/Qspawn.pm | 95 +++++++++++---------------------
4 files changed, 65 insertions(+), 62 deletions(-)
create mode 100644 lib/PublicInbox/Aspawn.pm
diff --git a/MANIFEST b/MANIFEST
index dcce801c..f087621c 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -161,6 +161,7 @@ lib/PublicInbox/AddressPP.pm
lib/PublicInbox/Admin.pm
lib/PublicInbox/AdminEdit.pm
lib/PublicInbox/AltId.pm
+lib/PublicInbox/Aspawn.pm
lib/PublicInbox/AutoReap.pm
lib/PublicInbox/Cgit.pm
lib/PublicInbox/CidxComm.pm
diff --git a/lib/PublicInbox/Aspawn.pm b/lib/PublicInbox/Aspawn.pm
new file mode 100644
index 00000000..2423ac31
--- /dev/null
+++ b/lib/PublicInbox/Aspawn.pm
@@ -0,0 +1,30 @@
+package PublicInbox::Aspawn;
+use v5.12;
+use parent qw(Exporter);
+use PublicInbox::DS qw(awaitpid);
+use PublicInbox::Spawn qw(spawn);
+our @EXPORT_OK = qw(run_await);
+
+sub _await_cb { # awaitpid cb
+ my ($pid, $cmd, $env, $opt, $cb, @args) = @_;
+ PublicInbox::Spawn::read_out_err($opt);
+ if ($? && !$opt->{quiet}) {
+ my ($status, $sig) = ($? >> 8, $? & 127);
+ my $msg = '';
+ $msg .= " (-C=$opt->{-C})" if defined $opt->{-C};
+ $msg .= " status=$status" if $status;
+ $msg .= " signal=$sig" if $sig;
+ warn "E: @$cmd", $msg, "\n";
+ }
+ $cb->($pid, $cmd, $env, $opt, @args) if $cb;
+}
+
+sub run_await {
+ my ($cmd, $env, $opt, $cb, @args) = @_;
+ $opt->{1} //= \(my $out);
+ my $pid = spawn($cmd, $env, $opt);
+ awaitpid($pid, \&_await_cb, $cmd, $env, $opt, $cb, @args);
+ awaitpid($pid); # synchronous for non-$in_loop
+}
+
+1;
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 36d00aea..b9b72ff4 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -55,6 +55,7 @@ use PublicInbox::CidxLogP;
use PublicInbox::CidxComm;
use PublicInbox::Git qw(%OFMT2HEXLEN);
use PublicInbox::Compat qw(uniqstr);
+use PublicInbox::Aspawn qw(run_await);
use Carp ();
use autodie qw(pipe open seek sysseek send);
our (
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index a4d78e49..59d5ed40 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -30,6 +30,7 @@ use PublicInbox::Spawn qw(popen_rd);
use PublicInbox::GzipFilter;
use Scalar::Util qw(blessed);
use PublicInbox::Limiter;
+use PublicInbox::Aspawn qw(run_await);
# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
use Errno qw(EAGAIN EINTR);
@@ -48,29 +49,30 @@ sub new {
sub _do_spawn {
my ($self, $start_cb, $limiter) = @_;
- my $err;
my ($cmd, $cmd_env, $opt) = @{delete $self->{args}};
my %o = %{$opt || {}};
$self->{limiter} = $limiter;
- foreach my $k (@PublicInbox::Spawn::RLIMITS) {
- if (defined(my $rlimit = $limiter->{$k})) {
- $o{$k} = $rlimit;
- }
+ for my $k (@PublicInbox::Spawn::RLIMITS) {
+ $o{$k} = $limiter->{$k} // next;
}
$self->{cmd} = $cmd;
$self->{-quiet} = 1 if $o{quiet};
- eval {
- # popen_rd may die on EMFILE, ENFILE
- $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
- \&waitpid_err, $self);
- $limiter->{running}++;
- $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
- };
+ $limiter->{running}++;
+ if ($start_cb) {
+ eval { # popen_rd may die on EMFILE, ENFILE
+ $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
+ \&waitpid_err, $self);
+ $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
+ };
+ } else {
+ eval { run_await($cmd, $cmd_env, \%o, \&wait_await, $self) };
+ warn "E: $@" if $@;
+ }
finish($self, $@) if $@;
}
-sub finalize ($) {
- my ($self) = @_;
+sub finalize ($;$) {
+ my ($self, $opt) = @_;
# process is done, spawn whatever's in the queue
my $limiter = delete $self->{limiter} or return;
@@ -89,10 +91,10 @@ sub finalize ($) {
warn "@{$self->{cmd}}: $err\n" if !$self->{-quiet};
}
- my ($env, $qx_cb, $qx_arg, $qx_buf) =
- delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
- if ($qx_cb) {
- eval { $qx_cb->($qx_buf, $qx_arg) };
+ my ($env, $qx_cb_arg) = delete @$self{qw(psgi_env qx_cb_arg)};
+ if ($qx_cb_arg) {
+ my $cb = shift @$qx_cb_arg;
+ eval { $cb->($opt->{1}, @$qx_cb_arg) };
return unless $@;
warn "E: $@"; # hope qspawn.wcb can handle it
}
@@ -108,15 +110,20 @@ sub finalize ($) {
sub DESTROY { finalize($_[0]) } # ->finalize is idempotent
sub waitpid_err { # callback for awaitpid
- my (undef, $self) = @_; # $_[0]: pid
+ my (undef, $self, $opt) = @_; # $_[0]: pid
$self->{_err} = ''; # for defined check in ->finish
- if ($?) {
+ if ($?) { # FIXME: redundant
my $status = $? >> 8;
my $sig = $? & 127;
$self->{_err} .= "exit status=$status";
$self->{_err} .= " signal=$sig" if $sig;
}
- finalize($self) if !$self->{rpipe};
+ finalize($self, $opt) if !$self->{rpipe};
+}
+
+sub wait_await { # run_await cb
+ my ($pid, $cmd, $cmd_env, $opt, $self) = @_;
+ waitpid_err($pid, $self, $opt);
}
sub finish ($;$) {
@@ -140,52 +147,16 @@ sub start ($$$) {
}
}
-sub psgi_qx_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
- my ($self) = @_;
- my ($r, $buf);
-reread:
- $r = sysread($self->{rpipe}, $buf, 65536);
- if (!defined($r)) {
- return if $! == EAGAIN; # try again when notified
- goto reread if $! == EINTR;
- event_step($self, $!);
- } elsif (my $as = delete $self->{async}) { # PublicInbox::HTTPD::Async
- $as->async_pass($self->{psgi_env}->{'psgix.io'},
- $self->{qx_fh}, \$buf);
- } elsif ($r) { # generic PSGI:
- print { $self->{qx_fh} } $buf;
- } else { # EOF
- event_step($self, undef);
- }
-}
-
-sub psgi_qx_start {
- my ($self) = @_;
- if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) {
- # PublicInbox::HTTPD::Async->new(rpipe, $cb, cb_arg, $end_obj)
- $self->{async} = $async->($self->{rpipe},
- \&psgi_qx_init_cb, $self, $self);
- # init_cb will call ->async_pass or ->close
- } else { # generic PSGI
- psgi_qx_init_cb($self) while $self->{qx_fh};
- }
-}
-
-# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with
+# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls @qx_cb_arg with
# the stdout of the given command when done; but respects the given limiter
# $env is the PSGI env. As with ``/qx; only use this when output is small
# and safe to slurp.
sub psgi_qx {
- my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_;
+ my ($self, $env, $limiter, @qx_cb_arg) = @_;
$self->{psgi_env} = $env;
- my $qx_buf = '';
- open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar
- $self->{qx_cb} = $qx_cb;
- $self->{qx_arg} = $qx_arg;
- $self->{qx_fh} = $qx_fh;
- $self->{qx_buf} = \$qx_buf;
+ $self->{qx_cb_arg} = \@qx_cb_arg;
$limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
- start($self, $limiter, \&psgi_qx_start);
+ start($self, $limiter, undef);
}
# this is called on pipe EOF to reap the process, may be called
@@ -195,7 +166,7 @@ sub event_step {
my ($self, $err) = @_; # $err: $!
warn "psgi_{return,qx} $err" if defined($err);
finish($self);
- my ($fh, $qx_fh) = delete(@$self{qw(qfh qx_fh)});
+ my $fh = delete $self->{qfh};
$fh->close if $fh; # async-only (psgi_return)
}
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 4/8] www_coderepo: capture uses a flattened list
2023-10-19 1:15 [PATCH 1/8] limiter: split out from qspawn Eric Wong
2023-10-19 1:15 ` [PATCH 2/8] spawn: support synchronous run_qx Eric Wong
2023-10-19 1:15 ` [PATCH 3/8] psgi_qx: use a temporary file rather than pipe Eric Wong
@ 2023-10-19 1:15 ` Eric Wong
2023-10-19 1:15 ` [PATCH 5/8] qspawn: psgi_return allows list for callback args Eric Wong
` (3 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-10-19 1:15 UTC (permalink / raw)
To: spew
We no longer need a multi-dimensional list to pass multiple
arguments to the psgi_qx callback.
---
lib/PublicInbox/WwwCoderepo.pm | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)
diff --git a/lib/PublicInbox/WwwCoderepo.pm b/lib/PublicInbox/WwwCoderepo.pm
index e8c340b5..68c4c86d 100644
--- a/lib/PublicInbox/WwwCoderepo.pm
+++ b/lib/PublicInbox/WwwCoderepo.pm
@@ -178,9 +178,8 @@ EOM
}
sub capture { # psgi_qx callback to capture git-for-each-ref
- my ($bref, $arg) = @_; # arg = [ctx, key, OnDestroy(summary_END)]
- utf8_maybe($$bref);
- $arg->[0]->{qx_res}->{$arg->[1]} = $$bref;
+ my ($bref, $ctx, $key) = @_; # $_[3] = OnDestroy(summary_END)
+ $ctx->{qx_res}->{$key} = $$bref;
# summary_END may be called via OnDestroy $arg->[2]
}
@@ -220,8 +219,7 @@ sub summary ($$) {
my ($k, $cmd) = @$_;
my $qsp = PublicInbox::Qspawn->new($cmd, \%env, \%opt);
$qsp->{qsp_err} = $qsp_err;
- $qsp->psgi_qx($ctx->{env}, undef, \&capture,
- [$ctx, $k, $END]);
+ $qsp->psgi_qx($ctx->{env}, undef, \&capture, $ctx, $k, $END);
}
$tip //= 'HEAD';
my @try = ("$tip:README", "$tip:README.md"); # TODO: configurable
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 5/8] qspawn: psgi_return allows list for callback args
2023-10-19 1:15 [PATCH 1/8] limiter: split out from qspawn Eric Wong
` (2 preceding siblings ...)
2023-10-19 1:15 ` [PATCH 4/8] www_coderepo: capture uses a flattened list Eric Wong
@ 2023-10-19 1:15 ` Eric Wong
2023-10-19 1:15 ` [PATCH 6/8] qspawn: drop unused err arg for ->event_step Eric Wong
` (2 subsequent siblings)
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-10-19 1:15 UTC (permalink / raw)
To: spew
This slightly simplifies our GitHTTPBackend wrapper.
We can also use shorter variable names to avoid wrapping some
lines.
---
lib/PublicInbox/GitHTTPBackend.pm | 6 +++---
lib/PublicInbox/Qspawn.pm | 19 ++++++++-----------
2 files changed, 11 insertions(+), 14 deletions(-)
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index d69f5f8b..edbc0157 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -80,9 +80,9 @@ sub serve_dumb {
}
sub git_parse_hdr { # {parse_hdr} for Qspawn
- my ($r, $bref, $dumb_args) = @_;
+ my ($r, $bref, @dumb_args) = @_;
my $res = parse_cgi_headers($r, $bref) or return; # incomplete
- $res->[0] == 403 ? serve_dumb(@$dumb_args) : $res;
+ $res->[0] == 403 ? serve_dumb(@dumb_args) : $res;
}
# returns undef if 403 so it falls back to dumb HTTP
@@ -106,7 +106,7 @@ sub serve_smart {
$env{PATH_TRANSLATED} = "$git->{git_dir}/$path";
my $rdr = input_prepare($env) or return r(500);
my $qsp = PublicInbox::Qspawn->new([qw(git http-backend)], \%env, $rdr);
- $qsp->psgi_return($env, $limiter, \&git_parse_hdr, [$env, $git, $path]);
+ $qsp->psgi_return($env, $limiter, \&git_parse_hdr, $env, $git, $path);
}
sub input_prepare {
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 59d5ed40..0f900691 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -174,16 +174,13 @@ sub rd_hdr ($) {
my ($self) = @_;
# typically used for reading CGI headers
# We also need to check EINTR for generic PSGI servers.
- my $ret;
- my $total_rd = 0;
- my $hdr_buf = $self->{hdr_buf};
- my ($ph_cb, $ph_arg) = @{$self->{parse_hdr}};
+ my ($ret, $total_rd);
+ my ($bref, $ph_cb, @ph_arg) = ($self->{hdr_buf}, @{$self->{parse_hdr}});
until (defined($ret)) {
- my $r = sysread($self->{rpipe}, $$hdr_buf, 4096,
- length($$hdr_buf));
+ my $r = sysread($self->{rpipe}, $$bref, 4096, length($$bref));
if (defined($r)) {
$total_rd += $r;
- eval { $ret = $ph_cb->($total_rd, $hdr_buf, $ph_arg) };
+ eval { $ret = $ph_cb->($total_rd, $bref, @ph_arg) };
if ($@) {
warn "parse_hdr: $@";
$ret = [ 500, [], [ "Internal error\n" ] ];
@@ -207,7 +204,7 @@ EOM
sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
my ($self) = @_;
- my $r = rd_hdr($self) or return;
+ my $r = rd_hdr($self) or return; # incomplete
my $env = $self->{psgi_env};
my $filter;
@@ -277,7 +274,7 @@ sub psgi_return_start { # may run later, much later...
#
# $limiter - the Limiter object to use (uses the def_limiter if not given)
#
-# $parse_hdr - Initial read function; often for parsing CGI header output.
+# @parse_hdr_arg - Initial read cb+args; often for parsing CGI header output.
# It will be given the return value of sysread from the pipe
# and a string ref of the current buffer. Returns an arrayref
# for PSGI responses. 2-element arrays in PSGI mean the
@@ -285,10 +282,10 @@ sub psgi_return_start { # may run later, much later...
# psgix.io. 3-element arrays means the body is available
# immediately (or streamed via ->getline (pull-based)).
sub psgi_return {
- my ($self, $env, $limiter, $parse_hdr, $hdr_arg) = @_;
+ my ($self, $env, $limiter, @parse_hdr_arg)= @_;
$self->{psgi_env} = $env;
$self->{hdr_buf} = \(my $hdr_buf = '');
- $self->{parse_hdr} = [ $parse_hdr, $hdr_arg ];
+ $self->{parse_hdr} = \@parse_hdr_arg;
$limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
# the caller already captured the PSGI write callback from
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 6/8] qspawn: drop unused err arg for ->event_step
2023-10-19 1:15 [PATCH 1/8] limiter: split out from qspawn Eric Wong
` (3 preceding siblings ...)
2023-10-19 1:15 ` [PATCH 5/8] qspawn: psgi_return allows list for callback args Eric Wong
@ 2023-10-19 1:15 ` Eric Wong
2023-10-19 1:15 ` [PATCH 7/8] httpd/async: require IO arg Eric Wong
2023-10-19 1:15 ` [PATCH 8/8] qspawn: introduce psgi_yield API Eric Wong
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-10-19 1:15 UTC (permalink / raw)
To: spew
It's no longer needed since psgi_qx doesn't use a pipe, anymore.
---
lib/PublicInbox/Qspawn.pm | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 0f900691..9a7e8734 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -163,8 +163,7 @@ sub psgi_qx {
# via PublicInbox::DS event loop OR via GetlineBody for generic
# PSGI servers.
sub event_step {
- my ($self, $err) = @_; # $err: $!
- warn "psgi_{return,qx} $err" if defined($err);
+ my ($self) = @_;
finish($self);
my $fh = delete $self->{qfh};
$fh->close if $fh; # async-only (psgi_return)
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 7/8] httpd/async: require IO arg
2023-10-19 1:15 [PATCH 1/8] limiter: split out from qspawn Eric Wong
` (4 preceding siblings ...)
2023-10-19 1:15 ` [PATCH 6/8] qspawn: drop unused err arg for ->event_step Eric Wong
@ 2023-10-19 1:15 ` Eric Wong
2023-10-19 1:15 ` [PATCH 8/8] qspawn: introduce psgi_yield API Eric Wong
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-10-19 1:15 UTC (permalink / raw)
To: spew
Callers that want to requeue can call PublicInbox::DS::requeue
directly and not go through the convoluted argument handling
via PublicInbox::HTTPD::Async->new.
---
lib/PublicInbox/HTTPD/Async.pm | 8 --------
lib/PublicInbox/MailDiff.pm | 7 +++----
lib/PublicInbox/SolverGit.pm | 10 +++-------
3 files changed, 6 insertions(+), 19 deletions(-)
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index b73d0c4b..2e4d8baa 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -25,14 +25,6 @@ use PublicInbox::ProcessIONBF;
# bidirectional socket in the future.
sub new {
my ($class, $io, $cb, $arg, $end_obj) = @_;
-
- # no $io? call $cb at the top of the next event loop to
- # avoid recursion:
- unless (defined($io)) {
- PublicInbox::DS::requeue($cb ? $cb : $arg);
- die '$end_obj unsupported w/o $io' if $end_obj;
- return;
- }
my $self = bless {
cb => $cb, # initial read callback
arg => $arg, # arg for $cb
diff --git a/lib/PublicInbox/MailDiff.pm b/lib/PublicInbox/MailDiff.pm
index c3ce9365..908f223c 100644
--- a/lib/PublicInbox/MailDiff.pm
+++ b/lib/PublicInbox/MailDiff.pm
@@ -59,8 +59,7 @@ sub next_smsg ($) {
$ctx->write($ctx->_html_end);
return $ctx->close;
}
- my $async = $self->{ctx}->{env}->{'pi-httpd.async'};
- $async->(undef, undef, $self) if $async # PublicInbox::HTTPD::Async->new
+ PublicInbox::DS::requeue($self) if $ctx->{env}->{'pi-httpd.async'};
}
sub emit_msg_diff {
@@ -125,8 +124,8 @@ sub event_step {
sub begin_mail_diff {
my ($self) = @_;
- if (my $async = $self->{ctx}->{env}->{'pi-httpd.async'}) {
- $async->(undef, undef, $self); # PublicInbox::HTTPD::Async->new
+ if ($self->{ctx}->{env}->{'pi-httpd.async'}) {
+ PublicInbox::DS::requeue($self);
} else {
event_step($self) while $self->{smsg};
}
diff --git a/lib/PublicInbox/SolverGit.pm b/lib/PublicInbox/SolverGit.pm
index 5f317f51..23d4d3d1 100644
--- a/lib/PublicInbox/SolverGit.pm
+++ b/lib/PublicInbox/SolverGit.pm
@@ -386,12 +386,9 @@ sub event_step ($) {
}
sub next_step ($) {
- my ($self) = @_;
# if outside of public-inbox-httpd, caller is expected to be
# looping event_step, anyways
- my $async = $self->{psgi_env}->{'pi-httpd.async'} or return;
- # PublicInbox::HTTPD::Async->new
- $async->(undef, undef, $self);
+ PublicInbox::DS::requeue($_[0]) if $_[0]->{psgi_env}->{'pi-httpd.async'}
}
sub mark_found ($$$) {
@@ -690,9 +687,8 @@ sub solve ($$$$$) {
$self->{found} = {}; # { abbr => [ ::Git, oid, type, size, $di ] }
dbg($self, "solving $oid_want ...");
- if (my $async = $env->{'pi-httpd.async'}) {
- # PublicInbox::HTTPD::Async->new
- $async->(undef, undef, $self);
+ if ($env->{'pi-httpd.async'}) {
+ PublicInbox::DS::requeue($self);
} else {
event_step($self) while $self->{user_cb};
}
^ permalink raw reply related [flat|nested] 8+ messages in thread
* [PATCH 8/8] qspawn: introduce psgi_yield API
2023-10-19 1:15 [PATCH 1/8] limiter: split out from qspawn Eric Wong
` (5 preceding siblings ...)
2023-10-19 1:15 ` [PATCH 7/8] httpd/async: require IO arg Eric Wong
@ 2023-10-19 1:15 ` Eric Wong
6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-10-19 1:15 UTC (permalink / raw)
To: spew
This may eventually replace psgi_return and HTTPD/Async entirely
---
lib/PublicInbox/GitHTTPBackend.pm | 4 +-
lib/PublicInbox/InputPipe.pm | 11 ++--
lib/PublicInbox/LEI.pm | 2 +-
lib/PublicInbox/Qspawn.pm | 97 ++++++++++++++++++++++++++++++-
4 files changed, 105 insertions(+), 9 deletions(-)
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index edbc0157..d7e0bced 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -79,7 +79,7 @@ sub serve_dumb {
PublicInbox::WwwStatic::response($env, $h, $path, $type);
}
-sub git_parse_hdr { # {parse_hdr} for Qspawn
+sub ghb_parse_hdr { # header parser for Qspawn
my ($r, $bref, @dumb_args) = @_;
my $res = parse_cgi_headers($r, $bref) or return; # incomplete
$res->[0] == 403 ? serve_dumb(@dumb_args) : $res;
@@ -106,7 +106,7 @@ sub serve_smart {
$env{PATH_TRANSLATED} = "$git->{git_dir}/$path";
my $rdr = input_prepare($env) or return r(500);
my $qsp = PublicInbox::Qspawn->new([qw(git http-backend)], \%env, $rdr);
- $qsp->psgi_return($env, $limiter, \&git_parse_hdr, $env, $git, $path);
+ $qsp->psgi_yield($env, $limiter, \&ghb_parse_hdr, $env, $git, $path);
}
sub input_prepare {
diff --git a/lib/PublicInbox/InputPipe.pm b/lib/PublicInbox/InputPipe.pm
index b38d8270..614360c2 100644
--- a/lib/PublicInbox/InputPipe.pm
+++ b/lib/PublicInbox/InputPipe.pm
@@ -39,14 +39,15 @@ sub consume {
if ($@) { # regular file (but not w/ select|IO::Poll backends)
$self->{-need_rq} = 1;
$self->requeue;
- } elsif (-p $in || -S _) { # O_NONBLOCK for sockets and pipes
+ } elsif (do { no warnings 'unopened'; !stat($in) }) { # ProcessIONBF
+ } elsif (-p _ || -S _) { # O_NONBLOCK for sockets and pipes
$in->blocking(0);
} elsif (-t $in) { # isatty(3) can't use `_' stat cache
unblock_tty($self);
}
}
-sub close {
+sub close { # idempotent
my ($self) = @_;
if (my $t = delete($self->{restore_termios})) {
my $fd = fileno($self->{sock} // return);
@@ -60,16 +61,16 @@ sub event_step {
my $r = sysread($self->{sock} // return, my $rbuf, 65536);
eval {
if ($r) {
- $self->{cb}->(@{$self->{args}}, $rbuf);
+ $self->{cb}->($self, @{$self->{args}}, $rbuf);
$self->requeue if $self->{-need_rq};
} elsif (defined($r)) { # EOF
- $self->{cb}->(@{$self->{args}}, '');
+ $self->{cb}->($self, @{$self->{args}}, '');
$self->close
} elsif ($!{EAGAIN}) { # rely on EPOLLIN
} elsif ($!{EINTR}) { # rely on EPOLLIN for sockets/pipes/tty
$self->requeue if $self->{-need_rq};
} else { # another error
- $self->{cb}->(@{$self->{args}}, undef);
+ $self->{cb}->($self, @{$self->{args}}, undef);
$self->close;
}
};
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 3ccdd4f7..c1e965c8 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1566,7 +1566,7 @@ sub request_umask {
}
sub _stdin_cb { # PublicInbox::InputPipe::consume callback for --stdin
- my ($lei, $cb) = @_; # $_[-1] = $rbuf
+ my (undef, $lei, $cb) = @_; # $_[-1] = $rbuf
$_[1] // return $lei->fail("error reading stdin: $!");
$lei->{stdin_buf} .= $_[-1];
do_env($lei, $cb) if $_[-1] eq '';
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 9a7e8734..c598e863 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -31,6 +31,9 @@ use PublicInbox::GzipFilter;
use Scalar::Util qw(blessed);
use PublicInbox::Limiter;
use PublicInbox::Aspawn qw(run_await);
+use PublicInbox::Syscall qw(EPOLLIN);
+use PublicInbox::InputPipe;
+use Carp qw(confess);
# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
use Errno qw(EAGAIN EINTR);
@@ -61,7 +64,7 @@ sub _do_spawn {
if ($start_cb) {
eval { # popen_rd may die on EMFILE, ENFILE
$self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
- \&waitpid_err, $self);
+ \&waitpid_err, $self, \%o);
$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
};
} else {
@@ -126,6 +129,13 @@ sub wait_await { # run_await cb
waitpid_err($pid, $self, $opt);
}
+sub yield_chunk { # $_[-1] is sysread buffer (or undef)
+ my ($self, $ipipe) = @_;
+ my $qfh = $self->{qfh}; # PublicInbox::HTTP::(Chunked|Identity)
+ ($_[-1] // '') eq '' ? $qfh->close : $qfh->write($_[-1]);
+ delete($self->{rpipe}) if ($_[-1] // '') eq ''; # all done
+}
+
sub finish ($;$) {
my ($self, $err) = @_;
$self->{_err} //= $err; # only for $@
@@ -201,6 +211,33 @@ EOM
$ret;
}
+sub yield_pass {
+ my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe
+ my $env = $self->{psgi_env};
+ my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb');
+ if (ref($res) eq 'CODE') { # chain another command
+ $ipipe->close;
+ $res->($wcb);
+ $self->{passed} = 1;
+ return; # all done
+ }
+ confess("BUG: $res unhandled") if ref($res) ne 'ARRAY';
+
+ my $filter = blessed($res->[2]) && $res->[2]->can('attach') ?
+ pop(@$res) : delete($env->{'qspawn.filter'});
+ $filter //= PublicInbox::GzipFilter::qsp_maybe($res->[1], $env);
+
+ if (scalar(@$res) == 3) { # done early
+ $ipipe->close;
+ return $wcb->($res); # all done
+ }
+
+ scalar(@$res) == 2 or confess("BUG: scalar(res) != 2: @$res");
+ my $qfh = $wcb->($res); # get PublicInbox::HTTP::(Chunked|Identity)
+ $qfh = $filter->attach($qfh) if $filter;
+ $self->{qfh} = $qfh; # keep $ipipe open
+}
+
sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
my ($self) = @_;
my $r = rd_hdr($self) or return; # incomplete
@@ -257,6 +294,46 @@ sub psgi_return_start { # may run later, much later...
}
}
+sub _ipipe_cb { # InputPipe callback
+ my ($ipipe, $self) = @_; # $_[-1] rbuf
+ return yield_chunk($self, $ipipe, $_[-1]) if $self->{qfh}; # stream body
+
+ if (!defined($_[-1])) {
+ warn "error reading header: $!";
+ } elsif ($_[-1] eq '') {
+ warn <<EOM;
+EOF parsing headers from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
+EOM
+ } else { # attempt to parse headers
+ my ($bref, $ph_cb, @ph_arg) = @{$self->{yield_parse_hdr}};
+ $$bref .= $_[-1];
+ my $ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) };
+ if ($ret) { # success
+ my $qfh = yield_pass($self, $ipipe, $ret);
+ delete $self->{yield_parse_hdr};
+ $qfh->write($$bref) if $qfh;
+ return;
+ }
+ return unless $@; # incomplete, not an error
+ warn "parse_hdr ($ph_cb): $@";
+ }
+ delete $self->{yield_parse_hdr};
+ yield_pass($self, $ipipe, [ 500, [], [ "Internal error\n" ] ])
+}
+
+sub _yield_start { # may run later, much later...
+ my ($self) = @_;
+ my $async = !!$self->{psgi_env}->{'pi-httpd.async'};
+ my $rpipe = $self->{rpipe};
+ if ($async) {
+ require PublicInbox::ProcessIONBF;
+ PublicInbox::ProcessIONBF->replace($rpipe);
+ }
+ my $ipipe = PublicInbox::InputPipe::consume($rpipe, \&_ipipe_cb, $self);
+ return if $async;
+ $ipipe->event_step while $ipipe->{sock};
+}
+
# Used for streaming the stdout of one process as a PSGI response.
#
# $env is the PSGI env.
@@ -302,4 +379,22 @@ sub psgi_return {
}
}
+sub psgi_yield {
+ my ($self, $env, $limiter, @parse_hdr_arg)= @_;
+ $self->{psgi_env} = $env;
+ $self->{yield_parse_hdr} = [ \(my $buf = ''), @parse_hdr_arg ];
+ $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
+
+ # the caller already captured the PSGI write callback from
+ # the PSGI server, so we can call ->start, here:
+ $env->{'qspawn.wcb'} ? start($self, $limiter, \&_yield_start) : sub {
+ # the caller will return this sub to the PSGI server, so
+ # it can set the response callback (that is, for
+ # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback),
+ # but other HTTP servers are supported:
+ $env->{'qspawn.wcb'} = $_[0];
+ start($self, $limiter, \&_yield_start);
+ }
+}
+
1;
^ permalink raw reply related [flat|nested] 8+ messages in thread
end of thread, other threads:[~2023-10-19 1:15 UTC | newest]
Thread overview: 8+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-10-19 1:15 [PATCH 1/8] limiter: split out from qspawn Eric Wong
2023-10-19 1:15 ` [PATCH 2/8] spawn: support synchronous run_qx Eric Wong
2023-10-19 1:15 ` [PATCH 3/8] psgi_qx: use a temporary file rather than pipe Eric Wong
2023-10-19 1:15 ` [PATCH 4/8] www_coderepo: capture uses a flattened list Eric Wong
2023-10-19 1:15 ` [PATCH 5/8] qspawn: psgi_return allows list for callback args Eric Wong
2023-10-19 1:15 ` [PATCH 6/8] qspawn: drop unused err arg for ->event_step Eric Wong
2023-10-19 1:15 ` [PATCH 7/8] httpd/async: require IO arg Eric Wong
2023-10-19 1:15 ` [PATCH 8/8] qspawn: introduce psgi_yield API Eric Wong
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).