From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 0C1DE1F47D for ; Fri, 6 Oct 2023 09:07:27 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1696583247; bh=bSmAa5Gm+cioStNEonKvnf1TJ6rikfzWjLu0T02jv2I=; h=From:To:Subject:Date:In-Reply-To:References:From; b=Z+pH1DjVNQW3Q+VditbHE0JNPeBs8cOsPwLiOoSFI0AB5PZWvhqWcV/1I6a6ZffzU 0x4QEDQrOmoUGxHSfBmdI5FNEbkFA+xmRdZf4WKvMfKT5K9N67T0LrtQkW/1kwvebl Dhg10GmMSldZPtJS8d2oRBPLDcMKsC+ubROCYKGc= From: Eric Wong To: spew@80x24.org Subject: [PATCH 3/6] process_pipe2 + import Date: Fri, 6 Oct 2023 09:07:23 +0000 Message-ID: <20231006090726.3936839-3-e@80x24.org> In-Reply-To: <20231006090726.3936839-1-e@80x24.org> References: <20231006090726.3936839-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: --- MANIFEST | 1 + lib/PublicInbox/Import.pm | 42 +++++++++++++-------------------- lib/PublicInbox/ProcessPipe.pm | 16 ++++++++----- lib/PublicInbox/ProcessPipe2.pm | 29 +++++++++++++++++++++++ 4 files changed, 57 insertions(+), 31 deletions(-) create mode 100644 lib/PublicInbox/ProcessPipe2.pm diff --git a/MANIFEST b/MANIFEST index 4693cbe0..2b180f72 100644 --- a/MANIFEST +++ b/MANIFEST @@ -319,6 +319,7 @@ lib/PublicInbox/POP3.pm lib/PublicInbox/POP3D.pm lib/PublicInbox/PktOp.pm lib/PublicInbox/ProcessPipe.pm +lib/PublicInbox/ProcessPipe2.pm lib/PublicInbox/Qspawn.pm lib/PublicInbox/Reply.pm lib/PublicInbox/RepoAtom.pm diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm index 59462e9a..2386983a 100644 --- a/lib/PublicInbox/Import.pm +++ b/lib/PublicInbox/Import.pm @@ -6,9 +6,8 @@ # and public-inbox-watch. Not the WWW or NNTP code which only # requires read-only access. package PublicInbox::Import; -use strict; -use parent qw(PublicInbox::Lock); -use v5.10.1; +use v5.12; +use parent qw(PublicInbox::Lock PublicInbox::ProcessPipe2); use PublicInbox::Spawn qw(run_die popen_rd); use PublicInbox::MID qw(mids mid2path); use PublicInbox::Address; @@ -56,10 +55,7 @@ sub new { sub gfi_start { my ($self) = @_; - return ($self->{in}, $self->{out}) if $self->{in}; - - my ($in_r, $out_r, $out_w); - pipe($out_r, $out_w) or die "pipe failed: $!"; + return @$self{qw(pp2_rd pp2_wr)} if $self->{pp2_rd}; $self->lock_acquire; eval { @@ -72,18 +68,15 @@ sub gfi_start { die "fatal: ls-tree -r -z --name-only $ref: \$?=$?" if $?; $self->{-tree} = { map { $_ => 1 } split(/\0/, $t) }; } - $in_r = $self->{in} = $git->popen(qw(fast-import - --quiet --done --date-format=raw), - undef, { 0 => $out_r }); - $out_w->autoflush(1); - $self->{out} = $out_w; + $self->popen_rw(['git', "--git-dir=$git->{git_dir}", + qw(fast-import --quiet --done --date-format=raw)]); $self->{nchg} = 0; }; if ($@) { $self->lock_release; die $@; } - ($in_r, $out_w); + @$self{qw(pp2_rd pp2_wr)}; } sub wfail () { die "write to fast-import failed: $!" } @@ -157,16 +150,16 @@ sub check_remove_v1 { sub checkpoint { my ($self) = @_; - return unless $self->{in}; - print { $self->{out} } "checkpoint\n" or wfail; + return unless $self->{pp2_rd}; + print { $self->{pp2_wr} } "checkpoint\n" or wfail; undef; } sub progress { my ($self, $msg) = @_; - return unless $self->{in}; - print { $self->{out} } "progress $msg\n" or wfail; - readline($self->{in}) eq "progress $msg\n" or die + return unless $self->{pp2_rd}; + print { $self->{pp2_wr} } "progress $msg\n" or wfail; + readline($self->{pp2_rd}) eq "progress $msg\n" or die "progress $msg not received\n"; undef; } @@ -216,7 +209,7 @@ sub barrier { # used for v2 sub get_mark { my ($self, $mark) = @_; - die "not active\n" unless $self->{in}; + die "not active\n" unless $self->{pp2_rd}; my ($r, $w) = $self->gfi_start; print $w "get-mark $mark\n" or wfail; my $oid = <$r> // die "get-mark failed, need git 2.6.0+\n"; @@ -486,15 +479,14 @@ EOM } # true if locked and active -sub active { !!$_[0]->{out} } +sub active { !!$_[0]->{pp2_wr} } sub done { my ($self) = @_; - my $w = delete $self->{out} or return; + $self->{pp2_wr} or return; eval { - my $r = delete $self->{in} or die 'BUG: missing {in} when done'; - print $w "done\n" or wfail; - close $r or die "fast-import failed: $?"; # ProcessPipe::CLOSE + print { $self->{pp2_wr} } "done\n" or wfail; + $self->close_wait or die "fast-import failed: $?"; }; my $wait_err = $@; my $nchg = delete $self->{nchg}; @@ -509,7 +501,7 @@ sub done { sub atfork_child { my ($self) = @_; - foreach my $f (qw(in out)) { + for my $f (qw(pp_rd pp_wr)) { next unless defined($self->{$f}); close $self->{$f} or die "failed to close import[$f]: $!\n"; } diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm index ba2c1ecb..12e03cc5 100644 --- a/lib/PublicInbox/ProcessPipe.pm +++ b/lib/PublicInbox/ProcessPipe.pm @@ -1,10 +1,9 @@ # Copyright (C) all contributors # License: AGPL-3.0+ -# a tied handle for auto reaping of children tied to a read-only pipe, see perltie(1) -# DO NOT use this as-is for bidirectional pipes/sockets (e.g. in PublicInbox::Git), -# both ends of the pipe must be at the same level of the Perl object hierarchy -# to ensure orderly destruction. +# a tied handle for auto reaping of children tied to a read-only pipe, +# see perltie(1). Use ProcessPipe2 for bidirectional pipes/sockets +# for proper refcount and destruction ordering but no tie support package PublicInbox::ProcessPipe; use v5.12; use PublicInbox::DS qw(awaitpid); @@ -57,8 +56,13 @@ sub FILENO { fileno($_[0]->{fh}) } sub _close ($;$) { my ($self, $wait) = @_; - my ($fh, $pid) = delete(@$self{qw(fh pid)}); - my $ret = (defined($fh) && $wait) ? close($fh) : ($fh = ''); + my ($fh, $pid, $rd, $wr) = delete(@$self{qw(fh pid pp2_rd pp2_wr)}); + my $ret = (defined($fh) && $wait) ? close($fh) : do { + $fh = ''; + my $n = ((defined($rd) && $wait) ? close($rd) : ($rd = 1)). + ((defined($wr) && $wait) ? close($wr) : ($wr = 1)); + $n eq '11' ? 1 : ''; + }; return $ret unless defined($pid) && $self->{ppid} == $$; if ($wait) { # caller cares about the exit status: # synchronous wait via defined(wantarray) on awaitpid: diff --git a/lib/PublicInbox/ProcessPipe2.pm b/lib/PublicInbox/ProcessPipe2.pm new file mode 100644 index 00000000..2eddf7ff --- /dev/null +++ b/lib/PublicInbox/ProcessPipe2.pm @@ -0,0 +1,29 @@ +# Copyright (C) all contributors +# License: AGPL-3.0+ + +# non-tied version of ProcessPipe for "bidirectional" pipes used by +# `cat-file --batch-*', fast-import, etc.. +package PublicInbox::ProcessPipe2; +use v5.12; +use parent qw(PublicInbox::ProcessPipe); +use autodie qw(pipe); +use PublicInbox::Spawn qw(spawn); +use PublicInbox::DS qw(awaitpid); + +sub popen_rw { + my ($self, $cmd, $env, $opt) = @_; + pipe($self->{pp2_rd}, local $opt->{1}); + pipe(local $opt->{0}, $self->{pp2_wr}); + $self->{pp2_wr}->autoflush(1); + $self->{ppid} = $$; + $self->{pp_chld_err} = \(my $err); + my $pid = $self->{pid} = spawn($cmd, $env, $opt); + awaitpid($pid, $self->can('waitcb'), \$err, @{$opt->{cb_arg} // []}); + undef; +} + +sub close_wait { $_[0]->_close(1) } + +# DESTROY is inherited from ProcessPipe + +1;