dumping ground for random patches and texts
 help / color / mirror / Atom feed
* [PATCH 1/3] process_pipe2 + import
@ 2023-09-30 14:52 Eric Wong
  2023-09-30 14:53 ` [PATCH 2/3] git: decouple cat_async_retry from POSIX pipe semantics Eric Wong
  2023-09-30 14:53 ` [PATCH 3/3] git: use Unix stream sockets for `cat-file --batch-*' Eric Wong
  0 siblings, 2 replies; 3+ messages in thread
From: Eric Wong @ 2023-09-30 14:52 UTC (permalink / raw)
  To: spew

---
 MANIFEST                        |  1 +
 lib/PublicInbox/Import.pm       | 21 +++++++--------------
 lib/PublicInbox/ProcessPipe.pm  | 15 +++++++++------
 lib/PublicInbox/ProcessPipe2.pm | 29 +++++++++++++++++++++++++++++
 4 files changed, 46 insertions(+), 20 deletions(-)
 create mode 100644 lib/PublicInbox/ProcessPipe2.pm

diff --git a/MANIFEST b/MANIFEST
index 4693cbe0..2b180f72 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -319,6 +319,7 @@ lib/PublicInbox/POP3.pm
 lib/PublicInbox/POP3D.pm
 lib/PublicInbox/PktOp.pm
 lib/PublicInbox/ProcessPipe.pm
+lib/PublicInbox/ProcessPipe2.pm
 lib/PublicInbox/Qspawn.pm
 lib/PublicInbox/Reply.pm
 lib/PublicInbox/RepoAtom.pm
diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index 59462e9a..c63b369f 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -7,7 +7,7 @@
 # requires read-only access.
 package PublicInbox::Import;
 use strict;
-use parent qw(PublicInbox::Lock);
+use parent qw(PublicInbox::Lock PublicInbox::ProcessPipe2);
 use v5.10.1;
 use PublicInbox::Spawn qw(run_die popen_rd);
 use PublicInbox::MID qw(mids mid2path);
@@ -58,9 +58,6 @@ sub gfi_start {
 
 	return ($self->{in}, $self->{out}) if $self->{in};
 
-	my ($in_r, $out_r, $out_w);
-	pipe($out_r, $out_w) or die "pipe failed: $!";
-
 	$self->lock_acquire;
 	eval {
 		my ($git, $ref) = @$self{qw(git ref)};
@@ -72,18 +69,15 @@ sub gfi_start {
 			die "fatal: ls-tree -r -z --name-only $ref: \$?=$?" if $?;
 			$self->{-tree} = { map { $_ => 1 } split(/\0/, $t) };
 		}
-		$in_r = $self->{in} = $git->popen(qw(fast-import
-					--quiet --done --date-format=raw),
-					undef, { 0 => $out_r });
-		$out_w->autoflush(1);
-		$self->{out} = $out_w;
+		$self->popen_rw(['git', "--git-dir=$git->{git_dir}",
+			qw(fast-import --quiet --done --date-format=raw)]);
 		$self->{nchg} = 0;
 	};
 	if ($@) {
 		$self->lock_release;
 		die $@;
 	}
-	($in_r, $out_w);
+	@$self{qw(in out)};
 }
 
 sub wfail () { die "write to fast-import failed: $!" }
@@ -490,11 +484,10 @@ sub active { !!$_[0]->{out} }
 
 sub done {
 	my ($self) = @_;
-	my $w = delete $self->{out} or return;
+	$self->{out} or return;
 	eval {
-		my $r = delete $self->{in} or die 'BUG: missing {in} when done';
-		print $w "done\n" or wfail;
-		close $r or die "fast-import failed: $?"; # ProcessPipe::CLOSE
+		print { $self->{out} } "done\n" or wfail;
+		$self->close_wait or die "fast-import failed: $?";
 	};
 	my $wait_err = $@;
 	my $nchg = delete $self->{nchg};
diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm
index 16971801..c82ba0f8 100644
--- a/lib/PublicInbox/ProcessPipe.pm
+++ b/lib/PublicInbox/ProcessPipe.pm
@@ -1,10 +1,9 @@
 # Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
-# a tied handle for auto reaping of children tied to a read-only pipe, see perltie(1)
-# DO NOT use this as-is for bidirectional pipes/sockets (e.g. in PublicInbox::Git),
-# both ends of the pipe must be at the same level of the Perl object hierarchy
-# to ensure orderly destruction.
+# a tied handle for auto reaping of children tied to a read-only pipe,
+# see perltie(1).  Use ProcessPipe2 for bidirectional pipes/sockets
+# for proper refcount and destruction ordering but no tie support
 package PublicInbox::ProcessPipe;
 use v5.12;
 use PublicInbox::DS qw(awaitpid);
@@ -57,8 +56,12 @@ sub FILENO { fileno($_[0]->{fh}) }
 
 sub _close ($;$) {
 	my ($self, $wait) = @_;
-	my ($fh, $pid) = delete(@$self{qw(fh pid)});
-	my $ret = defined($fh) ? close($fh) : '';
+	my ($fh, $pid, $in, $out) = delete(@$self{qw(fh pid in out)});
+	my $ret = defined($fh) ? close($fh) : do { # in/out => ProcessPipe2
+		my $n = (defined($out) ? close($out) : 1).
+			(defined($in) ? close($in) : 1);
+		$n eq '11' ? 1 : '';
+	};
 	return $ret unless defined($pid) && $self->{ppid} == $$;
 	if ($wait) { # caller cares about the exit status:
 		# synchronous wait via defined(wantarray) on awaitpid:
diff --git a/lib/PublicInbox/ProcessPipe2.pm b/lib/PublicInbox/ProcessPipe2.pm
new file mode 100644
index 00000000..d89a2b65
--- /dev/null
+++ b/lib/PublicInbox/ProcessPipe2.pm
@@ -0,0 +1,29 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# non-tied version of ProcessPipe for "bidirectional" pipes used by
+# `cat-file --batch-*', fast-import, etc..
+package PublicInbox::ProcessPipe2;
+use v5.12;
+use parent qw(PublicInbox::ProcessPipe);
+use autodie qw(pipe);
+use PublicInbox::Spawn qw(spawn);
+use PublicInbox::DS qw(awaitpid);
+
+sub popen_rw {
+	my ($self, $cmd, $env, $opt) = @_;
+	pipe($self->{in}, local $opt->{1});
+	pipe(local $opt->{0}, $self->{out});
+	$self->{out}->autoflush(1);
+	$self->{ppid} = $$;
+	$self->{pp_chld_err} = \(my $err);
+	my $pid = $self->{pid} = spawn($cmd, $env, $opt);
+	awaitpid($pid, $self->can('waitcb'), \$err, @{$opt->{cb_arg} // []});
+	undef;
+}
+
+sub close_wait { $_[0]->_close(1) }
+
+# DESTROY is inherited from ProcessPipe
+
+1;

^ permalink raw reply related	[flat|nested] 3+ messages in thread

* [PATCH 2/3] git: decouple cat_async_retry from POSIX pipe semantics
  2023-09-30 14:52 [PATCH 1/3] process_pipe2 + import Eric Wong
@ 2023-09-30 14:53 ` Eric Wong
  2023-09-30 14:53 ` [PATCH 3/3] git: use Unix stream sockets for `cat-file --batch-*' Eric Wong
  1 sibling, 0 replies; 3+ messages in thread
From: Eric Wong @ 2023-09-30 14:53 UTC (permalink / raw)
  To: spew

While pipes guarantee writes of <= 512 bytes to be atomic,
Unix stream sockets (or TCP sockets) have no such guarantees.
Removing the pipe assumption will make it possible for us to
switch to bidirectional Unix stream sockets and save FDs with
`git cat-file' processes as we have with Gcf2Client.  The
performance benefit of larger pipe buffers over stream sockets
isn't irrelevant when interacting with git as it is with
SearchIdx shards.
---
 lib/PublicInbox/Git.pm | 20 ++++++++------------
 1 file changed, 8 insertions(+), 12 deletions(-)

diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index eb88aa48..8ac40d2b 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -223,25 +223,21 @@ sub my_readline ($$) {
 }
 
 sub cat_async_retry ($$) {
-	my ($self, $inflight) = @_;
+	my ($self, $old_inflight) = @_;
 
 	# {inflight} may be non-existent, but if it isn't we delete it
 	# here to prevent cleanup() from waiting:
 	delete $self->{inflight};
 	cleanup($self);
+	batch_prepare($self, my $new_inflight = []);
 
-	batch_prepare($self, $inflight);
-	my $buf = '';
-	for (my $i = 0; $i < @$inflight; $i += 3) {
-		$buf .= "$inflight->[$i]\n";
+	while (my ($oid, $cb, $arg) = splice(@$old_inflight, 0, 3)) {
+		write_all($self, $self->{out}, $oid."\n",
+				\&cat_async_step, $new_inflight);
+		$oid = \$oid if !@$new_inflight; # to indicate oid retried
+		push @$new_inflight, $oid, $cb, $arg;
 	}
-	$self->{out}->blocking(1); # brand new pipe, should never block
-	print { $self->{out} } $buf or $self->fail("write error: $!");
-	$self->{out}->blocking(0);
-	my $req = shift @$inflight;
-	unshift(@$inflight, \$req); # \$ref to indicate retried
-
-	cat_async_step($self, $inflight); # take one step
+	cat_async_step($self, $new_inflight); # take one step
 }
 
 # returns true if prefetch is successful

^ permalink raw reply related	[flat|nested] 3+ messages in thread

* [PATCH 3/3] git: use Unix stream sockets for `cat-file --batch-*'
  2023-09-30 14:52 [PATCH 1/3] process_pipe2 + import Eric Wong
  2023-09-30 14:53 ` [PATCH 2/3] git: decouple cat_async_retry from POSIX pipe semantics Eric Wong
@ 2023-09-30 14:53 ` Eric Wong
  1 sibling, 0 replies; 3+ messages in thread
From: Eric Wong @ 2023-09-30 14:53 UTC (permalink / raw)
  To: spew

The benefit of 1MB potential pipe buffer size (on Linux) doesn't
seem noticeable when reading from git (unlike when writing to v2
shards), so Unix stream sockets seem fine, here.

This allows us to simplify our process management by using the
same socket FD for reads and writes and enables us to use our
ProcessPipe class for reaping (as we can do with Gcf2Client).

Gcf2Client no longer relies on PublicInbox::DS for write
buffering, and instead just waits for requests to complete
once the number of inflight requests hits the MAX_INFLIGHT
threshold as we do with PublicInbox::Git.

We reuse the existing MAX_INFLIGHT limit (18) that was
determined by the minimum allowed PIPE_BUF (512).  (AFAIK) Unix
stream sockets have no analogy to PIPE_BUF, but all *BSDs and
Linux I've checked have default SO_RCVBUF and SO_SNDBUF values
larger than the previously-required PIPE_BUF size of 512 bytes.
---
 lib/PublicInbox/Gcf2Client.pm  |  57 ++------
 lib/PublicInbox/Git.pm         | 254 +++++++++++++++++----------------
 lib/PublicInbox/GitAsyncCat.pm |  98 +------------
 lib/PublicInbox/LeiToMail.pm   |   2 +-
 lib/PublicInbox/ProcessPipe.pm |   4 +-
 lib/PublicInbox/ViewVCS.pm     |   2 +-
 6 files changed, 153 insertions(+), 264 deletions(-)

diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index a49e2aad..8f442787 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -3,14 +3,15 @@
 
 # connects public-inbox processes to PublicInbox::Gcf2::loop()
 package PublicInbox::Gcf2Client;
-use strict;
+use v5.12;
 use parent qw(PublicInbox::DS);
 use PublicInbox::Git;
 use PublicInbox::Gcf2; # fails if Inline::C or libgit2-dev isn't available
 use PublicInbox::Spawn qw(spawn);
 use Socket qw(AF_UNIX SOCK_STREAM);
 use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
-use PublicInbox::DS qw(awaitpid);
+use PublicInbox::ProcessPipe;
+
 # fields:
 #	sock => socket to Gcf2::loop
 # The rest of these fields are compatible with what PublicInbox::Git
@@ -21,66 +22,36 @@ use PublicInbox::DS qw(awaitpid);
 #	inflight => array (see PublicInbox::Git)
 #	rbuf => scalarref, may be non-existent or empty
 sub new  {
-	my ($rdr) = @_;
+	my ($opt) = @_;
 	my $self = bless {}, __PACKAGE__;
 	# ensure the child process has the same @INC we do:
 	my $env = { PERL5LIB => join(':', @INC) };
 	my ($s1, $s2);
 	socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair $!";
-	$rdr //= {};
-	$rdr->{0} = $rdr->{1} = $s2;
-	my $cmd = [$^X, qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]];
-	$self->{'pid.owner'} = $$;
-	awaitpid($self->{pid} = spawn($cmd, $env, $rdr), undef);
 	$s1->blocking(0);
+	$opt->{0} = $opt->{1} = $s2;
+	my $cmd = [$^X, qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]];
+	my $pid = spawn($cmd, $env, $opt);
+	my $sock = PublicInbox::ProcessPipe->maybe_new($pid, $s1);
 	$self->{inflight} = [];
-	$self->{in} = $s1;
-	$self->SUPER::new($s1, EPOLLIN|EPOLLET);
-}
-
-sub fail {
-	my $self = shift;
-	$self->close; # PublicInbox::DS::close
-	PublicInbox::Git::fail($self, @_);
+	$self->{epwatch} = \undef; # for Git->cleanup
+	$self->SUPER::new($sock, EPOLLIN|EPOLLET);
 }
 
 sub gcf2_async ($$$;$) {
 	my ($self, $req, $cb, $arg) = @_;
 	my $inflight = $self->{inflight} or return $self->close;
-
-	# {wbuf} is rare, I hope:
-	cat_async_step($self, $inflight) if $self->{wbuf};
-
-	$self->fail("gcf2c write: $!") if !$self->write($req) && !$self->{sock};
+	PublicInbox::Git::write_all($self, $$req, \&cat_async_step, $inflight);
 	push @$inflight, $req, $cb, $arg;
 }
 
 # ensure PublicInbox::Git::cat_async_step never calls cat_async_retry
 sub alternates_changed {}
 
-# DS::event_loop will call this
-sub event_step {
-	my ($self) = @_;
-	$self->flush_write;
-	$self->close if !$self->{in} || !$self->{sock}; # process died
-	my $inflight = $self->{inflight};
-	if ($inflight && @$inflight) {
-		cat_async_step($self, $inflight);
-		return $self->close unless $self->{in}; # process died
-
-		# ok, more to do, requeue for fairness
-		$self->requeue if @$inflight || exists($self->{rbuf});
-	}
-}
-
-sub DESTROY {
-	my ($self) = @_;
-	delete $self->{sock}; # if outside event_loop
-	PublicInbox::Git::DESTROY($self);
-}
-
 no warnings 'once';
 
-*cat_async_step = \&PublicInbox::Git::cat_async_step;
+*cat_async_step = \&PublicInbox::Git::cat_async_step; # for event_step
+*event_step = \&PublicInbox::Git::event_step;
+*DESTROY = \&PublicInbox::Git::DESTROY;
 
 1;
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 8ac40d2b..3062f293 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -9,23 +9,23 @@
 package PublicInbox::Git;
 use strict;
 use v5.10.1;
-use parent qw(Exporter);
+use parent qw(Exporter PublicInbox::DS);
 use POSIX ();
-use IO::Handle; # ->autoflush
+use IO::Handle; # ->blocking
+use Socket qw(AF_UNIX SOCK_STREAM);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
 use Errno qw(EINTR EAGAIN);
 use File::Glob qw(bsd_glob GLOB_NOSORT);
 use File::Spec ();
 use Time::HiRes qw(stat);
-use PublicInbox::Spawn qw(popen_rd which);
+use PublicInbox::Spawn qw(spawn popen_rd which);
 use PublicInbox::Tmpfile;
 use IO::Poll qw(POLLIN);
 use Carp qw(croak carp);
 use PublicInbox::SHA ();
-use PublicInbox::DS qw(awaitpid);
 our %HEXLEN2SHA = (40 => 1, 64 => 256);
 our %OFMT2HEXLEN = (sha1 => 40, sha256 => 64);
 our @EXPORT_OK = qw(git_unquote git_quote %HEXLEN2SHA %OFMT2HEXLEN);
-our $PIPE_BUFSIZ = 65536; # Linux default
 our $in_cleanup;
 our $RDTIMEO = 60_000; # milliseconds
 our $async_warn; # true in read-only daemons
@@ -34,11 +34,8 @@ our $async_warn; # true in read-only daemons
 my @MODIFIED_DATE = qw[for-each-ref --sort=-committerdate
 			--format=%(committerdate:raw) --count=1];
 
-# 512: POSIX PIPE_BUF minimum (see pipe(7))
-# 65: SHA-256 hex size + "\n" in preparation for git using non-SHA1
-# 3: @$inflight is flattened [ $OID, $cb, $arg ]
 use constant {
-	MAX_INFLIGHT => int(512 / (65 + length('contents '))) * 3,
+	MAX_INFLIGHT => 18, # arbitrary, formerly based on PIPE_BUF
 	BATCH_CMD_VER => v2.36.0, # git 2.36+
 };
 
@@ -65,7 +62,7 @@ sub check_git_exe () {
 	if ($st ne $EXE_ST) {
 		my $rd = popen_rd([ $GIT_EXE, '--version' ]);
 		my $v = readline($rd);
-		close($rd) or die "$GIT_EXE --version: $?";
+		CORE::close($rd) or die "$GIT_EXE --version: $?";
 		$v =~ /\b([0-9]+(?:\.[0-9]+){2})/ or die
 			"$GIT_EXE --version output: $v # unparseable";
 		$GIT_VER = eval("v$1") // die "BUG: bad vstring: $1 ($v)";
@@ -144,17 +141,16 @@ sub last_check_err {
 	$buf;
 }
 
-sub _bidi_pipe {
-	my ($self, $batch, $in, $out, $pid, $err) = @_;
-	if (defined $self->{$pid}) {
-		Carp::cluck("BUG: self->{$pid} exists unexpectedly");
-		return;
-	}
-	pipe(my ($out_r, $out_w)) or $self->fail("pipe failed: $!");
-	my $rdr = { 0 => $out_r, pgid => 0 };
+sub _sock_cmd {
+	my ($self, $batch, $err_c) = @_;
+	$self->{sock} and Carp::confess('BUG: {sock} exists');
+	my ($s1, $s2);
+	socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair $!";
+	$s1->blocking(0);
+	my $opt = { pgid => 0, 0 => $s2, 1 => $s2 };
 	my $gd = $self->{git_dir};
 	if ($gd =~ s!/([^/]+/[^/]+)\z!/!) {
-		$rdr->{-C} = $gd;
+		$opt->{-C} = $gd;
 		$gd = $1;
 	}
 
@@ -163,23 +159,13 @@ sub _bidi_pipe {
 	my $abbr = $GIT_VER lt v2.31.0 ? 40 : 'no';
 	my @cmd = ($GIT_EXE, "--git-dir=$gd", '-c', "core.abbrev=$abbr",
 			'cat-file', "--$batch");
-	if ($err) {
+	if ($err_c) {
 		my $id = "git.$self->{git_dir}.$batch.err";
-		$self->{$err} = $rdr->{2} = tmpfile($id, undef, 1) or
+		$self->{err_c} = $opt->{2} = tmpfile($id, undef, 1) or
 						$self->fail("tmpfile($id): $!");
 	}
-	# see lib/PublicInbox/ProcessPipe.pm for why we don't use that here
-	my ($in_r, $p) = popen_rd(\@cmd, undef, $rdr);
-	awaitpid($self->{$pid} = $p, undef);
-	$self->{"$pid.owner"} = $$;
-	$out_w->autoflush(1);
-	if ($^O eq 'linux') { # 1031: F_SETPIPE_SZ
-		fcntl($out_w, 1031, 4096);
-		fcntl($in_r, 1031, 4096) if $batch eq 'batch-check';
-	}
-	$out_w->blocking(0);
-	$self->{$out} = $out_w;
-	$self->{$in} = $in_r;
+	my $pid = spawn(\@cmd, undef, $opt);
+	$self->{sock} = PublicInbox::ProcessPipe->maybe_new($pid, $s1);
 }
 
 sub poll_in ($) { IO::Poll::_poll($RDTIMEO, fileno($_[0]), my $ev = POLLIN) }
@@ -189,7 +175,7 @@ sub my_read ($$$) {
 	my $left = $len - length($$rbuf);
 	my $r;
 	while ($left > 0) {
-		$r = sysread($fh, $$rbuf, $PIPE_BUFSIZ, length($$rbuf));
+		$r = sysread($fh, $$rbuf, $left, length($$rbuf));
 		if ($r) {
 			$left -= $r;
 		} elsif (defined($r)) { # EOF
@@ -210,8 +196,7 @@ sub my_readline ($$) {
 		if ((my $n = index($$rbuf, "\n")) >= 0) {
 			return substr($$rbuf, 0, $n + 1, '');
 		}
-		my $r = sysread($fh, $$rbuf, $PIPE_BUFSIZ, length($$rbuf))
-								and next;
+		my $r = sysread($fh, $$rbuf, 65536, length($$rbuf)) and next;
 
 		# return whatever's left on EOF
 		return substr($$rbuf, 0, length($$rbuf)+1, '') if defined($r);
@@ -229,11 +214,10 @@ sub cat_async_retry ($$) {
 	# here to prevent cleanup() from waiting:
 	delete $self->{inflight};
 	cleanup($self);
-	batch_prepare($self, my $new_inflight = []);
+	my $new_inflight = batch_prepare($self);
 
 	while (my ($oid, $cb, $arg) = splice(@$old_inflight, 0, 3)) {
-		write_all($self, $self->{out}, $oid."\n",
-				\&cat_async_step, $new_inflight);
+		write_all($self, $oid."\n", \&cat_async_step, $new_inflight);
 		$oid = \$oid if !@$new_inflight; # to indicate oid retried
 		push @$new_inflight, $oid, $cb, $arg;
 	}
@@ -246,7 +230,7 @@ sub async_prefetch {
 	my $inflight = $self->{inflight} or return;
 	return if @$inflight;
 	substr($oid, 0, 0) = 'contents ' if $self->{-bc};
-	write_all($self, $self->{out}, "$oid\n", \&cat_async_step, $inflight);
+	write_all($self, "$oid\n", \&cat_async_step, $inflight);
 	push(@$inflight, $oid, $cb, $arg);
 }
 
@@ -256,14 +240,14 @@ sub cat_async_step ($$) {
 	my ($req, $cb, $arg) = @$inflight[0, 1, 2];
 	my $rbuf = delete($self->{rbuf}) // \(my $new = '');
 	my ($bref, $oid, $type, $size);
-	my $head = my_readline($self->{in}, $rbuf);
+	my $head = my_readline($self->{sock}, $rbuf);
 	my $cmd = ref($req) ? $$req : $req;
 	# ->fail may be called via Gcf2Client.pm
 	my $info = $self->{-bc} && substr($cmd, 0, 5) eq 'info ';
 	if ($head =~ /^([0-9a-f]{40,}) (\S+) ([0-9]+)$/) {
 		($oid, $type, $size) = ($1, $2, $3 + 0);
 		unless ($info) { # --batch-command
-			$bref = my_read($self->{in}, $rbuf, $size + 1) or
+			$bref = my_read($self->{sock}, $rbuf, $size + 1) or
 				$self->fail(defined($bref) ?
 						'read EOF' : "read: $!");
 			chop($$bref) eq "\n" or
@@ -302,16 +286,16 @@ sub cat_async_wait ($) {
 	}
 }
 
-sub batch_prepare ($$) {
-	my ($self, $inflight) = @_;
+sub batch_prepare ($) {
+	my ($self) = @_;
 	check_git_exe();
 	if ($GIT_VER ge BATCH_CMD_VER) {
-		_bidi_pipe($self, qw(batch-command in out pid err_c));
 		$self->{-bc} = 1;
+		_sock_cmd($self, 'batch-command', 1);
 	} else {
-		_bidi_pipe($self, qw(batch in out pid));
+		_sock_cmd($self, 'batch');
 	}
-	$self->{inflight} = $inflight;
+	$self->{inflight} = [];
 }
 
 sub _cat_file_cb {
@@ -328,52 +312,59 @@ sub cat_file {
 }
 
 sub check_async_step ($$) {
-	my ($self, $inflight_c) = @_;
-	die 'BUG: inflight empty or odd' if scalar(@$inflight_c) < 3;
-	my ($req, $cb, $arg) = @$inflight_c[0, 1, 2];
-	my $rbuf = delete($self->{rbuf_c}) // \(my $new = '');
-	chomp(my $line = my_readline($self->{in_c}, $rbuf));
+	my ($ck, $inflight) = @_;
+	die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
+	my ($req, $cb, $arg) = @$inflight[0, 1, 2];
+	my $rbuf = delete($ck->{rbuf}) // \(my $new = '');
+	chomp(my $line = my_readline($ck->{sock}, $rbuf));
 	my ($hex, $type, $size) = split(/ /, $line);
 
 	# git <2.21 would show `dangling' (2.21+ shows `ambiguous')
 	# https://public-inbox.org/git/20190118033845.s2vlrb3wd3m2jfzu@dcvr/T/
 	if ($hex eq 'dangling') {
-		my $ret = my_read($self->{in_c}, $rbuf, $type + 1);
-		$self->fail(defined($ret) ? 'read EOF' : "read: $!") if !$ret;
+		my $ret = my_read($ck->{sock}, $rbuf, $type + 1);
+		$ck->fail(defined($ret) ? 'read EOF' : "read: $!") if !$ret;
 	}
-	$self->{rbuf_c} = $rbuf if $$rbuf ne '';
-	splice(@$inflight_c, 0, 3); # don't retry $cb on ->fail
+	$ck->{rbuf} = $rbuf if $$rbuf ne '';
+	splice(@$inflight, 0, 3); # don't retry $cb on ->fail
 	eval { $cb->(undef, $hex, $type, $size, $arg) };
-	async_err($self, $req, $hex, $@, 'check') if $@;
+	async_err($ck, $req, $hex, $@, 'check') if $@;
 }
 
 sub check_async_wait ($) {
 	my ($self) = @_;
 	return cat_async_wait($self) if $self->{-bc};
-	my $inflight_c = $self->{inflight_c} or return;
-	check_async_step($self, $inflight_c) while (scalar(@$inflight_c));
+	my $ck = $self->{ck} or return;
+	my $inflight = $ck->{inflight} or return;
+	check_async_step($ck, $inflight) while (scalar(@$inflight));
+}
+
+# git <2.36
+sub ck {
+	$_[0]->{ck} //= bless { git_dir => $_[0]->{git_dir} },
+				'PublicInbox::GitCheck';
 }
 
 sub check_async_begin ($) {
 	my ($self) = @_;
-	die 'BUG: already in async check' if $self->{inflight_c};
 	cleanup($self) if alternates_changed($self);
 	check_git_exe();
 	if ($GIT_VER ge BATCH_CMD_VER) {
-		_bidi_pipe($self, qw(batch-command in out pid err_c));
 		$self->{-bc} = 1;
-		$self->{inflight} = [];
+		_sock_cmd($self, 'batch-command', 1);
 	} else {
-		_bidi_pipe($self, qw(batch-check in_c out_c pid_c err_c));
-		$self->{inflight_c} = [];
+		_sock_cmd($self = ck($self), 'batch-check', 1);
 	}
+	$self->{inflight} = [];
 }
 
 sub write_all {
-	my ($self, $out, $buf, $read_step, $inflight) = @_;
+	my ($self, $buf, $read_step, $inflight) = @_;
+	$self->{sock} // Carp::confess 'BUG: no {sock}';
+	Carp::confess('BUG: not an array') if ref($inflight) ne 'ARRAY';
 	$read_step->($self, $inflight) while @$inflight >= MAX_INFLIGHT;
 	do {
-		my $w = syswrite($out, $buf);
+		my $w = syswrite($self->{sock}, $buf);
 		if (defined $w) {
 			return if $w == length($buf);
 			substr($buf, 0, $w, ''); # sv_chop
@@ -386,16 +377,17 @@ sub write_all {
 
 sub check_async ($$$$) {
 	my ($self, $oid, $cb, $arg) = @_;
-	my $inflight = $self->{-bc} ?
-			($self->{inflight} // cat_async_begin($self)) :
-			($self->{inflight_c} // check_async_begin($self));
-	if ($self->{-bc}) {
+	my $inflight;
+	if ($self->{-bc}) { # likely as time goes on
+batch_command:
+		$inflight = $self->{inflight} // cat_async_begin($self);
 		substr($oid, 0, 0) = 'info ';
-		write_all($self, $self->{out}, "$oid\n",
-				\&cat_async_step, $inflight);
-	} else {
-		write_all($self, $self->{out_c}, "$oid\n",
-				\&check_async_step, $inflight);
+		write_all($self, "$oid\n", \&cat_async_step, $inflight);
+	} else { # accounts for git upgrades while we're running:
+		my $ck = $self->{ck}; # undef OK, maybe set in check_async_begin
+		$inflight = $ck->{inflight} // check_async_begin($self);
+		goto batch_command if $self->{-bc};
+		write_all($self->{ck}, "$oid\n", \&check_async_step, $inflight);
 	}
 	push(@$inflight, $oid, $cb, $arg);
 }
@@ -418,39 +410,9 @@ sub check {
 	($hex, $type, $size);
 }
 
-sub _destroy {
-	my ($self, $pid, @rest) = @_; # rest = rbuf, in, out, err
-	my ($p) = delete @$self{($pid, @rest)};
-
-	# GitAsyncCat::event_step may delete {$pid}
-	awaitpid($p) if defined($p) && $$ == $self->{"$pid.owner"};
-}
-
-sub async_abort ($) {
-	my ($self) = @_;
-	while (scalar(@{$self->{inflight_c} // []}) ||
-			scalar(@{$self->{inflight} // []})) {
-		for my $c ('', '_c') {
-			my $q = $self->{"inflight$c"} or next;
-			while (@$q) {
-				my ($req, $cb, $arg) = splice(@$q, 0, 3);
-				$req = $$req if ref($req);
-				$self->{-bc} and
-					$req =~ s/\A(?:contents|info) //;
-				$req =~ s/ .*//; # drop git_dir for Gcf2Client
-				eval { $cb->(undef, $req, undef, undef, $arg) };
-				warn "E: (in abort) $req: $@" if $@;
-			}
-			delete $self->{"inflight$c"};
-			delete $self->{"rbuf$c"};
-		}
-	}
-	cleanup($self);
-}
-
-sub fail { # may be augmented in subclasses
+sub fail {
 	my ($self, $msg) = @_;
-	async_abort($self);
+	$self->close;
 	croak(ref($self) . ' ' . ($self->{git_dir} // '') . ": $msg");
 }
 
@@ -475,12 +437,12 @@ sub qx {
 	my $fh = popen(@_);
 	if (wantarray) {
 		my @ret = <$fh>;
-		close $fh; # caller should check $?
+		CORE::close $fh; # caller should check $?
 		@ret;
 	} else {
 		local $/;
 		my $ret = <$fh>;
-		close $fh; # caller should check $?
+		CORE::close $fh; # caller should check $?
 		$ret;
 	}
 }
@@ -492,12 +454,16 @@ sub date_parse {
 	} $self->qx('rev-parse', map { "--since=$_" } @_);
 }
 
+sub _active ($) {
+	scalar(@{$_[0]->{inflight} // []}) ||
+		($_[0]->{ck} && scalar(@{$_[0]->{ck}->{inflight} // []}))
+}
+
 # check_async and cat_async may trigger the other, so ensure they're
 # both completely done by using this:
 sub async_wait_all ($) {
 	my ($self) = @_;
-	while (scalar(@{$self->{inflight_c} // []}) ||
-			scalar(@{$self->{inflight} // []})) {
+	while (_active($self)) {
 		check_async_wait($self);
 		cat_async_wait($self);
 	}
@@ -506,14 +472,10 @@ sub async_wait_all ($) {
 # returns true if there are pending "git cat-file" processes
 sub cleanup {
 	my ($self, $lazy) = @_;
-	return 1 if $lazy && (scalar(@{$self->{inflight_c} // []}) ||
-				scalar(@{$self->{inflight} // []}));
+	return 1 if $lazy && _active($self);
 	local $in_cleanup = 1;
-	delete @$self{qw(async_cat async_chk)};
 	async_wait_all($self);
-	delete @$self{qw(inflight inflight_c -bc)};
-	_destroy($self, qw(pid rbuf in out err_c));
-	_destroy($self, qw(pid_c rbuf_c in_c out_c err_c));
+	$_->close for ($self, (delete($self->{ck}) // ()));
 	undef;
 }
 
@@ -530,7 +492,7 @@ sub packed_bytes {
 	$n
 }
 
-sub DESTROY { cleanup(@_) }
+sub DESTROY { cleanup($_[0]) }
 
 sub local_nick ($) {
 	# don't show full FS path, basename should be OK:
@@ -571,14 +533,14 @@ sub cat_async_begin {
 	my ($self) = @_;
 	cleanup($self) if $self->alternates_changed;
 	die 'BUG: already in async' if $self->{inflight};
-	batch_prepare($self, []);
+	batch_prepare($self);
 }
 
 sub cat_async ($$$;$) {
 	my ($self, $oid, $cb, $arg) = @_;
 	my $inflight = $self->{inflight} // cat_async_begin($self);
 	substr($oid, 0, 0) = 'contents ' if $self->{-bc};
-	write_all($self, $self->{out}, $oid."\n", \&cat_async_step, $inflight);
+	write_all($self, $oid."\n", \&cat_async_step, $inflight);
 	push(@$inflight, $oid, $cb, $arg);
 }
 
@@ -648,7 +610,7 @@ sub manifest_entry {
 	}
 	my $dig = PublicInbox::SHA->new(1);
 	while (read($sr, $buf, 65536)) { $dig->add($buf) }
-	close $sr or return; # empty, uninitialized git repo
+	CORE::close $sr or return; # empty, uninitialized git repo
 	$ent->{fingerprint} = $dig->hexdigest;
 	$ent->{modified} = modified(undef, $mod);
 	chomp($buf = <$own> // '');
@@ -664,8 +626,10 @@ sub cleanup_if_unlinked {
 	# Linux-specific /proc/$PID/maps access
 	# TODO: support this inside git.git
 	my $ret = 0;
-	for my $fld (qw(pid pid_c)) {
-		my $pid = $self->{$fld} // next;
+	for my $obj ($self, ($self->{ck} // ())) {
+		my $sock = $obj->{sock} // next;
+		my PublicInbox::ProcessPipe $pp = tied *$sock; # ProcessPipe
+		my $pid = $pp->{pid} // next;
 		open my $fh, '<', "/proc/$pid/maps" or return cleanup($self, 1);
 		while (<$fh>) {
 			# n.b. we do not restart for unlinked multi-pack-index
@@ -679,6 +643,50 @@ sub cleanup_if_unlinked {
 	$ret;
 }
 
+sub event_step {
+	my ($self) = @_;
+	$self->close if !$self->{sock}; # process died while requeued
+	my $inflight = $self->{inflight};
+	if ($inflight && @$inflight) {
+		$self->cat_async_step($inflight);
+		return $self->close unless $self->{sock};
+		# more to do? requeue for fairness:
+		$self->requeue if @$inflight || exists($self->{rbuf});
+	}
+}
+
+# idempotently registers with DS epoll/kqueue/select/poll
+sub watch_async ($) {
+	$_[0]->{epwatch} //= do {
+		$_[0]->SUPER::new($_[0]->{sock}, EPOLLIN|EPOLLET);
+		\undef;
+	}
+}
+
+sub close {
+	my ($self) = @_;
+	if (my $q = $self->{inflight}) { # abort inflight requests
+		while (@$q) {
+			my ($req, $cb, $arg) = splice(@$q, 0, 3);
+			$req = $$req if ref($req);
+			$self->{-bc} and $req =~ s/\A(?:contents|info) //;
+			$req =~ s/ .*//; # drop git_dir for Gcf2Client
+			eval { $cb->(undef, $req, undef, undef, $arg) };
+			warn "E: (in abort) $req: $@" if $@;
+		}
+	}
+	delete @$self{qw(-bc err_c inflight rbuf)};
+	delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock});
+}
+
+package PublicInbox::GitCheck; # only for git <2.36
+use v5.12;
+our @ISA = qw(PublicInbox::Git);
+no warnings 'once';
+
+# for event_step
+*cat_async_step = \&PublicInbox::Git::check_async_step;
+
 1;
 __END__
 =pod
diff --git a/lib/PublicInbox/GitAsyncCat.pm b/lib/PublicInbox/GitAsyncCat.pm
index 671654b5..71ee1147 100644
--- a/lib/PublicInbox/GitAsyncCat.pm
+++ b/lib/PublicInbox/GitAsyncCat.pm
@@ -1,70 +1,12 @@
 # Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-#
-# internal class used by PublicInbox::Git + PublicInbox::DS
-# This parses the output pipe of "git cat-file --batch"
 package PublicInbox::GitAsyncCat;
 use v5.12;
-use parent qw(PublicInbox::DS Exporter);
-use PublicInbox::DS qw(awaitpid);
-use POSIX qw(WNOHANG);
-use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
+use parent qw(Exporter);
 our @EXPORT = qw(ibx_async_cat ibx_async_prefetch async_check);
-use PublicInbox::Git ();
 
 our $GCF2C; # singleton PublicInbox::Gcf2Client
 
-# close w/o aborting another git process
-sub vanish {
-	delete $_[0]->{git};
-	$_[0]->close;
-}
-
-sub close {
-	my ($self) = @_;
-	if (my $git = delete $self->{git}) {
-		$git->async_abort;
-	}
-	$self->SUPER::close; # PublicInbox::DS::close
-}
-
-sub aclose {
-	my (undef, $self, $f) = @_; # ignore PID ($_[0])
-	if (my $g = $self->{git}) {
-		return vanish($self) if ($g->{$f} // 0) != ($self->{sock} // 1);
-	}
-	$self->close;
-}
-
-sub event_step {
-	my ($self) = @_;
-	my $git = $self->{git} or return;
-	return vanish($self) if ($git->{in} // 0) != ($self->{sock} // 1);
-	my $inflight = $git->{inflight};
-	if ($inflight && @$inflight) {
-		$git->cat_async_step($inflight);
-
-		# child death?
-		if (($git->{in} // 0) != ($self->{sock} // 1)) {
-			vanish($self);
-		} elsif (@$inflight || exists $git->{rbuf}) {
-			# ok, more to do, requeue for fairness
-			$self->requeue;
-		}
-	}
-}
-
-sub watch_cat {
-	my ($git) = @_;
-	$git->{async_cat} //= do {
-		my $self = bless { git => $git }, __PACKAGE__;
-		$git->{in}->blocking(0);
-		$self->SUPER::new($git->{in}, EPOLLIN|EPOLLET);
-		awaitpid($git->{pid}, \&aclose, $self, 'in');
-		\undef; # this is a true ref()
-	};
-}
-
 sub ibx_async_cat ($$$$) {
 	my ($ibx, $oid, $cb, $arg) = @_;
 	my $git = $ibx->{git} // $ibx->git;
@@ -80,7 +22,7 @@ sub ibx_async_cat ($$$$) {
 		\undef;
 	} else { # read-only end of git-cat-file pipe
 		$git->cat_async($oid, $cb, $arg);
-		watch_cat($git);
+		$git->watch_async;
 	}
 }
 
@@ -88,14 +30,7 @@ sub async_check ($$$$) {
 	my ($ibx, $oidish, $cb, $arg) = @_; # $ibx may be $ctx
 	my $git = $ibx->{git} // $ibx->git;
 	$git->check_async($oidish, $cb, $arg);
-	return watch_cat($git) if $git->{-bc}; # --batch-command
-	$git->{async_chk} //= do {
-		my $self = bless { git => $git }, 'PublicInbox::GitAsyncCheck';
-		$git->{in_c}->blocking(0);
-		$self->SUPER::new($git->{in_c}, EPOLLIN|EPOLLET);
-		awaitpid($git->{pid_c}, \&aclose, $self, 'in_c');
-		\undef; # this is a true ref()
-	};
+	($git->{ck} // $git)->watch_async;
 }
 
 # this is safe to call inside $cb, but not guaranteed to enqueue
@@ -109,35 +44,10 @@ sub ibx_async_prefetch {
 			$oid .= " $git->{git_dir}\n";
 			return $GCF2C->gcf2_async(\$oid, $cb, $arg); # true
 		}
-	} elsif ($git->{async_cat}) {
+	} elsif ($git->{epwatch}) {
 		return $git->async_prefetch($oid, $cb, $arg);
 	}
 	undef;
 }
 
 1;
-package PublicInbox::GitAsyncCheck;
-use v5.12;
-our @ISA = qw(PublicInbox::GitAsyncCat);
-use POSIX qw(WNOHANG);
-use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
-
-sub event_step {
-	my ($self) = @_;
-	my $git = $self->{git} or return;
-	return $self->vanish if ($git->{in_c} // 0) != ($self->{sock} // 1);
-	my $inflight = $git->{inflight_c};
-	if ($inflight && @$inflight) {
-		$git->check_async_step($inflight);
-
-		# child death?
-		if (($git->{in_c} // 0) != ($self->{sock} // 1)) {
-			$self->vanish;
-		} elsif (@$inflight || exists $git->{rbuf_c}) {
-			# ok, more to do, requeue for fairness
-			$self->requeue;
-		}
-	}
-}
-
-1;
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 2dddf00b..98d0ac19 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -133,7 +133,7 @@ sub eml2mboxcl2 {
 sub git_to_mail { # git->cat_async callback
 	my ($bref, $oid, $type, $size, $smsg) = @_;
 	my $self = delete $smsg->{l2m} // die "BUG: no l2m";
-	$type // return; # called by git->async_abort
+	$type // return; # called by PublicInbox::Git::close
 	eval {
 		if ($type eq 'missing' &&
 			  ($bref = $self->{-lms_rw}->local_blob($oid, 1))) {
diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm
index c82ba0f8..ba75541f 100644
--- a/lib/PublicInbox/ProcessPipe.pm
+++ b/lib/PublicInbox/ProcessPipe.pm
@@ -1,8 +1,8 @@
 # Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
-# a tied handle for auto reaping of children tied to a read-only pipe,
-# see perltie(1).  Use ProcessPipe2 for bidirectional pipes/sockets
+# a tied handle for auto reaping of children tied to a pipe or socket.
+# See perltie(1).  Use ProcessPipe2 for bidirectional pipes.
 # for proper refcount and destruction ordering but no tie support
 package PublicInbox::ProcessPipe;
 use v5.12;
diff --git a/lib/PublicInbox/ViewVCS.pm b/lib/PublicInbox/ViewVCS.pm
index 5529ed5b..f80fb4cb 100644
--- a/lib/PublicInbox/ViewVCS.pm
+++ b/lib/PublicInbox/ViewVCS.pm
@@ -133,7 +133,7 @@ sub do_cat_async {
 	# favor git(1) over Gcf2 (libgit2) for SHA-256 support
 	$ctx->{git}->cat_async($_, $cb, $ctx) for @req;
 	if ($ctx->{env}->{'pi-httpd.async'}) {
-		PublicInbox::GitAsyncCat::watch_cat($ctx->{git});
+		$ctx->{git}->watch_async;
 	} else { # synchronous, generic PSGI
 		$ctx->{git}->cat_async_wait;
 	}

^ permalink raw reply related	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2023-09-30 14:53 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-09-30 14:52 [PATCH 1/3] process_pipe2 + import Eric Wong
2023-09-30 14:53 ` [PATCH 2/3] git: decouple cat_async_retry from POSIX pipe semantics Eric Wong
2023-09-30 14:53 ` [PATCH 3/3] git: use Unix stream sockets for `cat-file --batch-*' Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).