dumping ground for random patches and texts
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: spew@80x24.org
Subject: [PATCH 3/6] process_pipe2 + import
Date: Fri,  6 Oct 2023 09:07:23 +0000	[thread overview]
Message-ID: <20231006090726.3936839-3-e@80x24.org> (raw)
In-Reply-To: <20231006090726.3936839-1-e@80x24.org>

---
 MANIFEST                        |  1 +
 lib/PublicInbox/Import.pm       | 42 +++++++++++++--------------------
 lib/PublicInbox/ProcessPipe.pm  | 16 ++++++++-----
 lib/PublicInbox/ProcessPipe2.pm | 29 +++++++++++++++++++++++
 4 files changed, 57 insertions(+), 31 deletions(-)
 create mode 100644 lib/PublicInbox/ProcessPipe2.pm

diff --git a/MANIFEST b/MANIFEST
index 4693cbe0..2b180f72 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -319,6 +319,7 @@ lib/PublicInbox/POP3.pm
 lib/PublicInbox/POP3D.pm
 lib/PublicInbox/PktOp.pm
 lib/PublicInbox/ProcessPipe.pm
+lib/PublicInbox/ProcessPipe2.pm
 lib/PublicInbox/Qspawn.pm
 lib/PublicInbox/Reply.pm
 lib/PublicInbox/RepoAtom.pm
diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index 59462e9a..2386983a 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -6,9 +6,8 @@
 # and public-inbox-watch. Not the WWW or NNTP code which only
 # requires read-only access.
 package PublicInbox::Import;
-use strict;
-use parent qw(PublicInbox::Lock);
-use v5.10.1;
+use v5.12;
+use parent qw(PublicInbox::Lock PublicInbox::ProcessPipe2);
 use PublicInbox::Spawn qw(run_die popen_rd);
 use PublicInbox::MID qw(mids mid2path);
 use PublicInbox::Address;
@@ -56,10 +55,7 @@ sub new {
 sub gfi_start {
 	my ($self) = @_;
 
-	return ($self->{in}, $self->{out}) if $self->{in};
-
-	my ($in_r, $out_r, $out_w);
-	pipe($out_r, $out_w) or die "pipe failed: $!";
+	return @$self{qw(pp2_rd pp2_wr)} if $self->{pp2_rd};
 
 	$self->lock_acquire;
 	eval {
@@ -72,18 +68,15 @@ sub gfi_start {
 			die "fatal: ls-tree -r -z --name-only $ref: \$?=$?" if $?;
 			$self->{-tree} = { map { $_ => 1 } split(/\0/, $t) };
 		}
-		$in_r = $self->{in} = $git->popen(qw(fast-import
-					--quiet --done --date-format=raw),
-					undef, { 0 => $out_r });
-		$out_w->autoflush(1);
-		$self->{out} = $out_w;
+		$self->popen_rw(['git', "--git-dir=$git->{git_dir}",
+			qw(fast-import --quiet --done --date-format=raw)]);
 		$self->{nchg} = 0;
 	};
 	if ($@) {
 		$self->lock_release;
 		die $@;
 	}
-	($in_r, $out_w);
+	@$self{qw(pp2_rd pp2_wr)};
 }
 
 sub wfail () { die "write to fast-import failed: $!" }
@@ -157,16 +150,16 @@ sub check_remove_v1 {
 
 sub checkpoint {
 	my ($self) = @_;
-	return unless $self->{in};
-	print { $self->{out} } "checkpoint\n" or wfail;
+	return unless $self->{pp2_rd};
+	print { $self->{pp2_wr} } "checkpoint\n" or wfail;
 	undef;
 }
 
 sub progress {
 	my ($self, $msg) = @_;
-	return unless $self->{in};
-	print { $self->{out} } "progress $msg\n" or wfail;
-	readline($self->{in}) eq "progress $msg\n" or die
+	return unless $self->{pp2_rd};
+	print { $self->{pp2_wr} } "progress $msg\n" or wfail;
+	readline($self->{pp2_rd}) eq "progress $msg\n" or die
 		"progress $msg not received\n";
 	undef;
 }
@@ -216,7 +209,7 @@ sub barrier {
 # used for v2
 sub get_mark {
 	my ($self, $mark) = @_;
-	die "not active\n" unless $self->{in};
+	die "not active\n" unless $self->{pp2_rd};
 	my ($r, $w) = $self->gfi_start;
 	print $w "get-mark $mark\n" or wfail;
 	my $oid = <$r> // die "get-mark failed, need git 2.6.0+\n";
@@ -486,15 +479,14 @@ EOM
 }
 
 # true if locked and active
-sub active { !!$_[0]->{out} }
+sub active { !!$_[0]->{pp2_wr} }
 
 sub done {
 	my ($self) = @_;
-	my $w = delete $self->{out} or return;
+	$self->{pp2_wr} or return;
 	eval {
-		my $r = delete $self->{in} or die 'BUG: missing {in} when done';
-		print $w "done\n" or wfail;
-		close $r or die "fast-import failed: $?"; # ProcessPipe::CLOSE
+		print { $self->{pp2_wr} } "done\n" or wfail;
+		$self->close_wait or die "fast-import failed: $?";
 	};
 	my $wait_err = $@;
 	my $nchg = delete $self->{nchg};
@@ -509,7 +501,7 @@ sub done {
 
 sub atfork_child {
 	my ($self) = @_;
-	foreach my $f (qw(in out)) {
+	for my $f (qw(pp_rd pp_wr)) {
 		next unless defined($self->{$f});
 		close $self->{$f} or die "failed to close import[$f]: $!\n";
 	}
diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm
index ba2c1ecb..12e03cc5 100644
--- a/lib/PublicInbox/ProcessPipe.pm
+++ b/lib/PublicInbox/ProcessPipe.pm
@@ -1,10 +1,9 @@
 # Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
-# a tied handle for auto reaping of children tied to a read-only pipe, see perltie(1)
-# DO NOT use this as-is for bidirectional pipes/sockets (e.g. in PublicInbox::Git),
-# both ends of the pipe must be at the same level of the Perl object hierarchy
-# to ensure orderly destruction.
+# a tied handle for auto reaping of children tied to a read-only pipe,
+# see perltie(1).  Use ProcessPipe2 for bidirectional pipes/sockets
+# for proper refcount and destruction ordering but no tie support
 package PublicInbox::ProcessPipe;
 use v5.12;
 use PublicInbox::DS qw(awaitpid);
@@ -57,8 +56,13 @@ sub FILENO { fileno($_[0]->{fh}) }
 
 sub _close ($;$) {
 	my ($self, $wait) = @_;
-	my ($fh, $pid) = delete(@$self{qw(fh pid)});
-	my $ret = (defined($fh) && $wait) ? close($fh) : ($fh = '');
+	my ($fh, $pid, $rd, $wr) = delete(@$self{qw(fh pid pp2_rd pp2_wr)});
+	my $ret = (defined($fh) && $wait) ? close($fh) : do {
+		$fh = '';
+		my $n = ((defined($rd) && $wait) ? close($rd) : ($rd = 1)).
+			((defined($wr) && $wait) ? close($wr) : ($wr = 1));
+		$n eq '11' ? 1 : '';
+	};
 	return $ret unless defined($pid) && $self->{ppid} == $$;
 	if ($wait) { # caller cares about the exit status:
 		# synchronous wait via defined(wantarray) on awaitpid:
diff --git a/lib/PublicInbox/ProcessPipe2.pm b/lib/PublicInbox/ProcessPipe2.pm
new file mode 100644
index 00000000..2eddf7ff
--- /dev/null
+++ b/lib/PublicInbox/ProcessPipe2.pm
@@ -0,0 +1,29 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# non-tied version of ProcessPipe for "bidirectional" pipes used by
+# `cat-file --batch-*', fast-import, etc..
+package PublicInbox::ProcessPipe2;
+use v5.12;
+use parent qw(PublicInbox::ProcessPipe);
+use autodie qw(pipe);
+use PublicInbox::Spawn qw(spawn);
+use PublicInbox::DS qw(awaitpid);
+
+sub popen_rw {
+	my ($self, $cmd, $env, $opt) = @_;
+	pipe($self->{pp2_rd}, local $opt->{1});
+	pipe(local $opt->{0}, $self->{pp2_wr});
+	$self->{pp2_wr}->autoflush(1);
+	$self->{ppid} = $$;
+	$self->{pp_chld_err} = \(my $err);
+	my $pid = $self->{pid} = spawn($cmd, $env, $opt);
+	awaitpid($pid, $self->can('waitcb'), \$err, @{$opt->{cb_arg} // []});
+	undef;
+}
+
+sub close_wait { $_[0]->_close(1) }
+
+# DESTROY is inherited from ProcessPipe
+
+1;

  parent reply	other threads:[~2023-10-06  9:07 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-10-06  9:07 [PATCH 1/6] ipc: require fork+SOCK_SEQPACKET for wq_* functions Eric Wong
2023-10-06  9:07 ` [PATCH 2/6] ipc: use autodie for most syscalls Eric Wong
2023-10-06  9:07 ` Eric Wong [this message]
2023-10-06  9:07 ` [PATCH 4/6] import: use autodie, rely on PerlIO for retries Eric Wong
2023-10-06  9:07 ` [PATCH 5/6] makefile: check-run skips makefile checks Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20231006090726.3936839-3-e@80x24.org \
    --to=e@80x24.org \
    --cc=spew@80x24.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).