dumping ground for random patches and texts
 help / color / mirror / Atom feed
* [PATCH 1/4] ipc: require fork+SOCK_SEQPACKET for wq_* functions
@ 2023-10-06 10:27 Eric Wong
  2023-10-06 10:27 ` [PATCH 2/4] ipc: use autodie for most syscalls Eric Wong
                   ` (2 more replies)
  0 siblings, 3 replies; 4+ messages in thread
From: Eric Wong @ 2023-10-06 10:27 UTC (permalink / raw)
  To: spew

None of the lei internals works properly without forking and
sockets and it increases the potential to accidentally call subs
in the wrong process during the teardown phase.

We'll still support ipc_do w/o forking for now since it
doesn't benefit small indexing runs.
---
 lib/PublicInbox/IPC.pm | 43 ++++++++++++++++--------------------------
 t/ipc.t                | 19 ++++++++-----------
 2 files changed, 24 insertions(+), 38 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 839281b2..4309d672 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -256,16 +256,12 @@ sub do_sock_stream { # via wq_io_do, for big requests
 
 sub wq_broadcast {
 	my ($self, $sub, @args) = @_;
-	if (my $wkr = $self->{-wq_workers}) {
-		my $buf = ipc_freeze([$sub, @args]);
-		for my $bcast1 (values %$wkr) {
-			my $sock = $bcast1 // $self->{-wq_s1} // next;
-			send($sock, $buf, 0) // croak "send: $!";
-			# XXX shouldn't have to deal with EMSGSIZE here...
-		}
-	} else {
-		eval { $self->$sub(@args) };
-		warn "wq_broadcast: $@" if $@;
+	my $wkr = $self->{-wq_workers} or Carp::confess('no -wq_workers');
+	my $buf = ipc_freeze([$sub, @args]);
+	for my $bcast1 (values %$wkr) {
+		my $sock = $bcast1 // $self->{-wq_s1} // next;
+		send($sock, $buf, 0) // croak "send: $!";
+		# XXX shouldn't have to deal with EMSGSIZE here...
 	}
 }
 
@@ -291,24 +287,17 @@ sub stream_in_full ($$$) {
 
 sub wq_io_do { # always async
 	my ($self, $sub, $ios, @args) = @_;
-	if (my $s1 = $self->{-wq_s1}) { # run in worker
-		my $fds = [ map { fileno($_) } @$ios ];
-		my $buf = ipc_freeze([$sub, @args]);
-		if (length($buf) > $MY_MAX_ARG_STRLEN) {
-			stream_in_full($s1, $fds, $buf);
-		} else {
-			my $n = $send_cmd->($s1, $fds, $buf, 0);
-			return if defined($n); # likely
-			$!{ETOOMANYREFS} and
-				croak "sendmsg: $! (check RLIMIT_NOFILE)";
-			$!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
-				croak("sendmsg: $!");
-		}
+	my $s1 = $self->{-wq_s1} or Carp::confess('no -wq_s1');
+	my $fds = [ map { fileno($_) } @$ios ];
+	my $buf = ipc_freeze([$sub, @args]);
+	if (length($buf) > $MY_MAX_ARG_STRLEN) {
+		stream_in_full($s1, $fds, $buf);
 	} else {
-		@$self{0..$#$ios} = @$ios;
-		eval { $self->$sub(@args) };
-		warn "wq_io_do: $@" if $@;
-		delete @$self{0..$#$ios}; # don't close
+		my $n = $send_cmd->($s1, $fds, $buf, 0);
+		return if defined($n); # likely
+		$!{ETOOMANYREFS} and croak "sendmsg: $! (check RLIMIT_NOFILE)";
+		$!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
+			croak("sendmsg: $!");
 	}
 }
 
diff --git a/t/ipc.t b/t/ipc.t
index 7bdf2218..519ef089 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -1,9 +1,7 @@
 #!perl -w
 # Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-use strict;
-use v5.10.1;
-use Test::More;
+use v5.12;
 use PublicInbox::TestCommon;
 use Fcntl qw(SEEK_SET);
 use PublicInbox::SHA qw(sha1_hex);
@@ -108,7 +106,9 @@ open my $agpl, '<', 'COPYING' or BAIL_OUT "AGPL-3 missing: $!";
 my $big = do { local $/; <$agpl> } // BAIL_OUT "read: $!";
 close $agpl or BAIL_OUT "close: $!";
 
-for my $t ('local', 'worker', 'worker again') {
+for my $t ('worker', 'worker again') {
+	my $ppid = $ipc->wq_workers_start('wq', 1);
+	push(@ppids, $ppid);
 	$ipc->wq_io_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world');
 	my $i = 0;
 	for my $fh ($ra, $rb, $rc) {
@@ -132,14 +132,12 @@ for my $t ('local', 'worker', 'worker again') {
 		$exp = sha1_hex($bigger)."\n";
 		is(readline($rb), $exp, "SHA WQWorker limit ($t)");
 	}
-	my $ppid = $ipc->wq_workers_start('wq', 1);
-	push(@ppids, $ppid);
 }
 
 # wq_io_do works across fork (siblings can feed)
 SKIP: {
 	skip 'Socket::MsgHdr or Inline::C missing', 3 if !$ppids[0];
-	is_deeply(\@ppids, [$$, undef, undef],
+	is_xdeeply(\@ppids, [$$, undef],
 		'parent pid returned in wq_workers_start');
 	my $pid = fork // BAIL_OUT $!;
 	if ($pid == 0) {
@@ -173,10 +171,9 @@ SKIP: {
 	skip 'Socket::MsgHdr or Inline::C missing', 11 if !$ppids[0];
 	seek($warn, 0, SEEK_SET) or BAIL_OUT;
 	my @warn = <$warn>;
-	is(scalar(@warn), 3, 'warned 3 times');
-	like($warn[0], qr/ wq_io_do: /, '1st warned from wq_do');
-	like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker');
-	is($warn[2], $warn[1], 'worker did not die');
+	is(scalar(@warn), 2, 'warned 3 times');
+	like($warn[0], qr/ wq_worker: /, '2nd warned from wq_worker');
+	is($warn[0], $warn[1], 'worker did not die');
 
 	$SIG{__WARN__} = 'DEFAULT';
 	is($ipc->wq_workers_start('wq', 2), $$, 'workers started again');

^ permalink raw reply related	[flat|nested] 4+ messages in thread

* [PATCH 2/4] ipc: use autodie for most syscalls
  2023-10-06 10:27 [PATCH 1/4] ipc: require fork+SOCK_SEQPACKET for wq_* functions Eric Wong
@ 2023-10-06 10:27 ` Eric Wong
  2023-10-06 10:27 ` [PATCH 3/4] import: use autodie, rely on PerlIO for retries Eric Wong
  2023-10-06 10:27 ` [PATCH 4/4] rename ProcessPipe to ProcessIO Eric Wong
  2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2023-10-06 10:27 UTC (permalink / raw)
  To: spew

I'm not sure how/if we should bother recovering from
these, so just die and let some caller deal with it.
---
 lib/PublicInbox/IPC.pm | 39 ++++++++++++++++-----------------------
 1 file changed, 16 insertions(+), 23 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 4309d672..c36860be 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -8,9 +8,9 @@
 # use ipc_do when you need work done on a certain process
 # use wq_io_do when your work can be done on any idle worker
 package PublicInbox::IPC;
-use strict;
-use v5.10.1;
+use v5.12;
 use parent qw(Exporter);
+use autodie qw(fork pipe read socketpair sysread);
 use Carp qw(croak);
 use PublicInbox::DS qw(awaitpid);
 use PublicInbox::Spawn;
@@ -54,9 +54,9 @@ our $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do {
 
 sub _get_rec ($) {
 	my ($r) = @_;
-	defined(my $len = <$r>) or return;
+	my $len = <$r> // return;
 	chop($len) eq "\n" or croak "no LF byte in $len";
-	defined(my $n = read($r, my $buf, $len)) or croak "read error: $!";
+	my $n = read($r, my $buf, $len);
 	$n == $len or croak "short read: $n != $len";
 	ipc_thaw($buf);
 }
@@ -98,12 +98,12 @@ sub ipc_worker_spawn {
 	my ($self, $ident, $oldset, $fields, @cb_args) = @_;
 	return if ($self->{-ipc_ppid} // -1) == $$; # idempotent
 	delete(@$self{qw(-ipc_req -ipc_res -ipc_ppid -ipc_pid)});
-	pipe(my ($r_req, $w_req)) or die "pipe: $!";
-	pipe(my ($r_res, $w_res)) or die "pipe: $!";
+	pipe(my $r_req, my $w_req);
+	pipe(my $r_res, my $w_res);
 	my $sigset = $oldset // PublicInbox::DS::block_signals();
 	$self->ipc_atfork_prepare;
 	my $seed = rand(0xffffffff);
-	my $pid = fork // die "fork: $!";
+	my $pid = fork;
 	if ($pid == 0) {
 		srand($seed);
 		eval { Net::SSLeay::randomize() };
@@ -211,15 +211,12 @@ sub recv_and_run {
 	my $n = length($buf) or return 0;
 	my $nfd = 0;
 	for my $fd (@fds) {
-		if (open(my $cmdfh, '+<&=', $fd)) {
-			$self->{$nfd++} = $cmdfh;
-			$cmdfh->autoflush(1);
-		} else {
-			die "$$ open(+<&=$fd) (FD:$nfd): $!";
-		}
+		open(my $cmdfh, '+<&=', $fd);
+		$self->{$nfd++} = $cmdfh;
+		$cmdfh->autoflush(1);
 	}
 	while ($full_stream && $n < $len) {
-		my $r = sysread($s2, $buf, $len - $n, $n) // croak "read: $!";
+		my $r = sysread($s2, $buf, $len - $n, $n);
 		croak "read EOF after $n/$len bytes" if $r == 0;
 		$n = length($buf);
 	}
@@ -267,8 +264,7 @@ sub wq_broadcast {
 
 sub stream_in_full ($$$) {
 	my ($s1, $fds, $buf) = @_;
-	socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
-		croak "socketpair: $!";
+	socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0);
 	my $n = $send_cmd->($s1, [ fileno($r) ],
 			ipc_freeze(['do_sock_stream', length($buf)]),
 			0) // croak "sendmsg: $!";
@@ -315,7 +311,7 @@ sub wq_sync_run {
 sub wq_do {
 	my ($self, $sub, @args) = @_;
 	if (defined(wantarray)) {
-		pipe(my ($r, $w)) or die "pipe: $!";
+		pipe(my $r, my $w);
 		wq_io_do($self, 'wq_sync_run', [ $w ], wantarray, $sub, @args);
 		undef $w;
 		_wait_return($r, $sub);
@@ -344,10 +340,9 @@ sub wq_nonblock_do { # always async
 sub _wq_worker_start {
 	my ($self, $oldset, $fields, $one, @cb_args) = @_;
 	my ($bcast1, $bcast2);
-	$one or socketpair($bcast1, $bcast2, AF_UNIX, SOCK_SEQPACKET, 0) or
-							die "socketpair: $!";
+	$one or socketpair($bcast1, $bcast2, AF_UNIX, SOCK_SEQPACKET, 0);
 	my $seed = rand(0xffffffff);
-	my $pid = fork // die "fork: $!";
+	my $pid = fork;
 	if ($pid == 0) {
 		srand($seed);
 		eval { Net::SSLeay::randomize() };
@@ -381,9 +376,7 @@ sub wq_workers_start {
 	my ($self, $ident, $nr_workers, $oldset, $fields, @cb_args) = @_;
 	($send_cmd && $recv_cmd) or return;
 	return if $self->{-wq_s1}; # idempotent
-	$self->{-wq_s1} = $self->{-wq_s2} = undef;
-	socketpair($self->{-wq_s1}, $self->{-wq_s2}, AF_UNIX, SOCK_SEQPACKET, 0)
-		or die "socketpair: $!";
+	socketpair($self->{-wq_s1}, $self->{-wq_s2},AF_UNIX, SOCK_SEQPACKET, 0);
 	$self->ipc_atfork_prepare;
 	$nr_workers //= $self->{-wq_nr_workers}; # was set earlier
 	my $sigset = $oldset // PublicInbox::DS::block_signals();

^ permalink raw reply related	[flat|nested] 4+ messages in thread

* [PATCH 3/4] import: use autodie, rely on PerlIO for retries
  2023-10-06 10:27 [PATCH 1/4] ipc: require fork+SOCK_SEQPACKET for wq_* functions Eric Wong
  2023-10-06 10:27 ` [PATCH 2/4] ipc: use autodie for most syscalls Eric Wong
@ 2023-10-06 10:27 ` Eric Wong
  2023-10-06 10:27 ` [PATCH 4/4] rename ProcessPipe to ProcessIO Eric Wong
  2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2023-10-06 10:27 UTC (permalink / raw)
  To: spew

As documented in perlipc(1), the default :perlio layer retries
the `read' perlop on EINTR.  The :perlio layer also makes `read'
perform read-in-full behavior; so there's no need to loop
ourselves.  Our responsibility is now only to detect short reads
in case fast-import is killed mid-stream.
---
 lib/PublicInbox/Import.pm | 45 ++++++++++++++-------------------------
 1 file changed, 16 insertions(+), 29 deletions(-)

diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index 59462e9a..7175884c 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -6,9 +6,8 @@
 # and public-inbox-watch. Not the WWW or NNTP code which only
 # requires read-only access.
 package PublicInbox::Import;
-use strict;
+use v5.12;
 use parent qw(PublicInbox::Lock);
-use v5.10.1;
 use PublicInbox::Spawn qw(run_die popen_rd);
 use PublicInbox::MID qw(mids mid2path);
 use PublicInbox::Address;
@@ -18,13 +17,15 @@ use PublicInbox::ContentHash qw(content_digest);
 use PublicInbox::MDA;
 use PublicInbox::Eml;
 use POSIX qw(strftime);
+use autodie qw(read close);
+use Carp qw(croak);
 
 sub default_branch () {
 	state $default_branch = do {
 		my $r = popen_rd([qw(git config --global init.defaultBranch)],
 				 { GIT_CONFIG => undef });
 		chomp(my $h = <$r> // '');
-		close $r;
+		CORE::close $r;
 		$h eq '' ? 'refs/heads/master' : "refs/heads/$h";
 	}
 }
@@ -113,20 +114,10 @@ sub _cat_blob ($$$) {
 	local $/ = "\n";
 	my $info = <$r> // die "EOF from fast-import / cat-blob: $!";
 	$info =~ /\A[a-f0-9]{40,} blob ([0-9]+)\n\z/ or return;
-	my $left = $1;
-	my $offset = 0;
-	my $buf = '';
-	my $n;
-	while ($left > 0) {
-		$n = read($r, $buf, $left, $offset) //
-			die "read cat-blob failed: $!";
-		$n == 0 and die 'fast-export (cat-blob) died';
-		$left -= $n;
-		$offset += $n;
-	}
-	$n = read($r, my $lf, 1) //
-		die "read final byte of cat-blob failed: $!";
-	die "bad read on final byte: <$lf>" if $lf ne "\n";
+	my $n = read($r, my $buf, my $len = $1 + 1);
+	$n == $len or croak "cat-blob: short read: $n < $len";
+	my $lf = chop $buf;
+	croak "bad read on final byte: <$lf>" if $lf ne "\n";
 
 	# fixup some bugginess in old versions:
 	$buf =~ s/\A[\r\n]*From [^\r\n]*\r?\n//s;
@@ -479,9 +470,9 @@ EOM
 	while (my ($fn, $contents) = splice(@fn_contents, 0, 2)) {
 		my $f = $dir.'/'.$fn;
 		next if -f $f;
-		open my $fh, '>', $f or die "open $f: $!";
-		print $fh $contents or die "print $f: $!";
-		close $fh or die "close $f: $!";
+		open my $fh, '>', $f;
+		print $fh $contents;
+		close $fh;
 	}
 }
 
@@ -494,7 +485,7 @@ sub done {
 	eval {
 		my $r = delete $self->{in} or die 'BUG: missing {in} when done';
 		print $w "done\n" or wfail;
-		close $r or die "fast-import failed: $?"; # ProcessPipe::CLOSE
+		close $r;
 	};
 	my $wait_err = $@;
 	my $nchg = delete $self->{nchg};
@@ -509,10 +500,7 @@ sub done {
 
 sub atfork_child {
 	my ($self) = @_;
-	foreach my $f (qw(in out)) {
-		next unless defined($self->{$f});
-		close $self->{$f} or die "failed to close import[$f]: $!\n";
-	}
+	close($_) for (grep defined, delete(@$self{qw(in out)}));
 }
 
 sub digest2mid ($$;$) {
@@ -583,10 +571,9 @@ sub replace_oids {
 			push @buf, "commit $tmp\n";
 		} elsif (/^data ([0-9]+)/) {
 			# only commit message, so $len is small:
-			my $len = $1; # + 1 for trailing "\n"
 			push @buf, $_;
-			my $n = read($rd, my $buf, $len) or die "read: $!";
-			$len == $n or die "short read ($n < $len)";
+			my $n = read($rd, my $buf, my $len = $1);
+			$len == $n or croak "short read ($n < $len)";
 			push @buf, $buf;
 		} elsif (/^M 100644 ([a-f0-9]+) (\w+)/) {
 			my ($oid, $path) = ($1, $2);
@@ -625,7 +612,7 @@ sub replace_oids {
 			push @buf, $_;
 		}
 	}
-	close $rd or die "close fast-export failed: $?";
+	close $rd;
 	if (@buf) {
 		print $w @buf or wfail;
 	}

^ permalink raw reply related	[flat|nested] 4+ messages in thread

* [PATCH 4/4] rename ProcessPipe to ProcessIO
  2023-10-06 10:27 [PATCH 1/4] ipc: require fork+SOCK_SEQPACKET for wq_* functions Eric Wong
  2023-10-06 10:27 ` [PATCH 2/4] ipc: use autodie for most syscalls Eric Wong
  2023-10-06 10:27 ` [PATCH 3/4] import: use autodie, rely on PerlIO for retries Eric Wong
@ 2023-10-06 10:27 ` Eric Wong
  2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2023-10-06 10:27 UTC (permalink / raw)
  To: spew

Since we deal with pipes (of either direction) and bidirectional
stream sockets for this class, it's better to remove the `Pipe'
from the name and replace it with `IO' to communicate that it
works for any form of IO::Handle-like object tied to a process.
---
 MANIFEST                                         | 2 +-
 lib/PublicInbox/Gcf2Client.pm                    | 4 ++--
 lib/PublicInbox/Git.pm                           | 4 ++--
 lib/PublicInbox/HTTPD/Async.pm                   | 2 +-
 lib/PublicInbox/LeiRediff.pm                     | 2 +-
 lib/PublicInbox/LeiToMail.pm                     | 4 ++--
 lib/PublicInbox/{ProcessPipe.pm => ProcessIO.pm} | 8 +++-----
 lib/PublicInbox/Qspawn.pm                        | 4 ++--
 lib/PublicInbox/Spamcheck/Spamc.pm               | 2 +-
 lib/PublicInbox/Spawn.pm                         | 6 +++---
 t/spawn.t                                        | 4 ++--
 11 files changed, 20 insertions(+), 22 deletions(-)
 rename lib/PublicInbox/{ProcessPipe.pm => ProcessIO.pm} (86%)

diff --git a/MANIFEST b/MANIFEST
index 4693cbe0..7b96f9f0 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -318,7 +318,7 @@ lib/PublicInbox/OverIdx.pm
 lib/PublicInbox/POP3.pm
 lib/PublicInbox/POP3D.pm
 lib/PublicInbox/PktOp.pm
-lib/PublicInbox/ProcessPipe.pm
+lib/PublicInbox/ProcessIO.pm
 lib/PublicInbox/Qspawn.pm
 lib/PublicInbox/Reply.pm
 lib/PublicInbox/RepoAtom.pm
diff --git a/lib/PublicInbox/Gcf2Client.pm b/lib/PublicInbox/Gcf2Client.pm
index 4a0348b4..f63a0335 100644
--- a/lib/PublicInbox/Gcf2Client.pm
+++ b/lib/PublicInbox/Gcf2Client.pm
@@ -10,7 +10,7 @@ use PublicInbox::Gcf2; # fails if Inline::C or libgit2-dev isn't available
 use PublicInbox::Spawn qw(spawn);
 use Socket qw(AF_UNIX SOCK_STREAM);
 use PublicInbox::Syscall qw(EPOLLIN);
-use PublicInbox::ProcessPipe;
+use PublicInbox::ProcessIO;
 use autodie qw(socketpair);
 
 # fields:
@@ -33,7 +33,7 @@ sub new  {
 	my $cmd = [$^X, $^W ? ('-w') : (),
 			qw[-MPublicInbox::Gcf2 -e PublicInbox::Gcf2::loop]];
 	my $pid = spawn($cmd, $env, $opt);
-	my $sock = PublicInbox::ProcessPipe->maybe_new($pid, $s1);
+	my $sock = PublicInbox::ProcessIO->maybe_new($pid, $s1);
 	$self->{inflight} = [];
 	$self->{epwatch} = \undef; # for Git->cleanup
 	$self->SUPER::new($sock, EPOLLIN);
diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm
index 0fd621e1..94d5dcee 100644
--- a/lib/PublicInbox/Git.pm
+++ b/lib/PublicInbox/Git.pm
@@ -165,7 +165,7 @@ sub _sock_cmd {
 						$self->fail("tmpfile($id): $!");
 	}
 	my $pid = spawn(\@cmd, undef, $opt);
-	$self->{sock} = PublicInbox::ProcessPipe->maybe_new($pid, $s1);
+	$self->{sock} = PublicInbox::ProcessIO->maybe_new($pid, $s1);
 }
 
 sub poll_in ($) { IO::Poll::_poll($RDTIMEO, fileno($_[0]), my $ev = POLLIN) }
@@ -626,7 +626,7 @@ sub cleanup_if_unlinked {
 	my $ret = 0;
 	for my $obj ($self, ($self->{ck} // ())) {
 		my $sock = $obj->{sock} // next;
-		my PublicInbox::ProcessPipe $pp = tied *$sock; # ProcessPipe
+		my PublicInbox::ProcessIO $pp = tied *$sock; # ProcessIO
 		my $pid = $pp->{pid} // next;
 		open my $fh, '<', "/proc/$pid/maps" or return cleanup($self, 1);
 		while (<$fh>) {
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index 7bbab1e1..b9d2159c 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -37,7 +37,7 @@ sub new {
 		arg => $arg, # arg for $cb
 		end_obj => $end_obj, # like END{}, can ->event_step
 	}, $class;
-	my $pp = tied *$io; # ProcessPipe
+	my $pp = tied *$io; # ProcessIO
 	$pp->{fh}->blocking(0) // die "$io->blocking(0): $!";
 	$self->SUPER::new($io, EPOLLIN);
 }
diff --git a/lib/PublicInbox/LeiRediff.pm b/lib/PublicInbox/LeiRediff.pm
index a886931c..b894342b 100644
--- a/lib/PublicInbox/LeiRediff.pm
+++ b/lib/PublicInbox/LeiRediff.pm
@@ -152,7 +152,7 @@ sub requote ($$) {
 	# $^X (perl) is overkill, but maybe there's a weird system w/o sed
 	my ($w, $pid) = popen_wr([$^X, '-pe', "s/^/$pfx/"], $lei->{env}, $opt);
 	$w->autoflush(1);
-	binmode $w, ':utf8'; # incompatible with ProcessPipe due to syswrite
+	binmode $w, ':utf8'; # incompatible with ProcessIO due to syswrite
 	$lei->{1} = $w;
 	PublicInbox::OnDestroy->new(\&wait_requote, $lei, $pid, $old_1);
 }
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index f239da82..f56ad330 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -7,7 +7,7 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::IPC);
 use PublicInbox::Eml;
-use PublicInbox::ProcessPipe;
+use PublicInbox::ProcessIO;
 use PublicInbox::Spawn qw(spawn);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
@@ -162,7 +162,7 @@ sub _post_augment_mbox { # open a compressor process from top-level lei-daemon
 	my ($r, $w) = @{delete $lei->{zpipe}};
 	my $rdr = { 0 => $r, 1 => $lei->{1}, 2 => $lei->{2}, pgid => 0 };
 	my $pid = spawn($cmd, undef, $rdr);
-	$lei->{1} = PublicInbox::ProcessPipe->maybe_new($pid, $w, {
+	$lei->{1} = PublicInbox::ProcessIO->maybe_new($pid, $w, {
 			cb_arg => [\&reap_compress, $lei, $cmd, $lei->{1} ] });
 }
 
diff --git a/lib/PublicInbox/ProcessPipe.pm b/lib/PublicInbox/ProcessIO.pm
similarity index 86%
rename from lib/PublicInbox/ProcessPipe.pm
rename to lib/PublicInbox/ProcessIO.pm
index ba2c1ecb..eeb66139 100644
--- a/lib/PublicInbox/ProcessPipe.pm
+++ b/lib/PublicInbox/ProcessIO.pm
@@ -1,11 +1,9 @@
 # Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
-# a tied handle for auto reaping of children tied to a read-only pipe, see perltie(1)
-# DO NOT use this as-is for bidirectional pipes/sockets (e.g. in PublicInbox::Git),
-# both ends of the pipe must be at the same level of the Perl object hierarchy
-# to ensure orderly destruction.
-package PublicInbox::ProcessPipe;
+# a tied handle for auto reaping of children tied to a pipe or socket,
+# see perltie(1) for details.
+package PublicInbox::ProcessIO;
 use v5.12;
 use PublicInbox::DS qw(awaitpid);
 use Symbol qw(gensym);
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index 5e4fd5cb..ea7ae647 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -124,7 +124,7 @@ sub finish ($;$) {
 
 	# we can safely finalize if pipe was closed before, or if
 	# {_err} is defined by waitpid_err.  Deleting {rpipe} will
-	# trigger PublicInbox::ProcessPipe::DESTROY -> waitpid_err,
+	# trigger PublicInbox::ProcessIO::DESTROY -> waitpid_err,
 	# but it may not fire right away if inside the event loop.
 	my $closed_before = !delete($self->{rpipe});
 	finalize($self) if $closed_before || defined($self->{_err});
@@ -251,7 +251,7 @@ sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
 	if (ref($r) ne 'ARRAY' || scalar(@$r) == 3) { # error
 		if ($async) { # calls rpipe->close && ->event_step
 			$async->close; # PublicInbox::HTTPD::Async::close
-		} else { # generic PSGI, use PublicInbox::ProcessPipe::CLOSE
+		} else { # generic PSGI, use PublicInbox::ProcessIO::CLOSE
 			delete($self->{rpipe})->close;
 			event_step($self);
 		}
diff --git a/lib/PublicInbox/Spamcheck/Spamc.pm b/lib/PublicInbox/Spamcheck/Spamc.pm
index 726866c8..cba33a66 100644
--- a/lib/PublicInbox/Spamcheck/Spamc.pm
+++ b/lib/PublicInbox/Spamcheck/Spamc.pm
@@ -27,7 +27,7 @@ sub spamcheck {
 		$out = \$buf;
 	}
 	$$out = do { local $/; <$fh> };
-	close $fh; # PublicInbox::ProcessPipe::CLOSE
+	close $fh; # PublicInbox::ProcessIO::CLOSE
 	($? || $$out eq '') ? 0 : 1;
 }
 
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 4c7e0f80..cb8b21c6 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -21,7 +21,7 @@ use PublicInbox::Lock;
 use Fcntl qw(SEEK_SET);
 use IO::Handle ();
 use Carp qw(croak);
-use PublicInbox::ProcessPipe;
+use PublicInbox::ProcessIO;
 our @EXPORT_OK = qw(which spawn popen_rd popen_wr run_die run_wait);
 our @RLIMITS = qw(RLIMIT_CPU RLIMIT_CORE RLIMIT_DATA);
 
@@ -368,13 +368,13 @@ sub spawn ($;$$) {
 sub popen_rd {
 	my ($cmd, $env, $opt) = @_;
 	pipe(my $r, local $opt->{1}) or die "pipe: $!\n";
-	PublicInbox::ProcessPipe->maybe_new(spawn($cmd, $env, $opt), $r, $opt)
+	PublicInbox::ProcessIO->maybe_new(spawn($cmd, $env, $opt), $r, $opt)
 }
 
 sub popen_wr {
 	my ($cmd, $env, $opt) = @_;
 	pipe(local $opt->{0}, my $w) or die "pipe: $!\n";
-	PublicInbox::ProcessPipe->maybe_new(spawn($cmd, $env, $opt), $w, $opt)
+	PublicInbox::ProcessIO->maybe_new(spawn($cmd, $env, $opt), $w, $opt)
 }
 
 sub run_wait ($;$$) {
diff --git a/t/spawn.t b/t/spawn.t
index 04589437..be5aaf9f 100644
--- a/t/spawn.t
+++ b/t/spawn.t
@@ -149,8 +149,8 @@ EOF
 	$fh = popen_rd(['true'], undef, { cb_arg => [sub { @c = caller }] });
 	undef $fh; # ->DESTROY
 	ok(scalar(@c), 'callback fired by ->DESTROY');
-	ok(grep(!m[/PublicInbox/ProcessPipe\.pm\z], @c),
-		'callback not invoked by ProcessPipe');
+	ok(grep(!m[/PublicInbox/ProcessIO\.pm\z], @c),
+		'callback not invoked by ProcessIO');
 }
 
 { # children don't wait on siblings

^ permalink raw reply related	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2023-10-06 10:27 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-10-06 10:27 [PATCH 1/4] ipc: require fork+SOCK_SEQPACKET for wq_* functions Eric Wong
2023-10-06 10:27 ` [PATCH 2/4] ipc: use autodie for most syscalls Eric Wong
2023-10-06 10:27 ` [PATCH 3/4] import: use autodie, rely on PerlIO for retries Eric Wong
2023-10-06 10:27 ` [PATCH 4/4] rename ProcessPipe to ProcessIO Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).