dumping ground for random patches and texts
 help / color / mirror / Atom feed
* [PATCH] replace ProcessIO with untied PublicInbox::IO
@ 2023-10-28  6:40 Eric Wong
  0 siblings, 0 replies; only message in thread
From: Eric Wong @ 2023-10-28  6:40 UTC (permalink / raw)
  To: spew

This fixes several problems with the use of tie for filehandles:

* no way to do fcntl, stat, etc calls directly on the tied handle,
  forcing callers to use the `tied' perlop to access the underlying
  IO::Handle

* needing separate classes to handle blocking and non-blocking I/O

As a result, Git->cleanup_if_unlinked, InputPipe->consume,
and Qspawn->_yield_start have fewer bizzare bits and we
can call `$io->blocking(0)' directly instead of
`(tied *$io)->{fh}->blocking(0)'

Having a PublicInbox::IO class will also allow us to support
custom read buffering which allows inspecting the current state.
---
 MANIFEST                        |  3 +-
 lib/PublicInbox/Gcf2Client.pm   |  7 ++-
 lib/PublicInbox/Git.pm          |  8 ++--
 lib/PublicInbox/IO.pm           | 54 ++++++++++++++++++++++++
 lib/PublicInbox/Import.pm       |  4 +-
 lib/PublicInbox/InputPipe.pm    |  1 -
 lib/PublicInbox/LeiToMail.pm    |  5 +--
 lib/PublicInbox/ProcessIO.pm    | 75 ---------------------------------
 lib/PublicInbox/ProcessIONBF.pm | 25 -----------
 lib/PublicInbox/Qspawn.pm       |  5 +--
 lib/PublicInbox/Spawn.pm        |  6 +--
 t/spawn.t                       | 26 ++++++------
 xt/check-run.t                  |  2 +-
 13 files changed, 85 insertions(+), 136 deletions(-)
 create mode 100644 lib/PublicInbox/IO.pm
 delete mode 100644 lib/PublicInbox/ProcessIO.pm
 delete mode 100644 lib/PublicInbox/ProcessIONBF.pm

diff --git a/MANIFEST b/MANIFEST
index 3df48667..479c09de 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -218,6 +218,7 @@ lib/PublicInbox/IMAPClient.pm
 lib/PublicInbox/IMAPD.pm
 lib/PublicInbox/IMAPTracker.pm
 lib/PublicInbox/IMAPsearchqp.pm
+lib/PublicInbox/IO.pm
 lib/PublicInbox/IPC.pm
 lib/PublicInbox/IdxStack.pm
 lib/PublicInbox/Import.pm
@@ -319,8 +320,6 @@ lib/PublicInbox/OverIdx.pm
 lib/PublicInbox/POP3.pm
 lib/PublicInbox/POP3D.pm
 lib/PublicInbox/PktOp.pm
-lib/PublicInbox/ProcessIO.pm
-lib/PublicInbox/ProcessIONBF.pm
 lib/PublicInbox/Qspawn.pm
 lib/PublicInbox/Reply.pm
 lib/PublicInbox/RepoAtom.pm
diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index f63a0335..5220c474 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -10,7 +10,7 @@ use PublicInbox::Gcf2; # fails if Inline::C or libgit2-dev isn't available
 use PublicInbox::Spawn qw(spawn);
 use Socket qw(AF_UNIX SOCK_STREAM);
 use PublicInbox::Syscall qw(EPOLLIN);
-use PublicInbox::ProcessIO;
+use PublicInbox::IO;
 use autodie qw(socketpair);
 
 # fields:
@@ -32,11 +32,10 @@ sub new  {
 	$opt->{0} = $opt->{1} = $s2;
 	my $cmd = [$^X, $^W ? ('-w') : (),
 			qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]];
-	my $pid = spawn($cmd, $env, $opt);
-	my $sock = PublicInbox::ProcessIO->maybe_new($pid, $s1);
+	PublicInbox::IO::attach_pid($s1, spawn($cmd, $env, $opt));
 	$self->{inflight} = [];
 	$self->{epwatch} = \undef; # for Git->cleanup
-	$self->SUPER::new($sock, EPOLLIN);
+	$self->SUPER::new($s1, EPOLLIN);
 }
 
 sub gcf2_async ($$$;$) {
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 1f8ce1f0..629468b0 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -18,7 +18,7 @@ use Errno qw(EINTR EAGAIN);
 use File::Glob qw(bsd_glob GLOB_NOSORT);
 use File::Spec ();
 use PublicInbox::Spawn qw(spawn popen_rd run_qx which);
-use PublicInbox::ProcessIONBF;
+use PublicInbox::IO;
 use PublicInbox::Tmpfile;
 use IO::Poll qw(POLLIN);
 use Carp qw(croak carp);
@@ -146,6 +146,7 @@ sub _sock_cmd {
 	my ($self, $batch, $err_c) = @_;
 	$self->{sock} and Carp::confess('BUG: {sock} exists');
 	socketpair(my $s1, my $s2, AF_UNIX, SOCK_STREAM, 0);
+	$s1->blocking(0);
 	my $opt = { pgid => 0, 0 => $s2, 1 => $s2 };
 	my $gd = $self->{git_dir};
 	if ($gd =~ s!/([^/]+/[^/]+)\z!/!) {
@@ -164,7 +165,7 @@ sub _sock_cmd {
 						$self->fail("tmpfile($id): $!");
 	}
 	my $pid = spawn(\@cmd, undef, $opt);
-	$self->{sock} = PublicInbox::ProcessIONBF->new($pid, $s1);
+	$self->{sock} = PublicInbox::IO::attach_pid($s1, $pid);
 }
 
 sub poll_in ($) { IO::Poll::_poll($RDTIMEO, fileno($_[0]), my $ev = POLLIN) }
@@ -637,8 +638,7 @@ sub cleanup_if_unlinked {
 	my $ret = 0;
 	for my $obj ($self, ($self->{ck} // ())) {
 		my $sock = $obj->{sock} // next;
-		my PublicInbox::ProcessIONBF $p = tied *$sock; # ProcessIONBF
-		my $pid = $p->{pid} // next;
+		my $pid = $sock->attached_pid // next;
 		open my $fh, '<', "/proc/$pid/maps" or return cleanup($self, 1);
 		while (<$fh>) {
 			# n.b. we do not restart for unlinked multi-pack-index
diff --git a/lib/PublicInbox/IO.pm b/lib/PublicInbox/IO.pm
new file mode 100644
index 00000000..63850a52
--- /dev/null
+++ b/lib/PublicInbox/IO.pm
@@ -0,0 +1,54 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# supports reaping of children tied to a pipe or socket
+package PublicInbox::IO;
+use v5.12;
+use parent qw(IO::Handle);
+use PublicInbox::DS qw(awaitpid);
+
+# TODO: this can probably be the new home for read_all, try_cat
+# and maybe even buffered read/readline...
+
+sub waitcb { # awaitpid callback
+	my ($pid, $errref, $cb, @args) = @_;
+	$$errref = $?; # sets .cerr for _close
+	$cb->($pid, @args) if $cb;
+}
+
+sub attach_pid ($$;@) {
+	my ($io, $pid, @cb_arg) = @_;
+	bless $io, __PACKAGE__;
+	# we share $err (and not $self) with awaitpid to avoid a ref cycle
+	${*$io}{pi_io_reap} = [ $$, $pid, \(my $err) ];
+	awaitpid($pid, \&waitcb, \$err, @cb_arg);
+	$io;
+}
+
+sub attached_pid {
+	my ($io) = @_;
+	${${*$io}{pi_io_reap} // []}[1];
+}
+
+# caller cares about error result if they call close explicitly
+# reap->[2] may be set before this is called via waitcb
+sub close {
+	my ($io) = @_;
+	my $ret = $io->SUPER::close;
+	my $reap = delete ${*$io}{pi_io_reap};
+	return $ret unless $reap && $reap->[0] == $$;
+	${$reap->[2]} // (my $w = awaitpid($reap->[1])); # sets [2]
+	($? = ${$reap->[2]}) ? '' : $ret;
+}
+
+sub DESTROY {
+	my ($io) = @_;
+	my $reap = delete ${*$io}{pi_io_reap};
+	if ($reap && $reap->[0] == $$) {
+		$io->SUPER::close;
+		awaitpid($reap->[1]);
+	}
+	$io->SUPER::DESTROY;
+}
+
+1;
diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index e12a56e8..dfba34b9 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -16,7 +16,7 @@ use PublicInbox::MsgTime qw(msg_datestamp);
 use PublicInbox::ContentHash qw(content_digest);
 use PublicInbox::MDA;
 use PublicInbox::Eml;
-use PublicInbox::ProcessIO;
+use PublicInbox::IO;
 use POSIX qw(strftime);
 use autodie qw(read close socketpair);
 use Carp qw(croak);
@@ -77,7 +77,7 @@ sub gfi_start {
 				--quiet --done --date-format=raw) ];
 		my $pid = spawn($gfi, undef, { 0 => $s2, 1 => $s2 });
 		$self->{nchg} = 0;
-		$self->{io} = PublicInbox::ProcessIO->maybe_new($pid, $io);
+		$self->{io} = PublicInbox::IO::attach_pid($io, $pid);
 	};
 	if ($@) {
 		$self->lock_release;
diff --git a/lib/PublicInbox/InputPipe.pm b/lib/PublicInbox/InputPipe.pm
index f4d57e7d..232f20e8 100644
--- a/lib/PublicInbox/InputPipe.pm
+++ b/lib/PublicInbox/InputPipe.pm
@@ -39,7 +39,6 @@ sub consume {
 	if ($@) { # regular file (but not w/ select|IO::Poll backends)
 		$self->{-need_rq} = 1;
 		$self->requeue;
-	} elsif (do { no warnings 'unopened'; !stat($in) }) { # ProcessIONBF
 	} elsif (-p _ || -S _) { # O_NONBLOCK for sockets and pipes
 		$in->blocking(0);
 	} elsif (-t $in) { # isatty(3) can't use `_' stat cache
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index ead60b38..b07c2c90 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -7,7 +7,7 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::IPC);
 use PublicInbox::Eml;
-use PublicInbox::ProcessIO;
+use PublicInbox::IO;
 use PublicInbox::Spawn qw(spawn);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
@@ -160,8 +160,7 @@ sub _post_augment_mbox { # open a compressor process from top-level lei-daemon
 	my $cmd = PublicInbox::MboxReader::zsfx2cmd($zsfx, undef, $lei);
 	my ($r, $w) = @{delete $lei->{zpipe}};
 	my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2}, pgid => 0 };
-	my $pid = spawn($cmd, undef, $rdr);
-	$lei->{1} = PublicInbox::ProcessIO->maybe_new($pid, $w,
+	$lei->{1} = PublicInbox::IO::attach_pid($w, spawn($cmd, undef, $rdr),
 				\&reap_compress, $lei, $cmd, $lei->{1});
 }
 
diff --git a/lib/PublicInbox/ProcessIO.pm b/lib/PublicInbox/ProcessIO.pm
deleted file mode 100644
index ea5d3e6c..00000000
--- a/lib/PublicInbox/ProcessIO.pm
+++ /dev/null
@@ -1,75 +0,0 @@
-# Copyright (C) all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-
-# a tied handle for auto reaping of children tied to a pipe or socket,
-# see perltie(1) for details.
-package PublicInbox::ProcessIO;
-use v5.12;
-use PublicInbox::DS qw(awaitpid);
-use Symbol qw(gensym);
-use bytes qw(length);
-
-sub maybe_new {
-	my ($cls, $pid, $fh, @cb_arg) = @_;
-	return ($fh, $pid) if wantarray;
-	my $s = gensym;
-	tie *$s, $cls, $pid, $fh, @cb_arg;
-	$s;
-}
-
-sub waitcb { # awaitpid callback
-	my ($pid, $err_ref, $cb, @args) = @_;
-	$$err_ref = $?; # sets >{pp_chld_err} for _close
-	$cb->($pid, @args) if $cb;
-}
-
-sub TIEHANDLE {
-	my ($cls, $pid, $fh, @cb_arg) = @_;
-	my $self = bless { pid => $pid, fh => $fh, ppid => $$ }, $cls;
-	# we share $err (and not $self) with awaitpid to avoid a ref cycle
-	$self->{pp_chld_err} = \(my $err);
-	awaitpid($pid, \&waitcb, \$err, @cb_arg);
-	$self;
-}
-
-# for IO::Uncompress::Gunzip and PublicInbox::LeiRediff
-sub BINMODE { @_ == 1 ? binmode($_[0]->{fh}) : binmode($_[0]->{fh}, $_[1]) }
-
-sub READ { read($_[0]->{fh}, $_[1], $_[2], $_[3] || 0) }
-
-sub READLINE { readline($_[0]->{fh}) }
-
-sub WRITE { syswrite($_[0]->{fh}, $_[1], $_[2] // length($_[1]), $_[3] // 0) }
-
-sub PRINT { print { $_[0]->{fh} } @_[1..$#_] }
-
-sub FILENO { fileno($_[0]->{fh}) }
-
-sub _close ($;$) {
-	my ($self, $wait) = @_;
-	my ($fh, $pid) = delete(@$self{qw(fh pid)});
-	my $ret = (defined($fh) && $wait) ? close($fh) : ($fh = '');
-	return $ret unless defined($pid) && $self->{ppid} == $$;
-	if ($wait) { # caller cares about the exit status:
-		# synchronous wait via defined(wantarray) on awaitpid:
-		defined(${$self->{pp_chld_err}}) or $wait = awaitpid($pid);
-		($? = ${$self->{pp_chld_err}}) and $ret = '';
-	} else {
-		awaitpid($pid); # depends on $in_loop or not
-	}
-	$ret;
-}
-
-# if caller uses close(), assume they want to check $? immediately so
-# we'll waitpid() synchronously.  n.b. wantarray doesn't seem to
-# propagate `undef' down to tied methods, otherwise I'd rely on that.
-sub CLOSE { _close($_[0], 1) }
-
-# if relying on DESTROY, assume the caller doesn't care about $? and
-# we can let the event loop call waitpid() whenever it gets SIGCHLD
-sub DESTROY {
-	_close($_[0]);
-	undef;
-}
-
-1;
diff --git a/lib/PublicInbox/ProcessIONBF.pm b/lib/PublicInbox/ProcessIONBF.pm
deleted file mode 100644
index 490e200a..00000000
--- a/lib/PublicInbox/ProcessIONBF.pm
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright (C) all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-
-# used to support unbuffered partial reads
-package PublicInbox::ProcessIONBF;
-use v5.12;
-use parent qw(PublicInbox::ProcessIO);
-use IO::Handle; # ->blocking
-
-sub new {
-	my ($cls, $pid, $fh, @cb_arg) = @_;
-	$fh->blocking(0) // die "$fh->blocking(0): $!";
-	my $io = $cls->SUPER::maybe_new($pid, $fh, @cb_arg);
-}
-
-sub replace {
-	my ($cls, $orig) = @_;
-	my $pio = tied *$orig; # ProcessIO
-	$pio->{fh}->blocking(0) // die "$pio->{fh}->blocking(0): $!";
-	bless $pio, $cls;
-}
-
-sub READ { sysread($_[0]->{fh}, $_[1], $_[2], $_[3] // 0) }
-
-1;
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index a03e1b01..0bf857c6 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -149,7 +149,7 @@ sub finish ($;$) {
 
 	# we can safely finalize if pipe was closed before, or if
 	# {_err} is defined by waitpid_err.  Deleting {rpipe} will
-	# trigger PublicInbox::ProcessIO::DESTROY -> waitpid_err,
+	# trigger PublicInbox::IO::DESTROY -> waitpid_err,
 	# but it may not fire right away if inside the event loop.
 	my $closed_before = !delete($self->{rpipe});
 	finalize($self) if $closed_before || defined($self->{_err});
@@ -244,9 +244,8 @@ sub ipipe_cb { # InputPipe callback
 sub _yield_start { # may run later, much later...
 	my ($self) = @_;
 	if ($self->{psgi_env}->{'pi-httpd.async'}) {
-		require PublicInbox::ProcessIONBF;
 		my $rpipe = $self->{rpipe};
-		PublicInbox::ProcessIONBF->replace($rpipe);
+		$rpipe->blocking(0);
 		PublicInbox::InputPipe::consume($rpipe, \&ipipe_cb, $self);
 	} else {
 		require PublicInbox::GetlineResponse;
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 8e9bc231..7dd47846 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -19,7 +19,7 @@ use PublicInbox::Lock;
 use Fcntl qw(SEEK_SET);
 use IO::Handle ();
 use Carp qw(croak);
-use PublicInbox::ProcessIO;
+use PublicInbox::IO;
 our @EXPORT_OK = qw(which spawn popen_rd popen_wr run_die run_wait run_qx);
 our @RLIMITS = qw(RLIMIT_CPU RLIMIT_CORE RLIMIT_DATA);
 use autodie qw(open pipe read seek sysseek truncate);
@@ -377,7 +377,7 @@ sub popen_rd {
 	my ($cmd, $env, $opt, @cb_arg) = @_;
 	pipe(my $r, local $opt->{1});
 	my $pid = spawn($cmd, $env, $opt);
-	PublicInbox::ProcessIO->maybe_new($pid, $r, @cb_arg);
+	wantarray ? ($r, $pid) : PublicInbox::IO::attach_pid($r, $pid, @cb_arg)
 }
 
 sub popen_wr {
@@ -385,7 +385,7 @@ sub popen_wr {
 	pipe(local $opt->{0}, my $w);
 	$w->autoflush(1);
 	my $pid = spawn($cmd, $env, $opt);
-	PublicInbox::ProcessIO->maybe_new($pid, $w, @cb_arg)
+	wantarray ? ($w, $pid) : PublicInbox::IO::attach_pid($w, $pid, @cb_arg)
 }
 
 sub read_out_err ($) {
diff --git a/t/spawn.t b/t/spawn.t
index 938a2e5e..3479b6b3 100644
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -107,34 +107,35 @@ EOF
 
 {
 	my $fh = popen_rd([qw(echo hello)]);
-	ok(fileno($fh) >= 0, 'tied fileno works');
+	ok(fileno($fh) >= 0, 'fileno works');
 	my $l = <$fh>;
-	is($l, "hello\n", 'tied readline works');
+	is($l, "hello\n", 'readline works');
 	$l = <$fh>;
-	ok(!$l, 'tied readline works for EOF');
+	ok(!$l, 'readline works for EOF');
 }
 
 {
 	my $fh = popen_rd([qw(printf foo\nbar)]);
-	ok(fileno($fh) >= 0, 'tied fileno works');
-	my $tfh = (tied *$fh)->{fh};
-	is($tfh->blocking(0), 1, '->blocking was true');
-	is($tfh->blocking, 0, '->blocking is false');
-	is($tfh->blocking(1), 0, '->blocking was true');
-	is($tfh->blocking, 1, '->blocking is true');
+	ok(fileno($fh) >= 0, 'fileno works');
+	is($fh->blocking(0), 1, '->blocking was true');
+	is($fh->blocking, 0, '->blocking is false');
+	is($fh->blocking(1), 0, '->blocking was true');
+	is($fh->blocking, 1, '->blocking is true');
 	my @line = <$fh>;
 	is_deeply(\@line, [ "foo\n", 'bar' ], 'wantarray works on readline');
 }
 
 {
 	my $fh = popen_rd([qw(echo hello)]);
+	like($fh->attached_pid, qr/\A[0-9]+\z/, 'have a PID');
 	my $buf;
 	is(sysread($fh, $buf, 6), 6, 'sysread got 6 bytes');
-	is($buf, "hello\n", 'tied gets works');
+	is($buf, "hello\n", 'sysread works');
 	is(sysread($fh, $buf, 6), 0, 'sysread got EOF');
 	$? = 1;
 	ok($fh->close, 'close succeeds');
 	is($?, 0, '$? set properly');
+	is($fh->attached_pid, undef, 'attached_pid cleared after close');
 }
 
 {
@@ -160,8 +161,8 @@ EOF
 	$fh = popen_rd(['true'], undef, undef, sub { @c = caller });
 	undef $fh; # ->DESTROY
 	ok(scalar(@c), 'callback fired by ->DESTROY');
-	ok(grep(!m[/PublicInbox/ProcessIO\.pm\z], @c),
-		'callback not invoked by ProcessIO');
+	ok(grep(!m[/PublicInbox/IO\.pm\z], @c),
+		'callback not invoked by PublicInbox::IO');
 }
 
 { # children don't wait on siblings
@@ -170,7 +171,6 @@ EOF
 	my @arg;
 	my $fh = popen_rd(['cat'], undef, { 0 => $r },
 			sub { @arg = @_; warn "x=$$\n" }, 'hi');
-	my $pp = tied *$fh;
 	my $pid = fork // BAIL_OUT $!;
 	local $SIG{__WARN__} = sub { _exit(1) };
 	if ($pid == 0) {
diff --git a/xt/check-run.t b/xt/check-run.t
index cda839fe..d12b925d 100755
--- a/xt/check-run.t
+++ b/xt/check-run.t
@@ -14,7 +14,7 @@ use v5.12;
 use IO::Handle; # ->autoflush
 use PublicInbox::TestCommon;
 use PublicInbox::Spawn;
-use PublicInbox::DS; # already loaded by Spawn via ProcessIO
+use PublicInbox::DS; # already loaded by Spawn via PublicInbox::IO
 use Getopt::Long qw(:config gnu_getopt no_ignore_case auto_abbrev);
 use Errno qw(EINTR);
 use Fcntl qw(:seek);

^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2023-10-28  6:40 UTC | newest]

Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-10-28  6:40 [PATCH] replace ProcessIO with untied PublicInbox::IO Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).