From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 4877E1F69D for ; Mon, 23 Oct 2023 08:48:39 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1698050919; bh=SC4cktHe2mloilQ8z/EVMYtboC+IqSNGOBMvTKO3dpc=; h=From:To:Subject:Date:In-Reply-To:References:From; b=qgCrZc37zgctjLcWropZ4gj4nhj4uCEv1dlf7bO/abNX18CVVxj5EJBRMCx1kTXlR yVmdk87/43ztrwLjd0KHgDI4uAOqUs+wduftYdlVNlo+JYS6sy6yD/K1Dh3JJY3unM ULt8XSJ2BF6VbPKWarHAwoYij/Q9nK20TNxXQaP0= From: Eric Wong To: spew@80x24.org Subject: [PATCH 09/18] qspawn: introduce new psgi_yield API Date: Mon, 23 Oct 2023 08:48:28 +0000 Message-ID: <20231023084837.2804687-9-e@80x24.org> In-Reply-To: <20231023084837.2804687-1-e@80x24.org> References: <20231023084837.2804687-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This is intended to replace psgi_return and HTTPD/Async entirely, hopefully making our code less convoluted while maintaining the ability to handle slow clients on memory-constrained systems This was made possible by the philosophy shift in commit 21a539a2df0c (httpd/async: switch to buffering-as-fast-as-possible, 2019-06-28). We'll still support generic PSGI via the `pull' model with a GetlineResponse class which is similar to the old GetlineBody. --- MANIFEST | 1 + lib/PublicInbox/GetlineResponse.pm | 40 ++++++++++ lib/PublicInbox/GitHTTPBackend.pm | 4 +- lib/PublicInbox/GzipFilter.pm | 3 +- lib/PublicInbox/HTTP.pm | 8 +- lib/PublicInbox/InputPipe.pm | 12 +-- lib/PublicInbox/LEI.pm | 2 +- lib/PublicInbox/Qspawn.pm | 119 ++++++++++++++++++++++++++++- 8 files changed, 176 insertions(+), 13 deletions(-) create mode 100644 lib/PublicInbox/GetlineResponse.pm diff --git a/MANIFEST b/MANIFEST index f087621c..420b40a1 100644 --- a/MANIFEST +++ b/MANIFEST @@ -204,6 +204,7 @@ lib/PublicInbox/Filter/Vger.pm lib/PublicInbox/Gcf2.pm lib/PublicInbox/Gcf2Client.pm lib/PublicInbox/GetlineBody.pm +lib/PublicInbox/GetlineResponse.pm lib/PublicInbox/Git.pm lib/PublicInbox/GitAsyncCat.pm lib/PublicInbox/GitCredential.pm diff --git a/lib/PublicInbox/GetlineResponse.pm b/lib/PublicInbox/GetlineResponse.pm new file mode 100644 index 00000000..290cce74 --- /dev/null +++ b/lib/PublicInbox/GetlineResponse.pm @@ -0,0 +1,40 @@ +# Copyright (C) all contributors +# License: AGPL-3.0+ + +# For generic PSGI servers (not public-inbox-httpd/netd) which assumes their +# getline response bodies can be backpressure-aware for slow clients +# This depends on rpipe being _blocking_ on getline. +package PublicInbox::GetlineResponse; +use v5.12; + +sub response { + my ($qsp) = @_; + my ($res, $rbuf); + do { # read header synchronously + sysread($qsp->{rpipe}, $rbuf, 65536); + $res = $qsp->parse_hdr_done($rbuf); # fills $bref + } until defined($res); + my ($wcb, $filter) = $qsp->yield_pass(undef, $res) or return; + my $self = $res->[2] = bless { + qsp => $qsp, + filter => $filter, + }, __PACKAGE__; + my ($bref) = @{delete $qsp->{yield_parse_hdr}}; + $self->{rbuf} = $$bref if $$bref ne ''; + $wcb->($res); +} + +sub getline { + my ($self) = @_; + my $rpipe = $self->{qsp}->{rpipe} // do { + delete($self->{qsp})->finish; + return; # EOF was set on previous call + }; + my $buf = delete($self->{rbuf}) // $rpipe->getline; + $buf // delete($self->{qsp}->{rpipe}); # set EOF for next call + $self->{filter} ? $self->{filter}->translate($buf) : $buf; +} + +sub close {} + +1; diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm index edbc0157..d7e0bced 100644 --- a/lib/PublicInbox/GitHTTPBackend.pm +++ b/lib/PublicInbox/GitHTTPBackend.pm @@ -79,7 +79,7 @@ sub serve_dumb { PublicInbox::WwwStatic::response($env, $h, $path, $type); } -sub git_parse_hdr { # {parse_hdr} for Qspawn +sub ghb_parse_hdr { # header parser for Qspawn my ($r, $bref, @dumb_args) = @_; my $res = parse_cgi_headers($r, $bref) or return; # incomplete $res->[0] == 403 ? serve_dumb(@dumb_args) : $res; @@ -106,7 +106,7 @@ sub serve_smart { $env{PATH_TRANSLATED} = "$git->{git_dir}/$path"; my $rdr = input_prepare($env) or return r(500); my $qsp = PublicInbox::Qspawn->new([qw(git http-backend)], \%env, $rdr); - $qsp->psgi_return($env, $limiter, \&git_parse_hdr, $env, $git, $path); + $qsp->psgi_yield($env, $limiter, \&ghb_parse_hdr, $env, $git, $path); } sub input_prepare { diff --git a/lib/PublicInbox/GzipFilter.pm b/lib/PublicInbox/GzipFilter.pm index db8e8397..d6ecd5ba 100644 --- a/lib/PublicInbox/GzipFilter.pm +++ b/lib/PublicInbox/GzipFilter.pm @@ -123,9 +123,10 @@ sub http_out ($) { }; } +# returns undef if HTTP client disconnected, may return 0 +# because ->translate can return '' sub write { my $self = shift; - # my $ret = bytes::length($_[1]); # XXX does anybody care? http_out($self)->write($self->translate(@_)); } diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index ca162939..edc88fe8 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -455,11 +455,12 @@ sub next_step { # They may be exposed to the PSGI application when the PSGI app # returns a CODE ref for "push"-based responses package PublicInbox::HTTP::Chunked; -use strict; +use v5.12; sub write { # ([$http], $buf) = @_; - PublicInbox::HTTP::chunked_write($_[0]->[0], $_[1]) + PublicInbox::HTTP::chunked_write($_[0]->[0], $_[1]); + $_[0]->[0]->{sock} ? length($_[1]) : undef; } sub close { @@ -468,12 +469,13 @@ sub close { } package PublicInbox::HTTP::Identity; -use strict; +use v5.12; our @ISA = qw(PublicInbox::HTTP::Chunked); sub write { # ([$http], $buf) = @_; PublicInbox::HTTP::identity_write($_[0]->[0], $_[1]); + $_[0]->[0]->{sock} ? length($_[1]) : undef; } 1; diff --git a/lib/PublicInbox/InputPipe.pm b/lib/PublicInbox/InputPipe.pm index b38d8270..f4d57e7d 100644 --- a/lib/PublicInbox/InputPipe.pm +++ b/lib/PublicInbox/InputPipe.pm @@ -39,14 +39,16 @@ sub consume { if ($@) { # regular file (but not w/ select|IO::Poll backends) $self->{-need_rq} = 1; $self->requeue; - } elsif (-p $in || -S _) { # O_NONBLOCK for sockets and pipes + } elsif (do { no warnings 'unopened'; !stat($in) }) { # ProcessIONBF + } elsif (-p _ || -S _) { # O_NONBLOCK for sockets and pipes $in->blocking(0); } elsif (-t $in) { # isatty(3) can't use `_' stat cache unblock_tty($self); } + $self; } -sub close { +sub close { # idempotent my ($self) = @_; if (my $t = delete($self->{restore_termios})) { my $fd = fileno($self->{sock} // return); @@ -60,16 +62,16 @@ sub event_step { my $r = sysread($self->{sock} // return, my $rbuf, 65536); eval { if ($r) { - $self->{cb}->(@{$self->{args}}, $rbuf); + $self->{cb}->($self, @{$self->{args}}, $rbuf); $self->requeue if $self->{-need_rq}; } elsif (defined($r)) { # EOF - $self->{cb}->(@{$self->{args}}, ''); + $self->{cb}->($self, @{$self->{args}}, ''); $self->close } elsif ($!{EAGAIN}) { # rely on EPOLLIN } elsif ($!{EINTR}) { # rely on EPOLLIN for sockets/pipes/tty $self->requeue if $self->{-need_rq}; } else { # another error - $self->{cb}->(@{$self->{args}}, undef); + $self->{cb}->($self, @{$self->{args}}, undef); $self->close; } }; diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 56e4c001..7bc7b2dc 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -1567,7 +1567,7 @@ sub request_umask { } sub _stdin_cb { # PublicInbox::InputPipe::consume callback for --stdin - my ($lei, $cb) = @_; # $_[-1] = $rbuf + my (undef, $lei, $cb) = @_; # $_[-1] = $rbuf $_[1] // return $lei->fail("error reading stdin: $!"); $lei->{stdin_buf} .= $_[-1]; do_env($lei, $cb) if $_[-1] eq ''; diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 9a7e8734..203d8f41 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -31,6 +31,9 @@ use PublicInbox::GzipFilter; use Scalar::Util qw(blessed); use PublicInbox::Limiter; use PublicInbox::Aspawn qw(run_await); +use PublicInbox::Syscall qw(EPOLLIN); +use PublicInbox::InputPipe; +use Carp qw(carp confess); # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers use Errno qw(EAGAIN EINTR); @@ -61,7 +64,7 @@ sub _do_spawn { if ($start_cb) { eval { # popen_rd may die on EMFILE, ENFILE $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o, - \&waitpid_err, $self); + \&waitpid_err, $self, \%o); $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM }; } else { @@ -126,6 +129,20 @@ sub wait_await { # run_await cb waitpid_err($pid, $self, $opt); } +sub yield_chunk { # $_[-1] is sysread buffer (or undef) + my ($self, $ipipe) = @_; + if (!defined($_[-1])) { + warn "error reading body: $!"; + } elsif ($_[-1] eq '') { # normal EOF + $self->finish; + $self->{qfh}->close; + } elsif (defined($self->{qfh}->write($_[-1]))) { + return; # continue while HTTP client is reading our writes + } # else { # HTTP client disconnected + delete $self->{rpipe}; + $ipipe->close; +} + sub finish ($;$) { my ($self, $err) = @_; $self->{_err} //= $err; # only for $@ @@ -201,6 +218,39 @@ EOM $ret; } +sub yield_pass { + my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe + my $env = $self->{psgi_env}; + my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb'); + if (ref($res) eq 'CODE') { # chain another command + delete $self->{rpipe}; + $ipipe->close if $ipipe; + $res->($wcb); + $self->{passed} = 1; + return; # all done + } + confess("BUG: $res unhandled") if ref($res) ne 'ARRAY'; + + my $filter = blessed($res->[2]) && $res->[2]->can('attach') ? + pop(@$res) : delete($env->{'qspawn.filter'}); + $filter //= PublicInbox::GzipFilter::qsp_maybe($res->[1], $env); + + if (scalar(@$res) == 3) { # done early (likely error or static file) + delete $self->{rpipe}; + $ipipe->close if $ipipe; + $wcb->($res); # all done + return; + } + scalar(@$res) == 2 or confess("BUG: scalar(res) != 2: @$res"); + return ($wcb, $filter) if !$ipipe; # generic PSGI + # streaming response + my $qfh = $wcb->($res); # get PublicInbox::HTTP::(Chunked|Identity) + $qfh = $filter->attach($qfh) if $filter; + my ($bref) = @{delete $self->{yield_parse_hdr}}; + $qfh->write($$bref) if $$bref ne ''; + $self->{qfh} = $qfh; # keep $ipipe open +} + sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb} my ($self) = @_; my $r = rd_hdr($self) or return; # incomplete @@ -257,6 +307,55 @@ sub psgi_return_start { # may run later, much later... } } +sub r500 () { [ 500, [], [ "Internal error\n" ] ] } + +sub parse_hdr_done ($$) { + my ($self) = @_; + my $ret; + if (defined $_[-1]) { + my ($bref, $ph_cb, @ph_arg) = @{$self->{yield_parse_hdr}}; + $$bref .= $_[-1]; + $ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) }; + if ($@) { + carp "parse_hdr (@{$self->{cmd}}): $@\n"; + $ret = r500(); + } elsif (!$ret && $_[-1] eq '') { + carp <{cmd}} ($self->{psgi_env}->{REQUEST_URI}) +EOM + $ret = r500(); + } + } else { + carp <{cmd}} ($self->{psgi_env}->{REQUEST_URI}) +EOM + $ret = r500(); + } + $ret; # undef if headers incomplete +} + +sub ipipe_cb { # InputPipe callback + my ($ipipe, $self) = @_; # $_[-1] rbuf + if ($self->{qfh}) { # already streaming + yield_chunk($self, $ipipe, $_[-1]); + } elsif (my $res = parse_hdr_done($self, $_[-1])) { + yield_pass($self, $ipipe, $res); + } # else: headers incomplete, keep reading +} + +sub _yield_start { # may run later, much later... + my ($self) = @_; + if ($self->{psgi_env}->{'pi-httpd.async'}) { + require PublicInbox::ProcessIONBF; + my $rpipe = $self->{rpipe}; + PublicInbox::ProcessIONBF->replace($rpipe); + PublicInbox::InputPipe::consume($rpipe, \&ipipe_cb, $self); + } else { + require PublicInbox::GetlineResponse; + PublicInbox::GetlineResponse::response($self); + } +} + # Used for streaming the stdout of one process as a PSGI response. # # $env is the PSGI env. @@ -302,4 +401,22 @@ sub psgi_return { } } +sub psgi_yield { + my ($self, $env, $limiter, @parse_hdr_arg)= @_; + $self->{psgi_env} = $env; + $self->{yield_parse_hdr} = [ \(my $buf = ''), @parse_hdr_arg ]; + $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32); + + # the caller already captured the PSGI write callback from + # the PSGI server, so we can call ->start, here: + $env->{'qspawn.wcb'} ? start($self, $limiter, \&_yield_start) : sub { + # the caller will return this sub to the PSGI server, so + # it can set the response callback (that is, for + # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback), + # but other HTTP servers are supported: + $env->{'qspawn.wcb'} = $_[0]; + start($self, $limiter, \&_yield_start); + } +} + 1;