* [PATCH 11/18] repo_snapshot: psgi_yield
2023-10-19 12:40 [PATCH 01/18] limiter: split out from qspawn Eric Wong
@ 2023-10-19 12:40 ` Eric Wong
0 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2023-10-19 12:40 UTC (permalink / raw)
To: spew
---
lib/PublicInbox/RepoSnapshot.pm | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/PublicInbox/RepoSnapshot.pm b/lib/PublicInbox/RepoSnapshot.pm
index ebcbbd81..6b7441b0 100644
--- a/lib/PublicInbox/RepoSnapshot.pm
+++ b/lib/PublicInbox/RepoSnapshot.pm
@@ -58,7 +58,7 @@ sub ver_check { # git->check_async callback
"--git-dir=$ctx->{git}->{git_dir}", 'archive',
"--prefix=$ctx->{snap_pfx}/",
"--format=$ctx->{snap_fmt}", $treeish]);
- $qsp->psgi_return($ctx->{env}, undef, \&archive_hdr, $ctx);
+ $qsp->psgi_yield($ctx->{env}, undef, \&archive_hdr, $ctx);
}
}
^ permalink raw reply related [flat|nested] 19+ messages in thread
* [PATCH 01/18] limiter: split out from qspawn
@ 2023-10-23 8:48 Eric Wong
2023-10-23 8:48 ` [PATCH 02/18] spawn: support synchronous run_qx Eric Wong
` (16 more replies)
0 siblings, 17 replies; 19+ messages in thread
From: Eric Wong @ 2023-10-23 8:48 UTC (permalink / raw)
To: spew
It's slightly better organized this way, especially since
`publicinboxLimiter' has its own user-facing config section
and knobs. I may use it in LeiMirror and CodeSearchIdx for
process management.
---
MANIFEST | 1 +
lib/PublicInbox/Config.pm | 4 +--
lib/PublicInbox/GitHTTPBackend.pm | 3 +-
lib/PublicInbox/Inbox.pm | 4 +--
lib/PublicInbox/Limiter.pm | 47 +++++++++++++++++++++++++++++++
lib/PublicInbox/MailDiff.pm | 1 +
lib/PublicInbox/Qspawn.pm | 47 ++-----------------------------
t/qspawn.t | 3 +-
8 files changed, 60 insertions(+), 50 deletions(-)
create mode 100644 lib/PublicInbox/Limiter.pm
diff --git a/MANIFEST b/MANIFEST
index 791d91a7..dcce801c 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -287,6 +287,7 @@ lib/PublicInbox/LeiUp.pm
lib/PublicInbox/LeiViewText.pm
lib/PublicInbox/LeiWatch.pm
lib/PublicInbox/LeiXSearch.pm
+lib/PublicInbox/Limiter.pm
lib/PublicInbox/Linkify.pm
lib/PublicInbox/Listener.pm
lib/PublicInbox/Lock.pm
diff --git a/lib/PublicInbox/Config.pm b/lib/PublicInbox/Config.pm
index 15e0872e..d156b2d3 100644
--- a/lib/PublicInbox/Config.pm
+++ b/lib/PublicInbox/Config.pm
@@ -124,9 +124,9 @@ sub lookup_newsgroup {
sub limiter {
my ($self, $name) = @_;
$self->{-limiters}->{$name} //= do {
- require PublicInbox::Qspawn;
+ require PublicInbox::Limiter;
my $max = $self->{"publicinboxlimiter.$name.max"} || 1;
- my $limiter = PublicInbox::Qspawn::Limiter->new($max);
+ my $limiter = PublicInbox::Limiter->new($max);
$limiter->setup_rlimit($name, $self);
$limiter;
};
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index 74432429..d69f5f8b 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -9,13 +9,14 @@ use v5.10.1;
use Fcntl qw(:seek);
use IO::Handle; # ->flush
use HTTP::Date qw(time2str);
+use PublicInbox::Limiter;
use PublicInbox::Qspawn;
use PublicInbox::Tmpfile;
use PublicInbox::WwwStatic qw(r @NO_CACHE);
use Carp ();
# 32 is same as the git-daemon connection limit
-my $default_limiter = PublicInbox::Qspawn::Limiter->new(32);
+my $default_limiter = PublicInbox::Limiter->new(32);
# n.b. serving "description" and "cloneurl" should be innocuous enough to
# not cause problems. serving "config" might...
diff --git a/lib/PublicInbox/Inbox.pm b/lib/PublicInbox/Inbox.pm
index 9afbb478..3dad7004 100644
--- a/lib/PublicInbox/Inbox.pm
+++ b/lib/PublicInbox/Inbox.pm
@@ -55,8 +55,8 @@ sub _set_limiter ($$$) {
my $val = $self->{$mkey} or return;
my $lim;
if ($val =~ /\A[0-9]+\z/) {
- require PublicInbox::Qspawn;
- $lim = PublicInbox::Qspawn::Limiter->new($val);
+ require PublicInbox::Limiter;
+ $lim = PublicInbox::Limiter->new($val);
} elsif ($val =~ /\A[a-z][a-z0-9]*\z/) {
$lim = $pi_cfg->limiter($val);
warn "$mkey limiter=$val not found\n" if !$lim;
diff --git a/lib/PublicInbox/Limiter.pm b/lib/PublicInbox/Limiter.pm
new file mode 100644
index 00000000..48a2b6a3
--- /dev/null
+++ b/lib/PublicInbox/Limiter.pm
@@ -0,0 +1,47 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+package PublicInbox::Limiter;
+use v5.12;
+use PublicInbox::Spawn;
+
+sub new {
+ my ($class, $max) = @_;
+ bless {
+ # 32 is same as the git-daemon connection limit
+ max => $max || 32,
+ running => 0,
+ run_queue => [],
+ # RLIMIT_CPU => undef,
+ # RLIMIT_DATA => undef,
+ # RLIMIT_CORE => undef,
+ }, $class;
+}
+
+sub setup_rlimit {
+ my ($self, $name, $cfg) = @_;
+ for my $rlim (@PublicInbox::Spawn::RLIMITS) {
+ my $k = lc($rlim);
+ $k =~ tr/_//d;
+ $k = "publicinboxlimiter.$name.$k";
+ my $v = $cfg->{$k} // next;
+ my @rlimit = split(/\s*,\s*/, $v);
+ if (scalar(@rlimit) == 1) {
+ push @rlimit, $rlimit[0];
+ } elsif (scalar(@rlimit) != 2) {
+ warn "could not parse $k: $v\n";
+ }
+ eval { require BSD::Resource };
+ if ($@) {
+ warn "BSD::Resource missing for $rlim";
+ next;
+ }
+ for my $i (0..$#rlimit) {
+ next if $rlimit[$i] ne 'INFINITY';
+ $rlimit[$i] = BSD::Resource::RLIM_INFINITY();
+ }
+ $self->{$rlim} = \@rlimit;
+ }
+}
+
+1;
diff --git a/lib/PublicInbox/MailDiff.pm b/lib/PublicInbox/MailDiff.pm
index 994c7851..c3ce9365 100644
--- a/lib/PublicInbox/MailDiff.pm
+++ b/lib/PublicInbox/MailDiff.pm
@@ -8,6 +8,7 @@ use PublicInbox::MsgIter qw(msg_part_text);
use PublicInbox::ViewDiff qw(flush_diff);
use PublicInbox::GitAsyncCat;
use PublicInbox::ContentDigestDbg;
+use PublicInbox::Qspawn;
sub write_part { # Eml->each_part callback
my ($ary, $self) = @_;
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 0e52617c..a4d78e49 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -29,6 +29,7 @@ use v5.12;
use PublicInbox::Spawn qw(popen_rd);
use PublicInbox::GzipFilter;
use Scalar::Util qw(blessed);
+use PublicInbox::Limiter;
# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
use Errno qw(EAGAIN EINTR);
@@ -183,7 +184,7 @@ sub psgi_qx {
$self->{qx_arg} = $qx_arg;
$self->{qx_fh} = $qx_fh;
$self->{qx_buf} = \$qx_buf;
- $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+ $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
start($self, $limiter, \&psgi_qx_start);
}
@@ -317,7 +318,7 @@ sub psgi_return {
$self->{psgi_env} = $env;
$self->{hdr_buf} = \(my $hdr_buf = '');
$self->{parse_hdr} = [ $parse_hdr, $hdr_arg ];
- $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+ $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
# the caller already captured the PSGI write callback from
# the PSGI server, so we can call ->start, here:
@@ -334,46 +335,4 @@ sub psgi_return {
}
}
-package PublicInbox::Qspawn::Limiter;
-use v5.12;
-
-sub new {
- my ($class, $max) = @_;
- bless {
- # 32 is same as the git-daemon connection limit
- max => $max || 32,
- running => 0,
- run_queue => [],
- # RLIMIT_CPU => undef,
- # RLIMIT_DATA => undef,
- # RLIMIT_CORE => undef,
- }, $class;
-}
-
-sub setup_rlimit {
- my ($self, $name, $cfg) = @_;
- foreach my $rlim (@PublicInbox::Spawn::RLIMITS) {
- my $k = lc($rlim);
- $k =~ tr/_//d;
- $k = "publicinboxlimiter.$name.$k";
- defined(my $v = $cfg->{$k}) or next;
- my @rlimit = split(/\s*,\s*/, $v);
- if (scalar(@rlimit) == 1) {
- push @rlimit, $rlimit[0];
- } elsif (scalar(@rlimit) != 2) {
- warn "could not parse $k: $v\n";
- }
- eval { require BSD::Resource };
- if ($@) {
- warn "BSD::Resource missing for $rlim";
- next;
- }
- foreach my $i (0..$#rlimit) {
- next if $rlimit[$i] ne 'INFINITY';
- $rlimit[$i] = BSD::Resource::RLIM_INFINITY();
- }
- $self->{$rlim} = \@rlimit;
- }
-}
-
1;
diff --git a/t/qspawn.t b/t/qspawn.t
index 224e20db..507f86a5 100644
--- a/t/qspawn.t
+++ b/t/qspawn.t
@@ -3,6 +3,7 @@
use v5.12;
use Test::More;
use_ok 'PublicInbox::Qspawn';
+use_ok 'PublicInbox::Limiter';
{
my $cmd = [qw(sh -c), 'echo >&2 err; echo out'];
@@ -23,7 +24,7 @@ sub finish_err ($) {
$qsp->{qsp_err} && ${$qsp->{qsp_err}};
}
-my $limiter = PublicInbox::Qspawn::Limiter->new(1);
+my $limiter = PublicInbox::Limiter->new(1);
{
my $x = PublicInbox::Qspawn->new([qw(true)]);
$x->{qsp_err} = \(my $err = '');
^ permalink raw reply related [flat|nested] 19+ messages in thread
* [PATCH 02/18] spawn: support synchronous run_qx
2023-10-23 8:48 [PATCH 01/18] limiter: split out from qspawn Eric Wong
@ 2023-10-23 8:48 ` Eric Wong
2023-10-23 8:48 ` [PATCH 03/18] psgi_qx: use a temporary file rather than pipe Eric Wong
` (15 subsequent siblings)
16 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2023-10-23 8:48 UTC (permalink / raw)
To: spew
This is similar to `backtick` but supports all our existing spawn
functionality (chdir, env, rlimit, redirects, etc.). It also
supports SCALAR ref redirects like run_script in our test suite
for std{in,out,err}.
We can probably use :utf8 by default for these redirects, even.
---
lib/PublicInbox/Git.pm | 6 ++++
lib/PublicInbox/SearchIdx.pm | 19 ++++------
lib/PublicInbox/Spawn.pm | 69 ++++++++++++++++++++++++++----------
t/spawn.t | 13 ++++++-
4 files changed, 76 insertions(+), 31 deletions(-)
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index a460d155..476dcf30 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -69,6 +69,7 @@ sub check_git_exe () {
$GIT_VER = eval("v$1") // die "BUG: bad vstring: $1 ($v)";
$EXE_ST = $st;
}
+ $GIT_EXE;
}
sub git_version {
@@ -422,6 +423,11 @@ sub async_err ($$$$$) {
$async_warn ? carp($msg) : $self->fail($msg);
}
+sub cmd {
+ my $self = shift;
+ [ $GIT_EXE // check_git_exe(), "--git-dir=$self->{git_dir}", @_ ]
+}
+
# $git->popen(qw(show f00)); # or
# $git->popen(qw(show f00), { GIT_CONFIG => ... }, { 2 => ... });
sub popen {
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 8a571cfb..3c64c715 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -22,7 +22,7 @@ use POSIX qw(strftime);
use Fcntl qw(SEEK_SET);
use Time::Local qw(timegm);
use PublicInbox::OverIdx;
-use PublicInbox::Spawn qw(run_wait);
+use PublicInbox::Spawn qw(run_wait run_qx);
use PublicInbox::Git qw(git_unquote);
use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp);
use PublicInbox::Address;
@@ -351,23 +351,18 @@ sub index_diff ($$$) {
}
sub patch_id {
- my ($self) = @_; # $_[1] is the diff (may be huge)
- open(my $fh, '+>:utf8', undef) or die "open: $!";
- open(my $eh, '+>', undef) or die "open: $!";
- $fh->autoflush(1);
- print $fh $_[1] or die "print: $!";
- sysseek($fh, 0, SEEK_SET) or die "sysseek: $!";
- my $id = ($self->{ibx} // $self->{eidx} // $self)->git->qx(
- [qw(patch-id --stable)], {}, { 0 => $fh, 2 => $eh });
- seek($eh, 0, SEEK_SET) or die "seek: $!";
- while (<$eh>) { warn $_ }
+ my ($self, $sref) = @_;
+ my $git = ($self->{ibx} // $self->{eidx} // $self)->git;
+ my $opt = { 0 => $sref, 2 => \(my $err) };
+ my $id = run_qx($git->cmd(qw(patch-id --stable)), undef, $opt);
+ warn $err if $err;
$id =~ /\A([a-f0-9]{40,})/ ? $1 : undef;
}
sub index_body_text {
my ($self, $doc, $sref) = @_;
if ($$sref =~ /^(?:diff|---|\+\+\+) /ms) {
- my $id = patch_id($self, $$sref);
+ my $id = patch_id($self, $sref);
$doc->add_term('XDFID'.$id) if defined($id);
}
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 106f5e01..1fa7a41f 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -22,8 +22,9 @@ use Fcntl qw(SEEK_SET);
use IO::Handle ();
use Carp qw(croak);
use PublicInbox::ProcessIO;
-our @EXPORT_OK = qw(which spawn popen_rd popen_wr run_die run_wait);
+our @EXPORT_OK = qw(which spawn popen_rd popen_wr run_die run_wait run_qx);
our @RLIMITS = qw(RLIMIT_CPU RLIMIT_CORE RLIMIT_DATA);
+use autodie qw(open pipe read seek sysseek truncate);
BEGIN {
my $all_libc = <<'ALL_LIBC'; # all *nix systems we support
@@ -290,7 +291,6 @@ ALL_LIBC
undef $all_libc unless -d $inline_dir;
if (defined $all_libc) {
local $ENV{PERL_INLINE_DIRECTORY} = $inline_dir;
- use autodie;
# CentOS 7.x ships Inline 0.53, 0.64+ has built-in locking
my $lk = PublicInbox::Lock->new($inline_dir.
'/.public-inbox.lock');
@@ -301,7 +301,7 @@ ALL_LIBC
open STDERR, '>&', $fh;
STDERR->autoflush(1);
STDOUT->autoflush(1);
- CORE::eval 'use Inline C => $all_libc, BUILD_NOISY => 1';
+ eval 'use Inline C => $all_libc, BUILD_NOISY => 1';
my $err = $@;
open(STDERR, '>&', $olderr);
open(STDOUT, '>&', $oldout);
@@ -332,26 +332,34 @@ sub which ($) {
}
sub spawn ($;$$) {
- my ($cmd, $env, $opts) = @_;
+ my ($cmd, $env, $opt) = @_;
my $f = which($cmd->[0]) // die "$cmd->[0]: command not found\n";
- my @env;
+ my (@env, @rdr);
my %env = (%ENV, $env ? %$env : ());
while (my ($k, $v) = each %env) {
push @env, "$k=$v" if defined($v);
}
- my $redir = [];
for my $child_fd (0..2) {
- my $parent_fd = $opts->{$child_fd};
- if (defined($parent_fd) && $parent_fd !~ /\A[0-9]+\z/) {
- my $fd = fileno($parent_fd) //
- die "$parent_fd not an IO GLOB? $!";
- $parent_fd = $fd;
+ my $pfd = $opt->{$child_fd};
+ if ('SCALAR' eq ref($pfd)) {
+ open my $fh, '+>:utf8', undef;
+ $opt->{"fh.$child_fd"} = $fh;
+ if ($child_fd == 0) {
+ print $fh $$pfd;
+ $fh->flush or die "flush: $!";
+ sysseek($fh, 0, SEEK_SET);
+ }
+ $pfd = fileno($fh);
+ } elsif (defined($pfd) && $pfd !~ /\A[0-9]+\z/) {
+ my $fd = fileno($pfd) //
+ die "$pfd not an IO GLOB? $!";
+ $pfd = $fd;
}
- $redir->[$child_fd] = $parent_fd // $child_fd;
+ $rdr[$child_fd] = $pfd // $child_fd;
}
my $rlim = [];
foreach my $l (@RLIMITS) {
- my $v = $opts->{$l} // next;
+ my $v = $opt->{$l} // next;
my $r = eval "require BSD::Resource; BSD::Resource::$l();";
unless (defined $r) {
warn "$l undefined by BSD::Resource: $@\n";
@@ -359,31 +367,41 @@ sub spawn ($;$$) {
}
push @$rlim, $r, @$v;
}
- my $cd = $opts->{'-C'} // ''; # undef => NULL mapping doesn't work?
- my $pgid = $opts->{pgid} // -1;
- my $pid = pi_fork_exec($redir, $f, $cmd, \@env, $rlim, $cd, $pgid);
+ my $cd = $opt->{'-C'} // ''; # undef => NULL mapping doesn't work?
+ my $pgid = $opt->{pgid} // -1;
+ my $pid = pi_fork_exec(\@rdr, $f, $cmd, \@env, $rlim, $cd, $pgid);
die "fork_exec @$cmd failed: $!\n" unless $pid > 0;
$pid;
}
sub popen_rd {
my ($cmd, $env, $opt, @cb_arg) = @_;
- pipe(my $r, local $opt->{1}) or die "pipe: $!\n";
+ pipe(my $r, local $opt->{1});
my $pid = spawn($cmd, $env, $opt);
PublicInbox::ProcessIO->maybe_new($pid, $r, @cb_arg);
}
sub popen_wr {
my ($cmd, $env, $opt, @cb_arg) = @_;
- pipe(local $opt->{0}, my $w) or die "pipe: $!\n";
+ pipe(local $opt->{0}, my $w);
$w->autoflush(1);
my $pid = spawn($cmd, $env, $opt);
PublicInbox::ProcessIO->maybe_new($pid, $w, @cb_arg)
}
+sub read_out_err ($) {
+ my ($opt) = @_;
+ for my $fd (1, 2) { # read stdout/stderr
+ my $fh = delete($opt->{"fh.$fd"}) // next;
+ seek($fh, 0, SEEK_SET);
+ read($fh, ${$opt->{$fd}}, -s $fh, length(${$opt->{$fd}} // ''));
+ }
+}
+
sub run_wait ($;$$) {
my ($cmd, $env, $opt) = @_;
waitpid(spawn($cmd, $env, $opt), 0);
+ read_out_err($opt);
$?
}
@@ -392,4 +410,19 @@ sub run_die ($;$$) {
run_wait($cmd, $env, $rdr) and croak "E: @$cmd failed: \$?=$?";
}
+sub run_qx {
+ my ($cmd, $env, $opt) = @_;
+ my $fh = popen_rd($cmd, $env, $opt);
+ my @ret;
+ if (wantarray) {
+ @ret = <$fh>;
+ } else {
+ local $/;
+ $ret[0] = <$fh>;
+ }
+ close $fh; # caller should check $?
+ read_out_err($opt);
+ wantarray ? @ret : $ret[0];
+}
+
1;
diff --git a/t/spawn.t b/t/spawn.t
index 1af66bda..4b3baae4 100644
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -3,7 +3,7 @@
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
use v5.12;
use Test::More;
-use PublicInbox::Spawn qw(which spawn popen_rd);
+use PublicInbox::Spawn qw(which spawn popen_rd run_qx);
require PublicInbox::Sigfd;
require PublicInbox::DS;
@@ -19,6 +19,17 @@ require PublicInbox::DS;
is($?, 0, 'true exited successfully');
}
+{
+ my $opt = { 0 => \'in', 2 => \(my $e) };
+ my $out = run_qx(['sh', '-c', 'echo e >&2; cat'], undef, $opt);
+ is($e, "e\n", 'captured stderr');
+ is($out, 'in', 'stdin read and stdout captured');
+ $opt->{0} = \"IN\n3\nLINES";
+ my @out = run_qx(['sh', '-c', 'echo E >&2; cat'], undef, $opt);
+ is($e, "e\nE\n", 'captured stderr appended to string');
+ is_deeply(\@out, [ "IN\n", "3\n", 'LINES' ], 'stdout array');
+}
+
SKIP: {
my $pid = spawn(['true'], undef, { pgid => 0 });
ok($pid, 'spawned process with new pgid');
^ permalink raw reply related [flat|nested] 19+ messages in thread
* [PATCH 03/18] psgi_qx: use a temporary file rather than pipe
2023-10-23 8:48 [PATCH 01/18] limiter: split out from qspawn Eric Wong
2023-10-23 8:48 ` [PATCH 02/18] spawn: support synchronous run_qx Eric Wong
@ 2023-10-23 8:48 ` Eric Wong
2023-10-23 8:48 ` [PATCH 04/18] www_coderepo: capture uses a flattened list Eric Wong
` (14 subsequent siblings)
16 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2023-10-23 8:48 UTC (permalink / raw)
To: spew
A pipe requires more context switches and code to deal with
unpredictable pipe EOF vs waitpid ordering. So just use the
new spawn/aspawn features to automatically handle slurping
output into a string.
---
MANIFEST | 1 +
lib/PublicInbox/Aspawn.pm | 30 ++++++++++
lib/PublicInbox/CodeSearchIdx.pm | 1 +
lib/PublicInbox/Qspawn.pm | 95 +++++++++++---------------------
4 files changed, 65 insertions(+), 62 deletions(-)
create mode 100644 lib/PublicInbox/Aspawn.pm
diff --git a/MANIFEST b/MANIFEST
index dcce801c..f087621c 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -161,6 +161,7 @@ lib/PublicInbox/AddressPP.pm
lib/PublicInbox/Admin.pm
lib/PublicInbox/AdminEdit.pm
lib/PublicInbox/AltId.pm
+lib/PublicInbox/Aspawn.pm
lib/PublicInbox/AutoReap.pm
lib/PublicInbox/Cgit.pm
lib/PublicInbox/CidxComm.pm
diff --git a/lib/PublicInbox/Aspawn.pm b/lib/PublicInbox/Aspawn.pm
new file mode 100644
index 00000000..2423ac31
--- /dev/null
+++ b/lib/PublicInbox/Aspawn.pm
@@ -0,0 +1,30 @@
+package PublicInbox::Aspawn;
+use v5.12;
+use parent qw(Exporter);
+use PublicInbox::DS qw(awaitpid);
+use PublicInbox::Spawn qw(spawn);
+our @EXPORT_OK = qw(run_await);
+
+sub _await_cb { # awaitpid cb
+ my ($pid, $cmd, $env, $opt, $cb, @args) = @_;
+ PublicInbox::Spawn::read_out_err($opt);
+ if ($? && !$opt->{quiet}) {
+ my ($status, $sig) = ($? >> 8, $? & 127);
+ my $msg = '';
+ $msg .= " (-C=$opt->{-C})" if defined $opt->{-C};
+ $msg .= " status=$status" if $status;
+ $msg .= " signal=$sig" if $sig;
+ warn "E: @$cmd", $msg, "\n";
+ }
+ $cb->($pid, $cmd, $env, $opt, @args) if $cb;
+}
+
+sub run_await {
+ my ($cmd, $env, $opt, $cb, @args) = @_;
+ $opt->{1} //= \(my $out);
+ my $pid = spawn($cmd, $env, $opt);
+ awaitpid($pid, \&_await_cb, $cmd, $env, $opt, $cb, @args);
+ awaitpid($pid); # synchronous for non-$in_loop
+}
+
+1;
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 36d00aea..b9b72ff4 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -55,6 +55,7 @@ use PublicInbox::CidxLogP;
use PublicInbox::CidxComm;
use PublicInbox::Git qw(%OFMT2HEXLEN);
use PublicInbox::Compat qw(uniqstr);
+use PublicInbox::Aspawn qw(run_await);
use Carp ();
use autodie qw(pipe open seek sysseek send);
our (
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index a4d78e49..59d5ed40 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -30,6 +30,7 @@ use PublicInbox::Spawn qw(popen_rd);
use PublicInbox::GzipFilter;
use Scalar::Util qw(blessed);
use PublicInbox::Limiter;
+use PublicInbox::Aspawn qw(run_await);
# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
use Errno qw(EAGAIN EINTR);
@@ -48,29 +49,30 @@ sub new {
sub _do_spawn {
my ($self, $start_cb, $limiter) = @_;
- my $err;
my ($cmd, $cmd_env, $opt) = @{delete $self->{args}};
my %o = %{$opt || {}};
$self->{limiter} = $limiter;
- foreach my $k (@PublicInbox::Spawn::RLIMITS) {
- if (defined(my $rlimit = $limiter->{$k})) {
- $o{$k} = $rlimit;
- }
+ for my $k (@PublicInbox::Spawn::RLIMITS) {
+ $o{$k} = $limiter->{$k} // next;
}
$self->{cmd} = $cmd;
$self->{-quiet} = 1 if $o{quiet};
- eval {
- # popen_rd may die on EMFILE, ENFILE
- $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
- \&waitpid_err, $self);
- $limiter->{running}++;
- $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
- };
+ $limiter->{running}++;
+ if ($start_cb) {
+ eval { # popen_rd may die on EMFILE, ENFILE
+ $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
+ \&waitpid_err, $self);
+ $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
+ };
+ } else {
+ eval { run_await($cmd, $cmd_env, \%o, \&wait_await, $self) };
+ warn "E: $@" if $@;
+ }
finish($self, $@) if $@;
}
-sub finalize ($) {
- my ($self) = @_;
+sub finalize ($;$) {
+ my ($self, $opt) = @_;
# process is done, spawn whatever's in the queue
my $limiter = delete $self->{limiter} or return;
@@ -89,10 +91,10 @@ sub finalize ($) {
warn "@{$self->{cmd}}: $err\n" if !$self->{-quiet};
}
- my ($env, $qx_cb, $qx_arg, $qx_buf) =
- delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
- if ($qx_cb) {
- eval { $qx_cb->($qx_buf, $qx_arg) };
+ my ($env, $qx_cb_arg) = delete @$self{qw(psgi_env qx_cb_arg)};
+ if ($qx_cb_arg) {
+ my $cb = shift @$qx_cb_arg;
+ eval { $cb->($opt->{1}, @$qx_cb_arg) };
return unless $@;
warn "E: $@"; # hope qspawn.wcb can handle it
}
@@ -108,15 +110,20 @@ sub finalize ($) {
sub DESTROY { finalize($_[0]) } # ->finalize is idempotent
sub waitpid_err { # callback for awaitpid
- my (undef, $self) = @_; # $_[0]: pid
+ my (undef, $self, $opt) = @_; # $_[0]: pid
$self->{_err} = ''; # for defined check in ->finish
- if ($?) {
+ if ($?) { # FIXME: redundant
my $status = $? >> 8;
my $sig = $? & 127;
$self->{_err} .= "exit status=$status";
$self->{_err} .= " signal=$sig" if $sig;
}
- finalize($self) if !$self->{rpipe};
+ finalize($self, $opt) if !$self->{rpipe};
+}
+
+sub wait_await { # run_await cb
+ my ($pid, $cmd, $cmd_env, $opt, $self) = @_;
+ waitpid_err($pid, $self, $opt);
}
sub finish ($;$) {
@@ -140,52 +147,16 @@ sub start ($$$) {
}
}
-sub psgi_qx_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
- my ($self) = @_;
- my ($r, $buf);
-reread:
- $r = sysread($self->{rpipe}, $buf, 65536);
- if (!defined($r)) {
- return if $! == EAGAIN; # try again when notified
- goto reread if $! == EINTR;
- event_step($self, $!);
- } elsif (my $as = delete $self->{async}) { # PublicInbox::HTTPD::Async
- $as->async_pass($self->{psgi_env}->{'psgix.io'},
- $self->{qx_fh}, \$buf);
- } elsif ($r) { # generic PSGI:
- print { $self->{qx_fh} } $buf;
- } else { # EOF
- event_step($self, undef);
- }
-}
-
-sub psgi_qx_start {
- my ($self) = @_;
- if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) {
- # PublicInbox::HTTPD::Async->new(rpipe, $cb, cb_arg, $end_obj)
- $self->{async} = $async->($self->{rpipe},
- \&psgi_qx_init_cb, $self, $self);
- # init_cb will call ->async_pass or ->close
- } else { # generic PSGI
- psgi_qx_init_cb($self) while $self->{qx_fh};
- }
-}
-
-# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with
+# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls @qx_cb_arg with
# the stdout of the given command when done; but respects the given limiter
# $env is the PSGI env. As with ``/qx; only use this when output is small
# and safe to slurp.
sub psgi_qx {
- my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_;
+ my ($self, $env, $limiter, @qx_cb_arg) = @_;
$self->{psgi_env} = $env;
- my $qx_buf = '';
- open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar
- $self->{qx_cb} = $qx_cb;
- $self->{qx_arg} = $qx_arg;
- $self->{qx_fh} = $qx_fh;
- $self->{qx_buf} = \$qx_buf;
+ $self->{qx_cb_arg} = \@qx_cb_arg;
$limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
- start($self, $limiter, \&psgi_qx_start);
+ start($self, $limiter, undef);
}
# this is called on pipe EOF to reap the process, may be called
@@ -195,7 +166,7 @@ sub event_step {
my ($self, $err) = @_; # $err: $!
warn "psgi_{return,qx} $err" if defined($err);
finish($self);
- my ($fh, $qx_fh) = delete(@$self{qw(qfh qx_fh)});
+ my $fh = delete $self->{qfh};
$fh->close if $fh; # async-only (psgi_return)
}
^ permalink raw reply related [flat|nested] 19+ messages in thread
* [PATCH 04/18] www_coderepo: capture uses a flattened list
2023-10-23 8:48 [PATCH 01/18] limiter: split out from qspawn Eric Wong
2023-10-23 8:48 ` [PATCH 02/18] spawn: support synchronous run_qx Eric Wong
2023-10-23 8:48 ` [PATCH 03/18] psgi_qx: use a temporary file rather than pipe Eric Wong
@ 2023-10-23 8:48 ` Eric Wong
2023-10-23 8:48 ` [PATCH 05/18] qspawn: psgi_return allows list for callback args Eric Wong
` (13 subsequent siblings)
16 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2023-10-23 8:48 UTC (permalink / raw)
To: spew
We no longer need a multi-dimensional list to pass multiple
arguments to the psgi_qx callback. This simplifies usage
and reduces allocations.
---
lib/PublicInbox/WwwCoderepo.pm | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)
diff --git a/lib/PublicInbox/WwwCoderepo.pm b/lib/PublicInbox/WwwCoderepo.pm
index e8c340b5..68c4c86d 100644
--- a/lib/PublicInbox/WwwCoderepo.pm
+++ b/lib/PublicInbox/WwwCoderepo.pm
@@ -178,9 +178,8 @@ EOM
}
sub capture { # psgi_qx callback to capture git-for-each-ref
- my ($bref, $arg) = @_; # arg = [ctx, key, OnDestroy(summary_END)]
- utf8_maybe($$bref);
- $arg->[0]->{qx_res}->{$arg->[1]} = $$bref;
+ my ($bref, $ctx, $key) = @_; # $_[3] = OnDestroy(summary_END)
+ $ctx->{qx_res}->{$key} = $$bref;
# summary_END may be called via OnDestroy $arg->[2]
}
@@ -220,8 +219,7 @@ sub summary ($$) {
my ($k, $cmd) = @$_;
my $qsp = PublicInbox::Qspawn->new($cmd, \%env, \%opt);
$qsp->{qsp_err} = $qsp_err;
- $qsp->psgi_qx($ctx->{env}, undef, \&capture,
- [$ctx, $k, $END]);
+ $qsp->psgi_qx($ctx->{env}, undef, \&capture, $ctx, $k, $END);
}
$tip //= 'HEAD';
my @try = ("$tip:README", "$tip:README.md"); # TODO: configurable
^ permalink raw reply related [flat|nested] 19+ messages in thread
* [PATCH 05/18] qspawn: psgi_return allows list for callback args
2023-10-23 8:48 [PATCH 01/18] limiter: split out from qspawn Eric Wong
` (2 preceding siblings ...)
2023-10-23 8:48 ` [PATCH 04/18] www_coderepo: capture uses a flattened list Eric Wong
@ 2023-10-23 8:48 ` Eric Wong
2023-10-23 8:48 ` [PATCH 06/18] qspawn: drop unused err arg for ->event_step Eric Wong
` (12 subsequent siblings)
16 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2023-10-23 8:48 UTC (permalink / raw)
To: spew
This slightly simplifies our GitHTTPBackend wrapper.
We can also use shorter variable names to avoid wrapping some
lines.
---
lib/PublicInbox/GitHTTPBackend.pm | 6 +++---
lib/PublicInbox/Qspawn.pm | 19 ++++++++-----------
2 files changed, 11 insertions(+), 14 deletions(-)
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index d69f5f8b..edbc0157 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -80,9 +80,9 @@ sub serve_dumb {
}
sub git_parse_hdr { # {parse_hdr} for Qspawn
- my ($r, $bref, $dumb_args) = @_;
+ my ($r, $bref, @dumb_args) = @_;
my $res = parse_cgi_headers($r, $bref) or return; # incomplete
- $res->[0] == 403 ? serve_dumb(@$dumb_args) : $res;
+ $res->[0] == 403 ? serve_dumb(@dumb_args) : $res;
}
# returns undef if 403 so it falls back to dumb HTTP
@@ -106,7 +106,7 @@ sub serve_smart {
$env{PATH_TRANSLATED} = "$git->{git_dir}/$path";
my $rdr = input_prepare($env) or return r(500);
my $qsp = PublicInbox::Qspawn->new([qw(git http-backend)], \%env, $rdr);
- $qsp->psgi_return($env, $limiter, \&git_parse_hdr, [$env, $git, $path]);
+ $qsp->psgi_return($env, $limiter, \&git_parse_hdr, $env, $git, $path);
}
sub input_prepare {
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 59d5ed40..0f900691 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -174,16 +174,13 @@ sub rd_hdr ($) {
my ($self) = @_;
# typically used for reading CGI headers
# We also need to check EINTR for generic PSGI servers.
- my $ret;
- my $total_rd = 0;
- my $hdr_buf = $self->{hdr_buf};
- my ($ph_cb, $ph_arg) = @{$self->{parse_hdr}};
+ my ($ret, $total_rd);
+ my ($bref, $ph_cb, @ph_arg) = ($self->{hdr_buf}, @{$self->{parse_hdr}});
until (defined($ret)) {
- my $r = sysread($self->{rpipe}, $$hdr_buf, 4096,
- length($$hdr_buf));
+ my $r = sysread($self->{rpipe}, $$bref, 4096, length($$bref));
if (defined($r)) {
$total_rd += $r;
- eval { $ret = $ph_cb->($total_rd, $hdr_buf, $ph_arg) };
+ eval { $ret = $ph_cb->($total_rd, $bref, @ph_arg) };
if ($@) {
warn "parse_hdr: $@";
$ret = [ 500, [], [ "Internal error\n" ] ];
@@ -207,7 +204,7 @@ EOM
sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
my ($self) = @_;
- my $r = rd_hdr($self) or return;
+ my $r = rd_hdr($self) or return; # incomplete
my $env = $self->{psgi_env};
my $filter;
@@ -277,7 +274,7 @@ sub psgi_return_start { # may run later, much later...
#
# $limiter - the Limiter object to use (uses the def_limiter if not given)
#
-# $parse_hdr - Initial read function; often for parsing CGI header output.
+# @parse_hdr_arg - Initial read cb+args; often for parsing CGI header output.
# It will be given the return value of sysread from the pipe
# and a string ref of the current buffer. Returns an arrayref
# for PSGI responses. 2-element arrays in PSGI mean the
@@ -285,10 +282,10 @@ sub psgi_return_start { # may run later, much later...
# psgix.io. 3-element arrays means the body is available
# immediately (or streamed via ->getline (pull-based)).
sub psgi_return {
- my ($self, $env, $limiter, $parse_hdr, $hdr_arg) = @_;
+ my ($self, $env, $limiter, @parse_hdr_arg)= @_;
$self->{psgi_env} = $env;
$self->{hdr_buf} = \(my $hdr_buf = '');
- $self->{parse_hdr} = [ $parse_hdr, $hdr_arg ];
+ $self->{parse_hdr} = \@parse_hdr_arg;
$limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
# the caller already captured the PSGI write callback from
^ permalink raw reply related [flat|nested] 19+ messages in thread
* [PATCH 06/18] qspawn: drop unused err arg for ->event_step
2023-10-23 8:48 [PATCH 01/18] limiter: split out from qspawn Eric Wong
` (3 preceding siblings ...)
2023-10-23 8:48 ` [PATCH 05/18] qspawn: psgi_return allows list for callback args Eric Wong
@ 2023-10-23 8:48 ` Eric Wong
2023-10-23 8:48 ` [PATCH 07/18] httpd/async: require IO arg Eric Wong
` (11 subsequent siblings)
16 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2023-10-23 8:48 UTC (permalink / raw)
To: spew
It's no longer needed since psgi_qx doesn't use a pipe, anymore.
---
lib/PublicInbox/Qspawn.pm | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 0f900691..9a7e8734 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -163,8 +163,7 @@ sub psgi_qx {
# via PublicInbox::DS event loop OR via GetlineBody for generic
# PSGI servers.
sub event_step {
- my ($self, $err) = @_; # $err: $!
- warn "psgi_{return,qx} $err" if defined($err);
+ my ($self) = @_;
finish($self);
my $fh = delete $self->{qfh};
$fh->close if $fh; # async-only (psgi_return)
^ permalink raw reply related [flat|nested] 19+ messages in thread
* [PATCH 07/18] httpd/async: require IO arg
2023-10-23 8:48 [PATCH 01/18] limiter: split out from qspawn Eric Wong
` (4 preceding siblings ...)
2023-10-23 8:48 ` [PATCH 06/18] qspawn: drop unused err arg for ->event_step Eric Wong
@ 2023-10-23 8:48 ` Eric Wong
2023-10-23 8:48 ` [PATCH 08/18] xt/check-run: call DS->Reset after all tests Eric Wong
` (10 subsequent siblings)
16 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2023-10-23 8:48 UTC (permalink / raw)
To: spew
Callers that want to requeue can call PublicInbox::DS::requeue
directly and not go through the convoluted argument handling
via PublicInbox::HTTPD::Async->new.
---
lib/PublicInbox/HTTPD/Async.pm | 8 --------
lib/PublicInbox/MailDiff.pm | 7 +++----
lib/PublicInbox/SolverGit.pm | 10 +++-------
3 files changed, 6 insertions(+), 19 deletions(-)
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index b73d0c4b..2e4d8baa 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -25,14 +25,6 @@ use PublicInbox::ProcessIONBF;
# bidirectional socket in the future.
sub new {
my ($class, $io, $cb, $arg, $end_obj) = @_;
-
- # no $io? call $cb at the top of the next event loop to
- # avoid recursion:
- unless (defined($io)) {
- PublicInbox::DS::requeue($cb ? $cb : $arg);
- die '$end_obj unsupported w/o $io' if $end_obj;
- return;
- }
my $self = bless {
cb => $cb, # initial read callback
arg => $arg, # arg for $cb
diff --git a/lib/PublicInbox/MailDiff.pm b/lib/PublicInbox/MailDiff.pm
index c3ce9365..908f223c 100644
--- a/lib/PublicInbox/MailDiff.pm
+++ b/lib/PublicInbox/MailDiff.pm
@@ -59,8 +59,7 @@ sub next_smsg ($) {
$ctx->write($ctx->_html_end);
return $ctx->close;
}
- my $async = $self->{ctx}->{env}->{'pi-httpd.async'};
- $async->(undef, undef, $self) if $async # PublicInbox::HTTPD::Async->new
+ PublicInbox::DS::requeue($self) if $ctx->{env}->{'pi-httpd.async'};
}
sub emit_msg_diff {
@@ -125,8 +124,8 @@ sub event_step {
sub begin_mail_diff {
my ($self) = @_;
- if (my $async = $self->{ctx}->{env}->{'pi-httpd.async'}) {
- $async->(undef, undef, $self); # PublicInbox::HTTPD::Async->new
+ if ($self->{ctx}->{env}->{'pi-httpd.async'}) {
+ PublicInbox::DS::requeue($self);
} else {
event_step($self) while $self->{smsg};
}
diff --git a/lib/PublicInbox/SolverGit.pm b/lib/PublicInbox/SolverGit.pm
index 5f317f51..23d4d3d1 100644
--- a/lib/PublicInbox/SolverGit.pm
+++ b/lib/PublicInbox/SolverGit.pm
@@ -386,12 +386,9 @@ sub event_step ($) {
}
sub next_step ($) {
- my ($self) = @_;
# if outside of public-inbox-httpd, caller is expected to be
# looping event_step, anyways
- my $async = $self->{psgi_env}->{'pi-httpd.async'} or return;
- # PublicInbox::HTTPD::Async->new
- $async->(undef, undef, $self);
+ PublicInbox::DS::requeue($_[0]) if $_[0]->{psgi_env}->{'pi-httpd.async'}
}
sub mark_found ($$$) {
@@ -690,9 +687,8 @@ sub solve ($$$$$) {
$self->{found} = {}; # { abbr => [ ::Git, oid, type, size, $di ] }
dbg($self, "solving $oid_want ...");
- if (my $async = $env->{'pi-httpd.async'}) {
- # PublicInbox::HTTPD::Async->new
- $async->(undef, undef, $self);
+ if ($env->{'pi-httpd.async'}) {
+ PublicInbox::DS::requeue($self);
} else {
event_step($self) while $self->{user_cb};
}
^ permalink raw reply related [flat|nested] 19+ messages in thread
* [PATCH 08/18] xt/check-run: call DS->Reset after all tests
2023-10-23 8:48 [PATCH 01/18] limiter: split out from qspawn Eric Wong
` (5 preceding siblings ...)
2023-10-23 8:48 ` [PATCH 07/18] httpd/async: require IO arg Eric Wong
@ 2023-10-23 8:48 ` Eric Wong
2023-10-23 8:48 ` [PATCH 09/18] qspawn: introduce new psgi_yield API Eric Wong
` (9 subsequent siblings)
16 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2023-10-23 8:48 UTC (permalink / raw)
To: spew
This ensures reused processes get a clean start and
avoids surprises as we develop more code around the
DS event loop.
---
t/dir_idle.t | 1 -
t/fake_inotify.t | 2 --
xt/check-run.t | 2 ++
3 files changed, 2 insertions(+), 3 deletions(-)
diff --git a/t/dir_idle.t b/t/dir_idle.t
index 35c800f9..14aad7a1 100644
--- a/t/dir_idle.t
+++ b/t/dir_idle.t
@@ -41,5 +41,4 @@ is(scalar(@x), 1, 'got an event') and
ok($x[0]->[0]->IN_DELETE_SELF || $x[0]->[0]->IN_MOVE_SELF,
'IN_DELETE_SELF set on move');
-PublicInbox::DS->Reset;
done_testing;
diff --git a/t/fake_inotify.t b/t/fake_inotify.t
index 56f64588..8221e092 100644
--- a/t/fake_inotify.t
+++ b/t/fake_inotify.t
@@ -48,6 +48,4 @@ is_deeply([map{ $_->fullname }@events], ["$tmpdir/new/tst"], 'unlink detected')
diag explain(\@events);
ok($events[0]->IN_DELETE, 'IN_DELETE set on unlink');
-PublicInbox::DS->Reset;
-
done_testing;
diff --git a/xt/check-run.t b/xt/check-run.t
index 6eefcb7d..cda839fe 100755
--- a/xt/check-run.t
+++ b/xt/check-run.t
@@ -14,6 +14,7 @@ use v5.12;
use IO::Handle; # ->autoflush
use PublicInbox::TestCommon;
use PublicInbox::Spawn;
+use PublicInbox::DS; # already loaded by Spawn via ProcessIO
use Getopt::Long qw(:config gnu_getopt no_ignore_case auto_abbrev);
use Errno qw(EINTR);
use Fcntl qw(:seek);
@@ -187,6 +188,7 @@ my $start_worker = sub {
DIE "short read $r" if $r != UINT_SIZE;
my $t = unpack('I', $buf);
run_test($todo->[$t]);
+ PublicInbox::DS->Reset;
$tb->reset;
}
kill 'USR1', $producer if !$eof; # sets $eof in $producer
^ permalink raw reply related [flat|nested] 19+ messages in thread
* [PATCH 09/18] qspawn: introduce new psgi_yield API
2023-10-23 8:48 [PATCH 01/18] limiter: split out from qspawn Eric Wong
` (6 preceding siblings ...)
2023-10-23 8:48 ` [PATCH 08/18] xt/check-run: call DS->Reset after all tests Eric Wong
@ 2023-10-23 8:48 ` Eric Wong
2023-10-23 8:48 ` [PATCH 10/18] repo_atom: switch to psgi_yield Eric Wong
` (8 subsequent siblings)
16 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2023-10-23 8:48 UTC (permalink / raw)
To: spew
This is intended to replace psgi_return and HTTPD/Async
entirely, hopefully making our code less convoluted while
maintaining the ability to handle slow clients on
memory-constrained systems
This was made possible by the philosophy shift in commit 21a539a2df0c
(httpd/async: switch to buffering-as-fast-as-possible, 2019-06-28).
We'll still support generic PSGI via the `pull' model with a
GetlineResponse class which is similar to the old GetlineBody.
---
MANIFEST | 1 +
lib/PublicInbox/GetlineResponse.pm | 40 ++++++++++
lib/PublicInbox/GitHTTPBackend.pm | 4 +-
lib/PublicInbox/GzipFilter.pm | 3 +-
lib/PublicInbox/HTTP.pm | 8 +-
lib/PublicInbox/InputPipe.pm | 12 +--
lib/PublicInbox/LEI.pm | 2 +-
lib/PublicInbox/Qspawn.pm | 119 ++++++++++++++++++++++++++++-
8 files changed, 176 insertions(+), 13 deletions(-)
create mode 100644 lib/PublicInbox/GetlineResponse.pm
diff --git a/MANIFEST b/MANIFEST
index f087621c..420b40a1 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -204,6 +204,7 @@ lib/PublicInbox/Filter/Vger.pm
lib/PublicInbox/Gcf2.pm
lib/PublicInbox/Gcf2Client.pm
lib/PublicInbox/GetlineBody.pm
+lib/PublicInbox/GetlineResponse.pm
lib/PublicInbox/Git.pm
lib/PublicInbox/GitAsyncCat.pm
lib/PublicInbox/GitCredential.pm
diff --git a/lib/PublicInbox/GetlineResponse.pm b/lib/PublicInbox/GetlineResponse.pm
new file mode 100644
index 00000000..290cce74
--- /dev/null
+++ b/lib/PublicInbox/GetlineResponse.pm
@@ -0,0 +1,40 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# For generic PSGI servers (not public-inbox-httpd/netd) which assumes their
+# getline response bodies can be backpressure-aware for slow clients
+# This depends on rpipe being _blocking_ on getline.
+package PublicInbox::GetlineResponse;
+use v5.12;
+
+sub response {
+ my ($qsp) = @_;
+ my ($res, $rbuf);
+ do { # read header synchronously
+ sysread($qsp->{rpipe}, $rbuf, 65536);
+ $res = $qsp->parse_hdr_done($rbuf); # fills $bref
+ } until defined($res);
+ my ($wcb, $filter) = $qsp->yield_pass(undef, $res) or return;
+ my $self = $res->[2] = bless {
+ qsp => $qsp,
+ filter => $filter,
+ }, __PACKAGE__;
+ my ($bref) = @{delete $qsp->{yield_parse_hdr}};
+ $self->{rbuf} = $$bref if $$bref ne '';
+ $wcb->($res);
+}
+
+sub getline {
+ my ($self) = @_;
+ my $rpipe = $self->{qsp}->{rpipe} // do {
+ delete($self->{qsp})->finish;
+ return; # EOF was set on previous call
+ };
+ my $buf = delete($self->{rbuf}) // $rpipe->getline;
+ $buf // delete($self->{qsp}->{rpipe}); # set EOF for next call
+ $self->{filter} ? $self->{filter}->translate($buf) : $buf;
+}
+
+sub close {}
+
+1;
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index edbc0157..d7e0bced 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -79,7 +79,7 @@ sub serve_dumb {
PublicInbox::WwwStatic::response($env, $h, $path, $type);
}
-sub git_parse_hdr { # {parse_hdr} for Qspawn
+sub ghb_parse_hdr { # header parser for Qspawn
my ($r, $bref, @dumb_args) = @_;
my $res = parse_cgi_headers($r, $bref) or return; # incomplete
$res->[0] == 403 ? serve_dumb(@dumb_args) : $res;
@@ -106,7 +106,7 @@ sub serve_smart {
$env{PATH_TRANSLATED} = "$git->{git_dir}/$path";
my $rdr = input_prepare($env) or return r(500);
my $qsp = PublicInbox::Qspawn->new([qw(git http-backend)], \%env, $rdr);
- $qsp->psgi_return($env, $limiter, \&git_parse_hdr, $env, $git, $path);
+ $qsp->psgi_yield($env, $limiter, \&ghb_parse_hdr, $env, $git, $path);
}
sub input_prepare {
diff --git a/lib/PublicInbox/GzipFilter.pm b/lib/PublicInbox/GzipFilter.pm
index db8e8397..d6ecd5ba 100644
--- a/lib/PublicInbox/GzipFilter.pm
+++ b/lib/PublicInbox/GzipFilter.pm
@@ -123,9 +123,10 @@ sub http_out ($) {
};
}
+# returns undef if HTTP client disconnected, may return 0
+# because ->translate can return ''
sub write {
my $self = shift;
- # my $ret = bytes::length($_[1]); # XXX does anybody care?
http_out($self)->write($self->translate(@_));
}
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index ca162939..edc88fe8 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -455,11 +455,12 @@ sub next_step {
# They may be exposed to the PSGI application when the PSGI app
# returns a CODE ref for "push"-based responses
package PublicInbox::HTTP::Chunked;
-use strict;
+use v5.12;
sub write {
# ([$http], $buf) = @_;
- PublicInbox::HTTP::chunked_write($_[0]->[0], $_[1])
+ PublicInbox::HTTP::chunked_write($_[0]->[0], $_[1]);
+ $_[0]->[0]->{sock} ? length($_[1]) : undef;
}
sub close {
@@ -468,12 +469,13 @@ sub close {
}
package PublicInbox::HTTP::Identity;
-use strict;
+use v5.12;
our @ISA = qw(PublicInbox::HTTP::Chunked);
sub write {
# ([$http], $buf) = @_;
PublicInbox::HTTP::identity_write($_[0]->[0], $_[1]);
+ $_[0]->[0]->{sock} ? length($_[1]) : undef;
}
1;
diff --git a/lib/PublicInbox/InputPipe.pm b/lib/PublicInbox/InputPipe.pm
index b38d8270..f4d57e7d 100644
--- a/lib/PublicInbox/InputPipe.pm
+++ b/lib/PublicInbox/InputPipe.pm
@@ -39,14 +39,16 @@ sub consume {
if ($@) { # regular file (but not w/ select|IO::Poll backends)
$self->{-need_rq} = 1;
$self->requeue;
- } elsif (-p $in || -S _) { # O_NONBLOCK for sockets and pipes
+ } elsif (do { no warnings 'unopened'; !stat($in) }) { # ProcessIONBF
+ } elsif (-p _ || -S _) { # O_NONBLOCK for sockets and pipes
$in->blocking(0);
} elsif (-t $in) { # isatty(3) can't use `_' stat cache
unblock_tty($self);
}
+ $self;
}
-sub close {
+sub close { # idempotent
my ($self) = @_;
if (my $t = delete($self->{restore_termios})) {
my $fd = fileno($self->{sock} // return);
@@ -60,16 +62,16 @@ sub event_step {
my $r = sysread($self->{sock} // return, my $rbuf, 65536);
eval {
if ($r) {
- $self->{cb}->(@{$self->{args}}, $rbuf);
+ $self->{cb}->($self, @{$self->{args}}, $rbuf);
$self->requeue if $self->{-need_rq};
} elsif (defined($r)) { # EOF
- $self->{cb}->(@{$self->{args}}, '');
+ $self->{cb}->($self, @{$self->{args}}, '');
$self->close
} elsif ($!{EAGAIN}) { # rely on EPOLLIN
} elsif ($!{EINTR}) { # rely on EPOLLIN for sockets/pipes/tty
$self->requeue if $self->{-need_rq};
} else { # another error
- $self->{cb}->(@{$self->{args}}, undef);
+ $self->{cb}->($self, @{$self->{args}}, undef);
$self->close;
}
};
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 56e4c001..7bc7b2dc 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1567,7 +1567,7 @@ sub request_umask {
}
sub _stdin_cb { # PublicInbox::InputPipe::consume callback for --stdin
- my ($lei, $cb) = @_; # $_[-1] = $rbuf
+ my (undef, $lei, $cb) = @_; # $_[-1] = $rbuf
$_[1] // return $lei->fail("error reading stdin: $!");
$lei->{stdin_buf} .= $_[-1];
do_env($lei, $cb) if $_[-1] eq '';
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 9a7e8734..203d8f41 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -31,6 +31,9 @@ use PublicInbox::GzipFilter;
use Scalar::Util qw(blessed);
use PublicInbox::Limiter;
use PublicInbox::Aspawn qw(run_await);
+use PublicInbox::Syscall qw(EPOLLIN);
+use PublicInbox::InputPipe;
+use Carp qw(carp confess);
# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
use Errno qw(EAGAIN EINTR);
@@ -61,7 +64,7 @@ sub _do_spawn {
if ($start_cb) {
eval { # popen_rd may die on EMFILE, ENFILE
$self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
- \&waitpid_err, $self);
+ \&waitpid_err, $self, \%o);
$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
};
} else {
@@ -126,6 +129,20 @@ sub wait_await { # run_await cb
waitpid_err($pid, $self, $opt);
}
+sub yield_chunk { # $_[-1] is sysread buffer (or undef)
+ my ($self, $ipipe) = @_;
+ if (!defined($_[-1])) {
+ warn "error reading body: $!";
+ } elsif ($_[-1] eq '') { # normal EOF
+ $self->finish;
+ $self->{qfh}->close;
+ } elsif (defined($self->{qfh}->write($_[-1]))) {
+ return; # continue while HTTP client is reading our writes
+ } # else { # HTTP client disconnected
+ delete $self->{rpipe};
+ $ipipe->close;
+}
+
sub finish ($;$) {
my ($self, $err) = @_;
$self->{_err} //= $err; # only for $@
@@ -201,6 +218,39 @@ EOM
$ret;
}
+sub yield_pass {
+ my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe
+ my $env = $self->{psgi_env};
+ my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb');
+ if (ref($res) eq 'CODE') { # chain another command
+ delete $self->{rpipe};
+ $ipipe->close if $ipipe;
+ $res->($wcb);
+ $self->{passed} = 1;
+ return; # all done
+ }
+ confess("BUG: $res unhandled") if ref($res) ne 'ARRAY';
+
+ my $filter = blessed($res->[2]) && $res->[2]->can('attach') ?
+ pop(@$res) : delete($env->{'qspawn.filter'});
+ $filter //= PublicInbox::GzipFilter::qsp_maybe($res->[1], $env);
+
+ if (scalar(@$res) == 3) { # done early (likely error or static file)
+ delete $self->{rpipe};
+ $ipipe->close if $ipipe;
+ $wcb->($res); # all done
+ return;
+ }
+ scalar(@$res) == 2 or confess("BUG: scalar(res) != 2: @$res");
+ return ($wcb, $filter) if !$ipipe; # generic PSGI
+ # streaming response
+ my $qfh = $wcb->($res); # get PublicInbox::HTTP::(Chunked|Identity)
+ $qfh = $filter->attach($qfh) if $filter;
+ my ($bref) = @{delete $self->{yield_parse_hdr}};
+ $qfh->write($$bref) if $$bref ne '';
+ $self->{qfh} = $qfh; # keep $ipipe open
+}
+
sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
my ($self) = @_;
my $r = rd_hdr($self) or return; # incomplete
@@ -257,6 +307,55 @@ sub psgi_return_start { # may run later, much later...
}
}
+sub r500 () { [ 500, [], [ "Internal error\n" ] ] }
+
+sub parse_hdr_done ($$) {
+ my ($self) = @_;
+ my $ret;
+ if (defined $_[-1]) {
+ my ($bref, $ph_cb, @ph_arg) = @{$self->{yield_parse_hdr}};
+ $$bref .= $_[-1];
+ $ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) };
+ if ($@) {
+ carp "parse_hdr (@{$self->{cmd}}): $@\n";
+ $ret = r500();
+ } elsif (!$ret && $_[-1] eq '') {
+ carp <<EOM;
+EOF parsing headers from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
+EOM
+ $ret = r500();
+ }
+ } else {
+ carp <<EOM;
+E: parsing headers: $! from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
+EOM
+ $ret = r500();
+ }
+ $ret; # undef if headers incomplete
+}
+
+sub ipipe_cb { # InputPipe callback
+ my ($ipipe, $self) = @_; # $_[-1] rbuf
+ if ($self->{qfh}) { # already streaming
+ yield_chunk($self, $ipipe, $_[-1]);
+ } elsif (my $res = parse_hdr_done($self, $_[-1])) {
+ yield_pass($self, $ipipe, $res);
+ } # else: headers incomplete, keep reading
+}
+
+sub _yield_start { # may run later, much later...
+ my ($self) = @_;
+ if ($self->{psgi_env}->{'pi-httpd.async'}) {
+ require PublicInbox::ProcessIONBF;
+ my $rpipe = $self->{rpipe};
+ PublicInbox::ProcessIONBF->replace($rpipe);
+ PublicInbox::InputPipe::consume($rpipe, \&ipipe_cb, $self);
+ } else {
+ require PublicInbox::GetlineResponse;
+ PublicInbox::GetlineResponse::response($self);
+ }
+}
+
# Used for streaming the stdout of one process as a PSGI response.
#
# $env is the PSGI env.
@@ -302,4 +401,22 @@ sub psgi_return {
}
}
+sub psgi_yield {
+ my ($self, $env, $limiter, @parse_hdr_arg)= @_;
+ $self->{psgi_env} = $env;
+ $self->{yield_parse_hdr} = [ \(my $buf = ''), @parse_hdr_arg ];
+ $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
+
+ # the caller already captured the PSGI write callback from
+ # the PSGI server, so we can call ->start, here:
+ $env->{'qspawn.wcb'} ? start($self, $limiter, \&_yield_start) : sub {
+ # the caller will return this sub to the PSGI server, so
+ # it can set the response callback (that is, for
+ # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback),
+ # but other HTTP servers are supported:
+ $env->{'qspawn.wcb'} = $_[0];
+ start($self, $limiter, \&_yield_start);
+ }
+}
+
1;
^ permalink raw reply related [flat|nested] 19+ messages in thread
* [PATCH 10/18] repo_atom: switch to psgi_yield
2023-10-23 8:48 [PATCH 01/18] limiter: split out from qspawn Eric Wong
` (7 preceding siblings ...)
2023-10-23 8:48 ` [PATCH 09/18] qspawn: introduce new psgi_yield API Eric Wong
@ 2023-10-23 8:48 ` Eric Wong
2023-10-23 8:48 ` [PATCH 11/18] repo_snapshot: psgi_yield Eric Wong
` (7 subsequent siblings)
16 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2023-10-23 8:48 UTC (permalink / raw)
To: spew
It appears to be a drop-in replacement for psgi_return
---
lib/PublicInbox/RepoAtom.pm | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/lib/PublicInbox/RepoAtom.pm b/lib/PublicInbox/RepoAtom.pm
index 79b76c12..b7179511 100644
--- a/lib/PublicInbox/RepoAtom.pm
+++ b/lib/PublicInbox/RepoAtom.pm
@@ -100,7 +100,7 @@ sub srv_tags_atom {
$ctx->{-feed_title} = "$ctx->{git}->{nick} tags";
my $qsp = PublicInbox::Qspawn->new(\@cmd);
$ctx->{-is_tag} = 1;
- $qsp->psgi_return($ctx->{env}, undef, \&atom_ok, $ctx);
+ $qsp->psgi_yield($ctx->{env}, undef, \&atom_ok, $ctx);
}
sub srv_atom {
@@ -122,7 +122,7 @@ sub srv_atom {
push @cmd, $path if $path ne '';
my $qsp = PublicInbox::Qspawn->new(\@cmd, undef,
{ quiet => 1, 2 => $ctx->{lh} });
- $qsp->psgi_return($ctx->{env}, undef, \&atom_ok, $ctx);
+ $qsp->psgi_yield($ctx->{env}, undef, \&atom_ok, $ctx);
}
1;
^ permalink raw reply related [flat|nested] 19+ messages in thread
* [PATCH 11/18] repo_snapshot: psgi_yield
2023-10-23 8:48 [PATCH 01/18] limiter: split out from qspawn Eric Wong
` (8 preceding siblings ...)
2023-10-23 8:48 ` [PATCH 10/18] repo_atom: switch to psgi_yield Eric Wong
@ 2023-10-23 8:48 ` Eric Wong
2023-10-23 8:48 ` [PATCH 12/18] viewvcs: psgi_yield Eric Wong
` (6 subsequent siblings)
16 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2023-10-23 8:48 UTC (permalink / raw)
To: spew
Another drop-in replacement
---
lib/PublicInbox/RepoSnapshot.pm | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/PublicInbox/RepoSnapshot.pm b/lib/PublicInbox/RepoSnapshot.pm
index ebcbbd81..6b7441b0 100644
--- a/lib/PublicInbox/RepoSnapshot.pm
+++ b/lib/PublicInbox/RepoSnapshot.pm
@@ -58,7 +58,7 @@ sub ver_check { # git->check_async callback
"--git-dir=$ctx->{git}->{git_dir}", 'archive',
"--prefix=$ctx->{snap_pfx}/",
"--format=$ctx->{snap_fmt}", $treeish]);
- $qsp->psgi_return($ctx->{env}, undef, \&archive_hdr, $ctx);
+ $qsp->psgi_yield($ctx->{env}, undef, \&archive_hdr, $ctx);
}
}
^ permalink raw reply related [flat|nested] 19+ messages in thread
* [PATCH 12/18] viewvcs: psgi_yield
2023-10-23 8:48 [PATCH 01/18] limiter: split out from qspawn Eric Wong
` (9 preceding siblings ...)
2023-10-23 8:48 ` [PATCH 11/18] repo_snapshot: psgi_yield Eric Wong
@ 2023-10-23 8:48 ` Eric Wong
2023-10-23 8:48 ` [PATCH 13/18] www_altid: switch to psgi_yield Eric Wong
` (5 subsequent siblings)
16 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2023-10-23 8:48 UTC (permalink / raw)
To: spew
Another drop-in replacement, though I took the opportunity
to avoid unnecessarily bumping the refcount of $ctx->{env}
---
lib/PublicInbox/ViewVCS.pm | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git a/lib/PublicInbox/ViewVCS.pm b/lib/PublicInbox/ViewVCS.pm
index 86c46e69..6c588ddf 100644
--- a/lib/PublicInbox/ViewVCS.pm
+++ b/lib/PublicInbox/ViewVCS.pm
@@ -101,9 +101,8 @@ sub stream_large_blob ($$) {
my ($git, $oid, $type, $size, $di) = @$res;
my $cmd = ['git', "--git-dir=$git->{git_dir}", 'cat-file', $type, $oid];
my $qsp = PublicInbox::Qspawn->new($cmd);
- my $env = $ctx->{env};
- $env->{'qspawn.wcb'} = $ctx->{-wcb};
- $qsp->psgi_return($env, undef, \&stream_blob_parse_hdr, $ctx);
+ $ctx->{env}->{'qspawn.wcb'} = $ctx->{-wcb};
+ $qsp->psgi_yield($ctx->{env}, undef, \&stream_blob_parse_hdr, $ctx);
}
sub show_other_result ($$) { # future-proofing
@@ -341,7 +340,7 @@ sub show_patch ($$) {
my $qsp = PublicInbox::Qspawn->new(\@cmd);
$ctx->{env}->{'qspawn.wcb'} = $ctx->{-wcb};
$ctx->{patch_oid} = $oid;
- $qsp->psgi_return($ctx->{env}, undef, \&stream_patch_parse_hdr, $ctx);
+ $qsp->psgi_yield($ctx->{env}, undef, \&stream_patch_parse_hdr, $ctx);
}
sub show_commit ($$) {
^ permalink raw reply related [flat|nested] 19+ messages in thread
* [PATCH 13/18] www_altid: switch to psgi_yield
2023-10-23 8:48 [PATCH 01/18] limiter: split out from qspawn Eric Wong
` (10 preceding siblings ...)
2023-10-23 8:48 ` [PATCH 12/18] viewvcs: psgi_yield Eric Wong
@ 2023-10-23 8:48 ` Eric Wong
2023-10-23 8:48 ` [PATCH 14/18] cgit: " Eric Wong
` (4 subsequent siblings)
16 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2023-10-23 8:48 UTC (permalink / raw)
To: spew
Another drop-in replacement for psgi_return, this once utilizing
an unconditional qspawn.filter for gzip.
---
lib/PublicInbox/WwwAltId.pm | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/lib/PublicInbox/WwwAltId.pm b/lib/PublicInbox/WwwAltId.pm
index 47056160..48520142 100644
--- a/lib/PublicInbox/WwwAltId.pm
+++ b/lib/PublicInbox/WwwAltId.pm
@@ -1,9 +1,9 @@
-# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
# dumps using the ".dump" command of sqlite3(1)
package PublicInbox::WwwAltId;
-use strict;
+use v5.12;
use PublicInbox::Qspawn;
use PublicInbox::WwwStream qw(html_oneshot);
use PublicInbox::AltId;
@@ -71,7 +71,7 @@ EOF
my $qsp = PublicInbox::Qspawn->new([$sqlite3, $fn], undef, { 0 => $r });
$ctx->{altid_pfx} = $altid_pfx;
$env->{'qspawn.filter'} = PublicInbox::GzipFilter->new;
- $qsp->psgi_return($env, undef, \&check_output, $ctx);
+ $qsp->psgi_yield($env, undef, \&check_output, $ctx);
}
1;
^ permalink raw reply related [flat|nested] 19+ messages in thread
* [PATCH 14/18] cgit: switch to psgi_yield
2023-10-23 8:48 [PATCH 01/18] limiter: split out from qspawn Eric Wong
` (11 preceding siblings ...)
2023-10-23 8:48 ` [PATCH 13/18] www_altid: switch to psgi_yield Eric Wong
@ 2023-10-23 8:48 ` Eric Wong
2023-10-23 8:48 ` [PATCH 15/18] www_coderepo: psgi_yield Eric Wong
` (3 subsequent siblings)
16 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2023-10-23 8:48 UTC (permalink / raw)
To: spew
Another drop-in replacement.
---
lib/PublicInbox/Cgit.pm | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/PublicInbox/Cgit.pm b/lib/PublicInbox/Cgit.pm
index 4265cfb2..10cad57a 100644
--- a/lib/PublicInbox/Cgit.pm
+++ b/lib/PublicInbox/Cgit.pm
@@ -110,7 +110,7 @@ sub call {
my $rdr = input_prepare($env) or return r(500);
my $qsp = PublicInbox::Qspawn->new($self->{cmd}, $cgi_env, $rdr);
my $limiter = $self->{pi_cfg}->limiter('-cgit');
- $qsp->psgi_return($env, $limiter, $parse_cgi_headers, $ctx);
+ $qsp->psgi_yield($env, $limiter, $parse_cgi_headers, $ctx);
}
1;
^ permalink raw reply related [flat|nested] 19+ messages in thread
* [PATCH 15/18] www_coderepo: psgi_yield
2023-10-23 8:48 [PATCH 01/18] limiter: split out from qspawn Eric Wong
` (12 preceding siblings ...)
2023-10-23 8:48 ` [PATCH 14/18] cgit: " Eric Wong
@ 2023-10-23 8:48 ` Eric Wong
2023-10-23 8:48 ` [PATCH 16/18] drop psgi_return, httpd/async and GetlineBody Eric Wong
` (2 subsequent siblings)
16 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2023-10-23 8:48 UTC (permalink / raw)
To: spew
Yet another drop-in replacement for psgi_return.
---
lib/PublicInbox/WwwCoderepo.pm | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/PublicInbox/WwwCoderepo.pm b/lib/PublicInbox/WwwCoderepo.pm
index 68c4c86d..6e19fc02 100644
--- a/lib/PublicInbox/WwwCoderepo.pm
+++ b/lib/PublicInbox/WwwCoderepo.pm
@@ -277,7 +277,7 @@ sub refs_foo { # /$REPO/refs/{heads,tags} endpoints
$ctx->{-heads} = 1 if $pfx eq 'refs/heads';
my $qsp = PublicInbox::Qspawn->new([@EACH_REF, $pfx ],
{ GIT_DIR => $ctx->{git}->{git_dir} });
- $qsp->psgi_return($ctx->{env}, undef, \&_refs_parse_hdr, $ctx);
+ $qsp->psgi_yield($ctx->{env}, undef, \&_refs_parse_hdr, $ctx);
}
sub srv { # endpoint called by PublicInbox::WWW
^ permalink raw reply related [flat|nested] 19+ messages in thread
* [PATCH 16/18] drop psgi_return, httpd/async and GetlineBody
2023-10-23 8:48 [PATCH 01/18] limiter: split out from qspawn Eric Wong
` (13 preceding siblings ...)
2023-10-23 8:48 ` [PATCH 15/18] www_coderepo: psgi_yield Eric Wong
@ 2023-10-23 8:48 ` Eric Wong
2023-10-23 8:48 ` [PATCH 17/18] qspawn: use WwwStatic for fallbacks and error code Eric Wong
2023-10-23 8:48 ` [PATCH 18/18] qspawn: simplify argument passing Eric Wong
16 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2023-10-23 8:48 UTC (permalink / raw)
To: spew
Now that psgi_yield is used everywhere, the more complex
psgi_return and it's helper bits can be removed. We'll also fix
some outdated comments now that everything using psgi_return
switched to psgi_yield. GetlineResponse replaces GetlineBody
and does a better job of isolating generic PSGI-only code.
---
MANIFEST | 2 -
lib/PublicInbox/GetlineBody.pm | 46 ------------
lib/PublicInbox/GitHTTPBackend.pm | 6 +-
lib/PublicInbox/GzipFilter.pm | 2 +-
lib/PublicInbox/HTTPD.pm | 5 +-
lib/PublicInbox/HTTPD/Async.pm | 101 -------------------------
lib/PublicInbox/Qspawn.pm | 121 +-----------------------------
lib/PublicInbox/RepoAtom.pm | 2 +-
lib/PublicInbox/WwwCoderepo.pm | 2 +-
t/httpd-corner.psgi | 14 ++--
t/httpd-corner.t | 12 +--
11 files changed, 19 insertions(+), 294 deletions(-)
delete mode 100644 lib/PublicInbox/GetlineBody.pm
delete mode 100644 lib/PublicInbox/HTTPD/Async.pm
diff --git a/MANIFEST b/MANIFEST
index 420b40a1..3df48667 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -203,7 +203,6 @@ lib/PublicInbox/Filter/SubjectTag.pm
lib/PublicInbox/Filter/Vger.pm
lib/PublicInbox/Gcf2.pm
lib/PublicInbox/Gcf2Client.pm
-lib/PublicInbox/GetlineBody.pm
lib/PublicInbox/GetlineResponse.pm
lib/PublicInbox/Git.pm
lib/PublicInbox/GitAsyncCat.pm
@@ -212,7 +211,6 @@ lib/PublicInbox/GitHTTPBackend.pm
lib/PublicInbox/GzipFilter.pm
lib/PublicInbox/HTTP.pm
lib/PublicInbox/HTTPD.pm
-lib/PublicInbox/HTTPD/Async.pm
lib/PublicInbox/HlMod.pm
lib/PublicInbox/Hval.pm
lib/PublicInbox/IMAP.pm
diff --git a/lib/PublicInbox/GetlineBody.pm b/lib/PublicInbox/GetlineBody.pm
deleted file mode 100644
index 0e781224..00000000
--- a/lib/PublicInbox/GetlineBody.pm
+++ /dev/null
@@ -1,46 +0,0 @@
-# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-
-# Wrap a pipe or file for PSGI streaming response bodies and calls the
-# end callback when the object goes out-of-scope.
-# This depends on rpipe being _blocking_ on getline.
-#
-# This is only used by generic PSGI servers and not public-inbox-httpd
-package PublicInbox::GetlineBody;
-use strict;
-use warnings;
-
-sub new {
- my ($class, $rpipe, $end, $end_arg, $buf, $filter) = @_;
- bless {
- rpipe => $rpipe,
- end => $end,
- end_arg => $end_arg,
- initial_buf => $buf,
- filter => $filter,
- }, $class;
-}
-
-# close should always be called after getline returns undef,
-# but a client aborting a connection can ruin our day; so lets
-# hope our underlying PSGI server does not leak references, here.
-sub DESTROY { $_[0]->close }
-
-sub getline {
- my ($self) = @_;
- my $rpipe = $self->{rpipe} or return; # EOF was set on previous call
- my $buf = delete($self->{initial_buf}) // $rpipe->getline;
- delete($self->{rpipe}) unless defined $buf; # set EOF for next call
- if (my $filter = $self->{filter}) {
- $buf = $filter->translate($buf);
- }
- $buf;
-}
-
-sub close {
- my ($self) = @_;
- my ($end, $end_arg) = delete @$self{qw(end end_arg)};
- $end->($end_arg) if $end;
-}
-
-1;
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index d7e0bced..7228555b 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -145,16 +145,12 @@ sub parse_cgi_headers { # {parse_hdr} for Qspawn
}
}
- # fallback to WwwCoderepo if cgit 404s. Duplicating $ctx prevents
- # ->finalize from the current Qspawn from using qspawn.wcb.
- # This makes qspawn skip ->async_pass and causes
- # PublicInbox::HTTPD::Async::event_step to close shortly after
+ # fallback to WwwCoderepo if cgit 404s
if ($code == 404 && $ctx->{www} && !$ctx->{_coderepo_tried}++) {
my $wcb = delete $ctx->{env}->{'qspawn.wcb'};
$ctx->{env}->{'plack.skip-deflater'} = 1; # prevent 2x gzip
$ctx->{env}->{'qspawn.fallback'} = $code;
my $res = $ctx->{www}->coderepo->srv($ctx);
- # for ->psgi_return_init_cb
$ctx->{env}->{'qspawn.wcb'} = $wcb;
$res; # CODE or ARRAY ref
} else {
diff --git a/lib/PublicInbox/GzipFilter.pm b/lib/PublicInbox/GzipFilter.pm
index d6ecd5ba..fc471ea2 100644
--- a/lib/PublicInbox/GzipFilter.pm
+++ b/lib/PublicInbox/GzipFilter.pm
@@ -93,7 +93,7 @@ sub gone { # what: search/over/mm
undef;
}
-# for GetlineBody (via Qspawn) when NOT using $env->{'pi-httpd.async'}
+# for GetlineResponse (via Qspawn) when NOT using $env->{'pi-httpd.async'}
# Also used for ->getline callbacks
sub translate {
my $self = shift; # $_[1] => input
diff --git a/lib/PublicInbox/HTTPD.pm b/lib/PublicInbox/HTTPD.pm
index bae7281b..6a6347d8 100644
--- a/lib/PublicInbox/HTTPD.pm
+++ b/lib/PublicInbox/HTTPD.pm
@@ -9,9 +9,6 @@ use strict;
use Plack::Util ();
use Plack::Builder;
use PublicInbox::HTTP;
-use PublicInbox::HTTPD::Async;
-
-sub pi_httpd_async { PublicInbox::HTTPD::Async->new(@_) }
# we have a different env for ever listener socket for
# SERVER_NAME, SERVER_PORT and psgi.url_scheme
@@ -45,7 +42,7 @@ sub env_for ($$$) {
# this to limit git-http-backend(1) parallelism.
# We also check for the truthiness of this to
# detect when to use async paths for slow blobs
- 'pi-httpd.async' => \&pi_httpd_async,
+ 'pi-httpd.async' => 1,
'pi-httpd.app' => $self->{app},
'pi-httpd.warn_cb' => $self->{warn_cb},
}
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
deleted file mode 100644
index 2e4d8baa..00000000
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ /dev/null
@@ -1,101 +0,0 @@
-# Copyright (C) all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-#
-# XXX This is a totally unstable API for public-inbox internal use only
-# This is exposed via the 'pi-httpd.async' key in the PSGI env hash.
-# The name of this key is not even stable!
-# Currently intended for use with read-only pipes with expensive
-# processes such as git-http-backend(1), cgit(1)
-#
-# fields:
-# http: PublicInbox::HTTP ref
-# fh: PublicInbox::HTTP::{Identity,Chunked} ref (can ->write + ->close)
-# cb: initial read callback
-# arg: arg for {cb}
-# end_obj: CODE or object which responds to ->event_step when ->close is called
-package PublicInbox::HTTPD::Async;
-use v5.12;
-use parent qw(PublicInbox::DS);
-use Errno qw(EAGAIN);
-use PublicInbox::Syscall qw(EPOLLIN);
-use PublicInbox::ProcessIONBF;
-
-# This is called via: $env->{'pi-httpd.async'}->()
-# $io is a read-only pipe ($rpipe) for now, but may be a
-# bidirectional socket in the future.
-sub new {
- my ($class, $io, $cb, $arg, $end_obj) = @_;
- my $self = bless {
- cb => $cb, # initial read callback
- arg => $arg, # arg for $cb
- end_obj => $end_obj, # like END{}, can ->event_step
- }, $class;
- PublicInbox::ProcessIONBF->replace($io);
- $self->SUPER::new($io, EPOLLIN);
-}
-
-sub event_step {
- my ($self) = @_;
- if (defined $self->{cb}) {
- # this may call async_pass when headers are done
- $self->{cb}->($self->{arg});
- } elsif (my $sock = $self->{sock}) {
- # $http may be undef if discarding body output from cgit on 404
- my $http = $self->{http} or return $self->close;
- # $self->{sock} is a read pipe for git-http-backend or cgit
- # and 65536 is the default Linux pipe size
- my $r = sysread($sock, my $buf, 65536);
- if ($r) {
- $self->{ofh}->write($buf); # may call $http->close
- # let other clients get some work done, too
- return if $http->{sock}; # !closed
-
- # else: fall through to close below...
- } elsif (!defined $r && $! == EAGAIN) {
- return; # EPOLLIN means we'll be notified
- }
-
- # Done! Error handling will happen in $self->{ofh}->close
- # called by end_obj->event_step handler
- delete $http->{forward};
- $self->close; # queues end_obj->event_step to be called
- } # else { # we may've been requeued but closed by $http
-}
-
-# once this is called, all data we read is passed to the
-# to the PublicInbox::HTTP instance ($http) via $ofh->write
-# $ofh is typically PublicInbox::HTTP::{Chunked,Identity}, but
-# may be PublicInbox::GzipFilter or $PublicInbox::Qspawn::qx_fh
-sub async_pass {
- my ($self, $http, $ofh, $bref) = @_;
- delete @$self{qw(cb arg)};
- # In case the client HTTP connection ($http) dies, it
- # will automatically close this ($self) object.
- $http->{forward} = $self;
-
- # write anything we overread when we were reading headers.
- # This is typically PublicInbox:HTTP::{chunked,identity}_wcb,
- # but may be PublicInbox::GzipFilter::write. PSGI requires
- # *_wcb methods respond to ->write (and ->close), not ->print
- $ofh->write($$bref);
-
- $self->{http} = $http;
- $self->{ofh} = $ofh;
-}
-
-# may be called as $forward->close in PublicInbox::HTTP or EOF (event_step)
-sub close {
- my $self = $_[0];
- $self->SUPER::close; # DS::close
- delete @$self{qw(cb arg)};
-
- # we defer this to the next timer loop since close is deferred
- if (my $end_obj = delete $self->{end_obj}) {
- # this calls $end_obj->event_step
- # (likely PublicInbox::Qspawn::event_step,
- # NOT PublicInbox::HTTPD::Async::event_step)
- PublicInbox::DS::requeue($end_obj);
- }
-}
-
-1;
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 203d8f41..a6e1d58b 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -176,48 +176,6 @@ sub psgi_qx {
start($self, $limiter, undef);
}
-# this is called on pipe EOF to reap the process, may be called
-# via PublicInbox::DS event loop OR via GetlineBody for generic
-# PSGI servers.
-sub event_step {
- my ($self) = @_;
- finish($self);
- my $fh = delete $self->{qfh};
- $fh->close if $fh; # async-only (psgi_return)
-}
-
-sub rd_hdr ($) {
- my ($self) = @_;
- # typically used for reading CGI headers
- # We also need to check EINTR for generic PSGI servers.
- my ($ret, $total_rd);
- my ($bref, $ph_cb, @ph_arg) = ($self->{hdr_buf}, @{$self->{parse_hdr}});
- until (defined($ret)) {
- my $r = sysread($self->{rpipe}, $$bref, 4096, length($$bref));
- if (defined($r)) {
- $total_rd += $r;
- eval { $ret = $ph_cb->($total_rd, $bref, @ph_arg) };
- if ($@) {
- warn "parse_hdr: $@";
- $ret = [ 500, [], [ "Internal error\n" ] ];
- } elsif (!defined($ret) && !$r) {
- warn <<EOM;
-EOF parsing headers from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
-EOM
- $ret = [ 500, [], [ "Internal error\n" ] ];
- }
- } else {
- # caller should notify us when it's ready:
- return if $! == EAGAIN;
- next if $! == EINTR; # immediate retry
- warn "error reading header: $!";
- $ret = [ 500, [], [ "Internal error\n" ] ];
- }
- }
- delete $self->{parse_hdr}; # done parsing headers
- $ret;
-}
-
sub yield_pass {
my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe
my $env = $self->{psgi_env};
@@ -251,62 +209,6 @@ sub yield_pass {
$self->{qfh} = $qfh; # keep $ipipe open
}
-sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
- my ($self) = @_;
- my $r = rd_hdr($self) or return; # incomplete
- my $env = $self->{psgi_env};
- my $filter;
-
- # this is for RepoAtom since that can fire after parse_cgi_headers
- if (ref($r) eq 'ARRAY' && blessed($r->[2]) && $r->[2]->can('attach')) {
- $filter = pop @$r;
- }
- $filter //= delete($env->{'qspawn.filter'}) // (ref($r) eq 'ARRAY' ?
- PublicInbox::GzipFilter::qsp_maybe($r->[1], $env) : undef);
-
- my $wcb = delete $env->{'qspawn.wcb'};
- my $async = delete $self->{async}; # PublicInbox::HTTPD::Async
- if (ref($r) ne 'ARRAY' || scalar(@$r) == 3) { # error
- if ($async) { # calls rpipe->close && ->event_step
- $async->close; # PublicInbox::HTTPD::Async::close
- } else { # generic PSGI, use PublicInbox::ProcessIO::CLOSE
- delete($self->{rpipe})->close;
- event_step($self);
- }
- if (ref($r) eq 'ARRAY') { # error
- $wcb->($r)
- } elsif (ref($r) eq 'CODE') { # chain another command
- $r->($wcb);
- $self->{passed} = 1;
- }
- # else do nothing
- } elsif ($async) {
- # done reading headers, handoff to read body
- my $fh = $wcb->($r); # scalar @$r == 2
- $fh = $filter->attach($fh) if $filter;
- $self->{qfh} = $fh;
- $async->async_pass($env->{'psgix.io'}, $fh,
- delete($self->{hdr_buf}));
- } else { # for synchronous PSGI servers
- require PublicInbox::GetlineBody;
- my $buf = delete $self->{hdr_buf};
- $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe},
- \&event_step, $self, $$buf, $filter);
- $wcb->($r);
- }
-}
-
-sub psgi_return_start { # may run later, much later...
- my ($self) = @_;
- if (my $cb = $self->{psgi_env}->{'pi-httpd.async'}) {
- # PublicInbox::HTTPD::Async->new(rpipe, $cb, $cb_arg, $end_obj)
- $self->{async} = $cb->($self->{rpipe},
- \&psgi_return_init_cb, $self, $self);
- } else { # generic PSGI
- psgi_return_init_cb($self) while $self->{parse_hdr};
- }
-}
-
sub r500 () { [ 500, [], [ "Internal error\n" ] ] }
sub parse_hdr_done ($$) {
@@ -363,7 +265,7 @@ sub _yield_start { # may run later, much later...
# $env->{'qspawn.wcb'} - the write callback from the PSGI server
# optional, use this if you've already
# captured it elsewhere. If not given,
-# psgi_return will return an anonymous
+# psgi_yield will return an anonymous
# sub for the PSGI server to call
#
# $env->{'qspawn.filter'} - filter object, responds to ->attach for
@@ -379,27 +281,6 @@ sub _yield_start { # may run later, much later...
# body will be streamed, later, via writes (push-based) to
# psgix.io. 3-element arrays means the body is available
# immediately (or streamed via ->getline (pull-based)).
-sub psgi_return {
- my ($self, $env, $limiter, @parse_hdr_arg)= @_;
- $self->{psgi_env} = $env;
- $self->{hdr_buf} = \(my $hdr_buf = '');
- $self->{parse_hdr} = \@parse_hdr_arg;
- $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
-
- # the caller already captured the PSGI write callback from
- # the PSGI server, so we can call ->start, here:
- $env->{'qspawn.wcb'} and
- return start($self, $limiter, \&psgi_return_start);
-
- # the caller will return this sub to the PSGI server, so
- # it can set the response callback (that is, for
- # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback),
- # but other HTTP servers are supported:
- sub {
- $env->{'qspawn.wcb'} = $_[0];
- start($self, $limiter, \&psgi_return_start);
- }
-}
sub psgi_yield {
my ($self, $env, $limiter, @parse_hdr_arg)= @_;
diff --git a/lib/PublicInbox/RepoAtom.pm b/lib/PublicInbox/RepoAtom.pm
index b7179511..c1649d0a 100644
--- a/lib/PublicInbox/RepoAtom.pm
+++ b/lib/PublicInbox/RepoAtom.pm
@@ -40,7 +40,7 @@ EOM
# called by GzipFilter->close
sub zflush { $_[0]->SUPER::zflush('</feed>') }
-# called by GzipFilter->write or GetlineBody->getline
+# called by GzipFilter->write or GetlineResponse->getline
sub translate {
my $self = shift;
my $rec = $_[0] // return $self->zflush; # getline
diff --git a/lib/PublicInbox/WwwCoderepo.pm b/lib/PublicInbox/WwwCoderepo.pm
index 6e19fc02..0eb4a2d6 100644
--- a/lib/PublicInbox/WwwCoderepo.pm
+++ b/lib/PublicInbox/WwwCoderepo.pm
@@ -230,7 +230,7 @@ sub summary ($$) {
# called by GzipFilter->close after translate
sub zflush { $_[0]->SUPER::zflush('</pre>', $_[0]->_html_end) }
-# called by GzipFilter->write or GetlineBody->getline
+# called by GzipFilter->write or GetlineResponse->getline
sub translate {
my $ctx = shift;
my $rec = $_[0] // return zflush($ctx); # getline
diff --git a/t/httpd-corner.psgi b/t/httpd-corner.psgi
index 1e96d7b1..e29fd87b 100644
--- a/t/httpd-corner.psgi
+++ b/t/httpd-corner.psgi
@@ -92,34 +92,34 @@ my $app = sub {
my $rdr = { 2 => fileno($null) };
my $cmd = [qw(dd if=/dev/zero count=30 bs=1024k)];
my $qsp = PublicInbox::Qspawn->new($cmd, undef, $rdr);
- return $qsp->psgi_return($env, undef, sub {
+ return $qsp->psgi_yield($env, undef, sub {
my ($r, $bref) = @_;
# make $rd_hdr retry sysread + $parse_hdr in Qspawn:
return until length($$bref) > 8000;
close $null;
[ 200, [ qw(Content-Type application/octet-stream) ]];
});
- } elsif ($path eq '/psgi-return-gzip') {
+ } elsif ($path eq '/psgi-yield-gzip') {
require PublicInbox::Qspawn;
require PublicInbox::GzipFilter;
my $cmd = [qw(echo hello world)];
my $qsp = PublicInbox::Qspawn->new($cmd);
$env->{'qspawn.filter'} = PublicInbox::GzipFilter->new;
- return $qsp->psgi_return($env, undef, sub {
+ return $qsp->psgi_yield($env, undef, sub {
[ 200, [ qw(Content-Type application/octet-stream)]]
});
- } elsif ($path eq '/psgi-return-compressible') {
+ } elsif ($path eq '/psgi-yield-compressible') {
require PublicInbox::Qspawn;
my $cmd = [qw(echo goodbye world)];
my $qsp = PublicInbox::Qspawn->new($cmd);
- return $qsp->psgi_return($env, undef, sub {
+ return $qsp->psgi_yield($env, undef, sub {
[200, [qw(Content-Type text/plain)]]
});
- } elsif ($path eq '/psgi-return-enoent') {
+ } elsif ($path eq '/psgi-yield-enoent') {
require PublicInbox::Qspawn;
my $cmd = [ 'this-better-not-exist-in-PATH'.rand ];
my $qsp = PublicInbox::Qspawn->new($cmd);
- return $qsp->psgi_return($env, undef, sub {
+ return $qsp->psgi_yield($env, undef, sub {
[ 200, [ qw(Content-Type application/octet-stream)]]
});
} elsif ($path eq '/pid') {
diff --git a/t/httpd-corner.t b/t/httpd-corner.t
index aab3635c..2d2d1061 100644
--- a/t/httpd-corner.t
+++ b/t/httpd-corner.t
@@ -374,13 +374,13 @@ SKIP: {
is($non_zero, 0, 'read all zeros');
require_mods(@zmods, 4);
- my $buf = xqx([$curl, '-gsS', "$base/psgi-return-gzip"]);
+ my $buf = xqx([$curl, '-gsS', "$base/psgi-yield-gzip"]);
is($?, 0, 'curl succesful');
IO::Uncompress::Gunzip::gunzip(\$buf => \(my $out));
is($out, "hello world\n");
my $curl_rdr = { 2 => \(my $curl_err = '') };
$buf = xqx([$curl, qw(-gsSv --compressed),
- "$base/psgi-return-compressible"], undef, $curl_rdr);
+ "$base/psgi-yield-compressible"], undef, $curl_rdr);
is($?, 0, 'curl --compressed successful');
is($buf, "goodbye world\n", 'gzipped response as expected');
like($curl_err, qr/\bContent-Encoding: gzip\b/,
@@ -388,8 +388,8 @@ SKIP: {
}
{
- my $conn = conn_for($sock, 'psgi_return ENOENT');
- print $conn "GET /psgi-return-enoent HTTP/1.1\r\n\r\n" or die;
+ my $conn = conn_for($sock, 'psgi_yield ENOENT');
+ print $conn "GET /psgi-yield-enoent HTTP/1.1\r\n\r\n" or die;
my $buf = '';
sysread($conn, $buf, 16384, length($buf)) until $buf =~ /\r\n\r\n/;
like($buf, qr!HTTP/1\.[01] 500\b!, 'got 500 error on ENOENT');
@@ -678,13 +678,13 @@ SKIP: {
my $app = require $psgi;
test_psgi($app, sub {
my ($cb) = @_;
- my $req = GET('http://example.com/psgi-return-gzip');
+ my $req = GET('http://example.com/psgi-yield-gzip');
my $res = $cb->($req);
my $buf = $res->content;
IO::Uncompress::Gunzip::gunzip(\$buf => \(my $out));
is($out, "hello world\n", 'got expected output');
- $req = GET('http://example.com/psgi-return-enoent');
+ $req = GET('http://example.com/psgi-yield-enoent');
$res = $cb->($req);
is($res->code, 500, 'got error on ENOENT');
seek($tmperr, 0, SEEK_SET) or die;
^ permalink raw reply related [flat|nested] 19+ messages in thread
* [PATCH 17/18] qspawn: use WwwStatic for fallbacks and error code
2023-10-23 8:48 [PATCH 01/18] limiter: split out from qspawn Eric Wong
` (14 preceding siblings ...)
2023-10-23 8:48 ` [PATCH 16/18] drop psgi_return, httpd/async and GetlineBody Eric Wong
@ 2023-10-23 8:48 ` Eric Wong
2023-10-23 8:48 ` [PATCH 18/18] qspawn: simplify argument passing Eric Wong
16 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2023-10-23 8:48 UTC (permalink / raw)
To: spew
This ensures we set directives to disable caching since
errors are always transient.
---
lib/PublicInbox/Qspawn.pm | 17 +++++++++--------
1 file changed, 9 insertions(+), 8 deletions(-)
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index a6e1d58b..0bb02081 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -74,6 +74,11 @@ sub _do_spawn {
finish($self, $@) if $@;
}
+sub psgi_status_err { # Qspawn itself is useful w/o PSGI
+ require PublicInbox::WwwStatic;
+ PublicInbox::WwwStatic::r($_[0] // 500);
+}
+
sub finalize ($;$) {
my ($self, $opt) = @_;
@@ -104,9 +109,7 @@ sub finalize ($;$) {
return if $self->{passed}; # another command chained it
if (my $wcb = delete $env->{'qspawn.wcb'}) {
# have we started writing, yet?
- my $code = delete $env->{'qspawn.fallback'} // 500;
- require PublicInbox::WwwStatic;
- $wcb->(PublicInbox::WwwStatic::r($code));
+ $wcb->(psgi_status_err($env->{'qspawn.fallback'}));
}
}
@@ -209,8 +212,6 @@ sub yield_pass {
$self->{qfh} = $qfh; # keep $ipipe open
}
-sub r500 () { [ 500, [], [ "Internal error\n" ] ] }
-
sub parse_hdr_done ($$) {
my ($self) = @_;
my $ret;
@@ -220,18 +221,18 @@ sub parse_hdr_done ($$) {
$ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) };
if ($@) {
carp "parse_hdr (@{$self->{cmd}}): $@\n";
- $ret = r500();
+ $ret = psgi_status_err();
} elsif (!$ret && $_[-1] eq '') {
carp <<EOM;
EOF parsing headers from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
EOM
- $ret = r500();
+ $ret = psgi_status_err();
}
} else {
carp <<EOM;
E: parsing headers: $! from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
EOM
- $ret = r500();
+ $ret = psgi_status_err();
}
$ret; # undef if headers incomplete
}
^ permalink raw reply related [flat|nested] 19+ messages in thread
* [PATCH 18/18] qspawn: simplify argument passing
2023-10-23 8:48 [PATCH 01/18] limiter: split out from qspawn Eric Wong
` (15 preceding siblings ...)
2023-10-23 8:48 ` [PATCH 17/18] qspawn: use WwwStatic for fallbacks and error code Eric Wong
@ 2023-10-23 8:48 ` Eric Wong
16 siblings, 0 replies; 19+ messages in thread
From: Eric Wong @ 2023-10-23 8:48 UTC (permalink / raw)
To: spew
Now that psgi_return is gone, we can further simplify our
internals to support only psgi_qx and psgi_yield. Internal
argument passing is reduced and we keep the command env and
redirects in the Qspawn object for as long as it's alive.
I wanted to get rid of finalize() entirely, but it seems
trickier to do when having to support generic PSGI.
---
lib/PublicInbox/Qspawn.pm | 50 +++++++++++++++++++--------------------
1 file changed, 24 insertions(+), 26 deletions(-)
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 0bb02081..a03e1b01 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -47,28 +47,27 @@ my $def_limiter;
# {qsp_err} is an optional error buffer callers may access themselves
sub new {
my ($class, $cmd, $cmd_env, $opt) = @_;
- bless { args => [ $cmd, $cmd_env, $opt ] }, $class;
+ bless { args => [ $cmd, $cmd_env, $opt ? { %$opt } : {} ] }, $class;
}
sub _do_spawn {
my ($self, $start_cb, $limiter) = @_;
- my ($cmd, $cmd_env, $opt) = @{delete $self->{args}};
+ my ($cmd, $cmd_env, $opt) = @{$self->{args}};
my %o = %{$opt || {}};
$self->{limiter} = $limiter;
for my $k (@PublicInbox::Spawn::RLIMITS) {
- $o{$k} = $limiter->{$k} // next;
+ $opt->{$k} = $limiter->{$k} // next;
}
- $self->{cmd} = $cmd;
$self->{-quiet} = 1 if $o{quiet};
$limiter->{running}++;
if ($start_cb) {
eval { # popen_rd may die on EMFILE, ENFILE
- $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
- \&waitpid_err, $self, \%o);
+ $self->{rpipe} = popen_rd($cmd, $cmd_env, $opt,
+ \&waitpid_err, $self);
$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
};
} else {
- eval { run_await($cmd, $cmd_env, \%o, \&wait_await, $self) };
+ eval { run_await($cmd, $cmd_env, $opt, \&wait_await, $self) };
warn "E: $@" if $@;
}
finish($self, $@) if $@;
@@ -79,8 +78,8 @@ sub psgi_status_err { # Qspawn itself is useful w/o PSGI
PublicInbox::WwwStatic::r($_[0] // 500);
}
-sub finalize ($;$) {
- my ($self, $opt) = @_;
+sub finalize ($) {
+ my ($self) = @_;
# process is done, spawn whatever's in the queue
my $limiter = delete $self->{limiter} or return;
@@ -96,13 +95,13 @@ sub finalize ($;$) {
if (my $dst = $self->{qsp_err}) {
$$dst .= $$dst ? " $err" : "; $err";
}
- warn "@{$self->{cmd}}: $err\n" if !$self->{-quiet};
+ warn "E: @{$self->{args}->[0]}: $err\n" if !$self->{-quiet};
}
my ($env, $qx_cb_arg) = delete @$self{qw(psgi_env qx_cb_arg)};
if ($qx_cb_arg) {
my $cb = shift @$qx_cb_arg;
- eval { $cb->($opt->{1}, @$qx_cb_arg) };
+ eval { $cb->($self->{args}->[2]->{1}, @$qx_cb_arg) };
return unless $@;
warn "E: $@"; # hope qspawn.wcb can handle it
}
@@ -113,23 +112,21 @@ sub finalize ($;$) {
}
}
-sub DESTROY { finalize($_[0]) } # ->finalize is idempotent
-
sub waitpid_err { # callback for awaitpid
- my (undef, $self, $opt) = @_; # $_[0]: pid
+ my (undef, $self) = @_; # $_[0]: pid
$self->{_err} = ''; # for defined check in ->finish
- if ($?) { # FIXME: redundant
+ if ($?) { # XXX this may be redundant
my $status = $? >> 8;
my $sig = $? & 127;
$self->{_err} .= "exit status=$status";
$self->{_err} .= " signal=$sig" if $sig;
}
- finalize($self, $opt) if !$self->{rpipe};
+ finalize($self) if !$self->{rpipe};
}
sub wait_await { # run_await cb
my ($pid, $cmd, $cmd_env, $opt, $self) = @_;
- waitpid_err($pid, $self, $opt);
+ waitpid_err($pid, $self);
}
sub yield_chunk { # $_[-1] is sysread buffer (or undef)
@@ -214,26 +211,24 @@ sub yield_pass {
sub parse_hdr_done ($$) {
my ($self) = @_;
- my $ret;
+ my ($ret, $err);
if (defined $_[-1]) {
my ($bref, $ph_cb, @ph_arg) = @{$self->{yield_parse_hdr}};
$$bref .= $_[-1];
$ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) };
- if ($@) {
- carp "parse_hdr (@{$self->{cmd}}): $@\n";
+ if (($err = $@)) {
$ret = psgi_status_err();
} elsif (!$ret && $_[-1] eq '') {
- carp <<EOM;
-EOF parsing headers from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
-EOM
+ $err = 'EOF';
$ret = psgi_status_err();
}
} else {
- carp <<EOM;
-E: parsing headers: $! from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
-EOM
+ $err = "$!";
$ret = psgi_status_err();
}
+ carp <<EOM if $err;
+E: $err @{$self->{args}->[0]} ($self->{psgi_env}->{REQUEST_URI})
+EOM
$ret; # undef if headers incomplete
}
@@ -301,4 +296,7 @@ sub psgi_yield {
}
}
+no warnings 'once';
+*DESTROY = \&finalize; # ->finalize is idempotent
+
1;
^ permalink raw reply related [flat|nested] 19+ messages in thread
end of thread, other threads:[~2023-10-23 8:48 UTC | newest]
Thread overview: 19+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-10-23 8:48 [PATCH 01/18] limiter: split out from qspawn Eric Wong
2023-10-23 8:48 ` [PATCH 02/18] spawn: support synchronous run_qx Eric Wong
2023-10-23 8:48 ` [PATCH 03/18] psgi_qx: use a temporary file rather than pipe Eric Wong
2023-10-23 8:48 ` [PATCH 04/18] www_coderepo: capture uses a flattened list Eric Wong
2023-10-23 8:48 ` [PATCH 05/18] qspawn: psgi_return allows list for callback args Eric Wong
2023-10-23 8:48 ` [PATCH 06/18] qspawn: drop unused err arg for ->event_step Eric Wong
2023-10-23 8:48 ` [PATCH 07/18] httpd/async: require IO arg Eric Wong
2023-10-23 8:48 ` [PATCH 08/18] xt/check-run: call DS->Reset after all tests Eric Wong
2023-10-23 8:48 ` [PATCH 09/18] qspawn: introduce new psgi_yield API Eric Wong
2023-10-23 8:48 ` [PATCH 10/18] repo_atom: switch to psgi_yield Eric Wong
2023-10-23 8:48 ` [PATCH 11/18] repo_snapshot: psgi_yield Eric Wong
2023-10-23 8:48 ` [PATCH 12/18] viewvcs: psgi_yield Eric Wong
2023-10-23 8:48 ` [PATCH 13/18] www_altid: switch to psgi_yield Eric Wong
2023-10-23 8:48 ` [PATCH 14/18] cgit: " Eric Wong
2023-10-23 8:48 ` [PATCH 15/18] www_coderepo: psgi_yield Eric Wong
2023-10-23 8:48 ` [PATCH 16/18] drop psgi_return, httpd/async and GetlineBody Eric Wong
2023-10-23 8:48 ` [PATCH 17/18] qspawn: use WwwStatic for fallbacks and error code Eric Wong
2023-10-23 8:48 ` [PATCH 18/18] qspawn: simplify argument passing Eric Wong
-- strict thread matches above, loose matches on Subject: below --
2023-10-19 12:40 [PATCH 01/18] limiter: split out from qspawn Eric Wong
2023-10-19 12:40 ` [PATCH 11/18] repo_snapshot: psgi_yield Eric Wong
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).