diff options
author | Eric Wong <e@80x24.org> | 2023-09-30 15:20:39 +0000 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2023-10-01 07:05:23 +0000 |
commit | 9b068be88b252355c1c62f804d4f081e9a20570d (patch) | |
tree | 4cf3ab9eebd8361bd9a3857c4d307783997be63c /lib/PublicInbox/Gcf2Client.pm | |
parent | bf929e8ddfb9359a97cd8be3d3017c038564d52d (diff) | |
download | public-inbox-9b068be88b252355c1c62f804d4f081e9a20570d.tar.gz |
The benefit of 1MB potential pipe buffer size (on Linux) doesn't seem noticeable when reading from git (unlike when writing to v2 shards), so Unix stream sockets seem fine, here. This allows us to simplify our process management by using the same socket FD for reads and writes and enables us to use our ProcessPipe class for reaping (as we can do with Gcf2Client). Gcf2Client no longer relies on PublicInbox::DS for write buffering, and instead just waits for requests to complete once the number of inflight requests hits the MAX_INFLIGHT threshold as we do with PublicInbox::Git. We reuse the existing MAX_INFLIGHT limit (18) that was determined by the minimum allowed PIPE_BUF (512). (AFAIK) Unix stream sockets have no analogy to PIPE_BUF, but all *BSDs and Linux I've checked have default SO_RCVBUF and SO_SNDBUF values larger than the previously-required PIPE_BUF size of 512 bytes.
Diffstat (limited to 'lib/PublicInbox/Gcf2Client.pm')
-rw-r--r-- | lib/PublicInbox/Gcf2Client.pm | 57 |
1 files changed, 14 insertions, 43 deletions
diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm index a49e2aad..8f442787 100644 --- a/lib/PublicInbox/Gcf2Client.pm +++ b/lib/PublicInbox/Gcf2Client.pm @@ -3,14 +3,15 @@ # connects public-inbox processes to PublicInbox::Gcf2::loop() package PublicInbox::Gcf2Client; -use strict; +use v5.12; use parent qw(PublicInbox::DS); use PublicInbox::Git; use PublicInbox::Gcf2; # fails if Inline::C or libgit2-dev isn't available use PublicInbox::Spawn qw(spawn); use Socket qw(AF_UNIX SOCK_STREAM); use PublicInbox::Syscall qw(EPOLLIN EPOLLET); -use PublicInbox::DS qw(awaitpid); +use PublicInbox::ProcessPipe; + # fields: # sock => socket to Gcf2::loop # The rest of these fields are compatible with what PublicInbox::Git @@ -21,66 +22,36 @@ use PublicInbox::DS qw(awaitpid); # inflight => array (see PublicInbox::Git) # rbuf => scalarref, may be non-existent or empty sub new { - my ($rdr) = @_; + my ($opt) = @_; my $self = bless {}, __PACKAGE__; # ensure the child process has the same @INC we do: my $env = { PERL5LIB => join(':', @INC) }; my ($s1, $s2); socketpair($s1, $s2, AF_UNIX, SOCK_STREAM, 0) or die "socketpair $!"; - $rdr //= {}; - $rdr->{0} = $rdr->{1} = $s2; - my $cmd = [$^X, qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]]; - $self->{'pid.owner'} = $$; - awaitpid($self->{pid} = spawn($cmd, $env, $rdr), undef); $s1->blocking(0); + $opt->{0} = $opt->{1} = $s2; + my $cmd = [$^X, qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]]; + my $pid = spawn($cmd, $env, $opt); + my $sock = PublicInbox::ProcessPipe->maybe_new($pid, $s1); $self->{inflight} = []; - $self->{in} = $s1; - $self->SUPER::new($s1, EPOLLIN|EPOLLET); -} - -sub fail { - my $self = shift; - $self->close; # PublicInbox::DS::close - PublicInbox::Git::fail($self, @_); + $self->{epwatch} = \undef; # for Git->cleanup + $self->SUPER::new($sock, EPOLLIN|EPOLLET); } sub gcf2_async ($$$;$) { my ($self, $req, $cb, $arg) = @_; my $inflight = $self->{inflight} or return $self->close; - - # {wbuf} is rare, I hope: - cat_async_step($self, $inflight) if $self->{wbuf}; - - $self->fail("gcf2c write: $!") if !$self->write($req) && !$self->{sock}; + PublicInbox::Git::write_all($self, $$req, \&cat_async_step, $inflight); push @$inflight, $req, $cb, $arg; } # ensure PublicInbox::Git::cat_async_step never calls cat_async_retry sub alternates_changed {} -# DS::event_loop will call this -sub event_step { - my ($self) = @_; - $self->flush_write; - $self->close if !$self->{in} || !$self->{sock}; # process died - my $inflight = $self->{inflight}; - if ($inflight && @$inflight) { - cat_async_step($self, $inflight); - return $self->close unless $self->{in}; # process died - - # ok, more to do, requeue for fairness - $self->requeue if @$inflight || exists($self->{rbuf}); - } -} - -sub DESTROY { - my ($self) = @_; - delete $self->{sock}; # if outside event_loop - PublicInbox::Git::DESTROY($self); -} - no warnings 'once'; -*cat_async_step = \&PublicInbox::Git::cat_async_step; +*cat_async_step = \&PublicInbox::Git::cat_async_step; # for event_step +*event_step = \&PublicInbox::Git::event_step; +*DESTROY = \&PublicInbox::Git::DESTROY; 1; |