dumping ground for random patches and texts
 help / color / mirror / Atom feed
* [PATCH] git: hoist out read buffering to ProcessIORBF
@ 2023-10-26  1:58 Eric Wong
  0 siblings, 0 replies; only message in thread
From: Eric Wong @ 2023-10-26  1:58 UTC (permalink / raw)
  To: spew

Maybe we can use it in other places, maybe not.  I'm not
bothering to support `$/ = undef' or `$/ = \$integer' cases
for now since (AFAIK) we won't need them.
---
 MANIFEST                        |  1 +
 lib/PublicInbox/Gcf2Client.pm   |  5 +--
 lib/PublicInbox/Git.pm          | 75 +++++++--------------------------
 lib/PublicInbox/ProcessIO.pm    |  5 ++-
 lib/PublicInbox/ProcessIORBF.pm | 55 ++++++++++++++++++++++++
 5 files changed, 77 insertions(+), 64 deletions(-)
 create mode 100644 lib/PublicInbox/ProcessIORBF.pm

diff --git a/MANIFEST b/MANIFEST
index 3df48667..fc1a122b 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -321,6 +321,7 @@ lib/PublicInbox/POP3D.pm
 lib/PublicInbox/PktOp.pm
 lib/PublicInbox/ProcessIO.pm
 lib/PublicInbox/ProcessIONBF.pm
+lib/PublicInbox/ProcessIORBF.pm
 lib/PublicInbox/Qspawn.pm
 lib/PublicInbox/Reply.pm
 lib/PublicInbox/RepoAtom.pm
diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index f63a0335..1215b75b 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -10,7 +10,7 @@ use PublicInbox::Gcf2; # fails if Inline::C or libgit2-dev isn't available
 use PublicInbox::Spawn qw(spawn);
 use Socket qw(AF_UNIX SOCK_STREAM);
 use PublicInbox::Syscall qw(EPOLLIN);
-use PublicInbox::ProcessIO;
+use PublicInbox::ProcessIORBF;
 use autodie qw(socketpair);
 
 # fields:
@@ -28,12 +28,11 @@ sub new  {
 	# ensure the child process has the same @INC we do:
 	my $env = { PERL5LIB => join(':', @INC) };
 	socketpair(my $s1, my $s2, AF_UNIX, SOCK_STREAM, 0);
-	$s1->blocking(0);
 	$opt->{0} = $opt->{1} = $s2;
 	my $cmd = [$^X, $^W ? ('-w') : (),
 			qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]];
 	my $pid = spawn($cmd, $env, $opt);
-	my $sock = PublicInbox::ProcessIO->maybe_new($pid, $s1);
+	my $sock = PublicInbox::ProcessIORBF->maybe_new($pid, $s1);
 	$self->{inflight} = [];
 	$self->{epwatch} = \undef; # for Git->cleanup
 	$self->SUPER::new($sock, EPOLLIN);
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 9c26d8bf..b0214f0c 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -14,21 +14,19 @@ use autodie qw(socketpair read);
 use POSIX ();
 use Socket qw(AF_UNIX SOCK_STREAM);
 use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
-use Errno qw(EINTR EAGAIN);
+use Errno qw(EAGAIN);
 use File::Glob qw(bsd_glob GLOB_NOSORT);
 use File::Spec ();
 use Time::HiRes qw(stat);
 use PublicInbox::Spawn qw(spawn popen_rd which);
-use PublicInbox::ProcessIONBF;
+use PublicInbox::ProcessIORBF;
 use PublicInbox::Tmpfile;
-use IO::Poll qw(POLLIN);
 use Carp qw(croak carp);
 use PublicInbox::SHA qw(sha_all);
 our %HEXLEN2SHA = (40 => 1, 64 => 256);
 our %OFMT2HEXLEN = (sha1 => 40, sha256 => 64);
 our @EXPORT_OK = qw(git_unquote git_quote %HEXLEN2SHA %OFMT2HEXLEN read_all);
 our $in_cleanup;
-our $RDTIMEO = 60_000; # milliseconds
 our $async_warn; # true in read-only daemons
 
 # committerdate:unix is git 2.9.4+ (2017-05-05), so using raw instead
@@ -165,46 +163,7 @@ sub _sock_cmd {
 						$self->fail("tmpfile($id): $!");
 	}
 	my $pid = spawn(\@cmd, undef, $opt);
-	$self->{sock} = PublicInbox::ProcessIONBF->new($pid, $s1);
-}
-
-sub poll_in ($) { IO::Poll::_poll($RDTIMEO, fileno($_[0]), my $ev = POLLIN) }
-
-sub my_read ($$$) {
-	my ($fh, $rbuf, $len) = @_;
-	my $left = $len - length($$rbuf);
-	my $r;
-	while ($left > 0) {
-		$r = sysread($fh, $$rbuf, $left, length($$rbuf));
-		if ($r) {
-			$left -= $r;
-		} elsif (defined($r)) { # EOF
-			return 0;
-		} else {
-			next if ($! == EAGAIN and poll_in($fh));
-			next if $! == EINTR; # may be set by sysread or poll_in
-			return; # unrecoverable error
-		}
-	}
-	my $no_pad = substr($$rbuf, 0, $len, '');
-	\$no_pad;
-}
-
-sub my_readline ($$) {
-	my ($fh, $rbuf) = @_;
-	while (1) {
-		if ((my $n = index($$rbuf, "\n")) >= 0) {
-			return substr($$rbuf, 0, $n + 1, '');
-		}
-		my $r = sysread($fh, $$rbuf, 65536, length($$rbuf)) and next;
-
-		# return whatever's left on EOF
-		return substr($$rbuf, 0, length($$rbuf)+1, '') if defined($r);
-
-		next if ($! == EAGAIN and poll_in($fh));
-		next if $! == EINTR; # may be set by sysread or poll_in
-		return; # unrecoverable error
-	}
+	$self->{sock} = PublicInbox::ProcessIORBF->new($pid, $s1);
 }
 
 sub cat_async_retry ($$) {
@@ -238,18 +197,16 @@ sub cat_async_step ($$) {
 	my ($self, $inflight) = @_;
 	die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
 	my ($req, $cb, $arg) = @$inflight[0, 1, 2];
-	my $rbuf = delete($self->{rbuf}) // \(my $new = '');
 	my ($bref, $oid, $type, $size);
-	my $head = my_readline($self->{sock}, $rbuf);
+	my $head = readline($self->{sock});
 	my $cmd = ref($req) ? $$req : $req;
 	# ->fail may be called via Gcf2Client.pm
 	my $info = $self->{-bc} && substr($cmd, 0, 5) eq 'info ';
 	if ($head =~ /^([0-9a-f]{40,}) (\S+) ([0-9]+)$/) {
 		($oid, $type, $size) = ($1, $2, $3 + 0);
 		unless ($info) { # --batch-command
-			$bref = my_read($self->{sock}, $rbuf, $size + 1) or
-				$self->fail(defined($bref) ?
-						'read EOF' : "read: $!");
+			$bref = eval { \read_all($self->{sock}, $size + 1) };
+			$self->fail("$oid: $@") if $@;
 			chop($$bref) eq "\n" or
 					$self->fail('LF missing after blob');
 		}
@@ -272,7 +229,6 @@ sub cat_async_step ($$) {
 		my $err = $! ? " ($!)" : '';
 		$self->fail("bad result from async cat-file: $head$err");
 	}
-	$self->{rbuf} = $rbuf if $$rbuf ne '';
 	splice(@$inflight, 0, 3); # don't retry $cb on ->fail
 	eval { $cb->($bref, $oid, $type, $size, $arg) };
 	async_err($self, $req, $oid, $@, $info ? 'check' : 'cat') if $@;
@@ -315,17 +271,15 @@ sub check_async_step ($$) {
 	my ($ck, $inflight) = @_;
 	die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
 	my ($req, $cb, $arg) = @$inflight[0, 1, 2];
-	my $rbuf = delete($ck->{rbuf}) // \(my $new = '');
-	chomp(my $line = my_readline($ck->{sock}, $rbuf));
+	chomp(my $line = readline($ck->{sock}));
 	my ($hex, $type, $size) = split(/ /, $line);
 
 	# git <2.21 would show `dangling' (2.21+ shows `ambiguous')
 	# https://public-inbox.org/git/20190118033845.s2vlrb3wd3m2jfzu@dcvr/T/
 	if ($hex eq 'dangling') {
-		my $ret = my_read($ck->{sock}, $rbuf, $type + 1);
-		$ck->fail(defined($ret) ? 'read EOF' : "read: $!") if !$ret;
+		eval { read_all($ck->{sock}, $type + 1) };
+		$ck->fail("$hex: $@") if $@;
 	}
-	$ck->{rbuf} = $rbuf if $$rbuf ne '';
 	splice(@$inflight, 0, 3); # don't retry $cb on ->fail
 	eval { $cb->(undef, $hex, $type, $size, $arg) };
 	async_err($ck, $req, $hex, $@, 'check') if $@;
@@ -560,7 +514,7 @@ sub read_all ($;$$) {
 	my ($fh, $len, $bref) = @_;
 	$bref //= \(my $buf);
 	my $r = read($fh, $$bref, $len //= -s $fh);
-	croak("$fh read ($r != $len)") if $len != $r;
+	croak("$fh read ($r != $len) (\$!=$!)") if $len != $r;
 	$$bref;
 }
 
@@ -638,7 +592,7 @@ sub cleanup_if_unlinked {
 	my $ret = 0;
 	for my $obj ($self, ($self->{ck} // ())) {
 		my $sock = $obj->{sock} // next;
-		my PublicInbox::ProcessIONBF $p = tied *$sock; # ProcessIONBF
+		my PublicInbox::ProcessIORBF $p = tied *$sock;
 		my $pid = $p->{pid} // next;
 		open my $fh, '<', "/proc/$pid/maps" or return cleanup($self, 1);
 		while (<$fh>) {
@@ -659,10 +613,11 @@ sub event_step {
 	my $inflight = $self->{inflight};
 	if ($inflight && @$inflight) {
 		$self->cat_async_step($inflight);
-		return $self->close unless $self->{sock};
+		my $sock = $self->{sock} or return $self->close;
 		# don't loop here to keep things fair, but we must requeue
 		# if there's already-read data in rbuf
-		$self->requeue if exists($self->{rbuf});
+		my PublicInbox::ProcessIORBF $rbf = tied *$sock;
+		$self->requeue if exists($rbf->{rbuf});
 	}
 }
 
@@ -686,7 +641,7 @@ sub close {
 			warn "E: (in abort) $req: $@" if $@;
 		}
 	}
-	delete @$self{qw(-bc err_c inflight rbuf)};
+	delete @$self{qw(-bc err_c inflight)};
 	delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock});
 }
 
diff --git a/lib/PublicInbox/ProcessIO.pm b/lib/PublicInbox/ProcessIO.pm
index ea5d3e6c..4f4aaa80 100644
--- a/lib/PublicInbox/ProcessIO.pm
+++ b/lib/PublicInbox/ProcessIO.pm
@@ -2,7 +2,10 @@
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # a tied handle for auto reaping of children tied to a pipe or socket,
-# see perltie(1) for details.
+# see perltie(1) for details.  This uses perlio (Perl internals) for
+# buffered reads, subclass PublicInbox::ProcessIONBF uses unbuffered
+# reads, while subclass PublicInbox::ProcessIORBF buffers reads in our
+# own Perl code, not via perlio, allowing for use in event loops
 package PublicInbox::ProcessIO;
 use v5.12;
 use PublicInbox::DS qw(awaitpid);
diff --git a/lib/PublicInbox/ProcessIORBF.pm b/lib/PublicInbox/ProcessIORBF.pm
new file mode 100644
index 00000000..2369e668
--- /dev/null
+++ b/lib/PublicInbox/ProcessIORBF.pm
@@ -0,0 +1,55 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Synchronous buffering reads for non-blocking IO since perlio doesn't
+# expose PerlIO_get_cnt to determine if we can rely on epoll/kevent.
+package PublicInbox::ProcessIORBF;
+use v5.12;
+use parent qw(PublicInbox::ProcessIONBF);
+use IO::Poll qw(POLLIN);
+use Errno qw(EINTR EAGAIN);
+use bytes qw(length substr);
+
+sub retry_read ($) {
+	($! == EAGAIN and
+		IO::Poll::_poll(-1, fileno($_[0]->{fh}), my $ev = POLLIN)) ||
+	($! == EINTR); # may be set by _poll (or sysread)
+}
+
+# this only supports $/ when it's "\n", don't bother with \integer and
+# undef cases since we only use this for git cat-file (and maybe fast-import)
+sub READLINE {
+	my $rbuf = delete($_[0]->{rbuf}) // '';
+	while (1) {
+		my $n = index($rbuf, "\n");
+		if ($n >= 0) {
+			my $ret = substr($rbuf, 0, $n + 1, '');
+			$_[0]->{rbuf} = $rbuf if $rbuf ne '';
+			return $ret;
+		}
+		$n = sysread($_[0]->{fh}, $rbuf, 65536, length($rbuf)) and next;
+		return $rbuf if defined($n); # EOF, everything
+		return ($rbuf eq '' ? undef : $rbuf) if !retry_read($_[0]);
+	}
+}
+
+sub READ { # ($self, $buf, $len) offset is not supported by us
+	my $rbuf = delete($_[0]->{rbuf}) // '';
+	my $left = $_[2] - length($rbuf);
+	while ($left > 0) {
+		my $r = sysread($_[0]->{fh}, $rbuf, $left, length($rbuf));
+		if ($r) {
+			$left -= $r
+		} elsif (defined($r)) {
+			last # EOF
+		} elsif (!retry_read($_[0])) { # unrecoverable error
+			last if $rbuf ne ''; # return whatever's left
+			return ($_[1] = undef);
+		} # else: loop and retry
+	}
+	$_[1] = substr($rbuf, 0, $_[2], '');
+	$_[0]->{rbuf} = $rbuf if $rbuf ne '';
+	length($_[1]);
+}
+
+1;

^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2023-10-26  1:58 UTC | newest]

Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-10-26  1:58 [PATCH] git: hoist out read buffering to ProcessIORBF Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).