From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 8F8C21F51B for ; Thu, 19 Oct 2023 01:15:36 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1697678136; bh=3UFxxMMyoilygHrwlK0tt6AhXjYvTRZrp1b6wzXUexw=; h=From:To:Subject:Date:In-Reply-To:References:From; b=DcTf4peeeC85En/Yi+Yd4Gt4FE3t7urgXHncQy7rjBV5bj3lyyFFX9p6unnL/r4f9 NqpcKPSDQcnn4NLwV8O6kNUF7W9en0qpNmPZRtzyq0HsiQcrTMrnNwywPjQUzMRrs4 GZJQmYbK3L7i5zaFxSRM6pqQICNjF3Jq7MJywjqM= From: Eric Wong To: spew@80x24.org Subject: [PATCH 8/8] qspawn: introduce psgi_yield API Date: Thu, 19 Oct 2023 01:15:35 +0000 Message-ID: <20231019011535.1895489-8-e@80x24.org> In-Reply-To: <20231019011535.1895489-1-e@80x24.org> References: <20231019011535.1895489-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This may eventually replace psgi_return and HTTPD/Async entirely --- lib/PublicInbox/GitHTTPBackend.pm | 4 +- lib/PublicInbox/InputPipe.pm | 11 ++-- lib/PublicInbox/LEI.pm | 2 +- lib/PublicInbox/Qspawn.pm | 97 ++++++++++++++++++++++++++++++- 4 files changed, 105 insertions(+), 9 deletions(-) diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm index edbc0157..d7e0bced 100644 --- a/lib/PublicInbox/GitHTTPBackend.pm +++ b/lib/PublicInbox/GitHTTPBackend.pm @@ -79,7 +79,7 @@ sub serve_dumb { PublicInbox::WwwStatic::response($env, $h, $path, $type); } -sub git_parse_hdr { # {parse_hdr} for Qspawn +sub ghb_parse_hdr { # header parser for Qspawn my ($r, $bref, @dumb_args) = @_; my $res = parse_cgi_headers($r, $bref) or return; # incomplete $res->[0] == 403 ? serve_dumb(@dumb_args) : $res; @@ -106,7 +106,7 @@ sub serve_smart { $env{PATH_TRANSLATED} = "$git->{git_dir}/$path"; my $rdr = input_prepare($env) or return r(500); my $qsp = PublicInbox::Qspawn->new([qw(git http-backend)], \%env, $rdr); - $qsp->psgi_return($env, $limiter, \&git_parse_hdr, $env, $git, $path); + $qsp->psgi_yield($env, $limiter, \&ghb_parse_hdr, $env, $git, $path); } sub input_prepare { diff --git a/lib/PublicInbox/InputPipe.pm b/lib/PublicInbox/InputPipe.pm index b38d8270..614360c2 100644 --- a/lib/PublicInbox/InputPipe.pm +++ b/lib/PublicInbox/InputPipe.pm @@ -39,14 +39,15 @@ sub consume { if ($@) { # regular file (but not w/ select|IO::Poll backends) $self->{-need_rq} = 1; $self->requeue; - } elsif (-p $in || -S _) { # O_NONBLOCK for sockets and pipes + } elsif (do { no warnings 'unopened'; !stat($in) }) { # ProcessIONBF + } elsif (-p _ || -S _) { # O_NONBLOCK for sockets and pipes $in->blocking(0); } elsif (-t $in) { # isatty(3) can't use `_' stat cache unblock_tty($self); } } -sub close { +sub close { # idempotent my ($self) = @_; if (my $t = delete($self->{restore_termios})) { my $fd = fileno($self->{sock} // return); @@ -60,16 +61,16 @@ sub event_step { my $r = sysread($self->{sock} // return, my $rbuf, 65536); eval { if ($r) { - $self->{cb}->(@{$self->{args}}, $rbuf); + $self->{cb}->($self, @{$self->{args}}, $rbuf); $self->requeue if $self->{-need_rq}; } elsif (defined($r)) { # EOF - $self->{cb}->(@{$self->{args}}, ''); + $self->{cb}->($self, @{$self->{args}}, ''); $self->close } elsif ($!{EAGAIN}) { # rely on EPOLLIN } elsif ($!{EINTR}) { # rely on EPOLLIN for sockets/pipes/tty $self->requeue if $self->{-need_rq}; } else { # another error - $self->{cb}->(@{$self->{args}}, undef); + $self->{cb}->($self, @{$self->{args}}, undef); $self->close; } }; diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 3ccdd4f7..c1e965c8 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -1566,7 +1566,7 @@ sub request_umask { } sub _stdin_cb { # PublicInbox::InputPipe::consume callback for --stdin - my ($lei, $cb) = @_; # $_[-1] = $rbuf + my (undef, $lei, $cb) = @_; # $_[-1] = $rbuf $_[1] // return $lei->fail("error reading stdin: $!"); $lei->{stdin_buf} .= $_[-1]; do_env($lei, $cb) if $_[-1] eq ''; diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 9a7e8734..c598e863 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -31,6 +31,9 @@ use PublicInbox::GzipFilter; use Scalar::Util qw(blessed); use PublicInbox::Limiter; use PublicInbox::Aspawn qw(run_await); +use PublicInbox::Syscall qw(EPOLLIN); +use PublicInbox::InputPipe; +use Carp qw(confess); # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers use Errno qw(EAGAIN EINTR); @@ -61,7 +64,7 @@ sub _do_spawn { if ($start_cb) { eval { # popen_rd may die on EMFILE, ENFILE $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o, - \&waitpid_err, $self); + \&waitpid_err, $self, \%o); $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM }; } else { @@ -126,6 +129,13 @@ sub wait_await { # run_await cb waitpid_err($pid, $self, $opt); } +sub yield_chunk { # $_[-1] is sysread buffer (or undef) + my ($self, $ipipe) = @_; + my $qfh = $self->{qfh}; # PublicInbox::HTTP::(Chunked|Identity) + ($_[-1] // '') eq '' ? $qfh->close : $qfh->write($_[-1]); + delete($self->{rpipe}) if ($_[-1] // '') eq ''; # all done +} + sub finish ($;$) { my ($self, $err) = @_; $self->{_err} //= $err; # only for $@ @@ -201,6 +211,33 @@ EOM $ret; } +sub yield_pass { + my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe + my $env = $self->{psgi_env}; + my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb'); + if (ref($res) eq 'CODE') { # chain another command + $ipipe->close; + $res->($wcb); + $self->{passed} = 1; + return; # all done + } + confess("BUG: $res unhandled") if ref($res) ne 'ARRAY'; + + my $filter = blessed($res->[2]) && $res->[2]->can('attach') ? + pop(@$res) : delete($env->{'qspawn.filter'}); + $filter //= PublicInbox::GzipFilter::qsp_maybe($res->[1], $env); + + if (scalar(@$res) == 3) { # done early + $ipipe->close; + return $wcb->($res); # all done + } + + scalar(@$res) == 2 or confess("BUG: scalar(res) != 2: @$res"); + my $qfh = $wcb->($res); # get PublicInbox::HTTP::(Chunked|Identity) + $qfh = $filter->attach($qfh) if $filter; + $self->{qfh} = $qfh; # keep $ipipe open +} + sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb} my ($self) = @_; my $r = rd_hdr($self) or return; # incomplete @@ -257,6 +294,46 @@ sub psgi_return_start { # may run later, much later... } } +sub _ipipe_cb { # InputPipe callback + my ($ipipe, $self) = @_; # $_[-1] rbuf + return yield_chunk($self, $ipipe, $_[-1]) if $self->{qfh}; # stream body + + if (!defined($_[-1])) { + warn "error reading header: $!"; + } elsif ($_[-1] eq '') { + warn <{cmd}} ($self->{psgi_env}->{REQUEST_URI}) +EOM + } else { # attempt to parse headers + my ($bref, $ph_cb, @ph_arg) = @{$self->{yield_parse_hdr}}; + $$bref .= $_[-1]; + my $ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) }; + if ($ret) { # success + my $qfh = yield_pass($self, $ipipe, $ret); + delete $self->{yield_parse_hdr}; + $qfh->write($$bref) if $qfh; + return; + } + return unless $@; # incomplete, not an error + warn "parse_hdr ($ph_cb): $@"; + } + delete $self->{yield_parse_hdr}; + yield_pass($self, $ipipe, [ 500, [], [ "Internal error\n" ] ]) +} + +sub _yield_start { # may run later, much later... + my ($self) = @_; + my $async = !!$self->{psgi_env}->{'pi-httpd.async'}; + my $rpipe = $self->{rpipe}; + if ($async) { + require PublicInbox::ProcessIONBF; + PublicInbox::ProcessIONBF->replace($rpipe); + } + my $ipipe = PublicInbox::InputPipe::consume($rpipe, \&_ipipe_cb, $self); + return if $async; + $ipipe->event_step while $ipipe->{sock}; +} + # Used for streaming the stdout of one process as a PSGI response. # # $env is the PSGI env. @@ -302,4 +379,22 @@ sub psgi_return { } } +sub psgi_yield { + my ($self, $env, $limiter, @parse_hdr_arg)= @_; + $self->{psgi_env} = $env; + $self->{yield_parse_hdr} = [ \(my $buf = ''), @parse_hdr_arg ]; + $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32); + + # the caller already captured the PSGI write callback from + # the PSGI server, so we can call ->start, here: + $env->{'qspawn.wcb'} ? start($self, $limiter, \&_yield_start) : sub { + # the caller will return this sub to the PSGI server, so + # it can set the response callback (that is, for + # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback), + # but other HTTP servers are supported: + $env->{'qspawn.wcb'} = $_[0]; + start($self, $limiter, \&_yield_start); + } +} + 1;