dumping ground for random patches and texts
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: spew@80x24.org
Subject: [PATCH 09/18] qspawn: introduce psgi_yield API
Date: Thu, 19 Oct 2023 12:40:09 +0000	[thread overview]
Message-ID: <20231019124018.2109632-9-e@80x24.org> (raw)
In-Reply-To: <20231019124018.2109632-1-e@80x24.org>

This will replace psgi_return and HTTPD/Async entirely
---
 lib/PublicInbox/GitHTTPBackend.pm |   4 +-
 lib/PublicInbox/HTTP.pm           |   8 ++-
 lib/PublicInbox/InputPipe.pm      |  12 ++--
 lib/PublicInbox/LEI.pm            |   2 +-
 lib/PublicInbox/Qspawn.pm         | 110 +++++++++++++++++++++++++++++-
 5 files changed, 124 insertions(+), 12 deletions(-)

diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index edbc0157..d7e0bced 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -79,7 +79,7 @@ sub serve_dumb {
 	PublicInbox::WwwStatic::response($env, $h, $path, $type);
 }
 
-sub git_parse_hdr { # {parse_hdr} for Qspawn
+sub ghb_parse_hdr { # header parser for Qspawn
 	my ($r, $bref, @dumb_args) = @_;
 	my $res = parse_cgi_headers($r, $bref) or return; # incomplete
 	$res->[0] == 403 ? serve_dumb(@dumb_args) : $res;
@@ -106,7 +106,7 @@ sub serve_smart {
 	$env{PATH_TRANSLATED} = "$git->{git_dir}/$path";
 	my $rdr = input_prepare($env) or return r(500);
 	my $qsp = PublicInbox::Qspawn->new([qw(git http-backend)], \%env, $rdr);
-	$qsp->psgi_return($env, $limiter, \&git_parse_hdr, $env, $git, $path);
+	$qsp->psgi_yield($env, $limiter, \&ghb_parse_hdr, $env, $git, $path);
 }
 
 sub input_prepare {
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index ca162939..edc88fe8 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -455,11 +455,12 @@ sub next_step {
 # They may be exposed to the PSGI application when the PSGI app
 # returns a CODE ref for "push"-based responses
 package PublicInbox::HTTP::Chunked;
-use strict;
+use v5.12;
 
 sub write {
 	# ([$http], $buf) = @_;
-	PublicInbox::HTTP::chunked_write($_[0]->[0], $_[1])
+	PublicInbox::HTTP::chunked_write($_[0]->[0], $_[1]);
+	$_[0]->[0]->{sock} ? length($_[1]) : undef;
 }
 
 sub close {
@@ -468,12 +469,13 @@ sub close {
 }
 
 package PublicInbox::HTTP::Identity;
-use strict;
+use v5.12;
 our @ISA = qw(PublicInbox::HTTP::Chunked);
 
 sub write {
 	# ([$http], $buf) = @_;
 	PublicInbox::HTTP::identity_write($_[0]->[0], $_[1]);
+	$_[0]->[0]->{sock} ? length($_[1]) : undef;
 }
 
 1;
diff --git a/lib/PublicInbox/InputPipe.pm b/lib/PublicInbox/InputPipe.pm
index b38d8270..f4d57e7d 100644
--- a/lib/PublicInbox/InputPipe.pm
+++ b/lib/PublicInbox/InputPipe.pm
@@ -39,14 +39,16 @@ sub consume {
 	if ($@) { # regular file (but not w/ select|IO::Poll backends)
 		$self->{-need_rq} = 1;
 		$self->requeue;
-	} elsif (-p $in || -S _) { # O_NONBLOCK for sockets and pipes
+	} elsif (do { no warnings 'unopened'; !stat($in) }) { # ProcessIONBF
+	} elsif (-p _ || -S _) { # O_NONBLOCK for sockets and pipes
 		$in->blocking(0);
 	} elsif (-t $in) { # isatty(3) can't use `_' stat cache
 		unblock_tty($self);
 	}
+	$self;
 }
 
-sub close {
+sub close { # idempotent
 	my ($self) = @_;
 	if (my $t = delete($self->{restore_termios})) {
 		my $fd = fileno($self->{sock} // return);
@@ -60,16 +62,16 @@ sub event_step {
 	my $r = sysread($self->{sock} // return, my $rbuf, 65536);
 	eval {
 		if ($r) {
-			$self->{cb}->(@{$self->{args}}, $rbuf);
+			$self->{cb}->($self, @{$self->{args}}, $rbuf);
 			$self->requeue if $self->{-need_rq};
 		} elsif (defined($r)) { # EOF
-			$self->{cb}->(@{$self->{args}}, '');
+			$self->{cb}->($self, @{$self->{args}}, '');
 			$self->close
 		} elsif ($!{EAGAIN}) { # rely on EPOLLIN
 		} elsif ($!{EINTR}) { # rely on EPOLLIN for sockets/pipes/tty
 			$self->requeue if $self->{-need_rq};
 		} else { # another error
-			$self->{cb}->(@{$self->{args}}, undef);
+			$self->{cb}->($self, @{$self->{args}}, undef);
 			$self->close;
 		}
 	};
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 56e4c001..7bc7b2dc 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1567,7 +1567,7 @@ sub request_umask {
 }
 
 sub _stdin_cb { # PublicInbox::InputPipe::consume callback for --stdin
-	my ($lei, $cb) = @_; # $_[-1] = $rbuf
+	my (undef, $lei, $cb) = @_; # $_[-1] = $rbuf
 	$_[1] // return $lei->fail("error reading stdin: $!");
 	$lei->{stdin_buf} .= $_[-1];
 	do_env($lei, $cb) if $_[-1] eq '';
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 9a7e8734..9ac9aec1 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -31,6 +31,9 @@ use PublicInbox::GzipFilter;
 use Scalar::Util qw(blessed);
 use PublicInbox::Limiter;
 use PublicInbox::Aspawn qw(run_await);
+use PublicInbox::Syscall qw(EPOLLIN);
+use PublicInbox::InputPipe;
+use Carp qw(confess);
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
 use Errno qw(EAGAIN EINTR);
@@ -61,7 +64,7 @@ sub _do_spawn {
 	if ($start_cb) {
 		eval { # popen_rd may die on EMFILE, ENFILE
 			$self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
-						\&waitpid_err, $self);
+						\&waitpid_err, $self, \%o);
 			$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
 		};
 	} else {
@@ -126,6 +129,18 @@ sub wait_await { # run_await cb
 	waitpid_err($pid, $self, $opt);
 }
 
+sub yield_chunk { # $_[-1] is sysread buffer (or undef)
+	my ($self, $ipipe) = @_;
+	if (($_[-1] // '') eq '') { # all done (EOF or error)
+		delete $self->{rpipe};
+		$self->{qfh}->close;
+	} elsif (defined($self->{qfh}->write($_[-1]))) { # OK, keep going
+	} else { # HTTP client gone
+		delete $self->{rpipe};
+		$ipipe->close;
+	}
+}
+
 sub finish ($;$) {
 	my ($self, $err) = @_;
 	$self->{_err} //= $err; # only for $@
@@ -201,6 +216,36 @@ EOM
 	$ret;
 }
 
+sub yield_pass {
+	my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe
+	my $env = $self->{psgi_env};
+	my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb');
+	if (ref($res) eq 'CODE') { # chain another command
+		delete $self->{rpipe};
+		$ipipe->close;
+		$res->($wcb);
+		$self->{passed} = 1;
+		return; # all done
+	}
+	confess("BUG: $res unhandled") if ref($res) ne 'ARRAY';
+
+	my $filter = blessed($res->[2]) && $res->[2]->can('attach') ?
+			pop(@$res) : delete($env->{'qspawn.filter'});
+	$filter //= PublicInbox::GzipFilter::qsp_maybe($res->[1], $env);
+
+	if (scalar(@$res) == 3) { # done early (likely error or static file)
+		delete $self->{rpipe};
+		$ipipe->close;
+		$wcb->($res); # all done
+		return;
+	}
+	scalar(@$res) == 2 or confess("BUG: scalar(res) != 2: @$res");
+	# streaming response
+	my $qfh = $wcb->($res); # get PublicInbox::HTTP::(Chunked|Identity)
+	$qfh = $filter->attach($qfh) if $filter;
+	$self->{qfh} = $qfh; # keep $ipipe open
+}
+
 sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
 	my ($self) = @_;
 	my $r = rd_hdr($self) or return; # incomplete
@@ -257,6 +302,51 @@ sub psgi_return_start { # may run later, much later...
 	}
 }
 
+sub _ipipe_cb { # InputPipe callback
+	my ($ipipe, $self) = @_; # $_[-1] rbuf
+	return yield_chunk($self, $ipipe, $_[-1]) if $self->{qfh}; # stream body
+
+	if (!defined($_[-1])) {
+		warn "error reading header: $!";
+	} else { # attempt to parse headers
+		my ($bref, $ph_cb, @ph_arg) = @{$self->{yield_parse_hdr}};
+		$$bref .= $_[-1];
+		my $ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) };
+		if ($ret) { # success
+			delete $self->{yield_parse_hdr};
+			yield_pass($self, $ipipe, $ret) and
+					$$bref ne '' and
+					yield_chunk($self, $ipipe, $$bref);
+			return; # may yield_chunk in future calls
+		} elsif ($@) {
+			warn "parse_hdr ($ph_cb): $@";
+		} elsif ($_[-1] eq '') {
+			warn <<EOM;
+EOF parsing headers from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
+EOM
+		} else { # incomplete, not an error
+			return;
+		}
+	}
+	delete $self->{yield_parse_hdr};
+	yield_pass($self, $ipipe, [ 500, [], [ "Internal error\n" ] ])
+}
+
+sub _yield_start { # may run later, much later...
+	my ($self) = @_;
+	my $async = !!$self->{psgi_env}->{'pi-httpd.async'};
+	my $rpipe = $self->{rpipe};
+	if ($async) {
+		require PublicInbox::ProcessIONBF;
+		PublicInbox::ProcessIONBF->replace($rpipe);
+	}
+	my $ipipe = PublicInbox::InputPipe::consume($rpipe, \&_ipipe_cb, $self);
+	return if $async;
+	$ipipe->event_step while $ipipe->{sock};
+	delete $self->{rpipe};
+	close $rpipe; # triggers waitpid_err ASAP
+}
+
 # Used for streaming the stdout of one process as a PSGI response.
 #
 # $env is the PSGI env.
@@ -302,4 +392,22 @@ sub psgi_return {
 	}
 }
 
+sub psgi_yield {
+	my ($self, $env, $limiter, @parse_hdr_arg)= @_;
+	$self->{psgi_env} = $env;
+	$self->{yield_parse_hdr} = [ \(my $buf = ''), @parse_hdr_arg ];
+	$limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
+
+	# the caller already captured the PSGI write callback from
+	# the PSGI server, so we can call ->start, here:
+	$env->{'qspawn.wcb'} ? start($self, $limiter, \&_yield_start) : sub {
+		# the caller will return this sub to the PSGI server, so
+		# it can set the response callback (that is, for
+		# PublicInbox::HTTP, the chunked_wcb or identity_wcb callback),
+		# but other HTTP servers are supported:
+		$env->{'qspawn.wcb'} = $_[0];
+		start($self, $limiter, \&_yield_start);
+	}
+}
+
 1;

  parent reply	other threads:[~2023-10-19 12:40 UTC|newest]

Thread overview: 18+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-10-19 12:40 [PATCH 01/18] limiter: split out from qspawn Eric Wong
2023-10-19 12:40 ` [PATCH 02/18] spawn: support synchronous run_qx Eric Wong
2023-10-19 12:40 ` [PATCH 03/18] psgi_qx: use a temporary file rather than pipe Eric Wong
2023-10-19 12:40 ` [PATCH 04/18] www_coderepo: capture uses a flattened list Eric Wong
2023-10-19 12:40 ` [PATCH 05/18] qspawn: psgi_return allows list for callback args Eric Wong
2023-10-19 12:40 ` [PATCH 06/18] qspawn: drop unused err arg for ->event_step Eric Wong
2023-10-19 12:40 ` [PATCH 07/18] httpd/async: require IO arg Eric Wong
2023-10-19 12:40 ` [PATCH 08/18] tests: move reset Eric Wong
2023-10-19 12:40 ` Eric Wong [this message]
2023-10-19 12:40 ` [PATCH 10/18] repo_atom: switch to psgi_yield Eric Wong
2023-10-19 12:40 ` [PATCH 11/18] repo_snapshot: psgi_yield Eric Wong
2023-10-19 12:40 ` [PATCH 12/18] viewvcs: psgi_yield Eric Wong
2023-10-19 12:40 ` [PATCH 13/18] www_altid: psgi_yield Eric Wong
2023-10-19 12:40 ` [PATCH 14/18] cgit: psgi_yield Eric Wong
2023-10-19 12:40 ` [PATCH 15/18] www_coderepo: psgi_yield Eric Wong
2023-10-19 12:40 ` [PATCH 16/18] githttpbackend: fix outdated comments Eric Wong
2023-10-19 12:40 ` [PATCH 17/18] drop psgi_return and httpd/async entirely Eric Wong
2023-10-19 12:40 ` [PATCH 18/18] kill getlinebody Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20231019124018.2109632-9-e@80x24.org \
    --to=e@80x24.org \
    --cc=spew@80x24.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).