From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id AF5511F454 for ; Fri, 6 Oct 2023 09:07:26 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1696583246; bh=dE+zsMSb6sEYbyZGKV51Xot6UNZTuDSOjx85ieYriPQ=; h=From:To:Subject:Date:From; b=h6KSEFMAIxGZAA1sy7/beefVzx4G4SVD3kFCWfufVGQlJ88Y571oZKcYFkp64iqcj fTuV92hd+oG32cDGEtjDJLYmbQvD+Fmiq3q9u3bJ5OAnAeBXD3XKLjle27eJnYCy3r T8gUpT/XoQXlg78dDSugQQtSu7iuuMcjJA4ZCtsw= From: Eric Wong To: spew@80x24.org Subject: [PATCH 1/6] ipc: require fork+SOCK_SEQPACKET for wq_* functions Date: Fri, 6 Oct 2023 09:07:21 +0000 Message-ID: <20231006090726.3936839-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: None of the lei internals works properly without forking and sockets and it increases the potential to accidentally call subs in the wrong process during the teardown phase. We'll still support ipc_do w/o forking for now since it doesn't benefit small indexing runs. --- lib/PublicInbox/IPC.pm | 44 ++++++++++++++++-------------------------- t/ipc.t | 19 ++++++++---------- 2 files changed, 25 insertions(+), 38 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 9b4b1508..5c60a3e1 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -274,16 +274,12 @@ sub do_sock_stream { # via wq_io_do, for big requests sub wq_broadcast { my ($self, $sub, @args) = @_; - if (my $wkr = $self->{-wq_workers}) { - my $buf = ipc_freeze([$sub, @args]); - for my $bcast1 (values %$wkr) { - my $sock = $bcast1 // $self->{-wq_s1} // next; - send($sock, $buf, 0) // croak "send: $!"; - # XXX shouldn't have to deal with EMSGSIZE here... - } - } else { - eval { $self->$sub(@args) }; - warn "wq_broadcast: $@" if $@; + my $wkr = $self->{-wq_workers} or Carp::confess('no -wq_workers'); + my $buf = ipc_freeze([$sub, @args]); + for my $bcast1 (values %$wkr) { + my $sock = $bcast1 // $self->{-wq_s1} // next; + send($sock, $buf, 0) // croak "send: $!"; + # XXX shouldn't have to deal with EMSGSIZE here... } } @@ -309,24 +305,18 @@ sub stream_in_full ($$$) { sub wq_io_do { # always async my ($self, $sub, $ios, @args) = @_; - if (my $s1 = $self->{-wq_s1}) { # run in worker - my $fds = [ map { fileno($_) } @$ios ]; - my $buf = ipc_freeze([$sub, @args]); - if (length($buf) > $MY_MAX_ARG_STRLEN) { - stream_in_full($s1, $fds, $buf); - } else { - my $n = send_cmd $s1, $fds, $buf, 0; - return if defined($n); # likely - $!{ETOOMANYREFS} and - croak "sendmsg: $! (check RLIMIT_NOFILE)"; - $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) : - croak("sendmsg: $!"); - } + my $s1 = $self->{-wq_s1} or Carp::confess('no -wq_s1'); + my $fds = [ map { fileno($_) } @$ios ]; + my $buf = ipc_freeze([$sub, @args]); + if (length($buf) > $MY_MAX_ARG_STRLEN) { + stream_in_full($s1, $fds, $buf); } else { - @$self{0..$#$ios} = @$ios; - eval { $self->$sub(@args) }; - warn "wq_io_do: $@" if $@; - delete @$self{0..$#$ios}; # don't close + my $n = send_cmd $s1, $fds, $buf, 0; + return if defined($n); # likely + $!{ETOOMANYREFS} and + croak "sendmsg: $! (check RLIMIT_NOFILE)"; + $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) : + croak("sendmsg: $!"); } } diff --git a/t/ipc.t b/t/ipc.t index 7bdf2218..519ef089 100644 --- a/t/ipc.t +++ b/t/ipc.t @@ -1,9 +1,7 @@ #!perl -w # Copyright (C) all contributors # License: AGPL-3.0+ -use strict; -use v5.10.1; -use Test::More; +use v5.12; use PublicInbox::TestCommon; use Fcntl qw(SEEK_SET); use PublicInbox::SHA qw(sha1_hex); @@ -108,7 +106,9 @@ open my $agpl, '<', 'COPYING' or BAIL_OUT "AGPL-3 missing: $!"; my $big = do { local $/; <$agpl> } // BAIL_OUT "read: $!"; close $agpl or BAIL_OUT "close: $!"; -for my $t ('local', 'worker', 'worker again') { +for my $t ('worker', 'worker again') { + my $ppid = $ipc->wq_workers_start('wq', 1); + push(@ppids, $ppid); $ipc->wq_io_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world'); my $i = 0; for my $fh ($ra, $rb, $rc) { @@ -132,14 +132,12 @@ for my $t ('local', 'worker', 'worker again') { $exp = sha1_hex($bigger)."\n"; is(readline($rb), $exp, "SHA WQWorker limit ($t)"); } - my $ppid = $ipc->wq_workers_start('wq', 1); - push(@ppids, $ppid); } # wq_io_do works across fork (siblings can feed) SKIP: { skip 'Socket::MsgHdr or Inline::C missing', 3 if !$ppids[0]; - is_deeply(\@ppids, [$$, undef, undef], + is_xdeeply(\@ppids, [$$, undef], 'parent pid returned in wq_workers_start'); my $pid = fork // BAIL_OUT $!; if ($pid == 0) { @@ -173,10 +171,9 @@ SKIP: { skip 'Socket::MsgHdr or Inline::C missing', 11 if !$ppids[0]; seek($warn, 0, SEEK_SET) or BAIL_OUT; my @warn = <$warn>; - is(scalar(@warn), 3, 'warned 3 times'); - like($warn[0], qr/ wq_io_do: /, '1st warned from wq_do'); - like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker'); - is($warn[2], $warn[1], 'worker did not die'); + is(scalar(@warn), 2, 'warned 3 times'); + like($warn[0], qr/ wq_worker: /, '2nd warned from wq_worker'); + is($warn[0], $warn[1], 'worker did not die'); $SIG{__WARN__} = 'DEFAULT'; is($ipc->wq_workers_start('wq', 2), $$, 'workers started again');