* [PATCH] git: hoist out read buffering to ProcessIORBF
@ 2023-10-26 1:58 Eric Wong
0 siblings, 0 replies; only message in thread
From: Eric Wong @ 2023-10-26 1:58 UTC (permalink / raw)
To: spew
Maybe we can use it in other places, maybe not. I'm not
bothering to support `$/ = undef' or `$/ = \$integer' cases
for now since (AFAIK) we won't need them.
---
MANIFEST | 1 +
lib/PublicInbox/Gcf2Client.pm | 5 +--
lib/PublicInbox/Git.pm | 75 +++++++--------------------------
lib/PublicInbox/ProcessIO.pm | 5 ++-
lib/PublicInbox/ProcessIORBF.pm | 55 ++++++++++++++++++++++++
5 files changed, 77 insertions(+), 64 deletions(-)
create mode 100644 lib/PublicInbox/ProcessIORBF.pm
diff --git a/MANIFEST b/MANIFEST
index 3df48667..fc1a122b 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -321,6 +321,7 @@ lib/PublicInbox/POP3D.pm
lib/PublicInbox/PktOp.pm
lib/PublicInbox/ProcessIO.pm
lib/PublicInbox/ProcessIONBF.pm
+lib/PublicInbox/ProcessIORBF.pm
lib/PublicInbox/Qspawn.pm
lib/PublicInbox/Reply.pm
lib/PublicInbox/RepoAtom.pm
diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index f63a0335..1215b75b 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -10,7 +10,7 @@ use PublicInbox::Gcf2; # fails if Inline::C or libgit2-dev isn't available
use PublicInbox::Spawn qw(spawn);
use Socket qw(AF_UNIX SOCK_STREAM);
use PublicInbox::Syscall qw(EPOLLIN);
-use PublicInbox::ProcessIO;
+use PublicInbox::ProcessIORBF;
use autodie qw(socketpair);
# fields:
@@ -28,12 +28,11 @@ sub new {
# ensure the child process has the same @INC we do:
my $env = { PERL5LIB => join(':', @INC) };
socketpair(my $s1, my $s2, AF_UNIX, SOCK_STREAM, 0);
- $s1->blocking(0);
$opt->{0} = $opt->{1} = $s2;
my $cmd = [$^X, $^W ? ('-w') : (),
qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]];
my $pid = spawn($cmd, $env, $opt);
- my $sock = PublicInbox::ProcessIO->maybe_new($pid, $s1);
+ my $sock = PublicInbox::ProcessIORBF->maybe_new($pid, $s1);
$self->{inflight} = [];
$self->{epwatch} = \undef; # for Git->cleanup
$self->SUPER::new($sock, EPOLLIN);
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 9c26d8bf..b0214f0c 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -14,21 +14,19 @@ use autodie qw(socketpair read);
use POSIX ();
use Socket qw(AF_UNIX SOCK_STREAM);
use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
-use Errno qw(EINTR EAGAIN);
+use Errno qw(EAGAIN);
use File::Glob qw(bsd_glob GLOB_NOSORT);
use File::Spec ();
use Time::HiRes qw(stat);
use PublicInbox::Spawn qw(spawn popen_rd which);
-use PublicInbox::ProcessIONBF;
+use PublicInbox::ProcessIORBF;
use PublicInbox::Tmpfile;
-use IO::Poll qw(POLLIN);
use Carp qw(croak carp);
use PublicInbox::SHA qw(sha_all);
our %HEXLEN2SHA = (40 => 1, 64 => 256);
our %OFMT2HEXLEN = (sha1 => 40, sha256 => 64);
our @EXPORT_OK = qw(git_unquote git_quote %HEXLEN2SHA %OFMT2HEXLEN read_all);
our $in_cleanup;
-our $RDTIMEO = 60_000; # milliseconds
our $async_warn; # true in read-only daemons
# committerdate:unix is git 2.9.4+ (2017-05-05), so using raw instead
@@ -165,46 +163,7 @@ sub _sock_cmd {
$self->fail("tmpfile($id): $!");
}
my $pid = spawn(\@cmd, undef, $opt);
- $self->{sock} = PublicInbox::ProcessIONBF->new($pid, $s1);
-}
-
-sub poll_in ($) { IO::Poll::_poll($RDTIMEO, fileno($_[0]), my $ev = POLLIN) }
-
-sub my_read ($$$) {
- my ($fh, $rbuf, $len) = @_;
- my $left = $len - length($$rbuf);
- my $r;
- while ($left > 0) {
- $r = sysread($fh, $$rbuf, $left, length($$rbuf));
- if ($r) {
- $left -= $r;
- } elsif (defined($r)) { # EOF
- return 0;
- } else {
- next if ($! == EAGAIN and poll_in($fh));
- next if $! == EINTR; # may be set by sysread or poll_in
- return; # unrecoverable error
- }
- }
- my $no_pad = substr($$rbuf, 0, $len, '');
- \$no_pad;
-}
-
-sub my_readline ($$) {
- my ($fh, $rbuf) = @_;
- while (1) {
- if ((my $n = index($$rbuf, "\n")) >= 0) {
- return substr($$rbuf, 0, $n + 1, '');
- }
- my $r = sysread($fh, $$rbuf, 65536, length($$rbuf)) and next;
-
- # return whatever's left on EOF
- return substr($$rbuf, 0, length($$rbuf)+1, '') if defined($r);
-
- next if ($! == EAGAIN and poll_in($fh));
- next if $! == EINTR; # may be set by sysread or poll_in
- return; # unrecoverable error
- }
+ $self->{sock} = PublicInbox::ProcessIORBF->new($pid, $s1);
}
sub cat_async_retry ($$) {
@@ -238,18 +197,16 @@ sub cat_async_step ($$) {
my ($self, $inflight) = @_;
die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
my ($req, $cb, $arg) = @$inflight[0, 1, 2];
- my $rbuf = delete($self->{rbuf}) // \(my $new = '');
my ($bref, $oid, $type, $size);
- my $head = my_readline($self->{sock}, $rbuf);
+ my $head = readline($self->{sock});
my $cmd = ref($req) ? $$req : $req;
# ->fail may be called via Gcf2Client.pm
my $info = $self->{-bc} && substr($cmd, 0, 5) eq 'info ';
if ($head =~ /^([0-9a-f]{40,}) (\S+) ([0-9]+)$/) {
($oid, $type, $size) = ($1, $2, $3 + 0);
unless ($info) { # --batch-command
- $bref = my_read($self->{sock}, $rbuf, $size + 1) or
- $self->fail(defined($bref) ?
- 'read EOF' : "read: $!");
+ $bref = eval { \read_all($self->{sock}, $size + 1) };
+ $self->fail("$oid: $@") if $@;
chop($$bref) eq "\n" or
$self->fail('LF missing after blob');
}
@@ -272,7 +229,6 @@ sub cat_async_step ($$) {
my $err = $! ? " ($!)" : '';
$self->fail("bad result from async cat-file: $head$err");
}
- $self->{rbuf} = $rbuf if $$rbuf ne '';
splice(@$inflight, 0, 3); # don't retry $cb on ->fail
eval { $cb->($bref, $oid, $type, $size, $arg) };
async_err($self, $req, $oid, $@, $info ? 'check' : 'cat') if $@;
@@ -315,17 +271,15 @@ sub check_async_step ($$) {
my ($ck, $inflight) = @_;
die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
my ($req, $cb, $arg) = @$inflight[0, 1, 2];
- my $rbuf = delete($ck->{rbuf}) // \(my $new = '');
- chomp(my $line = my_readline($ck->{sock}, $rbuf));
+ chomp(my $line = readline($ck->{sock}));
my ($hex, $type, $size) = split(/ /, $line);
# git <2.21 would show `dangling' (2.21+ shows `ambiguous')
# https://public-inbox.org/git/20190118033845.s2vlrb3wd3m2jfzu@dcvr/T/
if ($hex eq 'dangling') {
- my $ret = my_read($ck->{sock}, $rbuf, $type + 1);
- $ck->fail(defined($ret) ? 'read EOF' : "read: $!") if !$ret;
+ eval { read_all($ck->{sock}, $type + 1) };
+ $ck->fail("$hex: $@") if $@;
}
- $ck->{rbuf} = $rbuf if $$rbuf ne '';
splice(@$inflight, 0, 3); # don't retry $cb on ->fail
eval { $cb->(undef, $hex, $type, $size, $arg) };
async_err($ck, $req, $hex, $@, 'check') if $@;
@@ -560,7 +514,7 @@ sub read_all ($;$$) {
my ($fh, $len, $bref) = @_;
$bref //= \(my $buf);
my $r = read($fh, $$bref, $len //= -s $fh);
- croak("$fh read ($r != $len)") if $len != $r;
+ croak("$fh read ($r != $len) (\$!=$!)") if $len != $r;
$$bref;
}
@@ -638,7 +592,7 @@ sub cleanup_if_unlinked {
my $ret = 0;
for my $obj ($self, ($self->{ck} // ())) {
my $sock = $obj->{sock} // next;
- my PublicInbox::ProcessIONBF $p = tied *$sock; # ProcessIONBF
+ my PublicInbox::ProcessIORBF $p = tied *$sock;
my $pid = $p->{pid} // next;
open my $fh, '<', "/proc/$pid/maps" or return cleanup($self, 1);
while (<$fh>) {
@@ -659,10 +613,11 @@ sub event_step {
my $inflight = $self->{inflight};
if ($inflight && @$inflight) {
$self->cat_async_step($inflight);
- return $self->close unless $self->{sock};
+ my $sock = $self->{sock} or return $self->close;
# don't loop here to keep things fair, but we must requeue
# if there's already-read data in rbuf
- $self->requeue if exists($self->{rbuf});
+ my PublicInbox::ProcessIORBF $rbf = tied *$sock;
+ $self->requeue if exists($rbf->{rbuf});
}
}
@@ -686,7 +641,7 @@ sub close {
warn "E: (in abort) $req: $@" if $@;
}
}
- delete @$self{qw(-bc err_c inflight rbuf)};
+ delete @$self{qw(-bc err_c inflight)};
delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock});
}
diff --git a/lib/PublicInbox/ProcessIO.pm b/lib/PublicInbox/ProcessIO.pm
index ea5d3e6c..4f4aaa80 100644
--- a/lib/PublicInbox/ProcessIO.pm
+++ b/lib/PublicInbox/ProcessIO.pm
@@ -2,7 +2,10 @@
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
# a tied handle for auto reaping of children tied to a pipe or socket,
-# see perltie(1) for details.
+# see perltie(1) for details. This uses perlio (Perl internals) for
+# buffered reads, subclass PublicInbox::ProcessIONBF uses unbuffered
+# reads, while subclass PublicInbox::ProcessIORBF buffers reads in our
+# own Perl code, not via perlio, allowing for use in event loops
package PublicInbox::ProcessIO;
use v5.12;
use PublicInbox::DS qw(awaitpid);
diff --git a/lib/PublicInbox/ProcessIORBF.pm b/lib/PublicInbox/ProcessIORBF.pm
new file mode 100644
index 00000000..2369e668
--- /dev/null
+++ b/lib/PublicInbox/ProcessIORBF.pm
@@ -0,0 +1,55 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Synchronous buffering reads for non-blocking IO since perlio doesn't
+# expose PerlIO_get_cnt to determine if we can rely on epoll/kevent.
+package PublicInbox::ProcessIORBF;
+use v5.12;
+use parent qw(PublicInbox::ProcessIONBF);
+use IO::Poll qw(POLLIN);
+use Errno qw(EINTR EAGAIN);
+use bytes qw(length substr);
+
+sub retry_read ($) {
+ ($! == EAGAIN and
+ IO::Poll::_poll(-1, fileno($_[0]->{fh}), my $ev = POLLIN)) ||
+ ($! == EINTR); # may be set by _poll (or sysread)
+}
+
+# this only supports $/ when it's "\n", don't bother with \integer and
+# undef cases since we only use this for git cat-file (and maybe fast-import)
+sub READLINE {
+ my $rbuf = delete($_[0]->{rbuf}) // '';
+ while (1) {
+ my $n = index($rbuf, "\n");
+ if ($n >= 0) {
+ my $ret = substr($rbuf, 0, $n + 1, '');
+ $_[0]->{rbuf} = $rbuf if $rbuf ne '';
+ return $ret;
+ }
+ $n = sysread($_[0]->{fh}, $rbuf, 65536, length($rbuf)) and next;
+ return $rbuf if defined($n); # EOF, everything
+ return ($rbuf eq '' ? undef : $rbuf) if !retry_read($_[0]);
+ }
+}
+
+sub READ { # ($self, $buf, $len) offset is not supported by us
+ my $rbuf = delete($_[0]->{rbuf}) // '';
+ my $left = $_[2] - length($rbuf);
+ while ($left > 0) {
+ my $r = sysread($_[0]->{fh}, $rbuf, $left, length($rbuf));
+ if ($r) {
+ $left -= $r
+ } elsif (defined($r)) {
+ last # EOF
+ } elsif (!retry_read($_[0])) { # unrecoverable error
+ last if $rbuf ne ''; # return whatever's left
+ return ($_[1] = undef);
+ } # else: loop and retry
+ }
+ $_[1] = substr($rbuf, 0, $_[2], '');
+ $_[0]->{rbuf} = $rbuf if $rbuf ne '';
+ length($_[1]);
+}
+
+1;
^ permalink raw reply related [flat|nested] only message in thread
only message in thread, other threads:[~2023-10-26 1:58 UTC | newest]
Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-10-26 1:58 [PATCH] git: hoist out read buffering to ProcessIORBF Eric Wong
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).