dumping ground for random patches and texts
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: spew@80x24.org
Subject: [PATCH 3/8] psgi_qx: use a temporary file rather than pipe
Date: Thu, 19 Oct 2023 01:15:30 +0000	[thread overview]
Message-ID: <20231019011535.1895489-3-e@80x24.org> (raw)
In-Reply-To: <20231019011535.1895489-1-e@80x24.org>

A pipe requires more context switches and code to deal with
unpredictable pipe EOF vs waitpid ordering.  So just use the
new spawn/aspawn features to automatically handle slurping
output into a string.
---
 MANIFEST                         |  1 +
 lib/PublicInbox/Aspawn.pm        | 30 ++++++++++
 lib/PublicInbox/CodeSearchIdx.pm |  1 +
 lib/PublicInbox/Qspawn.pm        | 95 +++++++++++---------------------
 4 files changed, 65 insertions(+), 62 deletions(-)
 create mode 100644 lib/PublicInbox/Aspawn.pm

diff --git a/MANIFEST b/MANIFEST
index dcce801c..f087621c 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -161,6 +161,7 @@ lib/PublicInbox/AddressPP.pm
 lib/PublicInbox/Admin.pm
 lib/PublicInbox/AdminEdit.pm
 lib/PublicInbox/AltId.pm
+lib/PublicInbox/Aspawn.pm
 lib/PublicInbox/AutoReap.pm
 lib/PublicInbox/Cgit.pm
 lib/PublicInbox/CidxComm.pm
diff --git a/lib/PublicInbox/Aspawn.pm b/lib/PublicInbox/Aspawn.pm
new file mode 100644
index 00000000..2423ac31
--- /dev/null
+++ b/lib/PublicInbox/Aspawn.pm
@@ -0,0 +1,30 @@
+package PublicInbox::Aspawn;
+use v5.12;
+use parent qw(Exporter);
+use PublicInbox::DS qw(awaitpid);
+use PublicInbox::Spawn qw(spawn);
+our @EXPORT_OK = qw(run_await);
+
+sub _await_cb { # awaitpid cb
+	my ($pid, $cmd, $env, $opt, $cb, @args) = @_;
+	PublicInbox::Spawn::read_out_err($opt);
+	if ($? && !$opt->{quiet}) {
+		my ($status, $sig) = ($? >> 8, $? & 127);
+		my $msg = '';
+		$msg .= " (-C=$opt->{-C})" if defined $opt->{-C};
+		$msg .= " status=$status" if $status;
+		$msg .= " signal=$sig" if $sig;
+		warn "E: @$cmd", $msg, "\n";
+	}
+	$cb->($pid, $cmd, $env, $opt, @args) if $cb;
+}
+
+sub run_await {
+	my ($cmd, $env, $opt, $cb, @args) = @_;
+	$opt->{1} //= \(my $out);
+	my $pid = spawn($cmd, $env, $opt);
+	awaitpid($pid, \&_await_cb, $cmd, $env, $opt, $cb, @args);
+	awaitpid($pid); # synchronous for non-$in_loop
+}
+
+1;
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 36d00aea..b9b72ff4 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -55,6 +55,7 @@ use PublicInbox::CidxLogP;
 use PublicInbox::CidxComm;
 use PublicInbox::Git qw(%OFMT2HEXLEN);
 use PublicInbox::Compat qw(uniqstr);
+use PublicInbox::Aspawn qw(run_await);
 use Carp ();
 use autodie qw(pipe open seek sysseek send);
 our (
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index a4d78e49..59d5ed40 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -30,6 +30,7 @@ use PublicInbox::Spawn qw(popen_rd);
 use PublicInbox::GzipFilter;
 use Scalar::Util qw(blessed);
 use PublicInbox::Limiter;
+use PublicInbox::Aspawn qw(run_await);
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
 use Errno qw(EAGAIN EINTR);
@@ -48,29 +49,30 @@ sub new {
 
 sub _do_spawn {
 	my ($self, $start_cb, $limiter) = @_;
-	my $err;
 	my ($cmd, $cmd_env, $opt) = @{delete $self->{args}};
 	my %o = %{$opt || {}};
 	$self->{limiter} = $limiter;
-	foreach my $k (@PublicInbox::Spawn::RLIMITS) {
-		if (defined(my $rlimit = $limiter->{$k})) {
-			$o{$k} = $rlimit;
-		}
+	for my $k (@PublicInbox::Spawn::RLIMITS) {
+		$o{$k} = $limiter->{$k} // next;
 	}
 	$self->{cmd} = $cmd;
 	$self->{-quiet} = 1 if $o{quiet};
-	eval {
-		# popen_rd may die on EMFILE, ENFILE
-		$self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
-					\&waitpid_err, $self);
-		$limiter->{running}++;
-		$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
-	};
+	$limiter->{running}++;
+	if ($start_cb) {
+		eval { # popen_rd may die on EMFILE, ENFILE
+			$self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
+						\&waitpid_err, $self);
+			$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
+		};
+	} else {
+		eval { run_await($cmd, $cmd_env, \%o, \&wait_await, $self) };
+		warn "E: $@" if $@;
+	}
 	finish($self, $@) if $@;
 }
 
-sub finalize ($) {
-	my ($self) = @_;
+sub finalize ($;$) {
+	my ($self, $opt) = @_;
 
 	# process is done, spawn whatever's in the queue
 	my $limiter = delete $self->{limiter} or return;
@@ -89,10 +91,10 @@ sub finalize ($) {
 		warn "@{$self->{cmd}}: $err\n" if !$self->{-quiet};
 	}
 
-	my ($env, $qx_cb, $qx_arg, $qx_buf) =
-		delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
-	if ($qx_cb) {
-		eval { $qx_cb->($qx_buf, $qx_arg) };
+	my ($env, $qx_cb_arg) = delete @$self{qw(psgi_env qx_cb_arg)};
+	if ($qx_cb_arg) {
+		my $cb = shift @$qx_cb_arg;
+		eval { $cb->($opt->{1}, @$qx_cb_arg) };
 		return unless $@;
 		warn "E: $@"; # hope qspawn.wcb can handle it
 	}
@@ -108,15 +110,20 @@ sub finalize ($) {
 sub DESTROY { finalize($_[0]) } # ->finalize is idempotent
 
 sub waitpid_err { # callback for awaitpid
-	my (undef, $self) = @_; # $_[0]: pid
+	my (undef, $self, $opt) = @_; # $_[0]: pid
 	$self->{_err} = ''; # for defined check in ->finish
-	if ($?) {
+	if ($?) { # FIXME: redundant
 		my $status = $? >> 8;
 		my $sig = $? & 127;
 		$self->{_err} .= "exit status=$status";
 		$self->{_err} .= " signal=$sig" if $sig;
 	}
-	finalize($self) if !$self->{rpipe};
+	finalize($self, $opt) if !$self->{rpipe};
+}
+
+sub wait_await { # run_await cb
+	my ($pid, $cmd, $cmd_env, $opt, $self) = @_;
+	waitpid_err($pid, $self, $opt);
 }
 
 sub finish ($;$) {
@@ -140,52 +147,16 @@ sub start ($$$) {
 	}
 }
 
-sub psgi_qx_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
-	my ($self) = @_;
-	my ($r, $buf);
-reread:
-	$r = sysread($self->{rpipe}, $buf, 65536);
-	if (!defined($r)) {
-		return if $! == EAGAIN; # try again when notified
-		goto reread if $! == EINTR;
-		event_step($self, $!);
-	} elsif (my $as = delete $self->{async}) { # PublicInbox::HTTPD::Async
-		$as->async_pass($self->{psgi_env}->{'psgix.io'},
-				$self->{qx_fh}, \$buf);
-	} elsif ($r) { # generic PSGI:
-		print { $self->{qx_fh} } $buf;
-	} else { # EOF
-		event_step($self, undef);
-	}
-}
-
-sub psgi_qx_start {
-	my ($self) = @_;
-	if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) {
-		# PublicInbox::HTTPD::Async->new(rpipe, $cb, cb_arg, $end_obj)
-		$self->{async} = $async->($self->{rpipe},
-					\&psgi_qx_init_cb, $self, $self);
-		# init_cb will call ->async_pass or ->close
-	} else { # generic PSGI
-		psgi_qx_init_cb($self) while $self->{qx_fh};
-	}
-}
-
-# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with
+# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls @qx_cb_arg with
 # the stdout of the given command when done; but respects the given limiter
 # $env is the PSGI env.  As with ``/qx; only use this when output is small
 # and safe to slurp.
 sub psgi_qx {
-	my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_;
+	my ($self, $env, $limiter, @qx_cb_arg) = @_;
 	$self->{psgi_env} = $env;
-	my $qx_buf = '';
-	open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar
-	$self->{qx_cb} = $qx_cb;
-	$self->{qx_arg} = $qx_arg;
-	$self->{qx_fh} = $qx_fh;
-	$self->{qx_buf} = \$qx_buf;
+	$self->{qx_cb_arg} = \@qx_cb_arg;
 	$limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
-	start($self, $limiter, \&psgi_qx_start);
+	start($self, $limiter, undef);
 }
 
 # this is called on pipe EOF to reap the process, may be called
@@ -195,7 +166,7 @@ sub event_step {
 	my ($self, $err) = @_; # $err: $!
 	warn "psgi_{return,qx} $err" if defined($err);
 	finish($self);
-	my ($fh, $qx_fh) = delete(@$self{qw(qfh qx_fh)});
+	my $fh = delete $self->{qfh};
 	$fh->close if $fh; # async-only (psgi_return)
 }
 

  parent reply	other threads:[~2023-10-19  1:15 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-10-19  1:15 [PATCH 1/8] limiter: split out from qspawn Eric Wong
2023-10-19  1:15 ` [PATCH 2/8] spawn: support synchronous run_qx Eric Wong
2023-10-19  1:15 ` Eric Wong [this message]
2023-10-19  1:15 ` [PATCH 4/8] www_coderepo: capture uses a flattened list Eric Wong
2023-10-19  1:15 ` [PATCH 5/8] qspawn: psgi_return allows list for callback args Eric Wong
2023-10-19  1:15 ` [PATCH 6/8] qspawn: drop unused err arg for ->event_step Eric Wong
2023-10-19  1:15 ` [PATCH 7/8] httpd/async: require IO arg Eric Wong
2023-10-19  1:15 ` [PATCH 8/8] qspawn: introduce psgi_yield API Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20231019011535.1895489-3-e@80x24.org \
    --to=e@80x24.org \
    --cc=spew@80x24.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).