about summary refs log tree commit homepage
path: root/lib/PublicInbox/Gcf2Client.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-09-30 15:20:39 +0000
committerEric Wong <e@80x24.org>2023-10-01 07:05:23 +0000
commit9b068be88b252355c1c62f804d4f081e9a20570d (patch)
tree4cf3ab9eebd8361bd9a3857c4d307783997be63c /lib/PublicInbox/Gcf2Client.pm
parentbf929e8ddfb9359a97cd8be3d3017c038564d52d (diff)
downloadpublic-inbox-9b068be88b252355c1c62f804d4f081e9a20570d.tar.gz
The benefit of 1MB potential pipe buffer size (on Linux) doesn't
seem noticeable when reading from git (unlike when writing to v2
shards), so Unix stream sockets seem fine, here.

This allows us to simplify our process management by using the
same socket FD for reads and writes and enables us to use our
ProcessPipe class for reaping (as we can do with Gcf2Client).

Gcf2Client no longer relies on PublicInbox::DS for write
buffering, and instead just waits for requests to complete
once the number of inflight requests hits the MAX_INFLIGHT
threshold as we do with PublicInbox::Git.

We reuse the existing MAX_INFLIGHT limit (18) that was
determined by the minimum allowed PIPE_BUF (512).  (AFAIK) Unix
stream sockets have no analogy to PIPE_BUF, but all *BSDs and
Linux I've checked have default SO_RCVBUF and SO_SNDBUF values
larger than the previously-required PIPE_BUF size of 512 bytes.
Diffstat (limited to 'lib/PublicInbox/Gcf2Client.pm')
-rw-r--r--lib/PublicInbox/Gcf2Client.pm57
1 files changed, 14 insertions, 43 deletions
diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index a49e2aad..8f442787 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -3,14 +3,15 @@
 
 # connects public-inbox processes to PublicInbox::Gcf2::loop()
 package PublicInbox::Gcf2Client;
-use strict;
+use v5.12;
 use parent qw(PublicInbox::DS);
 use PublicInbox::Git;
 use PublicInbox::Gcf2; # fails if Inline::C or libgit2-dev isn't available
 use PublicInbox::Spawn qw(spawn);
 use Socket qw(AF_UNIX SOCK_STREAM);
 use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
-use PublicInbox::DS qw(awaitpid);
+use PublicInbox::ProcessPipe;
+
 # fields:
 #        sock => socket to Gcf2::loop
 # The rest of these fields are compatible with what PublicInbox::Git
@@ -21,66 +22,36 @@ use PublicInbox::DS qw(awaitpid);
 #        inflight => array (see PublicInbox::Git)
 #        rbuf => scalarref, may be non-existent or empty
 sub new  {
-        my ($rdr) = @_;
+        my ($opt) = @_;
         my $self = bless {}, __PACKAGE__;
         # ensure the child process has the same @INC we do:
         my $env = { PERL5LIB => join(':', @INC) };
         my ($s1, $s2);
         socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair $!";
-        $rdr //= {};
-        $rdr->{0} = $rdr->{1} = $s2;
-        my $cmd = [$^X, qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]];
-        $self->{'pid.owner'} = $$;
-        awaitpid($self->{pid} = spawn($cmd, $env, $rdr), undef);
         $s1->blocking(0);
+        $opt->{0} = $opt->{1} = $s2;
+        my $cmd = [$^X, qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]];
+        my $pid = spawn($cmd, $env, $opt);
+        my $sock = PublicInbox::ProcessPipe->maybe_new($pid, $s1);
         $self->{inflight} = [];
-        $self->{in} = $s1;
-        $self->SUPER::new($s1, EPOLLIN|EPOLLET);
-}
-
-sub fail {
-        my $self = shift;
-        $self->close; # PublicInbox::DS::close
-        PublicInbox::Git::fail($self, @_);
+        $self->{epwatch} = \undef; # for Git->cleanup
+        $self->SUPER::new($sock, EPOLLIN|EPOLLET);
 }
 
 sub gcf2_async ($$$;$) {
         my ($self, $req, $cb, $arg) = @_;
         my $inflight = $self->{inflight} or return $self->close;
-
-        # {wbuf} is rare, I hope:
-        cat_async_step($self, $inflight) if $self->{wbuf};
-
-        $self->fail("gcf2c write: $!") if !$self->write($req) && !$self->{sock};
+        PublicInbox::Git::write_all($self, $$req, \&cat_async_step, $inflight);
         push @$inflight, $req, $cb, $arg;
 }
 
 # ensure PublicInbox::Git::cat_async_step never calls cat_async_retry
 sub alternates_changed {}
 
-# DS::event_loop will call this
-sub event_step {
-        my ($self) = @_;
-        $self->flush_write;
-        $self->close if !$self->{in} || !$self->{sock}; # process died
-        my $inflight = $self->{inflight};
-        if ($inflight && @$inflight) {
-                cat_async_step($self, $inflight);
-                return $self->close unless $self->{in}; # process died
-
-                # ok, more to do, requeue for fairness
-                $self->requeue if @$inflight || exists($self->{rbuf});
-        }
-}
-
-sub DESTROY {
-        my ($self) = @_;
-        delete $self->{sock}; # if outside event_loop
-        PublicInbox::Git::DESTROY($self);
-}
-
 no warnings 'once';
 
-*cat_async_step = \&PublicInbox::Git::cat_async_step;
+*cat_async_step = \&PublicInbox::Git::cat_async_step; # for event_step
+*event_step = \&PublicInbox::Git::event_step;
+*DESTROY = \&PublicInbox::Git::DESTROY;
 
 1;