dumping ground for random patches and texts
 help / color / mirror / Atom feed
* [PATCH 1/8] limiter: split out from qspawn
@ 2023-10-19  1:15 Eric Wong
  2023-10-19  1:15 ` [PATCH 2/8] spawn: support synchronous run_qx Eric Wong
                   ` (6 more replies)
  0 siblings, 7 replies; 8+ messages in thread
From: Eric Wong @ 2023-10-19  1:15 UTC (permalink / raw)
  To: spew

It's slightly better organized this way, especially since
`publicinboxLimiter' has its own user-facing config section
and knobs.  I may use it in LeiMirror and CodeSearchIdx for
process management.
---
 MANIFEST                          |  1 +
 lib/PublicInbox/Config.pm         |  4 +--
 lib/PublicInbox/GitHTTPBackend.pm |  3 +-
 lib/PublicInbox/Inbox.pm          |  4 +--
 lib/PublicInbox/Limiter.pm        | 47 +++++++++++++++++++++++++++++++
 lib/PublicInbox/MailDiff.pm       |  1 +
 lib/PublicInbox/Qspawn.pm         | 47 ++-----------------------------
 t/qspawn.t                        |  3 +-
 8 files changed, 60 insertions(+), 50 deletions(-)
 create mode 100644 lib/PublicInbox/Limiter.pm

diff --git a/MANIFEST b/MANIFEST
index 791d91a7..dcce801c 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -287,6 +287,7 @@ lib/PublicInbox/LeiUp.pm
 lib/PublicInbox/LeiViewText.pm
 lib/PublicInbox/LeiWatch.pm
 lib/PublicInbox/LeiXSearch.pm
+lib/PublicInbox/Limiter.pm
 lib/PublicInbox/Linkify.pm
 lib/PublicInbox/Listener.pm
 lib/PublicInbox/Lock.pm
diff --git a/lib/PublicInbox/Config.pm b/lib/PublicInbox/Config.pm
index 15e0872e..d156b2d3 100644
--- a/lib/PublicInbox/Config.pm
+++ b/lib/PublicInbox/Config.pm
@@ -124,9 +124,9 @@ sub lookup_newsgroup {
 sub limiter {
 	my ($self, $name) = @_;
 	$self->{-limiters}->{$name} //= do {
-		require PublicInbox::Qspawn;
+		require PublicInbox::Limiter;
 		my $max = $self->{"publicinboxlimiter.$name.max"} || 1;
-		my $limiter = PublicInbox::Qspawn::Limiter->new($max);
+		my $limiter = PublicInbox::Limiter->new($max);
 		$limiter->setup_rlimit($name, $self);
 		$limiter;
 	};
diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index 74432429..d69f5f8b 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -9,13 +9,14 @@ use v5.10.1;
 use Fcntl qw(:seek);
 use IO::Handle; # ->flush
 use HTTP::Date qw(time2str);
+use PublicInbox::Limiter;
 use PublicInbox::Qspawn;
 use PublicInbox::Tmpfile;
 use PublicInbox::WwwStatic qw(r @NO_CACHE);
 use Carp ();
 
 # 32 is same as the git-daemon connection limit
-my $default_limiter = PublicInbox::Qspawn::Limiter->new(32);
+my $default_limiter = PublicInbox::Limiter->new(32);
 
 # n.b. serving "description" and "cloneurl" should be innocuous enough to
 # not cause problems.  serving "config" might...
diff --git a/lib/PublicInbox/Inbox.pm b/lib/PublicInbox/Inbox.pm
index 9afbb478..3dad7004 100644
--- a/lib/PublicInbox/Inbox.pm
+++ b/lib/PublicInbox/Inbox.pm
@@ -55,8 +55,8 @@ sub _set_limiter ($$$) {
 		my $val = $self->{$mkey} or return;
 		my $lim;
 		if ($val =~ /\A[0-9]+\z/) {
-			require PublicInbox::Qspawn;
-			$lim = PublicInbox::Qspawn::Limiter->new($val);
+			require PublicInbox::Limiter;
+			$lim = PublicInbox::Limiter->new($val);
 		} elsif ($val =~ /\A[a-z][a-z0-9]*\z/) {
 			$lim = $pi_cfg->limiter($val);
 			warn "$mkey limiter=$val not found\n" if !$lim;
diff --git a/lib/PublicInbox/Limiter.pm b/lib/PublicInbox/Limiter.pm
new file mode 100644
index 00000000..48a2b6a3
--- /dev/null
+++ b/lib/PublicInbox/Limiter.pm
@@ -0,0 +1,47 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+package PublicInbox::Limiter;
+use v5.12;
+use PublicInbox::Spawn;
+
+sub new {
+	my ($class, $max) = @_;
+	bless {
+		# 32 is same as the git-daemon connection limit
+		max => $max || 32,
+		running => 0,
+		run_queue => [],
+		# RLIMIT_CPU => undef,
+		# RLIMIT_DATA => undef,
+		# RLIMIT_CORE => undef,
+	}, $class;
+}
+
+sub setup_rlimit {
+	my ($self, $name, $cfg) = @_;
+	for my $rlim (@PublicInbox::Spawn::RLIMITS) {
+		my $k = lc($rlim);
+		$k =~ tr/_//d;
+		$k = "publicinboxlimiter.$name.$k";
+		my $v = $cfg->{$k} // next;
+		my @rlimit = split(/\s*,\s*/, $v);
+		if (scalar(@rlimit) == 1) {
+			push @rlimit, $rlimit[0];
+		} elsif (scalar(@rlimit) != 2) {
+			warn "could not parse $k: $v\n";
+		}
+		eval { require BSD::Resource };
+		if ($@) {
+			warn "BSD::Resource missing for $rlim";
+			next;
+		}
+		for my $i (0..$#rlimit) {
+			next if $rlimit[$i] ne 'INFINITY';
+			$rlimit[$i] = BSD::Resource::RLIM_INFINITY();
+		}
+		$self->{$rlim} = \@rlimit;
+	}
+}
+
+1;
diff --git a/lib/PublicInbox/MailDiff.pm b/lib/PublicInbox/MailDiff.pm
index 994c7851..c3ce9365 100644
--- a/lib/PublicInbox/MailDiff.pm
+++ b/lib/PublicInbox/MailDiff.pm
@@ -8,6 +8,7 @@ use PublicInbox::MsgIter qw(msg_part_text);
 use PublicInbox::ViewDiff qw(flush_diff);
 use PublicInbox::GitAsyncCat;
 use PublicInbox::ContentDigestDbg;
+use PublicInbox::Qspawn;
 
 sub write_part { # Eml->each_part callback
 	my ($ary, $self) = @_;
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 0e52617c..a4d78e49 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -29,6 +29,7 @@ use v5.12;
 use PublicInbox::Spawn qw(popen_rd);
 use PublicInbox::GzipFilter;
 use Scalar::Util qw(blessed);
+use PublicInbox::Limiter;
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
 use Errno qw(EAGAIN EINTR);
@@ -183,7 +184,7 @@ sub psgi_qx {
 	$self->{qx_arg} = $qx_arg;
 	$self->{qx_fh} = $qx_fh;
 	$self->{qx_buf} = \$qx_buf;
-	$limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+	$limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
 	start($self, $limiter, \&psgi_qx_start);
 }
 
@@ -317,7 +318,7 @@ sub psgi_return {
 	$self->{psgi_env} = $env;
 	$self->{hdr_buf} = \(my $hdr_buf = '');
 	$self->{parse_hdr} = [ $parse_hdr, $hdr_arg ];
-	$limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32);
+	$limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
 
 	# the caller already captured the PSGI write callback from
 	# the PSGI server, so we can call ->start, here:
@@ -334,46 +335,4 @@ sub psgi_return {
 	}
 }
 
-package PublicInbox::Qspawn::Limiter;
-use v5.12;
-
-sub new {
-	my ($class, $max) = @_;
-	bless {
-		# 32 is same as the git-daemon connection limit
-		max => $max || 32,
-		running => 0,
-		run_queue => [],
-		# RLIMIT_CPU => undef,
-		# RLIMIT_DATA => undef,
-		# RLIMIT_CORE => undef,
-	}, $class;
-}
-
-sub setup_rlimit {
-	my ($self, $name, $cfg) = @_;
-	foreach my $rlim (@PublicInbox::Spawn::RLIMITS) {
-		my $k = lc($rlim);
-		$k =~ tr/_//d;
-		$k = "publicinboxlimiter.$name.$k";
-		defined(my $v = $cfg->{$k}) or next;
-		my @rlimit = split(/\s*,\s*/, $v);
-		if (scalar(@rlimit) == 1) {
-			push @rlimit, $rlimit[0];
-		} elsif (scalar(@rlimit) != 2) {
-			warn "could not parse $k: $v\n";
-		}
-		eval { require BSD::Resource };
-		if ($@) {
-			warn "BSD::Resource missing for $rlim";
-			next;
-		}
-		foreach my $i (0..$#rlimit) {
-			next if $rlimit[$i] ne 'INFINITY';
-			$rlimit[$i] = BSD::Resource::RLIM_INFINITY();
-		}
-		$self->{$rlim} = \@rlimit;
-	}
-}
-
 1;
diff --git a/t/qspawn.t b/t/qspawn.t
index 224e20db..507f86a5 100644
--- a/t/qspawn.t
+++ b/t/qspawn.t
@@ -3,6 +3,7 @@
 use v5.12;
 use Test::More;
 use_ok 'PublicInbox::Qspawn';
+use_ok 'PublicInbox::Limiter';
 
 {
 	my $cmd = [qw(sh -c), 'echo >&2 err; echo out'];
@@ -23,7 +24,7 @@ sub finish_err ($) {
 	$qsp->{qsp_err} && ${$qsp->{qsp_err}};
 }
 
-my $limiter = PublicInbox::Qspawn::Limiter->new(1);
+my $limiter = PublicInbox::Limiter->new(1);
 {
 	my $x = PublicInbox::Qspawn->new([qw(true)]);
 	$x->{qsp_err} = \(my $err = '');

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 2/8] spawn: support synchronous run_qx
  2023-10-19  1:15 [PATCH 1/8] limiter: split out from qspawn Eric Wong
@ 2023-10-19  1:15 ` Eric Wong
  2023-10-19  1:15 ` [PATCH 3/8] psgi_qx: use a temporary file rather than pipe Eric Wong
                   ` (5 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-10-19  1:15 UTC (permalink / raw)
  To: spew

This is similar to `backtick` but supports all our existing spawn
functionality (chdir, env, rlimit, redirects, etc.).  It also
supports SCALAR ref redirects like run_script in our test suite
for std{in,out,err}.

We can probably use :utf8 by default for these redirects, even.
---
 lib/PublicInbox/Git.pm       |  6 ++++
 lib/PublicInbox/SearchIdx.pm | 19 ++++------
 lib/PublicInbox/Spawn.pm     | 69 ++++++++++++++++++++++++++----------
 t/spawn.t                    | 13 ++++++-
 4 files changed, 76 insertions(+), 31 deletions(-)

diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index a460d155..476dcf30 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -69,6 +69,7 @@ sub check_git_exe () {
 		$GIT_VER = eval("v$1") // die "BUG: bad vstring: $1 ($v)";
 		$EXE_ST = $st;
 	}
+	$GIT_EXE;
 }
 
 sub git_version {
@@ -422,6 +423,11 @@ sub async_err ($$$$$) {
 	$async_warn ? carp($msg) : $self->fail($msg);
 }
 
+sub cmd {
+	my $self = shift;
+	[ $GIT_EXE // check_git_exe(), "--git-dir=$self->{git_dir}", @_ ]
+}
+
 # $git->popen(qw(show f00)); # or
 # $git->popen(qw(show f00), { GIT_CONFIG => ... }, { 2 => ... });
 sub popen {
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 8a571cfb..3c64c715 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -22,7 +22,7 @@ use POSIX qw(strftime);
 use Fcntl qw(SEEK_SET);
 use Time::Local qw(timegm);
 use PublicInbox::OverIdx;
-use PublicInbox::Spawn qw(run_wait);
+use PublicInbox::Spawn qw(run_wait run_qx);
 use PublicInbox::Git qw(git_unquote);
 use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp);
 use PublicInbox::Address;
@@ -351,23 +351,18 @@ sub index_diff ($$$) {
 }
 
 sub patch_id {
-	my ($self) = @_; # $_[1] is the diff (may be huge)
-	open(my $fh, '+>:utf8', undef) or die "open: $!";
-	open(my $eh, '+>', undef) or die "open: $!";
-	$fh->autoflush(1);
-	print $fh $_[1] or die "print: $!";
-	sysseek($fh, 0, SEEK_SET) or die "sysseek: $!";
-	my $id = ($self->{ibx} // $self->{eidx} // $self)->git->qx(
-			[qw(patch-id --stable)], {}, { 0 => $fh, 2 => $eh });
-	seek($eh, 0, SEEK_SET) or die "seek: $!";
-	while (<$eh>) { warn $_ }
+	my ($self, $sref) = @_;
+	my $git = ($self->{ibx} // $self->{eidx} // $self)->git;
+	my $opt = { 0 => $sref, 2 => \(my $err) };
+	my $id = run_qx($git->cmd(qw(patch-id --stable)), undef, $opt);
+	warn $err if $err;
 	$id =~ /\A([a-f0-9]{40,})/ ? $1 : undef;
 }
 
 sub index_body_text {
 	my ($self, $doc, $sref) = @_;
 	if ($$sref =~ /^(?:diff|---|\+\+\+) /ms) {
-		my $id = patch_id($self, $$sref);
+		my $id = patch_id($self, $sref);
 		$doc->add_term('XDFID'.$id) if defined($id);
 	}
 
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 106f5e01..1fa7a41f 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -22,8 +22,9 @@ use Fcntl qw(SEEK_SET);
 use IO::Handle ();
 use Carp qw(croak);
 use PublicInbox::ProcessIO;
-our @EXPORT_OK = qw(which spawn popen_rd popen_wr run_die run_wait);
+our @EXPORT_OK = qw(which spawn popen_rd popen_wr run_die run_wait run_qx);
 our @RLIMITS = qw(RLIMIT_CPU RLIMIT_CORE RLIMIT_DATA);
+use autodie qw(open pipe read seek sysseek truncate);
 
 BEGIN {
 	my $all_libc = <<'ALL_LIBC'; # all *nix systems we support
@@ -290,7 +291,6 @@ ALL_LIBC
 	undef $all_libc unless -d $inline_dir;
 	if (defined $all_libc) {
 		local $ENV{PERL_INLINE_DIRECTORY} = $inline_dir;
-		use autodie;
 		# CentOS 7.x ships Inline 0.53, 0.64+ has built-in locking
 		my $lk = PublicInbox::Lock->new($inline_dir.
 						'/.public-inbox.lock');
@@ -301,7 +301,7 @@ ALL_LIBC
 		open STDERR, '>&', $fh;
 		STDERR->autoflush(1);
 		STDOUT->autoflush(1);
-		CORE::eval 'use Inline C => $all_libc, BUILD_NOISY => 1';
+		eval 'use Inline C => $all_libc, BUILD_NOISY => 1';
 		my $err = $@;
 		open(STDERR, '>&', $olderr);
 		open(STDOUT, '>&', $oldout);
@@ -332,26 +332,34 @@ sub which ($) {
 }
 
 sub spawn ($;$$) {
-	my ($cmd, $env, $opts) = @_;
+	my ($cmd, $env, $opt) = @_;
 	my $f = which($cmd->[0]) // die "$cmd->[0]: command not found\n";
-	my @env;
+	my (@env, @rdr);
 	my %env = (%ENV, $env ? %$env : ());
 	while (my ($k, $v) = each %env) {
 		push @env, "$k=$v" if defined($v);
 	}
-	my $redir = [];
 	for my $child_fd (0..2) {
-		my $parent_fd = $opts->{$child_fd};
-		if (defined($parent_fd) && $parent_fd !~ /\A[0-9]+\z/) {
-			my $fd = fileno($parent_fd) //
-					die "$parent_fd not an IO GLOB? $!";
-			$parent_fd = $fd;
+		my $pfd = $opt->{$child_fd};
+		if ('SCALAR' eq ref($pfd)) {
+			open my $fh, '+>:utf8', undef;
+			$opt->{"fh.$child_fd"} = $fh;
+			if ($child_fd == 0) {
+				print $fh $$pfd;
+				$fh->flush or die "flush: $!";
+				sysseek($fh, 0, SEEK_SET);
+			}
+			$pfd = fileno($fh);
+		} elsif (defined($pfd) && $pfd !~ /\A[0-9]+\z/) {
+			my $fd = fileno($pfd) //
+					die "$pfd not an IO GLOB? $!";
+			$pfd = $fd;
 		}
-		$redir->[$child_fd] = $parent_fd // $child_fd;
+		$rdr[$child_fd] = $pfd // $child_fd;
 	}
 	my $rlim = [];
 	foreach my $l (@RLIMITS) {
-		my $v = $opts->{$l} // next;
+		my $v = $opt->{$l} // next;
 		my $r = eval "require BSD::Resource; BSD::Resource::$l();";
 		unless (defined $r) {
 			warn "$l undefined by BSD::Resource: $@\n";
@@ -359,31 +367,41 @@ sub spawn ($;$$) {
 		}
 		push @$rlim, $r, @$v;
 	}
-	my $cd = $opts->{'-C'} // ''; # undef => NULL mapping doesn't work?
-	my $pgid = $opts->{pgid} // -1;
-	my $pid = pi_fork_exec($redir, $f, $cmd, \@env, $rlim, $cd, $pgid);
+	my $cd = $opt->{'-C'} // ''; # undef => NULL mapping doesn't work?
+	my $pgid = $opt->{pgid} // -1;
+	my $pid = pi_fork_exec(\@rdr, $f, $cmd, \@env, $rlim, $cd, $pgid);
 	die "fork_exec @$cmd failed: $!\n" unless $pid > 0;
 	$pid;
 }
 
 sub popen_rd {
 	my ($cmd, $env, $opt, @cb_arg) = @_;
-	pipe(my $r, local $opt->{1}) or die "pipe: $!\n";
+	pipe(my $r, local $opt->{1});
 	my $pid = spawn($cmd, $env, $opt);
 	PublicInbox::ProcessIO->maybe_new($pid, $r, @cb_arg);
 }
 
 sub popen_wr {
 	my ($cmd, $env, $opt, @cb_arg) = @_;
-	pipe(local $opt->{0}, my $w) or die "pipe: $!\n";
+	pipe(local $opt->{0}, my $w);
 	$w->autoflush(1);
 	my $pid = spawn($cmd, $env, $opt);
 	PublicInbox::ProcessIO->maybe_new($pid, $w, @cb_arg)
 }
 
+sub read_out_err ($) {
+	my ($opt) = @_;
+	for my $fd (1, 2) { # read stdout/stderr
+		my $fh = delete($opt->{"fh.$fd"}) // next;
+		seek($fh, 0, SEEK_SET);
+		read($fh, ${$opt->{$fd}}, -s $fh, length(${$opt->{$fd}} // ''));
+	}
+}
+
 sub run_wait ($;$$) {
 	my ($cmd, $env, $opt) = @_;
 	waitpid(spawn($cmd, $env, $opt), 0);
+	read_out_err($opt);
 	$?
 }
 
@@ -392,4 +410,19 @@ sub run_die ($;$$) {
 	run_wait($cmd, $env, $rdr) and croak "E: @$cmd failed: \$?=$?";
 }
 
+sub run_qx {
+	my ($cmd, $env, $opt) = @_;
+	my $fh = popen_rd($cmd, $env, $opt);
+	my @ret;
+	if (wantarray) {
+		@ret = <$fh>;
+	} else {
+		local $/;
+		$ret[0] = <$fh>;
+	}
+	close $fh; # caller should check $?
+	read_out_err($opt);
+	wantarray ? @ret : $ret[0];
+}
+
 1;
diff --git a/t/spawn.t b/t/spawn.t
index 1af66bda..4b3baae4 100644
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -3,7 +3,7 @@
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 use v5.12;
 use Test::More;
-use PublicInbox::Spawn qw(which spawn popen_rd);
+use PublicInbox::Spawn qw(which spawn popen_rd run_qx);
 require PublicInbox::Sigfd;
 require PublicInbox::DS;
 
@@ -19,6 +19,17 @@ require PublicInbox::DS;
 	is($?, 0, 'true exited successfully');
 }
 
+{
+	my $opt = { 0 => \'in', 2 => \(my $e) };
+	my $out = run_qx(['sh', '-c', 'echo e >&2; cat'], undef, $opt);
+	is($e, "e\n", 'captured stderr');
+	is($out, 'in', 'stdin read and stdout captured');
+	$opt->{0} = \"IN\n3\nLINES";
+	my @out = run_qx(['sh', '-c', 'echo E >&2; cat'], undef, $opt);
+	is($e, "e\nE\n", 'captured stderr appended to string');
+	is_deeply(\@out, [ "IN\n", "3\n", 'LINES' ], 'stdout array');
+}
+
 SKIP: {
 	my $pid = spawn(['true'], undef, { pgid => 0 });
 	ok($pid, 'spawned process with new pgid');

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 3/8] psgi_qx: use a temporary file rather than pipe
  2023-10-19  1:15 [PATCH 1/8] limiter: split out from qspawn Eric Wong
  2023-10-19  1:15 ` [PATCH 2/8] spawn: support synchronous run_qx Eric Wong
@ 2023-10-19  1:15 ` Eric Wong
  2023-10-19  1:15 ` [PATCH 4/8] www_coderepo: capture uses a flattened list Eric Wong
                   ` (4 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-10-19  1:15 UTC (permalink / raw)
  To: spew

A pipe requires more context switches and code to deal with
unpredictable pipe EOF vs waitpid ordering.  So just use the
new spawn/aspawn features to automatically handle slurping
output into a string.
---
 MANIFEST                         |  1 +
 lib/PublicInbox/Aspawn.pm        | 30 ++++++++++
 lib/PublicInbox/CodeSearchIdx.pm |  1 +
 lib/PublicInbox/Qspawn.pm        | 95 +++++++++++---------------------
 4 files changed, 65 insertions(+), 62 deletions(-)
 create mode 100644 lib/PublicInbox/Aspawn.pm

diff --git a/MANIFEST b/MANIFEST
index dcce801c..f087621c 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -161,6 +161,7 @@ lib/PublicInbox/AddressPP.pm
 lib/PublicInbox/Admin.pm
 lib/PublicInbox/AdminEdit.pm
 lib/PublicInbox/AltId.pm
+lib/PublicInbox/Aspawn.pm
 lib/PublicInbox/AutoReap.pm
 lib/PublicInbox/Cgit.pm
 lib/PublicInbox/CidxComm.pm
diff --git a/lib/PublicInbox/Aspawn.pm b/lib/PublicInbox/Aspawn.pm
new file mode 100644
index 00000000..2423ac31
--- /dev/null
+++ b/lib/PublicInbox/Aspawn.pm
@@ -0,0 +1,30 @@
+package PublicInbox::Aspawn;
+use v5.12;
+use parent qw(Exporter);
+use PublicInbox::DS qw(awaitpid);
+use PublicInbox::Spawn qw(spawn);
+our @EXPORT_OK = qw(run_await);
+
+sub _await_cb { # awaitpid cb
+	my ($pid, $cmd, $env, $opt, $cb, @args) = @_;
+	PublicInbox::Spawn::read_out_err($opt);
+	if ($? && !$opt->{quiet}) {
+		my ($status, $sig) = ($? >> 8, $? & 127);
+		my $msg = '';
+		$msg .= " (-C=$opt->{-C})" if defined $opt->{-C};
+		$msg .= " status=$status" if $status;
+		$msg .= " signal=$sig" if $sig;
+		warn "E: @$cmd", $msg, "\n";
+	}
+	$cb->($pid, $cmd, $env, $opt, @args) if $cb;
+}
+
+sub run_await {
+	my ($cmd, $env, $opt, $cb, @args) = @_;
+	$opt->{1} //= \(my $out);
+	my $pid = spawn($cmd, $env, $opt);
+	awaitpid($pid, \&_await_cb, $cmd, $env, $opt, $cb, @args);
+	awaitpid($pid); # synchronous for non-$in_loop
+}
+
+1;
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 36d00aea..b9b72ff4 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -55,6 +55,7 @@ use PublicInbox::CidxLogP;
 use PublicInbox::CidxComm;
 use PublicInbox::Git qw(%OFMT2HEXLEN);
 use PublicInbox::Compat qw(uniqstr);
+use PublicInbox::Aspawn qw(run_await);
 use Carp ();
 use autodie qw(pipe open seek sysseek send);
 our (
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index a4d78e49..59d5ed40 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -30,6 +30,7 @@ use PublicInbox::Spawn qw(popen_rd);
 use PublicInbox::GzipFilter;
 use Scalar::Util qw(blessed);
 use PublicInbox::Limiter;
+use PublicInbox::Aspawn qw(run_await);
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
 use Errno qw(EAGAIN EINTR);
@@ -48,29 +49,30 @@ sub new {
 
 sub _do_spawn {
 	my ($self, $start_cb, $limiter) = @_;
-	my $err;
 	my ($cmd, $cmd_env, $opt) = @{delete $self->{args}};
 	my %o = %{$opt || {}};
 	$self->{limiter} = $limiter;
-	foreach my $k (@PublicInbox::Spawn::RLIMITS) {
-		if (defined(my $rlimit = $limiter->{$k})) {
-			$o{$k} = $rlimit;
-		}
+	for my $k (@PublicInbox::Spawn::RLIMITS) {
+		$o{$k} = $limiter->{$k} // next;
 	}
 	$self->{cmd} = $cmd;
 	$self->{-quiet} = 1 if $o{quiet};
-	eval {
-		# popen_rd may die on EMFILE, ENFILE
-		$self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
-					\&waitpid_err, $self);
-		$limiter->{running}++;
-		$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
-	};
+	$limiter->{running}++;
+	if ($start_cb) {
+		eval { # popen_rd may die on EMFILE, ENFILE
+			$self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
+						\&waitpid_err, $self);
+			$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
+		};
+	} else {
+		eval { run_await($cmd, $cmd_env, \%o, \&wait_await, $self) };
+		warn "E: $@" if $@;
+	}
 	finish($self, $@) if $@;
 }
 
-sub finalize ($) {
-	my ($self) = @_;
+sub finalize ($;$) {
+	my ($self, $opt) = @_;
 
 	# process is done, spawn whatever's in the queue
 	my $limiter = delete $self->{limiter} or return;
@@ -89,10 +91,10 @@ sub finalize ($) {
 		warn "@{$self->{cmd}}: $err\n" if !$self->{-quiet};
 	}
 
-	my ($env, $qx_cb, $qx_arg, $qx_buf) =
-		delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
-	if ($qx_cb) {
-		eval { $qx_cb->($qx_buf, $qx_arg) };
+	my ($env, $qx_cb_arg) = delete @$self{qw(psgi_env qx_cb_arg)};
+	if ($qx_cb_arg) {
+		my $cb = shift @$qx_cb_arg;
+		eval { $cb->($opt->{1}, @$qx_cb_arg) };
 		return unless $@;
 		warn "E: $@"; # hope qspawn.wcb can handle it
 	}
@@ -108,15 +110,20 @@ sub finalize ($) {
 sub DESTROY { finalize($_[0]) } # ->finalize is idempotent
 
 sub waitpid_err { # callback for awaitpid
-	my (undef, $self) = @_; # $_[0]: pid
+	my (undef, $self, $opt) = @_; # $_[0]: pid
 	$self->{_err} = ''; # for defined check in ->finish
-	if ($?) {
+	if ($?) { # FIXME: redundant
 		my $status = $? >> 8;
 		my $sig = $? & 127;
 		$self->{_err} .= "exit status=$status";
 		$self->{_err} .= " signal=$sig" if $sig;
 	}
-	finalize($self) if !$self->{rpipe};
+	finalize($self, $opt) if !$self->{rpipe};
+}
+
+sub wait_await { # run_await cb
+	my ($pid, $cmd, $cmd_env, $opt, $self) = @_;
+	waitpid_err($pid, $self, $opt);
 }
 
 sub finish ($;$) {
@@ -140,52 +147,16 @@ sub start ($$$) {
 	}
 }
 
-sub psgi_qx_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
-	my ($self) = @_;
-	my ($r, $buf);
-reread:
-	$r = sysread($self->{rpipe}, $buf, 65536);
-	if (!defined($r)) {
-		return if $! == EAGAIN; # try again when notified
-		goto reread if $! == EINTR;
-		event_step($self, $!);
-	} elsif (my $as = delete $self->{async}) { # PublicInbox::HTTPD::Async
-		$as->async_pass($self->{psgi_env}->{'psgix.io'},
-				$self->{qx_fh}, \$buf);
-	} elsif ($r) { # generic PSGI:
-		print { $self->{qx_fh} } $buf;
-	} else { # EOF
-		event_step($self, undef);
-	}
-}
-
-sub psgi_qx_start {
-	my ($self) = @_;
-	if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) {
-		# PublicInbox::HTTPD::Async->new(rpipe, $cb, cb_arg, $end_obj)
-		$self->{async} = $async->($self->{rpipe},
-					\&psgi_qx_init_cb, $self, $self);
-		# init_cb will call ->async_pass or ->close
-	} else { # generic PSGI
-		psgi_qx_init_cb($self) while $self->{qx_fh};
-	}
-}
-
-# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with
+# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls @qx_cb_arg with
 # the stdout of the given command when done; but respects the given limiter
 # $env is the PSGI env.  As with ``/qx; only use this when output is small
 # and safe to slurp.
 sub psgi_qx {
-	my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_;
+	my ($self, $env, $limiter, @qx_cb_arg) = @_;
 	$self->{psgi_env} = $env;
-	my $qx_buf = '';
-	open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar
-	$self->{qx_cb} = $qx_cb;
-	$self->{qx_arg} = $qx_arg;
-	$self->{qx_fh} = $qx_fh;
-	$self->{qx_buf} = \$qx_buf;
+	$self->{qx_cb_arg} = \@qx_cb_arg;
 	$limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
-	start($self, $limiter, \&psgi_qx_start);
+	start($self, $limiter, undef);
 }
 
 # this is called on pipe EOF to reap the process, may be called
@@ -195,7 +166,7 @@ sub event_step {
 	my ($self, $err) = @_; # $err: $!
 	warn "psgi_{return,qx} $err" if defined($err);
 	finish($self);
-	my ($fh, $qx_fh) = delete(@$self{qw(qfh qx_fh)});
+	my $fh = delete $self->{qfh};
 	$fh->close if $fh; # async-only (psgi_return)
 }
 

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 4/8] www_coderepo: capture uses a flattened list
  2023-10-19  1:15 [PATCH 1/8] limiter: split out from qspawn Eric Wong
  2023-10-19  1:15 ` [PATCH 2/8] spawn: support synchronous run_qx Eric Wong
  2023-10-19  1:15 ` [PATCH 3/8] psgi_qx: use a temporary file rather than pipe Eric Wong
@ 2023-10-19  1:15 ` Eric Wong
  2023-10-19  1:15 ` [PATCH 5/8] qspawn: psgi_return allows list for callback args Eric Wong
                   ` (3 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-10-19  1:15 UTC (permalink / raw)
  To: spew

We no longer need a multi-dimensional list to pass multiple
arguments to the psgi_qx callback.
---
 lib/PublicInbox/WwwCoderepo.pm | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git a/lib/PublicInbox/WwwCoderepo.pm b/lib/PublicInbox/WwwCoderepo.pm
index e8c340b5..68c4c86d 100644
--- a/lib/PublicInbox/WwwCoderepo.pm
+++ b/lib/PublicInbox/WwwCoderepo.pm
@@ -178,9 +178,8 @@ EOM
 }
 
 sub capture { # psgi_qx callback to capture git-for-each-ref
-	my ($bref, $arg) = @_; # arg = [ctx, key, OnDestroy(summary_END)]
-	utf8_maybe($$bref);
-	$arg->[0]->{qx_res}->{$arg->[1]} = $$bref;
+	my ($bref, $ctx, $key) = @_; #  $_[3] = OnDestroy(summary_END)
+	$ctx->{qx_res}->{$key} = $$bref;
 	# summary_END may be called via OnDestroy $arg->[2]
 }
 
@@ -220,8 +219,7 @@ sub summary ($$) {
 		my ($k, $cmd) = @$_;
 		my $qsp = PublicInbox::Qspawn->new($cmd, \%env, \%opt);
 		$qsp->{qsp_err} = $qsp_err;
-		$qsp->psgi_qx($ctx->{env}, undef, \&capture,
-				[$ctx, $k, $END]);
+		$qsp->psgi_qx($ctx->{env}, undef, \&capture, $ctx, $k, $END);
 	}
 	$tip //= 'HEAD';
 	my @try = ("$tip:README", "$tip:README.md"); # TODO: configurable

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 5/8] qspawn: psgi_return allows list for callback args
  2023-10-19  1:15 [PATCH 1/8] limiter: split out from qspawn Eric Wong
                   ` (2 preceding siblings ...)
  2023-10-19  1:15 ` [PATCH 4/8] www_coderepo: capture uses a flattened list Eric Wong
@ 2023-10-19  1:15 ` Eric Wong
  2023-10-19  1:15 ` [PATCH 6/8] qspawn: drop unused err arg for ->event_step Eric Wong
                   ` (2 subsequent siblings)
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-10-19  1:15 UTC (permalink / raw)
  To: spew

This slightly simplifies our GitHTTPBackend wrapper.
We can also use shorter variable names to avoid wrapping some
lines.
---
 lib/PublicInbox/GitHTTPBackend.pm |  6 +++---
 lib/PublicInbox/Qspawn.pm         | 19 ++++++++-----------
 2 files changed, 11 insertions(+), 14 deletions(-)

diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index d69f5f8b..edbc0157 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -80,9 +80,9 @@ sub serve_dumb {
 }
 
 sub git_parse_hdr { # {parse_hdr} for Qspawn
-	my ($r, $bref, $dumb_args) = @_;
+	my ($r, $bref, @dumb_args) = @_;
 	my $res = parse_cgi_headers($r, $bref) or return; # incomplete
-	$res->[0] == 403 ? serve_dumb(@$dumb_args) : $res;
+	$res->[0] == 403 ? serve_dumb(@dumb_args) : $res;
 }
 
 # returns undef if 403 so it falls back to dumb HTTP
@@ -106,7 +106,7 @@ sub serve_smart {
 	$env{PATH_TRANSLATED} = "$git->{git_dir}/$path";
 	my $rdr = input_prepare($env) or return r(500);
 	my $qsp = PublicInbox::Qspawn->new([qw(git http-backend)], \%env, $rdr);
-	$qsp->psgi_return($env, $limiter, \&git_parse_hdr, [$env, $git, $path]);
+	$qsp->psgi_return($env, $limiter, \&git_parse_hdr, $env, $git, $path);
 }
 
 sub input_prepare {
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 59d5ed40..0f900691 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -174,16 +174,13 @@ sub rd_hdr ($) {
 	my ($self) = @_;
 	# typically used for reading CGI headers
 	# We also need to check EINTR for generic PSGI servers.
-	my $ret;
-	my $total_rd = 0;
-	my $hdr_buf = $self->{hdr_buf};
-	my ($ph_cb, $ph_arg) = @{$self->{parse_hdr}};
+	my ($ret, $total_rd);
+	my ($bref, $ph_cb, @ph_arg) = ($self->{hdr_buf}, @{$self->{parse_hdr}});
 	until (defined($ret)) {
-		my $r = sysread($self->{rpipe}, $$hdr_buf, 4096,
-				length($$hdr_buf));
+		my $r = sysread($self->{rpipe}, $$bref, 4096, length($$bref));
 		if (defined($r)) {
 			$total_rd += $r;
-			eval { $ret = $ph_cb->($total_rd, $hdr_buf, $ph_arg) };
+			eval { $ret = $ph_cb->($total_rd, $bref, @ph_arg) };
 			if ($@) {
 				warn "parse_hdr: $@";
 				$ret = [ 500, [], [ "Internal error\n" ] ];
@@ -207,7 +204,7 @@ EOM
 
 sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
 	my ($self) = @_;
-	my $r = rd_hdr($self) or return;
+	my $r = rd_hdr($self) or return; # incomplete
 	my $env = $self->{psgi_env};
 	my $filter;
 
@@ -277,7 +274,7 @@ sub psgi_return_start { # may run later, much later...
 #
 # $limiter - the Limiter object to use (uses the def_limiter if not given)
 #
-# $parse_hdr - Initial read function; often for parsing CGI header output.
+# @parse_hdr_arg - Initial read cb+args; often for parsing CGI header output.
 #              It will be given the return value of sysread from the pipe
 #              and a string ref of the current buffer.  Returns an arrayref
 #              for PSGI responses.  2-element arrays in PSGI mean the
@@ -285,10 +282,10 @@ sub psgi_return_start { # may run later, much later...
 #              psgix.io.  3-element arrays means the body is available
 #              immediately (or streamed via ->getline (pull-based)).
 sub psgi_return {
-	my ($self, $env, $limiter, $parse_hdr, $hdr_arg) = @_;
+	my ($self, $env, $limiter, @parse_hdr_arg)= @_;
 	$self->{psgi_env} = $env;
 	$self->{hdr_buf} = \(my $hdr_buf = '');
-	$self->{parse_hdr} = [ $parse_hdr, $hdr_arg ];
+	$self->{parse_hdr} = \@parse_hdr_arg;
 	$limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
 
 	# the caller already captured the PSGI write callback from

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 6/8] qspawn: drop unused err arg for ->event_step
  2023-10-19  1:15 [PATCH 1/8] limiter: split out from qspawn Eric Wong
                   ` (3 preceding siblings ...)
  2023-10-19  1:15 ` [PATCH 5/8] qspawn: psgi_return allows list for callback args Eric Wong
@ 2023-10-19  1:15 ` Eric Wong
  2023-10-19  1:15 ` [PATCH 7/8] httpd/async: require IO arg Eric Wong
  2023-10-19  1:15 ` [PATCH 8/8] qspawn: introduce psgi_yield API Eric Wong
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-10-19  1:15 UTC (permalink / raw)
  To: spew

It's no longer needed since psgi_qx doesn't use a pipe, anymore.
---
 lib/PublicInbox/Qspawn.pm | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 0f900691..9a7e8734 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -163,8 +163,7 @@ sub psgi_qx {
 # via PublicInbox::DS event loop OR via GetlineBody for generic
 # PSGI servers.
 sub event_step {
-	my ($self, $err) = @_; # $err: $!
-	warn "psgi_{return,qx} $err" if defined($err);
+	my ($self) = @_;
 	finish($self);
 	my $fh = delete $self->{qfh};
 	$fh->close if $fh; # async-only (psgi_return)

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 7/8] httpd/async: require IO arg
  2023-10-19  1:15 [PATCH 1/8] limiter: split out from qspawn Eric Wong
                   ` (4 preceding siblings ...)
  2023-10-19  1:15 ` [PATCH 6/8] qspawn: drop unused err arg for ->event_step Eric Wong
@ 2023-10-19  1:15 ` Eric Wong
  2023-10-19  1:15 ` [PATCH 8/8] qspawn: introduce psgi_yield API Eric Wong
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-10-19  1:15 UTC (permalink / raw)
  To: spew

Callers that want to requeue can call PublicInbox::DS::requeue
directly and not go through the convoluted argument handling
via PublicInbox::HTTPD::Async->new.
---
 lib/PublicInbox/HTTPD/Async.pm |  8 --------
 lib/PublicInbox/MailDiff.pm    |  7 +++----
 lib/PublicInbox/SolverGit.pm   | 10 +++-------
 3 files changed, 6 insertions(+), 19 deletions(-)

diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index b73d0c4b..2e4d8baa 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -25,14 +25,6 @@ use PublicInbox::ProcessIONBF;
 # bidirectional socket in the future.
 sub new {
 	my ($class, $io, $cb, $arg, $end_obj) = @_;
-
-	# no $io? call $cb at the top of the next event loop to
-	# avoid recursion:
-	unless (defined($io)) {
-		PublicInbox::DS::requeue($cb ? $cb : $arg);
-		die '$end_obj unsupported w/o $io' if $end_obj;
-		return;
-	}
 	my $self = bless {
 		cb => $cb, # initial read callback
 		arg => $arg, # arg for $cb
diff --git a/lib/PublicInbox/MailDiff.pm b/lib/PublicInbox/MailDiff.pm
index c3ce9365..908f223c 100644
--- a/lib/PublicInbox/MailDiff.pm
+++ b/lib/PublicInbox/MailDiff.pm
@@ -59,8 +59,7 @@ sub next_smsg ($) {
 		$ctx->write($ctx->_html_end);
 		return $ctx->close;
 	}
-	my $async = $self->{ctx}->{env}->{'pi-httpd.async'};
-	$async->(undef, undef, $self) if $async # PublicInbox::HTTPD::Async->new
+	PublicInbox::DS::requeue($self) if $ctx->{env}->{'pi-httpd.async'};
 }
 
 sub emit_msg_diff {
@@ -125,8 +124,8 @@ sub event_step {
 
 sub begin_mail_diff {
 	my ($self) = @_;
-	if (my $async = $self->{ctx}->{env}->{'pi-httpd.async'}) {
-		$async->(undef, undef, $self); # PublicInbox::HTTPD::Async->new
+	if ($self->{ctx}->{env}->{'pi-httpd.async'}) {
+		PublicInbox::DS::requeue($self);
 	} else {
 		event_step($self) while $self->{smsg};
 	}
diff --git a/lib/PublicInbox/SolverGit.pm b/lib/PublicInbox/SolverGit.pm
index 5f317f51..23d4d3d1 100644
--- a/lib/PublicInbox/SolverGit.pm
+++ b/lib/PublicInbox/SolverGit.pm
@@ -386,12 +386,9 @@ sub event_step ($) {
 }
 
 sub next_step ($) {
-	my ($self) = @_;
 	# if outside of public-inbox-httpd, caller is expected to be
 	# looping event_step, anyways
-	my $async = $self->{psgi_env}->{'pi-httpd.async'} or return;
-	# PublicInbox::HTTPD::Async->new
-	$async->(undef, undef, $self);
+	PublicInbox::DS::requeue($_[0]) if $_[0]->{psgi_env}->{'pi-httpd.async'}
 }
 
 sub mark_found ($$$) {
@@ -690,9 +687,8 @@ sub solve ($$$$$) {
 	$self->{found} = {}; # { abbr => [ ::Git, oid, type, size, $di ] }
 
 	dbg($self, "solving $oid_want ...");
-	if (my $async = $env->{'pi-httpd.async'}) {
-		# PublicInbox::HTTPD::Async->new
-		$async->(undef, undef, $self);
+	if ($env->{'pi-httpd.async'}) {
+		PublicInbox::DS::requeue($self);
 	} else {
 		event_step($self) while $self->{user_cb};
 	}

^ permalink raw reply related	[flat|nested] 8+ messages in thread

* [PATCH 8/8] qspawn: introduce psgi_yield API
  2023-10-19  1:15 [PATCH 1/8] limiter: split out from qspawn Eric Wong
                   ` (5 preceding siblings ...)
  2023-10-19  1:15 ` [PATCH 7/8] httpd/async: require IO arg Eric Wong
@ 2023-10-19  1:15 ` Eric Wong
  6 siblings, 0 replies; 8+ messages in thread
From: Eric Wong @ 2023-10-19  1:15 UTC (permalink / raw)
  To: spew

This may eventually replace psgi_return and HTTPD/Async entirely
---
 lib/PublicInbox/GitHTTPBackend.pm |  4 +-
 lib/PublicInbox/InputPipe.pm      | 11 ++--
 lib/PublicInbox/LEI.pm            |  2 +-
 lib/PublicInbox/Qspawn.pm         | 97 ++++++++++++++++++++++++++++++-
 4 files changed, 105 insertions(+), 9 deletions(-)

diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm
index edbc0157..d7e0bced 100644
--- a/lib/PublicInbox/GitHTTPBackend.pm
+++ b/lib/PublicInbox/GitHTTPBackend.pm
@@ -79,7 +79,7 @@ sub serve_dumb {
 	PublicInbox::WwwStatic::response($env, $h, $path, $type);
 }
 
-sub git_parse_hdr { # {parse_hdr} for Qspawn
+sub ghb_parse_hdr { # header parser for Qspawn
 	my ($r, $bref, @dumb_args) = @_;
 	my $res = parse_cgi_headers($r, $bref) or return; # incomplete
 	$res->[0] == 403 ? serve_dumb(@dumb_args) : $res;
@@ -106,7 +106,7 @@ sub serve_smart {
 	$env{PATH_TRANSLATED} = "$git->{git_dir}/$path";
 	my $rdr = input_prepare($env) or return r(500);
 	my $qsp = PublicInbox::Qspawn->new([qw(git http-backend)], \%env, $rdr);
-	$qsp->psgi_return($env, $limiter, \&git_parse_hdr, $env, $git, $path);
+	$qsp->psgi_yield($env, $limiter, \&ghb_parse_hdr, $env, $git, $path);
 }
 
 sub input_prepare {
diff --git a/lib/PublicInbox/InputPipe.pm b/lib/PublicInbox/InputPipe.pm
index b38d8270..614360c2 100644
--- a/lib/PublicInbox/InputPipe.pm
+++ b/lib/PublicInbox/InputPipe.pm
@@ -39,14 +39,15 @@ sub consume {
 	if ($@) { # regular file (but not w/ select|IO::Poll backends)
 		$self->{-need_rq} = 1;
 		$self->requeue;
-	} elsif (-p $in || -S _) { # O_NONBLOCK for sockets and pipes
+	} elsif (do { no warnings 'unopened'; !stat($in) }) { # ProcessIONBF
+	} elsif (-p _ || -S _) { # O_NONBLOCK for sockets and pipes
 		$in->blocking(0);
 	} elsif (-t $in) { # isatty(3) can't use `_' stat cache
 		unblock_tty($self);
 	}
 }
 
-sub close {
+sub close { # idempotent
 	my ($self) = @_;
 	if (my $t = delete($self->{restore_termios})) {
 		my $fd = fileno($self->{sock} // return);
@@ -60,16 +61,16 @@ sub event_step {
 	my $r = sysread($self->{sock} // return, my $rbuf, 65536);
 	eval {
 		if ($r) {
-			$self->{cb}->(@{$self->{args}}, $rbuf);
+			$self->{cb}->($self, @{$self->{args}}, $rbuf);
 			$self->requeue if $self->{-need_rq};
 		} elsif (defined($r)) { # EOF
-			$self->{cb}->(@{$self->{args}}, '');
+			$self->{cb}->($self, @{$self->{args}}, '');
 			$self->close
 		} elsif ($!{EAGAIN}) { # rely on EPOLLIN
 		} elsif ($!{EINTR}) { # rely on EPOLLIN for sockets/pipes/tty
 			$self->requeue if $self->{-need_rq};
 		} else { # another error
-			$self->{cb}->(@{$self->{args}}, undef);
+			$self->{cb}->($self, @{$self->{args}}, undef);
 			$self->close;
 		}
 	};
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 3ccdd4f7..c1e965c8 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1566,7 +1566,7 @@ sub request_umask {
 }
 
 sub _stdin_cb { # PublicInbox::InputPipe::consume callback for --stdin
-	my ($lei, $cb) = @_; # $_[-1] = $rbuf
+	my (undef, $lei, $cb) = @_; # $_[-1] = $rbuf
 	$_[1] // return $lei->fail("error reading stdin: $!");
 	$lei->{stdin_buf} .= $_[-1];
 	do_env($lei, $cb) if $_[-1] eq '';
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 9a7e8734..c598e863 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -31,6 +31,9 @@ use PublicInbox::GzipFilter;
 use Scalar::Util qw(blessed);
 use PublicInbox::Limiter;
 use PublicInbox::Aspawn qw(run_await);
+use PublicInbox::Syscall qw(EPOLLIN);
+use PublicInbox::InputPipe;
+use Carp qw(confess);
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
 use Errno qw(EAGAIN EINTR);
@@ -61,7 +64,7 @@ sub _do_spawn {
 	if ($start_cb) {
 		eval { # popen_rd may die on EMFILE, ENFILE
 			$self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
-						\&waitpid_err, $self);
+						\&waitpid_err, $self, \%o);
 			$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
 		};
 	} else {
@@ -126,6 +129,13 @@ sub wait_await { # run_await cb
 	waitpid_err($pid, $self, $opt);
 }
 
+sub yield_chunk { # $_[-1] is sysread buffer (or undef)
+	my ($self, $ipipe) = @_;
+	my $qfh = $self->{qfh}; # PublicInbox::HTTP::(Chunked|Identity)
+	($_[-1] // '') eq '' ? $qfh->close : $qfh->write($_[-1]);
+	delete($self->{rpipe}) if ($_[-1] // '') eq ''; # all done
+}
+
 sub finish ($;$) {
 	my ($self, $err) = @_;
 	$self->{_err} //= $err; # only for $@
@@ -201,6 +211,33 @@ EOM
 	$ret;
 }
 
+sub yield_pass {
+	my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe
+	my $env = $self->{psgi_env};
+	my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb');
+	if (ref($res) eq 'CODE') { # chain another command
+		$ipipe->close;
+		$res->($wcb);
+		$self->{passed} = 1;
+		return; # all done
+	}
+	confess("BUG: $res unhandled") if ref($res) ne 'ARRAY';
+
+	my $filter = blessed($res->[2]) && $res->[2]->can('attach') ?
+			pop(@$res) : delete($env->{'qspawn.filter'});
+	$filter //= PublicInbox::GzipFilter::qsp_maybe($res->[1], $env);
+
+	if (scalar(@$res) == 3) { # done early
+		$ipipe->close;
+		return $wcb->($res); # all done
+	}
+
+	scalar(@$res) == 2 or confess("BUG: scalar(res) != 2: @$res");
+	my $qfh = $wcb->($res); # get PublicInbox::HTTP::(Chunked|Identity)
+	$qfh = $filter->attach($qfh) if $filter;
+	$self->{qfh} = $qfh; # keep $ipipe open
+}
+
 sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
 	my ($self) = @_;
 	my $r = rd_hdr($self) or return; # incomplete
@@ -257,6 +294,46 @@ sub psgi_return_start { # may run later, much later...
 	}
 }
 
+sub _ipipe_cb { # InputPipe callback
+	my ($ipipe, $self) = @_; # $_[-1] rbuf
+	return yield_chunk($self, $ipipe, $_[-1]) if $self->{qfh}; # stream body
+
+	if (!defined($_[-1])) {
+		warn "error reading header: $!";
+	} elsif ($_[-1] eq '') {
+		warn <<EOM;
+EOF parsing headers from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
+EOM
+	} else { # attempt to parse headers
+		my ($bref, $ph_cb, @ph_arg) = @{$self->{yield_parse_hdr}};
+		$$bref .= $_[-1];
+		my $ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) };
+		if ($ret) { # success
+			my $qfh = yield_pass($self, $ipipe, $ret);
+			delete $self->{yield_parse_hdr};
+			$qfh->write($$bref) if $qfh;
+			return;
+		}
+		return unless $@; # incomplete, not an error
+		warn "parse_hdr ($ph_cb): $@";
+	}
+	delete $self->{yield_parse_hdr};
+	yield_pass($self, $ipipe, [ 500, [], [ "Internal error\n" ] ])
+}
+
+sub _yield_start { # may run later, much later...
+	my ($self) = @_;
+	my $async = !!$self->{psgi_env}->{'pi-httpd.async'};
+	my $rpipe = $self->{rpipe};
+	if ($async) {
+		require PublicInbox::ProcessIONBF;
+		PublicInbox::ProcessIONBF->replace($rpipe);
+	}
+	my $ipipe = PublicInbox::InputPipe::consume($rpipe, \&_ipipe_cb, $self);
+	return if $async;
+	$ipipe->event_step while $ipipe->{sock};
+}
+
 # Used for streaming the stdout of one process as a PSGI response.
 #
 # $env is the PSGI env.
@@ -302,4 +379,22 @@ sub psgi_return {
 	}
 }
 
+sub psgi_yield {
+	my ($self, $env, $limiter, @parse_hdr_arg)= @_;
+	$self->{psgi_env} = $env;
+	$self->{yield_parse_hdr} = [ \(my $buf = ''), @parse_hdr_arg ];
+	$limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
+
+	# the caller already captured the PSGI write callback from
+	# the PSGI server, so we can call ->start, here:
+	$env->{'qspawn.wcb'} ? start($self, $limiter, \&_yield_start) : sub {
+		# the caller will return this sub to the PSGI server, so
+		# it can set the response callback (that is, for
+		# PublicInbox::HTTP, the chunked_wcb or identity_wcb callback),
+		# but other HTTP servers are supported:
+		$env->{'qspawn.wcb'} = $_[0];
+		start($self, $limiter, \&_yield_start);
+	}
+}
+
 1;

^ permalink raw reply related	[flat|nested] 8+ messages in thread

end of thread, other threads:[~2023-10-19  1:15 UTC | newest]

Thread overview: 8+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-10-19  1:15 [PATCH 1/8] limiter: split out from qspawn Eric Wong
2023-10-19  1:15 ` [PATCH 2/8] spawn: support synchronous run_qx Eric Wong
2023-10-19  1:15 ` [PATCH 3/8] psgi_qx: use a temporary file rather than pipe Eric Wong
2023-10-19  1:15 ` [PATCH 4/8] www_coderepo: capture uses a flattened list Eric Wong
2023-10-19  1:15 ` [PATCH 5/8] qspawn: psgi_return allows list for callback args Eric Wong
2023-10-19  1:15 ` [PATCH 6/8] qspawn: drop unused err arg for ->event_step Eric Wong
2023-10-19  1:15 ` [PATCH 7/8] httpd/async: require IO arg Eric Wong
2023-10-19  1:15 ` [PATCH 8/8] qspawn: introduce psgi_yield API Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).