about summary refs log tree commit homepage
path: root/lib/PublicInbox/GitAsyncCat.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-09-30 15:20:39 +0000
committerEric Wong <e@80x24.org>2023-10-01 07:05:23 +0000
commit9b068be88b252355c1c62f804d4f081e9a20570d (patch)
tree4cf3ab9eebd8361bd9a3857c4d307783997be63c /lib/PublicInbox/GitAsyncCat.pm
parentbf929e8ddfb9359a97cd8be3d3017c038564d52d (diff)
downloadpublic-inbox-9b068be88b252355c1c62f804d4f081e9a20570d.tar.gz
The benefit of 1MB potential pipe buffer size (on Linux) doesn't
seem noticeable when reading from git (unlike when writing to v2
shards), so Unix stream sockets seem fine, here.

This allows us to simplify our process management by using the
same socket FD for reads and writes and enables us to use our
ProcessPipe class for reaping (as we can do with Gcf2Client).

Gcf2Client no longer relies on PublicInbox::DS for write
buffering, and instead just waits for requests to complete
once the number of inflight requests hits the MAX_INFLIGHT
threshold as we do with PublicInbox::Git.

We reuse the existing MAX_INFLIGHT limit (18) that was
determined by the minimum allowed PIPE_BUF (512).  (AFAIK) Unix
stream sockets have no analogy to PIPE_BUF, but all *BSDs and
Linux I've checked have default SO_RCVBUF and SO_SNDBUF values
larger than the previously-required PIPE_BUF size of 512 bytes.
Diffstat (limited to 'lib/PublicInbox/GitAsyncCat.pm')
-rw-r--r--lib/PublicInbox/GitAsyncCat.pm98
1 files changed, 4 insertions, 94 deletions
diff --git a/lib/PublicInbox/GitAsyncCat.pm b/lib/PublicInbox/GitAsyncCat.pm
index 671654b5..71ee1147 100644
--- a/lib/PublicInbox/GitAsyncCat.pm
+++ b/lib/PublicInbox/GitAsyncCat.pm
@@ -1,70 +1,12 @@
 # Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-#
-# internal class used by PublicInbox::Git + PublicInbox::DS
-# This parses the output pipe of "git cat-file --batch"
 package PublicInbox::GitAsyncCat;
 use v5.12;
-use parent qw(PublicInbox::DS Exporter);
-use PublicInbox::DS qw(awaitpid);
-use POSIX qw(WNOHANG);
-use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
+use parent qw(Exporter);
 our @EXPORT = qw(ibx_async_cat ibx_async_prefetch async_check);
-use PublicInbox::Git ();
 
 our $GCF2C; # singleton PublicInbox::Gcf2Client
 
-# close w/o aborting another git process
-sub vanish {
-        delete $_[0]->{git};
-        $_[0]->close;
-}
-
-sub close {
-        my ($self) = @_;
-        if (my $git = delete $self->{git}) {
-                $git->async_abort;
-        }
-        $self->SUPER::close; # PublicInbox::DS::close
-}
-
-sub aclose {
-        my (undef, $self, $f) = @_; # ignore PID ($_[0])
-        if (my $g = $self->{git}) {
-                return vanish($self) if ($g->{$f} // 0) != ($self->{sock} // 1);
-        }
-        $self->close;
-}
-
-sub event_step {
-        my ($self) = @_;
-        my $git = $self->{git} or return;
-        return vanish($self) if ($git->{in} // 0) != ($self->{sock} // 1);
-        my $inflight = $git->{inflight};
-        if ($inflight && @$inflight) {
-                $git->cat_async_step($inflight);
-
-                # child death?
-                if (($git->{in} // 0) != ($self->{sock} // 1)) {
-                        vanish($self);
-                } elsif (@$inflight || exists $git->{rbuf}) {
-                        # ok, more to do, requeue for fairness
-                        $self->requeue;
-                }
-        }
-}
-
-sub watch_cat {
-        my ($git) = @_;
-        $git->{async_cat} //= do {
-                my $self = bless { git => $git }, __PACKAGE__;
-                $git->{in}->blocking(0);
-                $self->SUPER::new($git->{in}, EPOLLIN|EPOLLET);
-                awaitpid($git->{pid}, \&aclose, $self, 'in');
-                \undef; # this is a true ref()
-        };
-}
-
 sub ibx_async_cat ($$$$) {
         my ($ibx, $oid, $cb, $arg) = @_;
         my $git = $ibx->{git} // $ibx->git;
@@ -80,7 +22,7 @@ sub ibx_async_cat ($$$$) {
                 \undef;
         } else { # read-only end of git-cat-file pipe
                 $git->cat_async($oid, $cb, $arg);
-                watch_cat($git);
+                $git->watch_async;
         }
 }
 
@@ -88,14 +30,7 @@ sub async_check ($$$$) {
         my ($ibx, $oidish, $cb, $arg) = @_; # $ibx may be $ctx
         my $git = $ibx->{git} // $ibx->git;
         $git->check_async($oidish, $cb, $arg);
-        return watch_cat($git) if $git->{-bc}; # --batch-command
-        $git->{async_chk} //= do {
-                my $self = bless { git => $git }, 'PublicInbox::GitAsyncCheck';
-                $git->{in_c}->blocking(0);
-                $self->SUPER::new($git->{in_c}, EPOLLIN|EPOLLET);
-                awaitpid($git->{pid_c}, \&aclose, $self, 'in_c');
-                \undef; # this is a true ref()
-        };
+        ($git->{ck} // $git)->watch_async;
 }
 
 # this is safe to call inside $cb, but not guaranteed to enqueue
@@ -109,35 +44,10 @@ sub ibx_async_prefetch {
                         $oid .= " $git->{git_dir}\n";
                         return $GCF2C->gcf2_async(\$oid, $cb, $arg); # true
                 }
-        } elsif ($git->{async_cat}) {
+        } elsif ($git->{epwatch}) {
                 return $git->async_prefetch($oid, $cb, $arg);
         }
         undef;
 }
 
 1;
-package PublicInbox::GitAsyncCheck;
-use v5.12;
-our @ISA = qw(PublicInbox::GitAsyncCat);
-use POSIX qw(WNOHANG);
-use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
-
-sub event_step {
-        my ($self) = @_;
-        my $git = $self->{git} or return;
-        return $self->vanish if ($git->{in_c} // 0) != ($self->{sock} // 1);
-        my $inflight = $git->{inflight_c};
-        if ($inflight && @$inflight) {
-                $git->check_async_step($inflight);
-
-                # child death?
-                if (($git->{in_c} // 0) != ($self->{sock} // 1)) {
-                        $self->vanish;
-                } elsif (@$inflight || exists $git->{rbuf_c}) {
-                        # ok, more to do, requeue for fairness
-                        $self->requeue;
-                }
-        }
-}
-
-1;