From: Eric Wong <e@80x24.org>
To: spew@80x24.org
Subject: [PATCH 8/8] qspawn: introduce psgi_yield API
Date: Thu, 19 Oct 2023 01:15:35 +0000 [thread overview]
Message-ID: <20231019011535.1895489-8-e@80x24.org> (raw)
In-Reply-To: <20231019011535.1895489-1-e@80x24.org>
This may eventually replace psgi_return and HTTPD/Async entirely
---
lib/PublicInbox/GitHTTPBackend.pm | 4 +-
lib/PublicInbox/InputPipe.pm | 11 ++--
lib/PublicInbox/LEI.pm | 2 +-
lib/PublicInbox/Qspawn.pm | 97 ++++++++++++++++++++++++++++++-
4 files changed, 105 insertions(+), 9 deletions(-)
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index edbc0157..d7e0bced 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -79,7 +79,7 @@ sub serve_dumb {
PublicInbox::WwwStatic::response($env, $h, $path, $type);
}
-sub git_parse_hdr { # {parse_hdr} for Qspawn
+sub ghb_parse_hdr { # header parser for Qspawn
my ($r, $bref, @dumb_args) = @_;
my $res = parse_cgi_headers($r, $bref) or return; # incomplete
$res->[0] == 403 ? serve_dumb(@dumb_args) : $res;
@@ -106,7 +106,7 @@ sub serve_smart {
$env{PATH_TRANSLATED} = "$git->{git_dir}/$path";
my $rdr = input_prepare($env) or return r(500);
my $qsp = PublicInbox::Qspawn->new([qw(git http-backend)], \%env, $rdr);
- $qsp->psgi_return($env, $limiter, \&git_parse_hdr, $env, $git, $path);
+ $qsp->psgi_yield($env, $limiter, \&ghb_parse_hdr, $env, $git, $path);
}
sub input_prepare {
diff --git a/lib/PublicInbox/InputPipe.pm b/lib/PublicInbox/InputPipe.pm
index b38d8270..614360c2 100644
--- a/lib/PublicInbox/InputPipe.pm
+++ b/lib/PublicInbox/InputPipe.pm
@@ -39,14 +39,15 @@ sub consume {
if ($@) { # regular file (but not w/ select|IO::Poll backends)
$self->{-need_rq} = 1;
$self->requeue;
- } elsif (-p $in || -S _) { # O_NONBLOCK for sockets and pipes
+ } elsif (do { no warnings 'unopened'; !stat($in) }) { # ProcessIONBF
+ } elsif (-p _ || -S _) { # O_NONBLOCK for sockets and pipes
$in->blocking(0);
} elsif (-t $in) { # isatty(3) can't use `_' stat cache
unblock_tty($self);
}
}
-sub close {
+sub close { # idempotent
my ($self) = @_;
if (my $t = delete($self->{restore_termios})) {
my $fd = fileno($self->{sock} // return);
@@ -60,16 +61,16 @@ sub event_step {
my $r = sysread($self->{sock} // return, my $rbuf, 65536);
eval {
if ($r) {
- $self->{cb}->(@{$self->{args}}, $rbuf);
+ $self->{cb}->($self, @{$self->{args}}, $rbuf);
$self->requeue if $self->{-need_rq};
} elsif (defined($r)) { # EOF
- $self->{cb}->(@{$self->{args}}, '');
+ $self->{cb}->($self, @{$self->{args}}, '');
$self->close
} elsif ($!{EAGAIN}) { # rely on EPOLLIN
} elsif ($!{EINTR}) { # rely on EPOLLIN for sockets/pipes/tty
$self->requeue if $self->{-need_rq};
} else { # another error
- $self->{cb}->(@{$self->{args}}, undef);
+ $self->{cb}->($self, @{$self->{args}}, undef);
$self->close;
}
};
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 3ccdd4f7..c1e965c8 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1566,7 +1566,7 @@ sub request_umask {
}
sub _stdin_cb { # PublicInbox::InputPipe::consume callback for --stdin
- my ($lei, $cb) = @_; # $_[-1] = $rbuf
+ my (undef, $lei, $cb) = @_; # $_[-1] = $rbuf
$_[1] // return $lei->fail("error reading stdin: $!");
$lei->{stdin_buf} .= $_[-1];
do_env($lei, $cb) if $_[-1] eq '';
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 9a7e8734..c598e863 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -31,6 +31,9 @@ use PublicInbox::GzipFilter;
use Scalar::Util qw(blessed);
use PublicInbox::Limiter;
use PublicInbox::Aspawn qw(run_await);
+use PublicInbox::Syscall qw(EPOLLIN);
+use PublicInbox::InputPipe;
+use Carp qw(confess);
# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
use Errno qw(EAGAIN EINTR);
@@ -61,7 +64,7 @@ sub _do_spawn {
if ($start_cb) {
eval { # popen_rd may die on EMFILE, ENFILE
$self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
- \&waitpid_err, $self);
+ \&waitpid_err, $self, \%o);
$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
};
} else {
@@ -126,6 +129,13 @@ sub wait_await { # run_await cb
waitpid_err($pid, $self, $opt);
}
+sub yield_chunk { # $_[-1] is sysread buffer (or undef)
+ my ($self, $ipipe) = @_;
+ my $qfh = $self->{qfh}; # PublicInbox::HTTP::(Chunked|Identity)
+ ($_[-1] // '') eq '' ? $qfh->close : $qfh->write($_[-1]);
+ delete($self->{rpipe}) if ($_[-1] // '') eq ''; # all done
+}
+
sub finish ($;$) {
my ($self, $err) = @_;
$self->{_err} //= $err; # only for $@
@@ -201,6 +211,33 @@ EOM
$ret;
}
+sub yield_pass {
+ my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe
+ my $env = $self->{psgi_env};
+ my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb');
+ if (ref($res) eq 'CODE') { # chain another command
+ $ipipe->close;
+ $res->($wcb);
+ $self->{passed} = 1;
+ return; # all done
+ }
+ confess("BUG: $res unhandled") if ref($res) ne 'ARRAY';
+
+ my $filter = blessed($res->[2]) && $res->[2]->can('attach') ?
+ pop(@$res) : delete($env->{'qspawn.filter'});
+ $filter //= PublicInbox::GzipFilter::qsp_maybe($res->[1], $env);
+
+ if (scalar(@$res) == 3) { # done early
+ $ipipe->close;
+ return $wcb->($res); # all done
+ }
+
+ scalar(@$res) == 2 or confess("BUG: scalar(res) != 2: @$res");
+ my $qfh = $wcb->($res); # get PublicInbox::HTTP::(Chunked|Identity)
+ $qfh = $filter->attach($qfh) if $filter;
+ $self->{qfh} = $qfh; # keep $ipipe open
+}
+
sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
my ($self) = @_;
my $r = rd_hdr($self) or return; # incomplete
@@ -257,6 +294,46 @@ sub psgi_return_start { # may run later, much later...
}
}
+sub _ipipe_cb { # InputPipe callback
+ my ($ipipe, $self) = @_; # $_[-1] rbuf
+ return yield_chunk($self, $ipipe, $_[-1]) if $self->{qfh}; # stream body
+
+ if (!defined($_[-1])) {
+ warn "error reading header: $!";
+ } elsif ($_[-1] eq '') {
+ warn <<EOM;
+EOF parsing headers from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
+EOM
+ } else { # attempt to parse headers
+ my ($bref, $ph_cb, @ph_arg) = @{$self->{yield_parse_hdr}};
+ $$bref .= $_[-1];
+ my $ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) };
+ if ($ret) { # success
+ my $qfh = yield_pass($self, $ipipe, $ret);
+ delete $self->{yield_parse_hdr};
+ $qfh->write($$bref) if $qfh;
+ return;
+ }
+ return unless $@; # incomplete, not an error
+ warn "parse_hdr ($ph_cb): $@";
+ }
+ delete $self->{yield_parse_hdr};
+ yield_pass($self, $ipipe, [ 500, [], [ "Internal error\n" ] ])
+}
+
+sub _yield_start { # may run later, much later...
+ my ($self) = @_;
+ my $async = !!$self->{psgi_env}->{'pi-httpd.async'};
+ my $rpipe = $self->{rpipe};
+ if ($async) {
+ require PublicInbox::ProcessIONBF;
+ PublicInbox::ProcessIONBF->replace($rpipe);
+ }
+ my $ipipe = PublicInbox::InputPipe::consume($rpipe, \&_ipipe_cb, $self);
+ return if $async;
+ $ipipe->event_step while $ipipe->{sock};
+}
+
# Used for streaming the stdout of one process as a PSGI response.
#
# $env is the PSGI env.
@@ -302,4 +379,22 @@ sub psgi_return {
}
}
+sub psgi_yield {
+ my ($self, $env, $limiter, @parse_hdr_arg)= @_;
+ $self->{psgi_env} = $env;
+ $self->{yield_parse_hdr} = [ \(my $buf = ''), @parse_hdr_arg ];
+ $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
+
+ # the caller already captured the PSGI write callback from
+ # the PSGI server, so we can call ->start, here:
+ $env->{'qspawn.wcb'} ? start($self, $limiter, \&_yield_start) : sub {
+ # the caller will return this sub to the PSGI server, so
+ # it can set the response callback (that is, for
+ # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback),
+ # but other HTTP servers are supported:
+ $env->{'qspawn.wcb'} = $_[0];
+ start($self, $limiter, \&_yield_start);
+ }
+}
+
1;
prev parent reply other threads:[~2023-10-19 1:15 UTC|newest]
Thread overview: 8+ messages / expand[flat|nested] mbox.gz Atom feed top
2023-10-19 1:15 [PATCH 1/8] limiter: split out from qspawn Eric Wong
2023-10-19 1:15 ` [PATCH 2/8] spawn: support synchronous run_qx Eric Wong
2023-10-19 1:15 ` [PATCH 3/8] psgi_qx: use a temporary file rather than pipe Eric Wong
2023-10-19 1:15 ` [PATCH 4/8] www_coderepo: capture uses a flattened list Eric Wong
2023-10-19 1:15 ` [PATCH 5/8] qspawn: psgi_return allows list for callback args Eric Wong
2023-10-19 1:15 ` [PATCH 6/8] qspawn: drop unused err arg for ->event_step Eric Wong
2023-10-19 1:15 ` [PATCH 7/8] httpd/async: require IO arg Eric Wong
2023-10-19 1:15 ` Eric Wong [this message]
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20231019011535.1895489-8-e@80x24.org \
--to=e@80x24.org \
--cc=spew@80x24.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).