From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 6D6B51F888 for ; Mon, 23 Oct 2023 08:48:40 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1698050920; bh=FGp07RpWWI9UflAujDZWBu8xgarnqhlbjnbiYuNw41I=; h=From:To:Subject:Date:In-Reply-To:References:From; b=HRGFnhXmhPMGmz9xE50RZ/cchCGf9QJFULzvfQfzSqxU1drzt8sGYn7AwxBnnqIh7 S9VvAv1RDN7BwVB9oLOnDV0uFCVgAxuTEDMe5u3ruV6D36kYoNaL9URxr6Lq00SIyR QsN2inzia3VMfGmabTpROEfjFzdHVaZBZaEBVkfo= From: Eric Wong To: spew@80x24.org Subject: [PATCH 16/18] drop psgi_return, httpd/async and GetlineBody Date: Mon, 23 Oct 2023 08:48:35 +0000 Message-ID: <20231023084837.2804687-16-e@80x24.org> In-Reply-To: <20231023084837.2804687-1-e@80x24.org> References: <20231023084837.2804687-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Now that psgi_yield is used everywhere, the more complex psgi_return and it's helper bits can be removed. We'll also fix some outdated comments now that everything using psgi_return switched to psgi_yield. GetlineResponse replaces GetlineBody and does a better job of isolating generic PSGI-only code. --- MANIFEST | 2 - lib/PublicInbox/GetlineBody.pm | 46 ------------ lib/PublicInbox/GitHTTPBackend.pm | 6 +- lib/PublicInbox/GzipFilter.pm | 2 +- lib/PublicInbox/HTTPD.pm | 5 +- lib/PublicInbox/HTTPD/Async.pm | 101 ------------------------- lib/PublicInbox/Qspawn.pm | 121 +----------------------------- lib/PublicInbox/RepoAtom.pm | 2 +- lib/PublicInbox/WwwCoderepo.pm | 2 +- t/httpd-corner.psgi | 14 ++-- t/httpd-corner.t | 12 +-- 11 files changed, 19 insertions(+), 294 deletions(-) delete mode 100644 lib/PublicInbox/GetlineBody.pm delete mode 100644 lib/PublicInbox/HTTPD/Async.pm diff --git a/MANIFEST b/MANIFEST index 420b40a1..3df48667 100644 --- a/MANIFEST +++ b/MANIFEST @@ -203,7 +203,6 @@ lib/PublicInbox/Filter/SubjectTag.pm lib/PublicInbox/Filter/Vger.pm lib/PublicInbox/Gcf2.pm lib/PublicInbox/Gcf2Client.pm -lib/PublicInbox/GetlineBody.pm lib/PublicInbox/GetlineResponse.pm lib/PublicInbox/Git.pm lib/PublicInbox/GitAsyncCat.pm @@ -212,7 +211,6 @@ lib/PublicInbox/GitHTTPBackend.pm lib/PublicInbox/GzipFilter.pm lib/PublicInbox/HTTP.pm lib/PublicInbox/HTTPD.pm -lib/PublicInbox/HTTPD/Async.pm lib/PublicInbox/HlMod.pm lib/PublicInbox/Hval.pm lib/PublicInbox/IMAP.pm diff --git a/lib/PublicInbox/GetlineBody.pm b/lib/PublicInbox/GetlineBody.pm deleted file mode 100644 index 0e781224..00000000 --- a/lib/PublicInbox/GetlineBody.pm +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright (C) 2016-2021 all contributors -# License: AGPL-3.0+ - -# Wrap a pipe or file for PSGI streaming response bodies and calls the -# end callback when the object goes out-of-scope. -# This depends on rpipe being _blocking_ on getline. -# -# This is only used by generic PSGI servers and not public-inbox-httpd -package PublicInbox::GetlineBody; -use strict; -use warnings; - -sub new { - my ($class, $rpipe, $end, $end_arg, $buf, $filter) = @_; - bless { - rpipe => $rpipe, - end => $end, - end_arg => $end_arg, - initial_buf => $buf, - filter => $filter, - }, $class; -} - -# close should always be called after getline returns undef, -# but a client aborting a connection can ruin our day; so lets -# hope our underlying PSGI server does not leak references, here. -sub DESTROY { $_[0]->close } - -sub getline { - my ($self) = @_; - my $rpipe = $self->{rpipe} or return; # EOF was set on previous call - my $buf = delete($self->{initial_buf}) // $rpipe->getline; - delete($self->{rpipe}) unless defined $buf; # set EOF for next call - if (my $filter = $self->{filter}) { - $buf = $filter->translate($buf); - } - $buf; -} - -sub close { - my ($self) = @_; - my ($end, $end_arg) = delete @$self{qw(end end_arg)}; - $end->($end_arg) if $end; -} - -1; diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm index d7e0bced..7228555b 100644 --- a/lib/PublicInbox/GitHTTPBackend.pm +++ b/lib/PublicInbox/GitHTTPBackend.pm @@ -145,16 +145,12 @@ sub parse_cgi_headers { # {parse_hdr} for Qspawn } } - # fallback to WwwCoderepo if cgit 404s. Duplicating $ctx prevents - # ->finalize from the current Qspawn from using qspawn.wcb. - # This makes qspawn skip ->async_pass and causes - # PublicInbox::HTTPD::Async::event_step to close shortly after + # fallback to WwwCoderepo if cgit 404s if ($code == 404 && $ctx->{www} && !$ctx->{_coderepo_tried}++) { my $wcb = delete $ctx->{env}->{'qspawn.wcb'}; $ctx->{env}->{'plack.skip-deflater'} = 1; # prevent 2x gzip $ctx->{env}->{'qspawn.fallback'} = $code; my $res = $ctx->{www}->coderepo->srv($ctx); - # for ->psgi_return_init_cb $ctx->{env}->{'qspawn.wcb'} = $wcb; $res; # CODE or ARRAY ref } else { diff --git a/lib/PublicInbox/GzipFilter.pm b/lib/PublicInbox/GzipFilter.pm index d6ecd5ba..fc471ea2 100644 --- a/lib/PublicInbox/GzipFilter.pm +++ b/lib/PublicInbox/GzipFilter.pm @@ -93,7 +93,7 @@ sub gone { # what: search/over/mm undef; } -# for GetlineBody (via Qspawn) when NOT using $env->{'pi-httpd.async'} +# for GetlineResponse (via Qspawn) when NOT using $env->{'pi-httpd.async'} # Also used for ->getline callbacks sub translate { my $self = shift; # $_[1] => input diff --git a/lib/PublicInbox/HTTPD.pm b/lib/PublicInbox/HTTPD.pm index bae7281b..6a6347d8 100644 --- a/lib/PublicInbox/HTTPD.pm +++ b/lib/PublicInbox/HTTPD.pm @@ -9,9 +9,6 @@ use strict; use Plack::Util (); use Plack::Builder; use PublicInbox::HTTP; -use PublicInbox::HTTPD::Async; - -sub pi_httpd_async { PublicInbox::HTTPD::Async->new(@_) } # we have a different env for ever listener socket for # SERVER_NAME, SERVER_PORT and psgi.url_scheme @@ -45,7 +42,7 @@ sub env_for ($$$) { # this to limit git-http-backend(1) parallelism. # We also check for the truthiness of this to # detect when to use async paths for slow blobs - 'pi-httpd.async' => \&pi_httpd_async, + 'pi-httpd.async' => 1, 'pi-httpd.app' => $self->{app}, 'pi-httpd.warn_cb' => $self->{warn_cb}, } diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm deleted file mode 100644 index 2e4d8baa..00000000 --- a/lib/PublicInbox/HTTPD/Async.pm +++ /dev/null @@ -1,101 +0,0 @@ -# Copyright (C) all contributors -# License: AGPL-3.0+ -# -# XXX This is a totally unstable API for public-inbox internal use only -# This is exposed via the 'pi-httpd.async' key in the PSGI env hash. -# The name of this key is not even stable! -# Currently intended for use with read-only pipes with expensive -# processes such as git-http-backend(1), cgit(1) -# -# fields: -# http: PublicInbox::HTTP ref -# fh: PublicInbox::HTTP::{Identity,Chunked} ref (can ->write + ->close) -# cb: initial read callback -# arg: arg for {cb} -# end_obj: CODE or object which responds to ->event_step when ->close is called -package PublicInbox::HTTPD::Async; -use v5.12; -use parent qw(PublicInbox::DS); -use Errno qw(EAGAIN); -use PublicInbox::Syscall qw(EPOLLIN); -use PublicInbox::ProcessIONBF; - -# This is called via: $env->{'pi-httpd.async'}->() -# $io is a read-only pipe ($rpipe) for now, but may be a -# bidirectional socket in the future. -sub new { - my ($class, $io, $cb, $arg, $end_obj) = @_; - my $self = bless { - cb => $cb, # initial read callback - arg => $arg, # arg for $cb - end_obj => $end_obj, # like END{}, can ->event_step - }, $class; - PublicInbox::ProcessIONBF->replace($io); - $self->SUPER::new($io, EPOLLIN); -} - -sub event_step { - my ($self) = @_; - if (defined $self->{cb}) { - # this may call async_pass when headers are done - $self->{cb}->($self->{arg}); - } elsif (my $sock = $self->{sock}) { - # $http may be undef if discarding body output from cgit on 404 - my $http = $self->{http} or return $self->close; - # $self->{sock} is a read pipe for git-http-backend or cgit - # and 65536 is the default Linux pipe size - my $r = sysread($sock, my $buf, 65536); - if ($r) { - $self->{ofh}->write($buf); # may call $http->close - # let other clients get some work done, too - return if $http->{sock}; # !closed - - # else: fall through to close below... - } elsif (!defined $r && $! == EAGAIN) { - return; # EPOLLIN means we'll be notified - } - - # Done! Error handling will happen in $self->{ofh}->close - # called by end_obj->event_step handler - delete $http->{forward}; - $self->close; # queues end_obj->event_step to be called - } # else { # we may've been requeued but closed by $http -} - -# once this is called, all data we read is passed to the -# to the PublicInbox::HTTP instance ($http) via $ofh->write -# $ofh is typically PublicInbox::HTTP::{Chunked,Identity}, but -# may be PublicInbox::GzipFilter or $PublicInbox::Qspawn::qx_fh -sub async_pass { - my ($self, $http, $ofh, $bref) = @_; - delete @$self{qw(cb arg)}; - # In case the client HTTP connection ($http) dies, it - # will automatically close this ($self) object. - $http->{forward} = $self; - - # write anything we overread when we were reading headers. - # This is typically PublicInbox:HTTP::{chunked,identity}_wcb, - # but may be PublicInbox::GzipFilter::write. PSGI requires - # *_wcb methods respond to ->write (and ->close), not ->print - $ofh->write($$bref); - - $self->{http} = $http; - $self->{ofh} = $ofh; -} - -# may be called as $forward->close in PublicInbox::HTTP or EOF (event_step) -sub close { - my $self = $_[0]; - $self->SUPER::close; # DS::close - delete @$self{qw(cb arg)}; - - # we defer this to the next timer loop since close is deferred - if (my $end_obj = delete $self->{end_obj}) { - # this calls $end_obj->event_step - # (likely PublicInbox::Qspawn::event_step, - # NOT PublicInbox::HTTPD::Async::event_step) - PublicInbox::DS::requeue($end_obj); - } -} - -1; diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 203d8f41..a6e1d58b 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -176,48 +176,6 @@ sub psgi_qx { start($self, $limiter, undef); } -# this is called on pipe EOF to reap the process, may be called -# via PublicInbox::DS event loop OR via GetlineBody for generic -# PSGI servers. -sub event_step { - my ($self) = @_; - finish($self); - my $fh = delete $self->{qfh}; - $fh->close if $fh; # async-only (psgi_return) -} - -sub rd_hdr ($) { - my ($self) = @_; - # typically used for reading CGI headers - # We also need to check EINTR for generic PSGI servers. - my ($ret, $total_rd); - my ($bref, $ph_cb, @ph_arg) = ($self->{hdr_buf}, @{$self->{parse_hdr}}); - until (defined($ret)) { - my $r = sysread($self->{rpipe}, $$bref, 4096, length($$bref)); - if (defined($r)) { - $total_rd += $r; - eval { $ret = $ph_cb->($total_rd, $bref, @ph_arg) }; - if ($@) { - warn "parse_hdr: $@"; - $ret = [ 500, [], [ "Internal error\n" ] ]; - } elsif (!defined($ret) && !$r) { - warn <{cmd}} ($self->{psgi_env}->{REQUEST_URI}) -EOM - $ret = [ 500, [], [ "Internal error\n" ] ]; - } - } else { - # caller should notify us when it's ready: - return if $! == EAGAIN; - next if $! == EINTR; # immediate retry - warn "error reading header: $!"; - $ret = [ 500, [], [ "Internal error\n" ] ]; - } - } - delete $self->{parse_hdr}; # done parsing headers - $ret; -} - sub yield_pass { my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe my $env = $self->{psgi_env}; @@ -251,62 +209,6 @@ sub yield_pass { $self->{qfh} = $qfh; # keep $ipipe open } -sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb} - my ($self) = @_; - my $r = rd_hdr($self) or return; # incomplete - my $env = $self->{psgi_env}; - my $filter; - - # this is for RepoAtom since that can fire after parse_cgi_headers - if (ref($r) eq 'ARRAY' && blessed($r->[2]) && $r->[2]->can('attach')) { - $filter = pop @$r; - } - $filter //= delete($env->{'qspawn.filter'}) // (ref($r) eq 'ARRAY' ? - PublicInbox::GzipFilter::qsp_maybe($r->[1], $env) : undef); - - my $wcb = delete $env->{'qspawn.wcb'}; - my $async = delete $self->{async}; # PublicInbox::HTTPD::Async - if (ref($r) ne 'ARRAY' || scalar(@$r) == 3) { # error - if ($async) { # calls rpipe->close && ->event_step - $async->close; # PublicInbox::HTTPD::Async::close - } else { # generic PSGI, use PublicInbox::ProcessIO::CLOSE - delete($self->{rpipe})->close; - event_step($self); - } - if (ref($r) eq 'ARRAY') { # error - $wcb->($r) - } elsif (ref($r) eq 'CODE') { # chain another command - $r->($wcb); - $self->{passed} = 1; - } - # else do nothing - } elsif ($async) { - # done reading headers, handoff to read body - my $fh = $wcb->($r); # scalar @$r == 2 - $fh = $filter->attach($fh) if $filter; - $self->{qfh} = $fh; - $async->async_pass($env->{'psgix.io'}, $fh, - delete($self->{hdr_buf})); - } else { # for synchronous PSGI servers - require PublicInbox::GetlineBody; - my $buf = delete $self->{hdr_buf}; - $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe}, - \&event_step, $self, $$buf, $filter); - $wcb->($r); - } -} - -sub psgi_return_start { # may run later, much later... - my ($self) = @_; - if (my $cb = $self->{psgi_env}->{'pi-httpd.async'}) { - # PublicInbox::HTTPD::Async->new(rpipe, $cb, $cb_arg, $end_obj) - $self->{async} = $cb->($self->{rpipe}, - \&psgi_return_init_cb, $self, $self); - } else { # generic PSGI - psgi_return_init_cb($self) while $self->{parse_hdr}; - } -} - sub r500 () { [ 500, [], [ "Internal error\n" ] ] } sub parse_hdr_done ($$) { @@ -363,7 +265,7 @@ sub _yield_start { # may run later, much later... # $env->{'qspawn.wcb'} - the write callback from the PSGI server # optional, use this if you've already # captured it elsewhere. If not given, -# psgi_return will return an anonymous +# psgi_yield will return an anonymous # sub for the PSGI server to call # # $env->{'qspawn.filter'} - filter object, responds to ->attach for @@ -379,27 +281,6 @@ sub _yield_start { # may run later, much later... # body will be streamed, later, via writes (push-based) to # psgix.io. 3-element arrays means the body is available # immediately (or streamed via ->getline (pull-based)). -sub psgi_return { - my ($self, $env, $limiter, @parse_hdr_arg)= @_; - $self->{psgi_env} = $env; - $self->{hdr_buf} = \(my $hdr_buf = ''); - $self->{parse_hdr} = \@parse_hdr_arg; - $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32); - - # the caller already captured the PSGI write callback from - # the PSGI server, so we can call ->start, here: - $env->{'qspawn.wcb'} and - return start($self, $limiter, \&psgi_return_start); - - # the caller will return this sub to the PSGI server, so - # it can set the response callback (that is, for - # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback), - # but other HTTP servers are supported: - sub { - $env->{'qspawn.wcb'} = $_[0]; - start($self, $limiter, \&psgi_return_start); - } -} sub psgi_yield { my ($self, $env, $limiter, @parse_hdr_arg)= @_; diff --git a/lib/PublicInbox/RepoAtom.pm b/lib/PublicInbox/RepoAtom.pm index b7179511..c1649d0a 100644 --- a/lib/PublicInbox/RepoAtom.pm +++ b/lib/PublicInbox/RepoAtom.pm @@ -40,7 +40,7 @@ EOM # called by GzipFilter->close sub zflush { $_[0]->SUPER::zflush('') } -# called by GzipFilter->write or GetlineBody->getline +# called by GzipFilter->write or GetlineResponse->getline sub translate { my $self = shift; my $rec = $_[0] // return $self->zflush; # getline diff --git a/lib/PublicInbox/WwwCoderepo.pm b/lib/PublicInbox/WwwCoderepo.pm index 6e19fc02..0eb4a2d6 100644 --- a/lib/PublicInbox/WwwCoderepo.pm +++ b/lib/PublicInbox/WwwCoderepo.pm @@ -230,7 +230,7 @@ sub summary ($$) { # called by GzipFilter->close after translate sub zflush { $_[0]->SUPER::zflush('', $_[0]->_html_end) } -# called by GzipFilter->write or GetlineBody->getline +# called by GzipFilter->write or GetlineResponse->getline sub translate { my $ctx = shift; my $rec = $_[0] // return zflush($ctx); # getline diff --git a/t/httpd-corner.psgi b/t/httpd-corner.psgi index 1e96d7b1..e29fd87b 100644 --- a/t/httpd-corner.psgi +++ b/t/httpd-corner.psgi @@ -92,34 +92,34 @@ my $app = sub { my $rdr = { 2 => fileno($null) }; my $cmd = [qw(dd if=/dev/zero count=30 bs=1024k)]; my $qsp = PublicInbox::Qspawn->new($cmd, undef, $rdr); - return $qsp->psgi_return($env, undef, sub { + return $qsp->psgi_yield($env, undef, sub { my ($r, $bref) = @_; # make $rd_hdr retry sysread + $parse_hdr in Qspawn: return until length($$bref) > 8000; close $null; [ 200, [ qw(Content-Type application/octet-stream) ]]; }); - } elsif ($path eq '/psgi-return-gzip') { + } elsif ($path eq '/psgi-yield-gzip') { require PublicInbox::Qspawn; require PublicInbox::GzipFilter; my $cmd = [qw(echo hello world)]; my $qsp = PublicInbox::Qspawn->new($cmd); $env->{'qspawn.filter'} = PublicInbox::GzipFilter->new; - return $qsp->psgi_return($env, undef, sub { + return $qsp->psgi_yield($env, undef, sub { [ 200, [ qw(Content-Type application/octet-stream)]] }); - } elsif ($path eq '/psgi-return-compressible') { + } elsif ($path eq '/psgi-yield-compressible') { require PublicInbox::Qspawn; my $cmd = [qw(echo goodbye world)]; my $qsp = PublicInbox::Qspawn->new($cmd); - return $qsp->psgi_return($env, undef, sub { + return $qsp->psgi_yield($env, undef, sub { [200, [qw(Content-Type text/plain)]] }); - } elsif ($path eq '/psgi-return-enoent') { + } elsif ($path eq '/psgi-yield-enoent') { require PublicInbox::Qspawn; my $cmd = [ 'this-better-not-exist-in-PATH'.rand ]; my $qsp = PublicInbox::Qspawn->new($cmd); - return $qsp->psgi_return($env, undef, sub { + return $qsp->psgi_yield($env, undef, sub { [ 200, [ qw(Content-Type application/octet-stream)]] }); } elsif ($path eq '/pid') { diff --git a/t/httpd-corner.t b/t/httpd-corner.t index aab3635c..2d2d1061 100644 --- a/t/httpd-corner.t +++ b/t/httpd-corner.t @@ -374,13 +374,13 @@ SKIP: { is($non_zero, 0, 'read all zeros'); require_mods(@zmods, 4); - my $buf = xqx([$curl, '-gsS', "$base/psgi-return-gzip"]); + my $buf = xqx([$curl, '-gsS', "$base/psgi-yield-gzip"]); is($?, 0, 'curl succesful'); IO::Uncompress::Gunzip::gunzip(\$buf => \(my $out)); is($out, "hello world\n"); my $curl_rdr = { 2 => \(my $curl_err = '') }; $buf = xqx([$curl, qw(-gsSv --compressed), - "$base/psgi-return-compressible"], undef, $curl_rdr); + "$base/psgi-yield-compressible"], undef, $curl_rdr); is($?, 0, 'curl --compressed successful'); is($buf, "goodbye world\n", 'gzipped response as expected'); like($curl_err, qr/\bContent-Encoding: gzip\b/, @@ -388,8 +388,8 @@ SKIP: { } { - my $conn = conn_for($sock, 'psgi_return ENOENT'); - print $conn "GET /psgi-return-enoent HTTP/1.1\r\n\r\n" or die; + my $conn = conn_for($sock, 'psgi_yield ENOENT'); + print $conn "GET /psgi-yield-enoent HTTP/1.1\r\n\r\n" or die; my $buf = ''; sysread($conn, $buf, 16384, length($buf)) until $buf =~ /\r\n\r\n/; like($buf, qr!HTTP/1\.[01] 500\b!, 'got 500 error on ENOENT'); @@ -678,13 +678,13 @@ SKIP: { my $app = require $psgi; test_psgi($app, sub { my ($cb) = @_; - my $req = GET('http://example.com/psgi-return-gzip'); + my $req = GET('http://example.com/psgi-yield-gzip'); my $res = $cb->($req); my $buf = $res->content; IO::Uncompress::Gunzip::gunzip(\$buf => \(my $out)); is($out, "hello world\n", 'got expected output'); - $req = GET('http://example.com/psgi-return-enoent'); + $req = GET('http://example.com/psgi-yield-enoent'); $res = $cb->($req); is($res->code, 500, 'got error on ENOENT'); seek($tmperr, 0, SEEK_SET) or die;