From 9b068be88b252355c1c62f804d4f081e9a20570d Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 30 Sep 2023 15:20:39 +0000 Subject: git: use Unix stream sockets for `cat-file --batch-*' The benefit of 1MB potential pipe buffer size (on Linux) doesn't seem noticeable when reading from git (unlike when writing to v2 shards), so Unix stream sockets seem fine, here. This allows us to simplify our process management by using the same socket FD for reads and writes and enables us to use our ProcessPipe class for reaping (as we can do with Gcf2Client). Gcf2Client no longer relies on PublicInbox::DS for write buffering, and instead just waits for requests to complete once the number of inflight requests hits the MAX_INFLIGHT threshold as we do with PublicInbox::Git. We reuse the existing MAX_INFLIGHT limit (18) that was determined by the minimum allowed PIPE_BUF (512). (AFAIK) Unix stream sockets have no analogy to PIPE_BUF, but all *BSDs and Linux I've checked have default SO_RCVBUF and SO_SNDBUF values larger than the previously-required PIPE_BUF size of 512 bytes. --- lib/PublicInbox/GitAsyncCat.pm | 98 ++---------------------------------------- 1 file changed, 4 insertions(+), 94 deletions(-) (limited to 'lib/PublicInbox/GitAsyncCat.pm') diff --git a/lib/PublicInbox/GitAsyncCat.pm b/lib/PublicInbox/GitAsyncCat.pm index 671654b5..71ee1147 100644 --- a/lib/PublicInbox/GitAsyncCat.pm +++ b/lib/PublicInbox/GitAsyncCat.pm @@ -1,70 +1,12 @@ # Copyright (C) all contributors # License: AGPL-3.0+ -# -# internal class used by PublicInbox::Git + PublicInbox::DS -# This parses the output pipe of "git cat-file --batch" package PublicInbox::GitAsyncCat; use v5.12; -use parent qw(PublicInbox::DS Exporter); -use PublicInbox::DS qw(awaitpid); -use POSIX qw(WNOHANG); -use PublicInbox::Syscall qw(EPOLLIN EPOLLET); +use parent qw(Exporter); our @EXPORT = qw(ibx_async_cat ibx_async_prefetch async_check); -use PublicInbox::Git (); our $GCF2C; # singleton PublicInbox::Gcf2Client -# close w/o aborting another git process -sub vanish { - delete $_[0]->{git}; - $_[0]->close; -} - -sub close { - my ($self) = @_; - if (my $git = delete $self->{git}) { - $git->async_abort; - } - $self->SUPER::close; # PublicInbox::DS::close -} - -sub aclose { - my (undef, $self, $f) = @_; # ignore PID ($_[0]) - if (my $g = $self->{git}) { - return vanish($self) if ($g->{$f} // 0) != ($self->{sock} // 1); - } - $self->close; -} - -sub event_step { - my ($self) = @_; - my $git = $self->{git} or return; - return vanish($self) if ($git->{in} // 0) != ($self->{sock} // 1); - my $inflight = $git->{inflight}; - if ($inflight && @$inflight) { - $git->cat_async_step($inflight); - - # child death? - if (($git->{in} // 0) != ($self->{sock} // 1)) { - vanish($self); - } elsif (@$inflight || exists $git->{rbuf}) { - # ok, more to do, requeue for fairness - $self->requeue; - } - } -} - -sub watch_cat { - my ($git) = @_; - $git->{async_cat} //= do { - my $self = bless { git => $git }, __PACKAGE__; - $git->{in}->blocking(0); - $self->SUPER::new($git->{in}, EPOLLIN|EPOLLET); - awaitpid($git->{pid}, \&aclose, $self, 'in'); - \undef; # this is a true ref() - }; -} - sub ibx_async_cat ($$$$) { my ($ibx, $oid, $cb, $arg) = @_; my $git = $ibx->{git} // $ibx->git; @@ -80,7 +22,7 @@ sub ibx_async_cat ($$$$) { \undef; } else { # read-only end of git-cat-file pipe $git->cat_async($oid, $cb, $arg); - watch_cat($git); + $git->watch_async; } } @@ -88,14 +30,7 @@ sub async_check ($$$$) { my ($ibx, $oidish, $cb, $arg) = @_; # $ibx may be $ctx my $git = $ibx->{git} // $ibx->git; $git->check_async($oidish, $cb, $arg); - return watch_cat($git) if $git->{-bc}; # --batch-command - $git->{async_chk} //= do { - my $self = bless { git => $git }, 'PublicInbox::GitAsyncCheck'; - $git->{in_c}->blocking(0); - $self->SUPER::new($git->{in_c}, EPOLLIN|EPOLLET); - awaitpid($git->{pid_c}, \&aclose, $self, 'in_c'); - \undef; # this is a true ref() - }; + ($git->{ck} // $git)->watch_async; } # this is safe to call inside $cb, but not guaranteed to enqueue @@ -109,35 +44,10 @@ sub ibx_async_prefetch { $oid .= " $git->{git_dir}\n"; return $GCF2C->gcf2_async(\$oid, $cb, $arg); # true } - } elsif ($git->{async_cat}) { + } elsif ($git->{epwatch}) { return $git->async_prefetch($oid, $cb, $arg); } undef; } 1; -package PublicInbox::GitAsyncCheck; -use v5.12; -our @ISA = qw(PublicInbox::GitAsyncCat); -use POSIX qw(WNOHANG); -use PublicInbox::Syscall qw(EPOLLIN EPOLLET); - -sub event_step { - my ($self) = @_; - my $git = $self->{git} or return; - return $self->vanish if ($git->{in_c} // 0) != ($self->{sock} // 1); - my $inflight = $git->{inflight_c}; - if ($inflight && @$inflight) { - $git->check_async_step($inflight); - - # child death? - if (($git->{in_c} // 0) != ($self->{sock} // 1)) { - $self->vanish; - } elsif (@$inflight || exists $git->{rbuf_c}) { - # ok, more to do, requeue for fairness - $self->requeue; - } - } -} - -1; -- cgit v1.2.3-24-ge0c7