From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 78D471F565 for ; Sat, 30 Sep 2023 14:53:02 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1696085582; bh=fgu9oDcv6fzATwbbVuME4KLB83Y1PYN+6ZNUlOPC2iQ=; h=From:To:Subject:Date:In-Reply-To:References:From; b=PTS3okyVQ8AsLoR3IUwQjV9uwDO92tMSbdIkVrzSlGVpzxZZBCtl/UGi3xZgxk9Ha wq9/izyPTFn6QYN4NAjTB5JTfO2LlBAcnm4vf3I2WhOGbH1MmFSD0im7OyKGSARRYn Ue6PBQ7Wiq+OAzXG4LB9K53CRdI3dGwiQkJReYTM= From: Eric Wong To: spew@80x24.org Subject: [PATCH 3/3] git: use Unix stream sockets for `cat-file --batch-*' Date: Sat, 30 Sep 2023 14:53:01 +0000 Message-ID: <20230930145302.1642664-3-e@80x24.org> In-Reply-To: <20230930145302.1642664-1-e@80x24.org> References: <20230930145302.1642664-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: The benefit of 1MB potential pipe buffer size (on Linux) doesn't seem noticeable when reading from git (unlike when writing to v2 shards), so Unix stream sockets seem fine, here. This allows us to simplify our process management by using the same socket FD for reads and writes and enables us to use our ProcessPipe class for reaping (as we can do with Gcf2Client). Gcf2Client no longer relies on PublicInbox::DS for write buffering, and instead just waits for requests to complete once the number of inflight requests hits the MAX_INFLIGHT threshold as we do with PublicInbox::Git. We reuse the existing MAX_INFLIGHT limit (18) that was determined by the minimum allowed PIPE_BUF (512). (AFAIK) Unix stream sockets have no analogy to PIPE_BUF, but all *BSDs and Linux I've checked have default SO_RCVBUF and SO_SNDBUF values larger than the previously-required PIPE_BUF size of 512 bytes. --- lib/PublicInbox/Gcf2Client.pm | 57 ++------ lib/PublicInbox/Git.pm | 254 +++++++++++++++++---------------- lib/PublicInbox/GitAsyncCat.pm | 98 +------------ lib/PublicInbox/LeiToMail.pm | 2 +- lib/PublicInbox/ProcessPipe.pm | 4 +- lib/PublicInbox/ViewVCS.pm | 2 +- 6 files changed, 153 insertions(+), 264 deletions(-) diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm index a49e2aad..8f442787 100644 --- a/lib/PublicInbox/Gcf2Client.pm +++ b/lib/PublicInbox/Gcf2Client.pm @@ -3,14 +3,15 @@ # connects public-inbox processes to PublicInbox::Gcf2::loop() package PublicInbox::Gcf2Client; -use strict; +use v5.12; use parent qw(PublicInbox::DS); use PublicInbox::Git; use PublicInbox::Gcf2; # fails if Inline::C or libgit2-dev isn't available use PublicInbox::Spawn qw(spawn); use Socket qw(AF_UNIX SOCK_STREAM); use PublicInbox::Syscall qw(EPOLLIN EPOLLET); -use PublicInbox::DS qw(awaitpid); +use PublicInbox::ProcessPipe; + # fields: # sock => socket to Gcf2::loop # The rest of these fields are compatible with what PublicInbox::Git @@ -21,66 +22,36 @@ use PublicInbox::DS qw(awaitpid); # inflight => array (see PublicInbox::Git) # rbuf => scalarref, may be non-existent or empty sub new { - my ($rdr) = @_; + my ($opt) = @_; my $self = bless {}, __PACKAGE__; # ensure the child process has the same @INC we do: my $env = { PERL5LIB => join(':', @INC) }; my ($s1, $s2); socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair $!"; - $rdr //= {}; - $rdr->{0} = $rdr->{1} = $s2; - my $cmd = [$^X, qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]]; - $self->{'pid.owner'} = $$; - awaitpid($self->{pid} = spawn($cmd, $env, $rdr), undef); $s1->blocking(0); + $opt->{0} = $opt->{1} = $s2; + my $cmd = [$^X, qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]]; + my $pid = spawn($cmd, $env, $opt); + my $sock = PublicInbox::ProcessPipe->maybe_new($pid, $s1); $self->{inflight} = []; - $self->{in} = $s1; - $self->SUPER::new($s1, EPOLLIN|EPOLLET); -} - -sub fail { - my $self = shift; - $self->close; # PublicInbox::DS::close - PublicInbox::Git::fail($self, @_); + $self->{epwatch} = \undef; # for Git->cleanup + $self->SUPER::new($sock, EPOLLIN|EPOLLET); } sub gcf2_async ($$$;$) { my ($self, $req, $cb, $arg) = @_; my $inflight = $self->{inflight} or return $self->close; - - # {wbuf} is rare, I hope: - cat_async_step($self, $inflight) if $self->{wbuf}; - - $self->fail("gcf2c write: $!") if !$self->write($req) && !$self->{sock}; + PublicInbox::Git::write_all($self, $$req, \&cat_async_step, $inflight); push @$inflight, $req, $cb, $arg; } # ensure PublicInbox::Git::cat_async_step never calls cat_async_retry sub alternates_changed {} -# DS::event_loop will call this -sub event_step { - my ($self) = @_; - $self->flush_write; - $self->close if !$self->{in} || !$self->{sock}; # process died - my $inflight = $self->{inflight}; - if ($inflight && @$inflight) { - cat_async_step($self, $inflight); - return $self->close unless $self->{in}; # process died - - # ok, more to do, requeue for fairness - $self->requeue if @$inflight || exists($self->{rbuf}); - } -} - -sub DESTROY { - my ($self) = @_; - delete $self->{sock}; # if outside event_loop - PublicInbox::Git::DESTROY($self); -} - no warnings 'once'; -*cat_async_step = \&PublicInbox::Git::cat_async_step; +*cat_async_step = \&PublicInbox::Git::cat_async_step; # for event_step +*event_step = \&PublicInbox::Git::event_step; +*DESTROY = \&PublicInbox::Git::DESTROY; 1; diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index 8ac40d2b..3062f293 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -9,23 +9,23 @@ package PublicInbox::Git; use strict; use v5.10.1; -use parent qw(Exporter); +use parent qw(Exporter PublicInbox::DS); use POSIX (); -use IO::Handle; # ->autoflush +use IO::Handle; # ->blocking +use Socket qw(AF_UNIX SOCK_STREAM); +use PublicInbox::Syscall qw(EPOLLIN EPOLLET); use Errno qw(EINTR EAGAIN); use File::Glob qw(bsd_glob GLOB_NOSORT); use File::Spec (); use Time::HiRes qw(stat); -use PublicInbox::Spawn qw(popen_rd which); +use PublicInbox::Spawn qw(spawn popen_rd which); use PublicInbox::Tmpfile; use IO::Poll qw(POLLIN); use Carp qw(croak carp); use PublicInbox::SHA (); -use PublicInbox::DS qw(awaitpid); our %HEXLEN2SHA = (40 => 1, 64 => 256); our %OFMT2HEXLEN = (sha1 => 40, sha256 => 64); our @EXPORT_OK = qw(git_unquote git_quote %HEXLEN2SHA %OFMT2HEXLEN); -our $PIPE_BUFSIZ = 65536; # Linux default our $in_cleanup; our $RDTIMEO = 60_000; # milliseconds our $async_warn; # true in read-only daemons @@ -34,11 +34,8 @@ our $async_warn; # true in read-only daemons my @MODIFIED_DATE = qw[for-each-ref --sort=-committerdate --format=%(committerdate:raw) --count=1]; -# 512: POSIX PIPE_BUF minimum (see pipe(7)) -# 65: SHA-256 hex size + "\n" in preparation for git using non-SHA1 -# 3: @$inflight is flattened [ $OID, $cb, $arg ] use constant { - MAX_INFLIGHT => int(512 / (65 + length('contents '))) * 3, + MAX_INFLIGHT => 18, # arbitrary, formerly based on PIPE_BUF BATCH_CMD_VER => v2.36.0, # git 2.36+ }; @@ -65,7 +62,7 @@ sub check_git_exe () { if ($st ne $EXE_ST) { my $rd = popen_rd([ $GIT_EXE, '--version' ]); my $v = readline($rd); - close($rd) or die "$GIT_EXE --version: $?"; + CORE::close($rd) or die "$GIT_EXE --version: $?"; $v =~ /\b([0-9]+(?:\.[0-9]+){2})/ or die "$GIT_EXE --version output: $v # unparseable"; $GIT_VER = eval("v$1") // die "BUG: bad vstring: $1 ($v)"; @@ -144,17 +141,16 @@ sub last_check_err { $buf; } -sub _bidi_pipe { - my ($self, $batch, $in, $out, $pid, $err) = @_; - if (defined $self->{$pid}) { - Carp::cluck("BUG: self->{$pid} exists unexpectedly"); - return; - } - pipe(my ($out_r, $out_w)) or $self->fail("pipe failed: $!"); - my $rdr = { 0 => $out_r, pgid => 0 }; +sub _sock_cmd { + my ($self, $batch, $err_c) = @_; + $self->{sock} and Carp::confess('BUG: {sock} exists'); + my ($s1, $s2); + socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair $!"; + $s1->blocking(0); + my $opt = { pgid => 0, 0 => $s2, 1 => $s2 }; my $gd = $self->{git_dir}; if ($gd =~ s!/([^/]+/[^/]+)\z!/!) { - $rdr->{-C} = $gd; + $opt->{-C} = $gd; $gd = $1; } @@ -163,23 +159,13 @@ sub _bidi_pipe { my $abbr = $GIT_VER lt v2.31.0 ? 40 : 'no'; my @cmd = ($GIT_EXE, "--git-dir=$gd", '-c', "core.abbrev=$abbr", 'cat-file', "--$batch"); - if ($err) { + if ($err_c) { my $id = "git.$self->{git_dir}.$batch.err"; - $self->{$err} = $rdr->{2} = tmpfile($id, undef, 1) or + $self->{err_c} = $opt->{2} = tmpfile($id, undef, 1) or $self->fail("tmpfile($id): $!"); } - # see lib/PublicInbox/ProcessPipe.pm for why we don't use that here - my ($in_r, $p) = popen_rd(\@cmd, undef, $rdr); - awaitpid($self->{$pid} = $p, undef); - $self->{"$pid.owner"} = $$; - $out_w->autoflush(1); - if ($^O eq 'linux') { # 1031: F_SETPIPE_SZ - fcntl($out_w, 1031, 4096); - fcntl($in_r, 1031, 4096) if $batch eq 'batch-check'; - } - $out_w->blocking(0); - $self->{$out} = $out_w; - $self->{$in} = $in_r; + my $pid = spawn(\@cmd, undef, $opt); + $self->{sock} = PublicInbox::ProcessPipe->maybe_new($pid, $s1); } sub poll_in ($) { IO::Poll::_poll($RDTIMEO, fileno($_[0]), my $ev = POLLIN) } @@ -189,7 +175,7 @@ sub my_read ($$$) { my $left = $len - length($$rbuf); my $r; while ($left > 0) { - $r = sysread($fh, $$rbuf, $PIPE_BUFSIZ, length($$rbuf)); + $r = sysread($fh, $$rbuf, $left, length($$rbuf)); if ($r) { $left -= $r; } elsif (defined($r)) { # EOF @@ -210,8 +196,7 @@ sub my_readline ($$) { if ((my $n = index($$rbuf, "\n")) >= 0) { return substr($$rbuf, 0, $n + 1, ''); } - my $r = sysread($fh, $$rbuf, $PIPE_BUFSIZ, length($$rbuf)) - and next; + my $r = sysread($fh, $$rbuf, 65536, length($$rbuf)) and next; # return whatever's left on EOF return substr($$rbuf, 0, length($$rbuf)+1, '') if defined($r); @@ -229,11 +214,10 @@ sub cat_async_retry ($$) { # here to prevent cleanup() from waiting: delete $self->{inflight}; cleanup($self); - batch_prepare($self, my $new_inflight = []); + my $new_inflight = batch_prepare($self); while (my ($oid, $cb, $arg) = splice(@$old_inflight, 0, 3)) { - write_all($self, $self->{out}, $oid."\n", - \&cat_async_step, $new_inflight); + write_all($self, $oid."\n", \&cat_async_step, $new_inflight); $oid = \$oid if !@$new_inflight; # to indicate oid retried push @$new_inflight, $oid, $cb, $arg; } @@ -246,7 +230,7 @@ sub async_prefetch { my $inflight = $self->{inflight} or return; return if @$inflight; substr($oid, 0, 0) = 'contents ' if $self->{-bc}; - write_all($self, $self->{out}, "$oid\n", \&cat_async_step, $inflight); + write_all($self, "$oid\n", \&cat_async_step, $inflight); push(@$inflight, $oid, $cb, $arg); } @@ -256,14 +240,14 @@ sub cat_async_step ($$) { my ($req, $cb, $arg) = @$inflight[0, 1, 2]; my $rbuf = delete($self->{rbuf}) // \(my $new = ''); my ($bref, $oid, $type, $size); - my $head = my_readline($self->{in}, $rbuf); + my $head = my_readline($self->{sock}, $rbuf); my $cmd = ref($req) ? $$req : $req; # ->fail may be called via Gcf2Client.pm my $info = $self->{-bc} && substr($cmd, 0, 5) eq 'info '; if ($head =~ /^([0-9a-f]{40,}) (\S+) ([0-9]+)$/) { ($oid, $type, $size) = ($1, $2, $3 + 0); unless ($info) { # --batch-command - $bref = my_read($self->{in}, $rbuf, $size + 1) or + $bref = my_read($self->{sock}, $rbuf, $size + 1) or $self->fail(defined($bref) ? 'read EOF' : "read: $!"); chop($$bref) eq "\n" or @@ -302,16 +286,16 @@ sub cat_async_wait ($) { } } -sub batch_prepare ($$) { - my ($self, $inflight) = @_; +sub batch_prepare ($) { + my ($self) = @_; check_git_exe(); if ($GIT_VER ge BATCH_CMD_VER) { - _bidi_pipe($self, qw(batch-command in out pid err_c)); $self->{-bc} = 1; + _sock_cmd($self, 'batch-command', 1); } else { - _bidi_pipe($self, qw(batch in out pid)); + _sock_cmd($self, 'batch'); } - $self->{inflight} = $inflight; + $self->{inflight} = []; } sub _cat_file_cb { @@ -328,52 +312,59 @@ sub cat_file { } sub check_async_step ($$) { - my ($self, $inflight_c) = @_; - die 'BUG: inflight empty or odd' if scalar(@$inflight_c) < 3; - my ($req, $cb, $arg) = @$inflight_c[0, 1, 2]; - my $rbuf = delete($self->{rbuf_c}) // \(my $new = ''); - chomp(my $line = my_readline($self->{in_c}, $rbuf)); + my ($ck, $inflight) = @_; + die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3; + my ($req, $cb, $arg) = @$inflight[0, 1, 2]; + my $rbuf = delete($ck->{rbuf}) // \(my $new = ''); + chomp(my $line = my_readline($ck->{sock}, $rbuf)); my ($hex, $type, $size) = split(/ /, $line); # git <2.21 would show `dangling' (2.21+ shows `ambiguous') # https://public-inbox.org/git/20190118033845.s2vlrb3wd3m2jfzu@dcvr/T/ if ($hex eq 'dangling') { - my $ret = my_read($self->{in_c}, $rbuf, $type + 1); - $self->fail(defined($ret) ? 'read EOF' : "read: $!") if !$ret; + my $ret = my_read($ck->{sock}, $rbuf, $type + 1); + $ck->fail(defined($ret) ? 'read EOF' : "read: $!") if !$ret; } - $self->{rbuf_c} = $rbuf if $$rbuf ne ''; - splice(@$inflight_c, 0, 3); # don't retry $cb on ->fail + $ck->{rbuf} = $rbuf if $$rbuf ne ''; + splice(@$inflight, 0, 3); # don't retry $cb on ->fail eval { $cb->(undef, $hex, $type, $size, $arg) }; - async_err($self, $req, $hex, $@, 'check') if $@; + async_err($ck, $req, $hex, $@, 'check') if $@; } sub check_async_wait ($) { my ($self) = @_; return cat_async_wait($self) if $self->{-bc}; - my $inflight_c = $self->{inflight_c} or return; - check_async_step($self, $inflight_c) while (scalar(@$inflight_c)); + my $ck = $self->{ck} or return; + my $inflight = $ck->{inflight} or return; + check_async_step($ck, $inflight) while (scalar(@$inflight)); +} + +# git <2.36 +sub ck { + $_[0]->{ck} //= bless { git_dir => $_[0]->{git_dir} }, + 'PublicInbox::GitCheck'; } sub check_async_begin ($) { my ($self) = @_; - die 'BUG: already in async check' if $self->{inflight_c}; cleanup($self) if alternates_changed($self); check_git_exe(); if ($GIT_VER ge BATCH_CMD_VER) { - _bidi_pipe($self, qw(batch-command in out pid err_c)); $self->{-bc} = 1; - $self->{inflight} = []; + _sock_cmd($self, 'batch-command', 1); } else { - _bidi_pipe($self, qw(batch-check in_c out_c pid_c err_c)); - $self->{inflight_c} = []; + _sock_cmd($self = ck($self), 'batch-check', 1); } + $self->{inflight} = []; } sub write_all { - my ($self, $out, $buf, $read_step, $inflight) = @_; + my ($self, $buf, $read_step, $inflight) = @_; + $self->{sock} // Carp::confess 'BUG: no {sock}'; + Carp::confess('BUG: not an array') if ref($inflight) ne 'ARRAY'; $read_step->($self, $inflight) while @$inflight >= MAX_INFLIGHT; do { - my $w = syswrite($out, $buf); + my $w = syswrite($self->{sock}, $buf); if (defined $w) { return if $w == length($buf); substr($buf, 0, $w, ''); # sv_chop @@ -386,16 +377,17 @@ sub write_all { sub check_async ($$$$) { my ($self, $oid, $cb, $arg) = @_; - my $inflight = $self->{-bc} ? - ($self->{inflight} // cat_async_begin($self)) : - ($self->{inflight_c} // check_async_begin($self)); - if ($self->{-bc}) { + my $inflight; + if ($self->{-bc}) { # likely as time goes on +batch_command: + $inflight = $self->{inflight} // cat_async_begin($self); substr($oid, 0, 0) = 'info '; - write_all($self, $self->{out}, "$oid\n", - \&cat_async_step, $inflight); - } else { - write_all($self, $self->{out_c}, "$oid\n", - \&check_async_step, $inflight); + write_all($self, "$oid\n", \&cat_async_step, $inflight); + } else { # accounts for git upgrades while we're running: + my $ck = $self->{ck}; # undef OK, maybe set in check_async_begin + $inflight = $ck->{inflight} // check_async_begin($self); + goto batch_command if $self->{-bc}; + write_all($self->{ck}, "$oid\n", \&check_async_step, $inflight); } push(@$inflight, $oid, $cb, $arg); } @@ -418,39 +410,9 @@ sub check { ($hex, $type, $size); } -sub _destroy { - my ($self, $pid, @rest) = @_; # rest = rbuf, in, out, err - my ($p) = delete @$self{($pid, @rest)}; - - # GitAsyncCat::event_step may delete {$pid} - awaitpid($p) if defined($p) && $$ == $self->{"$pid.owner"}; -} - -sub async_abort ($) { - my ($self) = @_; - while (scalar(@{$self->{inflight_c} // []}) || - scalar(@{$self->{inflight} // []})) { - for my $c ('', '_c') { - my $q = $self->{"inflight$c"} or next; - while (@$q) { - my ($req, $cb, $arg) = splice(@$q, 0, 3); - $req = $$req if ref($req); - $self->{-bc} and - $req =~ s/\A(?:contents|info) //; - $req =~ s/ .*//; # drop git_dir for Gcf2Client - eval { $cb->(undef, $req, undef, undef, $arg) }; - warn "E: (in abort) $req: $@" if $@; - } - delete $self->{"inflight$c"}; - delete $self->{"rbuf$c"}; - } - } - cleanup($self); -} - -sub fail { # may be augmented in subclasses +sub fail { my ($self, $msg) = @_; - async_abort($self); + $self->close; croak(ref($self) . ' ' . ($self->{git_dir} // '') . ": $msg"); } @@ -475,12 +437,12 @@ sub qx { my $fh = popen(@_); if (wantarray) { my @ret = <$fh>; - close $fh; # caller should check $? + CORE::close $fh; # caller should check $? @ret; } else { local $/; my $ret = <$fh>; - close $fh; # caller should check $? + CORE::close $fh; # caller should check $? $ret; } } @@ -492,12 +454,16 @@ sub date_parse { } $self->qx('rev-parse', map { "--since=$_" } @_); } +sub _active ($) { + scalar(@{$_[0]->{inflight} // []}) || + ($_[0]->{ck} && scalar(@{$_[0]->{ck}->{inflight} // []})) +} + # check_async and cat_async may trigger the other, so ensure they're # both completely done by using this: sub async_wait_all ($) { my ($self) = @_; - while (scalar(@{$self->{inflight_c} // []}) || - scalar(@{$self->{inflight} // []})) { + while (_active($self)) { check_async_wait($self); cat_async_wait($self); } @@ -506,14 +472,10 @@ sub async_wait_all ($) { # returns true if there are pending "git cat-file" processes sub cleanup { my ($self, $lazy) = @_; - return 1 if $lazy && (scalar(@{$self->{inflight_c} // []}) || - scalar(@{$self->{inflight} // []})); + return 1 if $lazy && _active($self); local $in_cleanup = 1; - delete @$self{qw(async_cat async_chk)}; async_wait_all($self); - delete @$self{qw(inflight inflight_c -bc)}; - _destroy($self, qw(pid rbuf in out err_c)); - _destroy($self, qw(pid_c rbuf_c in_c out_c err_c)); + $_->close for ($self, (delete($self->{ck}) // ())); undef; } @@ -530,7 +492,7 @@ sub packed_bytes { $n } -sub DESTROY { cleanup(@_) } +sub DESTROY { cleanup($_[0]) } sub local_nick ($) { # don't show full FS path, basename should be OK: @@ -571,14 +533,14 @@ sub cat_async_begin { my ($self) = @_; cleanup($self) if $self->alternates_changed; die 'BUG: already in async' if $self->{inflight}; - batch_prepare($self, []); + batch_prepare($self); } sub cat_async ($$$;$) { my ($self, $oid, $cb, $arg) = @_; my $inflight = $self->{inflight} // cat_async_begin($self); substr($oid, 0, 0) = 'contents ' if $self->{-bc}; - write_all($self, $self->{out}, $oid."\n", \&cat_async_step, $inflight); + write_all($self, $oid."\n", \&cat_async_step, $inflight); push(@$inflight, $oid, $cb, $arg); } @@ -648,7 +610,7 @@ sub manifest_entry { } my $dig = PublicInbox::SHA->new(1); while (read($sr, $buf, 65536)) { $dig->add($buf) } - close $sr or return; # empty, uninitialized git repo + CORE::close $sr or return; # empty, uninitialized git repo $ent->{fingerprint} = $dig->hexdigest; $ent->{modified} = modified(undef, $mod); chomp($buf = <$own> // ''); @@ -664,8 +626,10 @@ sub cleanup_if_unlinked { # Linux-specific /proc/$PID/maps access # TODO: support this inside git.git my $ret = 0; - for my $fld (qw(pid pid_c)) { - my $pid = $self->{$fld} // next; + for my $obj ($self, ($self->{ck} // ())) { + my $sock = $obj->{sock} // next; + my PublicInbox::ProcessPipe $pp = tied *$sock; # ProcessPipe + my $pid = $pp->{pid} // next; open my $fh, '<', "/proc/$pid/maps" or return cleanup($self, 1); while (<$fh>) { # n.b. we do not restart for unlinked multi-pack-index @@ -679,6 +643,50 @@ sub cleanup_if_unlinked { $ret; } +sub event_step { + my ($self) = @_; + $self->close if !$self->{sock}; # process died while requeued + my $inflight = $self->{inflight}; + if ($inflight && @$inflight) { + $self->cat_async_step($inflight); + return $self->close unless $self->{sock}; + # more to do? requeue for fairness: + $self->requeue if @$inflight || exists($self->{rbuf}); + } +} + +# idempotently registers with DS epoll/kqueue/select/poll +sub watch_async ($) { + $_[0]->{epwatch} //= do { + $_[0]->SUPER::new($_[0]->{sock}, EPOLLIN|EPOLLET); + \undef; + } +} + +sub close { + my ($self) = @_; + if (my $q = $self->{inflight}) { # abort inflight requests + while (@$q) { + my ($req, $cb, $arg) = splice(@$q, 0, 3); + $req = $$req if ref($req); + $self->{-bc} and $req =~ s/\A(?:contents|info) //; + $req =~ s/ .*//; # drop git_dir for Gcf2Client + eval { $cb->(undef, $req, undef, undef, $arg) }; + warn "E: (in abort) $req: $@" if $@; + } + } + delete @$self{qw(-bc err_c inflight rbuf)}; + delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock}); +} + +package PublicInbox::GitCheck; # only for git <2.36 +use v5.12; +our @ISA = qw(PublicInbox::Git); +no warnings 'once'; + +# for event_step +*cat_async_step = \&PublicInbox::Git::check_async_step; + 1; __END__ =pod diff --git a/lib/PublicInbox/GitAsyncCat.pm b/lib/PublicInbox/GitAsyncCat.pm index 671654b5..71ee1147 100644 --- a/lib/PublicInbox/GitAsyncCat.pm +++ b/lib/PublicInbox/GitAsyncCat.pm @@ -1,70 +1,12 @@ # Copyright (C) all contributors # License: AGPL-3.0+ -# -# internal class used by PublicInbox::Git + PublicInbox::DS -# This parses the output pipe of "git cat-file --batch" package PublicInbox::GitAsyncCat; use v5.12; -use parent qw(PublicInbox::DS Exporter); -use PublicInbox::DS qw(awaitpid); -use POSIX qw(WNOHANG); -use PublicInbox::Syscall qw(EPOLLIN EPOLLET); +use parent qw(Exporter); our @EXPORT = qw(ibx_async_cat ibx_async_prefetch async_check); -use PublicInbox::Git (); our $GCF2C; # singleton PublicInbox::Gcf2Client -# close w/o aborting another git process -sub vanish { - delete $_[0]->{git}; - $_[0]->close; -} - -sub close { - my ($self) = @_; - if (my $git = delete $self->{git}) { - $git->async_abort; - } - $self->SUPER::close; # PublicInbox::DS::close -} - -sub aclose { - my (undef, $self, $f) = @_; # ignore PID ($_[0]) - if (my $g = $self->{git}) { - return vanish($self) if ($g->{$f} // 0) != ($self->{sock} // 1); - } - $self->close; -} - -sub event_step { - my ($self) = @_; - my $git = $self->{git} or return; - return vanish($self) if ($git->{in} // 0) != ($self->{sock} // 1); - my $inflight = $git->{inflight}; - if ($inflight && @$inflight) { - $git->cat_async_step($inflight); - - # child death? - if (($git->{in} // 0) != ($self->{sock} // 1)) { - vanish($self); - } elsif (@$inflight || exists $git->{rbuf}) { - # ok, more to do, requeue for fairness - $self->requeue; - } - } -} - -sub watch_cat { - my ($git) = @_; - $git->{async_cat} //= do { - my $self = bless { git => $git }, __PACKAGE__; - $git->{in}->blocking(0); - $self->SUPER::new($git->{in}, EPOLLIN|EPOLLET); - awaitpid($git->{pid}, \&aclose, $self, 'in'); - \undef; # this is a true ref() - }; -} - sub ibx_async_cat ($$$$) { my ($ibx, $oid, $cb, $arg) = @_; my $git = $ibx->{git} // $ibx->git; @@ -80,7 +22,7 @@ sub ibx_async_cat ($$$$) { \undef; } else { # read-only end of git-cat-file pipe $git->cat_async($oid, $cb, $arg); - watch_cat($git); + $git->watch_async; } } @@ -88,14 +30,7 @@ sub async_check ($$$$) { my ($ibx, $oidish, $cb, $arg) = @_; # $ibx may be $ctx my $git = $ibx->{git} // $ibx->git; $git->check_async($oidish, $cb, $arg); - return watch_cat($git) if $git->{-bc}; # --batch-command - $git->{async_chk} //= do { - my $self = bless { git => $git }, 'PublicInbox::GitAsyncCheck'; - $git->{in_c}->blocking(0); - $self->SUPER::new($git->{in_c}, EPOLLIN|EPOLLET); - awaitpid($git->{pid_c}, \&aclose, $self, 'in_c'); - \undef; # this is a true ref() - }; + ($git->{ck} // $git)->watch_async; } # this is safe to call inside $cb, but not guaranteed to enqueue @@ -109,35 +44,10 @@ sub ibx_async_prefetch { $oid .= " $git->{git_dir}\n"; return $GCF2C->gcf2_async(\$oid, $cb, $arg); # true } - } elsif ($git->{async_cat}) { + } elsif ($git->{epwatch}) { return $git->async_prefetch($oid, $cb, $arg); } undef; } 1; -package PublicInbox::GitAsyncCheck; -use v5.12; -our @ISA = qw(PublicInbox::GitAsyncCat); -use POSIX qw(WNOHANG); -use PublicInbox::Syscall qw(EPOLLIN EPOLLET); - -sub event_step { - my ($self) = @_; - my $git = $self->{git} or return; - return $self->vanish if ($git->{in_c} // 0) != ($self->{sock} // 1); - my $inflight = $git->{inflight_c}; - if ($inflight && @$inflight) { - $git->check_async_step($inflight); - - # child death? - if (($git->{in_c} // 0) != ($self->{sock} // 1)) { - $self->vanish; - } elsif (@$inflight || exists $git->{rbuf_c}) { - # ok, more to do, requeue for fairness - $self->requeue; - } - } -} - -1; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 2dddf00b..98d0ac19 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -133,7 +133,7 @@ sub eml2mboxcl2 { sub git_to_mail { # git->cat_async callback my ($bref, $oid, $type, $size, $smsg) = @_; my $self = delete $smsg->{l2m} // die "BUG: no l2m"; - $type // return; # called by git->async_abort + $type // return; # called by PublicInbox::Git::close eval { if ($type eq 'missing' && ($bref = $self->{-lms_rw}->local_blob($oid, 1))) { diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessPipe.pm index c82ba0f8..ba75541f 100644 --- a/lib/PublicInbox/ProcessPipe.pm +++ b/lib/PublicInbox/ProcessPipe.pm @@ -1,8 +1,8 @@ # Copyright (C) all contributors # License: AGPL-3.0+ -# a tied handle for auto reaping of children tied to a read-only pipe, -# see perltie(1). Use ProcessPipe2 for bidirectional pipes/sockets +# a tied handle for auto reaping of children tied to a pipe or socket. +# See perltie(1). Use ProcessPipe2 for bidirectional pipes. # for proper refcount and destruction ordering but no tie support package PublicInbox::ProcessPipe; use v5.12; diff --git a/lib/PublicInbox/ViewVCS.pm b/lib/PublicInbox/ViewVCS.pm index 5529ed5b..f80fb4cb 100644 --- a/lib/PublicInbox/ViewVCS.pm +++ b/lib/PublicInbox/ViewVCS.pm @@ -133,7 +133,7 @@ sub do_cat_async { # favor git(1) over Gcf2 (libgit2) for SHA-256 support $ctx->{git}->cat_async($_, $cb, $ctx) for @req; if ($ctx->{env}->{'pi-httpd.async'}) { - PublicInbox::GitAsyncCat::watch_cat($ctx->{git}); + $ctx->{git}->watch_async; } else { # synchronous, generic PSGI $ctx->{git}->cat_async_wait; }