From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id B0FB31F87E for ; Thu, 19 Oct 2023 12:40:21 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1697719221; bh=XBqP0YvnAxnD9+LnnhG8u3iqDwjEYdMyxoMITtf0W/s=; h=From:To:Subject:Date:In-Reply-To:References:From; b=YL+kwqOl1s/6U/jSKCpZoB+vKVauWLKQks/RpiUS+bCB2ag+nDAKuaTwC69GdYqq7 EgIbAYxQJaQk8VUJ3M+ng1bQwH7yVB300RKYhSTYigAMAWEAC+E5fvmKQ3/7R4g/5k UauhOWXnxuS9Y0pBDJqh5uSMUb/5pV8DA7+G4yxg= From: Eric Wong To: spew@80x24.org Subject: [PATCH 17/18] drop psgi_return and httpd/async entirely Date: Thu, 19 Oct 2023 12:40:17 +0000 Message-ID: <20231019124018.2109632-17-e@80x24.org> In-Reply-To: <20231019124018.2109632-1-e@80x24.org> References: <20231019124018.2109632-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: --- MANIFEST | 1 - lib/PublicInbox/HTTPD.pm | 5 +- lib/PublicInbox/HTTPD/Async.pm | 101 --------------------------- lib/PublicInbox/Qspawn.pm | 121 +-------------------------------- t/httpd-corner.psgi | 14 ++-- t/httpd-corner.t | 12 ++-- 6 files changed, 15 insertions(+), 239 deletions(-) delete mode 100644 lib/PublicInbox/HTTPD/Async.pm diff --git a/MANIFEST b/MANIFEST index f087621c..bac28d62 100644 --- a/MANIFEST +++ b/MANIFEST @@ -211,7 +211,6 @@ lib/PublicInbox/GitHTTPBackend.pm lib/PublicInbox/GzipFilter.pm lib/PublicInbox/HTTP.pm lib/PublicInbox/HTTPD.pm -lib/PublicInbox/HTTPD/Async.pm lib/PublicInbox/HlMod.pm lib/PublicInbox/Hval.pm lib/PublicInbox/IMAP.pm diff --git a/lib/PublicInbox/HTTPD.pm b/lib/PublicInbox/HTTPD.pm index bae7281b..6a6347d8 100644 --- a/lib/PublicInbox/HTTPD.pm +++ b/lib/PublicInbox/HTTPD.pm @@ -9,9 +9,6 @@ use strict; use Plack::Util (); use Plack::Builder; use PublicInbox::HTTP; -use PublicInbox::HTTPD::Async; - -sub pi_httpd_async { PublicInbox::HTTPD::Async->new(@_) } # we have a different env for ever listener socket for # SERVER_NAME, SERVER_PORT and psgi.url_scheme @@ -45,7 +42,7 @@ sub env_for ($$$) { # this to limit git-http-backend(1) parallelism. # We also check for the truthiness of this to # detect when to use async paths for slow blobs - 'pi-httpd.async' => \&pi_httpd_async, + 'pi-httpd.async' => 1, 'pi-httpd.app' => $self->{app}, 'pi-httpd.warn_cb' => $self->{warn_cb}, } diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm deleted file mode 100644 index 2e4d8baa..00000000 --- a/lib/PublicInbox/HTTPD/Async.pm +++ /dev/null @@ -1,101 +0,0 @@ -# Copyright (C) all contributors -# License: AGPL-3.0+ -# -# XXX This is a totally unstable API for public-inbox internal use only -# This is exposed via the 'pi-httpd.async' key in the PSGI env hash. -# The name of this key is not even stable! -# Currently intended for use with read-only pipes with expensive -# processes such as git-http-backend(1), cgit(1) -# -# fields: -# http: PublicInbox::HTTP ref -# fh: PublicInbox::HTTP::{Identity,Chunked} ref (can ->write + ->close) -# cb: initial read callback -# arg: arg for {cb} -# end_obj: CODE or object which responds to ->event_step when ->close is called -package PublicInbox::HTTPD::Async; -use v5.12; -use parent qw(PublicInbox::DS); -use Errno qw(EAGAIN); -use PublicInbox::Syscall qw(EPOLLIN); -use PublicInbox::ProcessIONBF; - -# This is called via: $env->{'pi-httpd.async'}->() -# $io is a read-only pipe ($rpipe) for now, but may be a -# bidirectional socket in the future. -sub new { - my ($class, $io, $cb, $arg, $end_obj) = @_; - my $self = bless { - cb => $cb, # initial read callback - arg => $arg, # arg for $cb - end_obj => $end_obj, # like END{}, can ->event_step - }, $class; - PublicInbox::ProcessIONBF->replace($io); - $self->SUPER::new($io, EPOLLIN); -} - -sub event_step { - my ($self) = @_; - if (defined $self->{cb}) { - # this may call async_pass when headers are done - $self->{cb}->($self->{arg}); - } elsif (my $sock = $self->{sock}) { - # $http may be undef if discarding body output from cgit on 404 - my $http = $self->{http} or return $self->close; - # $self->{sock} is a read pipe for git-http-backend or cgit - # and 65536 is the default Linux pipe size - my $r = sysread($sock, my $buf, 65536); - if ($r) { - $self->{ofh}->write($buf); # may call $http->close - # let other clients get some work done, too - return if $http->{sock}; # !closed - - # else: fall through to close below... - } elsif (!defined $r && $! == EAGAIN) { - return; # EPOLLIN means we'll be notified - } - - # Done! Error handling will happen in $self->{ofh}->close - # called by end_obj->event_step handler - delete $http->{forward}; - $self->close; # queues end_obj->event_step to be called - } # else { # we may've been requeued but closed by $http -} - -# once this is called, all data we read is passed to the -# to the PublicInbox::HTTP instance ($http) via $ofh->write -# $ofh is typically PublicInbox::HTTP::{Chunked,Identity}, but -# may be PublicInbox::GzipFilter or $PublicInbox::Qspawn::qx_fh -sub async_pass { - my ($self, $http, $ofh, $bref) = @_; - delete @$self{qw(cb arg)}; - # In case the client HTTP connection ($http) dies, it - # will automatically close this ($self) object. - $http->{forward} = $self; - - # write anything we overread when we were reading headers. - # This is typically PublicInbox:HTTP::{chunked,identity}_wcb, - # but may be PublicInbox::GzipFilter::write. PSGI requires - # *_wcb methods respond to ->write (and ->close), not ->print - $ofh->write($$bref); - - $self->{http} = $http; - $self->{ofh} = $ofh; -} - -# may be called as $forward->close in PublicInbox::HTTP or EOF (event_step) -sub close { - my $self = $_[0]; - $self->SUPER::close; # DS::close - delete @$self{qw(cb arg)}; - - # we defer this to the next timer loop since close is deferred - if (my $end_obj = delete $self->{end_obj}) { - # this calls $end_obj->event_step - # (likely PublicInbox::Qspawn::event_step, - # NOT PublicInbox::HTTPD::Async::event_step) - PublicInbox::DS::requeue($end_obj); - } -} - -1; diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 9ac9aec1..9f8f9a99 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -174,48 +174,6 @@ sub psgi_qx { start($self, $limiter, undef); } -# this is called on pipe EOF to reap the process, may be called -# via PublicInbox::DS event loop OR via GetlineBody for generic -# PSGI servers. -sub event_step { - my ($self) = @_; - finish($self); - my $fh = delete $self->{qfh}; - $fh->close if $fh; # async-only (psgi_return) -} - -sub rd_hdr ($) { - my ($self) = @_; - # typically used for reading CGI headers - # We also need to check EINTR for generic PSGI servers. - my ($ret, $total_rd); - my ($bref, $ph_cb, @ph_arg) = ($self->{hdr_buf}, @{$self->{parse_hdr}}); - until (defined($ret)) { - my $r = sysread($self->{rpipe}, $$bref, 4096, length($$bref)); - if (defined($r)) { - $total_rd += $r; - eval { $ret = $ph_cb->($total_rd, $bref, @ph_arg) }; - if ($@) { - warn "parse_hdr: $@"; - $ret = [ 500, [], [ "Internal error\n" ] ]; - } elsif (!defined($ret) && !$r) { - warn <{cmd}} ($self->{psgi_env}->{REQUEST_URI}) -EOM - $ret = [ 500, [], [ "Internal error\n" ] ]; - } - } else { - # caller should notify us when it's ready: - return if $! == EAGAIN; - next if $! == EINTR; # immediate retry - warn "error reading header: $!"; - $ret = [ 500, [], [ "Internal error\n" ] ]; - } - } - delete $self->{parse_hdr}; # done parsing headers - $ret; -} - sub yield_pass { my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe my $env = $self->{psgi_env}; @@ -246,62 +204,6 @@ sub yield_pass { $self->{qfh} = $qfh; # keep $ipipe open } -sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb} - my ($self) = @_; - my $r = rd_hdr($self) or return; # incomplete - my $env = $self->{psgi_env}; - my $filter; - - # this is for RepoAtom since that can fire after parse_cgi_headers - if (ref($r) eq 'ARRAY' && blessed($r->[2]) && $r->[2]->can('attach')) { - $filter = pop @$r; - } - $filter //= delete($env->{'qspawn.filter'}) // (ref($r) eq 'ARRAY' ? - PublicInbox::GzipFilter::qsp_maybe($r->[1], $env) : undef); - - my $wcb = delete $env->{'qspawn.wcb'}; - my $async = delete $self->{async}; # PublicInbox::HTTPD::Async - if (ref($r) ne 'ARRAY' || scalar(@$r) == 3) { # error - if ($async) { # calls rpipe->close && ->event_step - $async->close; # PublicInbox::HTTPD::Async::close - } else { # generic PSGI, use PublicInbox::ProcessIO::CLOSE - delete($self->{rpipe})->close; - event_step($self); - } - if (ref($r) eq 'ARRAY') { # error - $wcb->($r) - } elsif (ref($r) eq 'CODE') { # chain another command - $r->($wcb); - $self->{passed} = 1; - } - # else do nothing - } elsif ($async) { - # done reading headers, handoff to read body - my $fh = $wcb->($r); # scalar @$r == 2 - $fh = $filter->attach($fh) if $filter; - $self->{qfh} = $fh; - $async->async_pass($env->{'psgix.io'}, $fh, - delete($self->{hdr_buf})); - } else { # for synchronous PSGI servers - require PublicInbox::GetlineBody; - my $buf = delete $self->{hdr_buf}; - $r->[2] = PublicInbox::GetlineBody->new($self->{rpipe}, - \&event_step, $self, $$buf, $filter); - $wcb->($r); - } -} - -sub psgi_return_start { # may run later, much later... - my ($self) = @_; - if (my $cb = $self->{psgi_env}->{'pi-httpd.async'}) { - # PublicInbox::HTTPD::Async->new(rpipe, $cb, $cb_arg, $end_obj) - $self->{async} = $cb->($self->{rpipe}, - \&psgi_return_init_cb, $self, $self); - } else { # generic PSGI - psgi_return_init_cb($self) while $self->{parse_hdr}; - } -} - sub _ipipe_cb { # InputPipe callback my ($ipipe, $self) = @_; # $_[-1] rbuf return yield_chunk($self, $ipipe, $_[-1]) if $self->{qfh}; # stream body @@ -354,7 +256,7 @@ sub _yield_start { # may run later, much later... # $env->{'qspawn.wcb'} - the write callback from the PSGI server # optional, use this if you've already # captured it elsewhere. If not given, -# psgi_return will return an anonymous +# psgi_yield will return an anonymous # sub for the PSGI server to call # # $env->{'qspawn.filter'} - filter object, responds to ->attach for @@ -370,27 +272,6 @@ sub _yield_start { # may run later, much later... # body will be streamed, later, via writes (push-based) to # psgix.io. 3-element arrays means the body is available # immediately (or streamed via ->getline (pull-based)). -sub psgi_return { - my ($self, $env, $limiter, @parse_hdr_arg)= @_; - $self->{psgi_env} = $env; - $self->{hdr_buf} = \(my $hdr_buf = ''); - $self->{parse_hdr} = \@parse_hdr_arg; - $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32); - - # the caller already captured the PSGI write callback from - # the PSGI server, so we can call ->start, here: - $env->{'qspawn.wcb'} and - return start($self, $limiter, \&psgi_return_start); - - # the caller will return this sub to the PSGI server, so - # it can set the response callback (that is, for - # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback), - # but other HTTP servers are supported: - sub { - $env->{'qspawn.wcb'} = $_[0]; - start($self, $limiter, \&psgi_return_start); - } -} sub psgi_yield { my ($self, $env, $limiter, @parse_hdr_arg)= @_; diff --git a/t/httpd-corner.psgi b/t/httpd-corner.psgi index 1e96d7b1..e29fd87b 100644 --- a/t/httpd-corner.psgi +++ b/t/httpd-corner.psgi @@ -92,34 +92,34 @@ my $app = sub { my $rdr = { 2 => fileno($null) }; my $cmd = [qw(dd if=/dev/zero count=30 bs=1024k)]; my $qsp = PublicInbox::Qspawn->new($cmd, undef, $rdr); - return $qsp->psgi_return($env, undef, sub { + return $qsp->psgi_yield($env, undef, sub { my ($r, $bref) = @_; # make $rd_hdr retry sysread + $parse_hdr in Qspawn: return until length($$bref) > 8000; close $null; [ 200, [ qw(Content-Type application/octet-stream) ]]; }); - } elsif ($path eq '/psgi-return-gzip') { + } elsif ($path eq '/psgi-yield-gzip') { require PublicInbox::Qspawn; require PublicInbox::GzipFilter; my $cmd = [qw(echo hello world)]; my $qsp = PublicInbox::Qspawn->new($cmd); $env->{'qspawn.filter'} = PublicInbox::GzipFilter->new; - return $qsp->psgi_return($env, undef, sub { + return $qsp->psgi_yield($env, undef, sub { [ 200, [ qw(Content-Type application/octet-stream)]] }); - } elsif ($path eq '/psgi-return-compressible') { + } elsif ($path eq '/psgi-yield-compressible') { require PublicInbox::Qspawn; my $cmd = [qw(echo goodbye world)]; my $qsp = PublicInbox::Qspawn->new($cmd); - return $qsp->psgi_return($env, undef, sub { + return $qsp->psgi_yield($env, undef, sub { [200, [qw(Content-Type text/plain)]] }); - } elsif ($path eq '/psgi-return-enoent') { + } elsif ($path eq '/psgi-yield-enoent') { require PublicInbox::Qspawn; my $cmd = [ 'this-better-not-exist-in-PATH'.rand ]; my $qsp = PublicInbox::Qspawn->new($cmd); - return $qsp->psgi_return($env, undef, sub { + return $qsp->psgi_yield($env, undef, sub { [ 200, [ qw(Content-Type application/octet-stream)]] }); } elsif ($path eq '/pid') { diff --git a/t/httpd-corner.t b/t/httpd-corner.t index aab3635c..2d2d1061 100644 --- a/t/httpd-corner.t +++ b/t/httpd-corner.t @@ -374,13 +374,13 @@ SKIP: { is($non_zero, 0, 'read all zeros'); require_mods(@zmods, 4); - my $buf = xqx([$curl, '-gsS', "$base/psgi-return-gzip"]); + my $buf = xqx([$curl, '-gsS', "$base/psgi-yield-gzip"]); is($?, 0, 'curl succesful'); IO::Uncompress::Gunzip::gunzip(\$buf => \(my $out)); is($out, "hello world\n"); my $curl_rdr = { 2 => \(my $curl_err = '') }; $buf = xqx([$curl, qw(-gsSv --compressed), - "$base/psgi-return-compressible"], undef, $curl_rdr); + "$base/psgi-yield-compressible"], undef, $curl_rdr); is($?, 0, 'curl --compressed successful'); is($buf, "goodbye world\n", 'gzipped response as expected'); like($curl_err, qr/\bContent-Encoding: gzip\b/, @@ -388,8 +388,8 @@ SKIP: { } { - my $conn = conn_for($sock, 'psgi_return ENOENT'); - print $conn "GET /psgi-return-enoent HTTP/1.1\r\n\r\n" or die; + my $conn = conn_for($sock, 'psgi_yield ENOENT'); + print $conn "GET /psgi-yield-enoent HTTP/1.1\r\n\r\n" or die; my $buf = ''; sysread($conn, $buf, 16384, length($buf)) until $buf =~ /\r\n\r\n/; like($buf, qr!HTTP/1\.[01] 500\b!, 'got 500 error on ENOENT'); @@ -678,13 +678,13 @@ SKIP: { my $app = require $psgi; test_psgi($app, sub { my ($cb) = @_; - my $req = GET('http://example.com/psgi-return-gzip'); + my $req = GET('http://example.com/psgi-yield-gzip'); my $res = $cb->($req); my $buf = $res->content; IO::Uncompress::Gunzip::gunzip(\$buf => \(my $out)); is($out, "hello world\n", 'got expected output'); - $req = GET('http://example.com/psgi-return-enoent'); + $req = GET('http://example.com/psgi-yield-enoent'); $res = $cb->($req); is($res->code, 500, 'got error on ENOENT'); seek($tmperr, 0, SEEK_SET) or die;