dumping ground for random patches and texts
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: spew@80x24.org
Subject: [PATCH 1/4] ipc: require fork+SOCK_SEQPACKET for wq_* functions
Date: Fri,  6 Oct 2023 10:27:28 +0000	[thread overview]
Message-ID: <20231006102731.4009551-1-e@80x24.org> (raw)

None of the lei internals works properly without forking and
sockets and it increases the potential to accidentally call subs
in the wrong process during the teardown phase.

We'll still support ipc_do w/o forking for now since it
doesn't benefit small indexing runs.
---
 lib/PublicInbox/IPC.pm | 43 ++++++++++++++++--------------------------
 t/ipc.t                | 19 ++++++++-----------
 2 files changed, 24 insertions(+), 38 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 839281b2..4309d672 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -256,16 +256,12 @@ sub do_sock_stream { # via wq_io_do, for big requests
 
 sub wq_broadcast {
 	my ($self, $sub, @args) = @_;
-	if (my $wkr = $self->{-wq_workers}) {
-		my $buf = ipc_freeze([$sub, @args]);
-		for my $bcast1 (values %$wkr) {
-			my $sock = $bcast1 // $self->{-wq_s1} // next;
-			send($sock, $buf, 0) // croak "send: $!";
-			# XXX shouldn't have to deal with EMSGSIZE here...
-		}
-	} else {
-		eval { $self->$sub(@args) };
-		warn "wq_broadcast: $@" if $@;
+	my $wkr = $self->{-wq_workers} or Carp::confess('no -wq_workers');
+	my $buf = ipc_freeze([$sub, @args]);
+	for my $bcast1 (values %$wkr) {
+		my $sock = $bcast1 // $self->{-wq_s1} // next;
+		send($sock, $buf, 0) // croak "send: $!";
+		# XXX shouldn't have to deal with EMSGSIZE here...
 	}
 }
 
@@ -291,24 +287,17 @@ sub stream_in_full ($$$) {
 
 sub wq_io_do { # always async
 	my ($self, $sub, $ios, @args) = @_;
-	if (my $s1 = $self->{-wq_s1}) { # run in worker
-		my $fds = [ map { fileno($_) } @$ios ];
-		my $buf = ipc_freeze([$sub, @args]);
-		if (length($buf) > $MY_MAX_ARG_STRLEN) {
-			stream_in_full($s1, $fds, $buf);
-		} else {
-			my $n = $send_cmd->($s1, $fds, $buf, 0);
-			return if defined($n); # likely
-			$!{ETOOMANYREFS} and
-				croak "sendmsg: $! (check RLIMIT_NOFILE)";
-			$!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
-				croak("sendmsg: $!");
-		}
+	my $s1 = $self->{-wq_s1} or Carp::confess('no -wq_s1');
+	my $fds = [ map { fileno($_) } @$ios ];
+	my $buf = ipc_freeze([$sub, @args]);
+	if (length($buf) > $MY_MAX_ARG_STRLEN) {
+		stream_in_full($s1, $fds, $buf);
 	} else {
-		@$self{0..$#$ios} = @$ios;
-		eval { $self->$sub(@args) };
-		warn "wq_io_do: $@" if $@;
-		delete @$self{0..$#$ios}; # don't close
+		my $n = $send_cmd->($s1, $fds, $buf, 0);
+		return if defined($n); # likely
+		$!{ETOOMANYREFS} and croak "sendmsg: $! (check RLIMIT_NOFILE)";
+		$!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
+			croak("sendmsg: $!");
 	}
 }
 
diff --git a/t/ipc.t b/t/ipc.t
index 7bdf2218..519ef089 100644
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -1,9 +1,7 @@
 #!perl -w
 # Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-use strict;
-use v5.10.1;
-use Test::More;
+use v5.12;
 use PublicInbox::TestCommon;
 use Fcntl qw(SEEK_SET);
 use PublicInbox::SHA qw(sha1_hex);
@@ -108,7 +106,9 @@ open my $agpl, '<', 'COPYING' or BAIL_OUT "AGPL-3 missing: $!";
 my $big = do { local $/; <$agpl> } // BAIL_OUT "read: $!";
 close $agpl or BAIL_OUT "close: $!";
 
-for my $t ('local', 'worker', 'worker again') {
+for my $t ('worker', 'worker again') {
+	my $ppid = $ipc->wq_workers_start('wq', 1);
+	push(@ppids, $ppid);
 	$ipc->wq_io_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world');
 	my $i = 0;
 	for my $fh ($ra, $rb, $rc) {
@@ -132,14 +132,12 @@ for my $t ('local', 'worker', 'worker again') {
 		$exp = sha1_hex($bigger)."\n";
 		is(readline($rb), $exp, "SHA WQWorker limit ($t)");
 	}
-	my $ppid = $ipc->wq_workers_start('wq', 1);
-	push(@ppids, $ppid);
 }
 
 # wq_io_do works across fork (siblings can feed)
 SKIP: {
 	skip 'Socket::MsgHdr or Inline::C missing', 3 if !$ppids[0];
-	is_deeply(\@ppids, [$$, undef, undef],
+	is_xdeeply(\@ppids, [$$, undef],
 		'parent pid returned in wq_workers_start');
 	my $pid = fork // BAIL_OUT $!;
 	if ($pid == 0) {
@@ -173,10 +171,9 @@ SKIP: {
 	skip 'Socket::MsgHdr or Inline::C missing', 11 if !$ppids[0];
 	seek($warn, 0, SEEK_SET) or BAIL_OUT;
 	my @warn = <$warn>;
-	is(scalar(@warn), 3, 'warned 3 times');
-	like($warn[0], qr/ wq_io_do: /, '1st warned from wq_do');
-	like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker');
-	is($warn[2], $warn[1], 'worker did not die');
+	is(scalar(@warn), 2, 'warned 3 times');
+	like($warn[0], qr/ wq_worker: /, '2nd warned from wq_worker');
+	is($warn[0], $warn[1], 'worker did not die');
 
 	$SIG{__WARN__} = 'DEFAULT';
 	is($ipc->wq_workers_start('wq', 2), $$, 'workers started again');

             reply	other threads:[~2023-10-06 10:27 UTC|newest]

Thread overview: 4+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-10-06 10:27 Eric Wong [this message]
2023-10-06 10:27 ` [PATCH 2/4] ipc: use autodie for most syscalls Eric Wong
2023-10-06 10:27 ` [PATCH 3/4] import: use autodie, rely on PerlIO for retries Eric Wong
2023-10-06 10:27 ` [PATCH 4/4] rename ProcessPipe to ProcessIO Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20231006102731.4009551-1-e@80x24.org \
    --to=e@80x24.org \
    --cc=spew@80x24.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).