From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id D37251F406 for ; Thu, 26 Oct 2023 01:58:59 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1698285539; bh=WZ21Z9ukE6CAt3CLHWKOSMYCFCnoWa8r8G3V8hHMY4U=; h=From:To:Subject:Date:From; b=Q0EUMMTFvG1RxNoVCFtVnNTK1DHnp2HnILEtRfliaqj7OyUXOgFt2zmvFlcoJTEv6 TZGs6GalGmkElKaIiquRJPkM6tE8yF0CVrjT2Qnc5uGdxI/boJgxUbB3KOj8XvPO5G NDKV0cxoA5k50V9EC6oTq6dcQM9nSNjo5z5zlQok= From: Eric Wong To: spew@80x24.org Subject: [PATCH] git: hoist out read buffering to ProcessIORBF Date: Thu, 26 Oct 2023 01:58:59 +0000 Message-ID: <20231026015859.3429794-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Maybe we can use it in other places, maybe not. I'm not bothering to support `$/ = undef' or `$/ = \$integer' cases for now since (AFAIK) we won't need them. --- MANIFEST | 1 + lib/PublicInbox/Gcf2Client.pm | 5 +-- lib/PublicInbox/Git.pm | 75 +++++++-------------------------- lib/PublicInbox/ProcessIO.pm | 5 ++- lib/PublicInbox/ProcessIORBF.pm | 55 ++++++++++++++++++++++++ 5 files changed, 77 insertions(+), 64 deletions(-) create mode 100644 lib/PublicInbox/ProcessIORBF.pm diff --git a/MANIFEST b/MANIFEST index 3df48667..fc1a122b 100644 --- a/MANIFEST +++ b/MANIFEST @@ -321,6 +321,7 @@ lib/PublicInbox/POP3D.pm lib/PublicInbox/PktOp.pm lib/PublicInbox/ProcessIO.pm lib/PublicInbox/ProcessIONBF.pm +lib/PublicInbox/ProcessIORBF.pm lib/PublicInbox/Qspawn.pm lib/PublicInbox/Reply.pm lib/PublicInbox/RepoAtom.pm diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm index f63a0335..1215b75b 100644 --- a/lib/PublicInbox/Gcf2Client.pm +++ b/lib/PublicInbox/Gcf2Client.pm @@ -10,7 +10,7 @@ use PublicInbox::Gcf2; # fails if Inline::C or libgit2-dev isn't available use PublicInbox::Spawn qw(spawn); use Socket qw(AF_UNIX SOCK_STREAM); use PublicInbox::Syscall qw(EPOLLIN); -use PublicInbox::ProcessIO; +use PublicInbox::ProcessIORBF; use autodie qw(socketpair); # fields: @@ -28,12 +28,11 @@ sub new { # ensure the child process has the same @INC we do: my $env = { PERL5LIB => join(':', @INC) }; socketpair(my $s1, my $s2, AF_UNIX, SOCK_STREAM, 0); - $s1->blocking(0); $opt->{0} = $opt->{1} = $s2; my $cmd = [$^X, $^W ? ('-w') : (), qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]]; my $pid = spawn($cmd, $env, $opt); - my $sock = PublicInbox::ProcessIO->maybe_new($pid, $s1); + my $sock = PublicInbox::ProcessIORBF->maybe_new($pid, $s1); $self->{inflight} = []; $self->{epwatch} = \undef; # for Git->cleanup $self->SUPER::new($sock, EPOLLIN); diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index 9c26d8bf..b0214f0c 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -14,21 +14,19 @@ use autodie qw(socketpair read); use POSIX (); use Socket qw(AF_UNIX SOCK_STREAM); use PublicInbox::Syscall qw(EPOLLIN EPOLLET); -use Errno qw(EINTR EAGAIN); +use Errno qw(EAGAIN); use File::Glob qw(bsd_glob GLOB_NOSORT); use File::Spec (); use Time::HiRes qw(stat); use PublicInbox::Spawn qw(spawn popen_rd which); -use PublicInbox::ProcessIONBF; +use PublicInbox::ProcessIORBF; use PublicInbox::Tmpfile; -use IO::Poll qw(POLLIN); use Carp qw(croak carp); use PublicInbox::SHA qw(sha_all); our %HEXLEN2SHA = (40 => 1, 64 => 256); our %OFMT2HEXLEN = (sha1 => 40, sha256 => 64); our @EXPORT_OK = qw(git_unquote git_quote %HEXLEN2SHA %OFMT2HEXLEN read_all); our $in_cleanup; -our $RDTIMEO = 60_000; # milliseconds our $async_warn; # true in read-only daemons # committerdate:unix is git 2.9.4+ (2017-05-05), so using raw instead @@ -165,46 +163,7 @@ sub _sock_cmd { $self->fail("tmpfile($id): $!"); } my $pid = spawn(\@cmd, undef, $opt); - $self->{sock} = PublicInbox::ProcessIONBF->new($pid, $s1); -} - -sub poll_in ($) { IO::Poll::_poll($RDTIMEO, fileno($_[0]), my $ev = POLLIN) } - -sub my_read ($$$) { - my ($fh, $rbuf, $len) = @_; - my $left = $len - length($$rbuf); - my $r; - while ($left > 0) { - $r = sysread($fh, $$rbuf, $left, length($$rbuf)); - if ($r) { - $left -= $r; - } elsif (defined($r)) { # EOF - return 0; - } else { - next if ($! == EAGAIN and poll_in($fh)); - next if $! == EINTR; # may be set by sysread or poll_in - return; # unrecoverable error - } - } - my $no_pad = substr($$rbuf, 0, $len, ''); - \$no_pad; -} - -sub my_readline ($$) { - my ($fh, $rbuf) = @_; - while (1) { - if ((my $n = index($$rbuf, "\n")) >= 0) { - return substr($$rbuf, 0, $n + 1, ''); - } - my $r = sysread($fh, $$rbuf, 65536, length($$rbuf)) and next; - - # return whatever's left on EOF - return substr($$rbuf, 0, length($$rbuf)+1, '') if defined($r); - - next if ($! == EAGAIN and poll_in($fh)); - next if $! == EINTR; # may be set by sysread or poll_in - return; # unrecoverable error - } + $self->{sock} = PublicInbox::ProcessIORBF->new($pid, $s1); } sub cat_async_retry ($$) { @@ -238,18 +197,16 @@ sub cat_async_step ($$) { my ($self, $inflight) = @_; die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3; my ($req, $cb, $arg) = @$inflight[0, 1, 2]; - my $rbuf = delete($self->{rbuf}) // \(my $new = ''); my ($bref, $oid, $type, $size); - my $head = my_readline($self->{sock}, $rbuf); + my $head = readline($self->{sock}); my $cmd = ref($req) ? $$req : $req; # ->fail may be called via Gcf2Client.pm my $info = $self->{-bc} && substr($cmd, 0, 5) eq 'info '; if ($head =~ /^([0-9a-f]{40,}) (\S+) ([0-9]+)$/) { ($oid, $type, $size) = ($1, $2, $3 + 0); unless ($info) { # --batch-command - $bref = my_read($self->{sock}, $rbuf, $size + 1) or - $self->fail(defined($bref) ? - 'read EOF' : "read: $!"); + $bref = eval { \read_all($self->{sock}, $size + 1) }; + $self->fail("$oid: $@") if $@; chop($$bref) eq "\n" or $self->fail('LF missing after blob'); } @@ -272,7 +229,6 @@ sub cat_async_step ($$) { my $err = $! ? " ($!)" : ''; $self->fail("bad result from async cat-file: $head$err"); } - $self->{rbuf} = $rbuf if $$rbuf ne ''; splice(@$inflight, 0, 3); # don't retry $cb on ->fail eval { $cb->($bref, $oid, $type, $size, $arg) }; async_err($self, $req, $oid, $@, $info ? 'check' : 'cat') if $@; @@ -315,17 +271,15 @@ sub check_async_step ($$) { my ($ck, $inflight) = @_; die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3; my ($req, $cb, $arg) = @$inflight[0, 1, 2]; - my $rbuf = delete($ck->{rbuf}) // \(my $new = ''); - chomp(my $line = my_readline($ck->{sock}, $rbuf)); + chomp(my $line = readline($ck->{sock})); my ($hex, $type, $size) = split(/ /, $line); # git <2.21 would show `dangling' (2.21+ shows `ambiguous') # https://public-inbox.org/git/20190118033845.s2vlrb3wd3m2jfzu@dcvr/T/ if ($hex eq 'dangling') { - my $ret = my_read($ck->{sock}, $rbuf, $type + 1); - $ck->fail(defined($ret) ? 'read EOF' : "read: $!") if !$ret; + eval { read_all($ck->{sock}, $type + 1) }; + $ck->fail("$hex: $@") if $@; } - $ck->{rbuf} = $rbuf if $$rbuf ne ''; splice(@$inflight, 0, 3); # don't retry $cb on ->fail eval { $cb->(undef, $hex, $type, $size, $arg) }; async_err($ck, $req, $hex, $@, 'check') if $@; @@ -560,7 +514,7 @@ sub read_all ($;$$) { my ($fh, $len, $bref) = @_; $bref //= \(my $buf); my $r = read($fh, $$bref, $len //= -s $fh); - croak("$fh read ($r != $len)") if $len != $r; + croak("$fh read ($r != $len) (\$!=$!)") if $len != $r; $$bref; } @@ -638,7 +592,7 @@ sub cleanup_if_unlinked { my $ret = 0; for my $obj ($self, ($self->{ck} // ())) { my $sock = $obj->{sock} // next; - my PublicInbox::ProcessIONBF $p = tied *$sock; # ProcessIONBF + my PublicInbox::ProcessIORBF $p = tied *$sock; my $pid = $p->{pid} // next; open my $fh, '<', "/proc/$pid/maps" or return cleanup($self, 1); while (<$fh>) { @@ -659,10 +613,11 @@ sub event_step { my $inflight = $self->{inflight}; if ($inflight && @$inflight) { $self->cat_async_step($inflight); - return $self->close unless $self->{sock}; + my $sock = $self->{sock} or return $self->close; # don't loop here to keep things fair, but we must requeue # if there's already-read data in rbuf - $self->requeue if exists($self->{rbuf}); + my PublicInbox::ProcessIORBF $rbf = tied *$sock; + $self->requeue if exists($rbf->{rbuf}); } } @@ -686,7 +641,7 @@ sub close { warn "E: (in abort) $req: $@" if $@; } } - delete @$self{qw(-bc err_c inflight rbuf)}; + delete @$self{qw(-bc err_c inflight)}; delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock}); } diff --git a/lib/PublicInbox/ProcessIO.pm b/lib/PublicInbox/ProcessIO.pm index ea5d3e6c..4f4aaa80 100644 --- a/lib/PublicInbox/ProcessIO.pm +++ b/lib/PublicInbox/ProcessIO.pm @@ -2,7 +2,10 @@ # License: AGPL-3.0+ # a tied handle for auto reaping of children tied to a pipe or socket, -# see perltie(1) for details. +# see perltie(1) for details. This uses perlio (Perl internals) for +# buffered reads, subclass PublicInbox::ProcessIONBF uses unbuffered +# reads, while subclass PublicInbox::ProcessIORBF buffers reads in our +# own Perl code, not via perlio, allowing for use in event loops package PublicInbox::ProcessIO; use v5.12; use PublicInbox::DS qw(awaitpid); diff --git a/lib/PublicInbox/ProcessIORBF.pm b/lib/PublicInbox/ProcessIORBF.pm new file mode 100644 index 00000000..2369e668 --- /dev/null +++ b/lib/PublicInbox/ProcessIORBF.pm @@ -0,0 +1,55 @@ +# Copyright (C) all contributors +# License: AGPL-3.0+ + +# Synchronous buffering reads for non-blocking IO since perlio doesn't +# expose PerlIO_get_cnt to determine if we can rely on epoll/kevent. +package PublicInbox::ProcessIORBF; +use v5.12; +use parent qw(PublicInbox::ProcessIONBF); +use IO::Poll qw(POLLIN); +use Errno qw(EINTR EAGAIN); +use bytes qw(length substr); + +sub retry_read ($) { + ($! == EAGAIN and + IO::Poll::_poll(-1, fileno($_[0]->{fh}), my $ev = POLLIN)) || + ($! == EINTR); # may be set by _poll (or sysread) +} + +# this only supports $/ when it's "\n", don't bother with \integer and +# undef cases since we only use this for git cat-file (and maybe fast-import) +sub READLINE { + my $rbuf = delete($_[0]->{rbuf}) // ''; + while (1) { + my $n = index($rbuf, "\n"); + if ($n >= 0) { + my $ret = substr($rbuf, 0, $n + 1, ''); + $_[0]->{rbuf} = $rbuf if $rbuf ne ''; + return $ret; + } + $n = sysread($_[0]->{fh}, $rbuf, 65536, length($rbuf)) and next; + return $rbuf if defined($n); # EOF, everything + return ($rbuf eq '' ? undef : $rbuf) if !retry_read($_[0]); + } +} + +sub READ { # ($self, $buf, $len) offset is not supported by us + my $rbuf = delete($_[0]->{rbuf}) // ''; + my $left = $_[2] - length($rbuf); + while ($left > 0) { + my $r = sysread($_[0]->{fh}, $rbuf, $left, length($rbuf)); + if ($r) { + $left -= $r + } elsif (defined($r)) { + last # EOF + } elsif (!retry_read($_[0])) { # unrecoverable error + last if $rbuf ne ''; # return whatever's left + return ($_[1] = undef); + } # else: loop and retry + } + $_[1] = substr($rbuf, 0, $_[2], ''); + $_[0]->{rbuf} = $rbuf if $rbuf ne ''; + length($_[1]); +} + +1;