* [PATCH] ipc: lower-level send_cmd/recv_cmd handle EINTR directly
@ 2023-10-06 1:02 Eric Wong
0 siblings, 0 replies; only message in thread
From: Eric Wong @ 2023-10-06 1:02 UTC (permalink / raw)
To: spew
This ensures script/lei $send_cmd usage is EINTR-safe (since
I prefer to avoid loading PublicInbox::IPC for startup time).
Overall, it saves us some code, too.
---
lib/PublicInbox/CmdIPC4.pm | 24 +++++++++++++------
lib/PublicInbox/IPC.pm | 26 ++++----------------
lib/PublicInbox/LEI.pm | 6 ++---
lib/PublicInbox/LeiSelfSocket.pm | 3 ++-
lib/PublicInbox/Spawn.pm | 41 ++++++++++++++++++--------------
lib/PublicInbox/Syscall.pm | 21 ++++++++--------
lib/PublicInbox/XapClient.pm | 2 +-
lib/PublicInbox/XapHelper.pm | 2 +-
script/lei | 5 +---
t/cmd_ipc.t | 12 ++++++----
t/xap_helper.t | 4 ++--
11 files changed, 72 insertions(+), 74 deletions(-)
diff --git a/lib/PublicInbox/CmdIPC4.pm b/lib/PublicInbox/CmdIPC4.pm
index 4bc4c729..2f102ec6 100644
--- a/lib/PublicInbox/CmdIPC4.pm
+++ b/lib/PublicInbox/CmdIPC4.pm
@@ -7,6 +7,16 @@
package PublicInbox::CmdIPC4;
use v5.12;
use Socket qw(SOL_SOCKET SCM_RIGHTS);
+
+sub sendmsg_retry ($) {
+ return 1 if $!{EINTR};
+ return unless ($!{ENOMEM} || $!{ENOBUFS} || $!{ETOOMANYREFS});
+ return if ++$_[0] >= 50;
+ warn "# sleeping on sendmsg: $! (#$_[0])\n";
+ select(undef, undef, undef, 0.1);
+ 1;
+}
+
BEGIN { eval {
require Socket::MsgHdr; # XS
no warnings 'once';
@@ -20,21 +30,21 @@ no warnings 'once';
my $try = 0;
do {
$s = Socket::MsgHdr::sendmsg($sock, $mh, $flags);
- } while (!defined($s) &&
- ($!{ENOBUFS} || $!{ENOMEM} || $!{ETOOMANYREFS}) &&
- (++$try < 50) &&
- warn "# sleeping on sendmsg: $! (#$try)\n" &&
- select(undef, undef, undef, 0.1) == 0);
+ } while (!defined($s) && sendmsg_retry($try));
$s;
};
*recv_cmd4 = sub ($$$) {
my ($s, undef, $len) = @_; # $_[1] = destination buffer
my $mh = Socket::MsgHdr->new(buflen => $len, controllen => 256);
- my $r = Socket::MsgHdr::recvmsg($s, $mh, 0) // do {
+ my $r;
+ do {
+ $r = Socket::MsgHdr::recvmsg($s, $mh, 0);
+ } while (!defined($r) && $!{EINTR});
+ if (!defined($r)) {
$_[1] = '';
return (undef);
- };
+ }
$_[1] = $mh->buf;
return () if $r == 0;
my (undef, undef, $data) = $mh->cmsghdr;
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 9b4b1508..839281b2 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -204,27 +204,9 @@ sub ipc_sibling_atfork_child {
$pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
}
-sub send_cmd ($$$$) {
- my ($s, $fds, $buf, $fl) = @_;
- while (1) {
- my $n = $send_cmd->($s, $fds, $buf, $fl);
- next if !defined($n) && $!{EINTR};
- return $n;
- }
-}
-
-sub recv_cmd ($$$) {
- my ($s, undef, $len) = @_; # $_[1] is $buf
- while (1) {
- my @fds = $recv_cmd->($s, $_[1], $len);
- next if scalar(@fds) == 1 && !defined($fds[0]) && $!{EINTR};
- return @fds;
- }
-}
-
sub recv_and_run {
my ($self, $s2, $len, $full_stream) = @_;
- my @fds = recv_cmd($s2, my $buf, $len // $MY_MAX_ARG_STRLEN);
+ my @fds = $recv_cmd->($s2, my $buf, $len // $MY_MAX_ARG_STRLEN);
return if scalar(@fds) && !defined($fds[0]);
my $n = length($buf) or return 0;
my $nfd = 0;
@@ -291,11 +273,11 @@ sub stream_in_full ($$$) {
my ($s1, $fds, $buf) = @_;
socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
croak "socketpair: $!";
- my $n = send_cmd($s1, [ fileno($r) ],
+ my $n = $send_cmd->($s1, [ fileno($r) ],
ipc_freeze(['do_sock_stream', length($buf)]),
0) // croak "sendmsg: $!";
undef $r;
- $n = send_cmd($w, $fds, $buf, 0) // croak "sendmsg: $!";
+ $n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!";
while ($n < length($buf)) {
my $x = syswrite($w, $buf, length($buf) - $n, $n);
if (!defined($n)) {
@@ -315,7 +297,7 @@ sub wq_io_do { # always async
if (length($buf) > $MY_MAX_ARG_STRLEN) {
stream_in_full($s1, $fds, $buf);
} else {
- my $n = send_cmd $s1, $fds, $buf, 0;
+ my $n = $send_cmd->($s1, $fds, $buf, 0);
return if defined($n); # likely
$!{ETOOMANYREFS} and
croak "sendmsg: $! (check RLIMIT_NOFILE)";
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index e300f0a4..f8bcd43d 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1041,7 +1041,7 @@ sub start_mua {
sub send_exec_cmd { # tell script/lei to execute a command
my ($self, $io, $cmd, $env) = @_;
- PublicInbox::IPC::send_cmd(
+ $PublicInbox::IPC::send_cmd->(
$self->{sock} // die('lei client gone'),
[ map { fileno($_) } @$io ],
exec_buf($cmd, $env), 0) //
@@ -1139,7 +1139,7 @@ sub accept_dispatch { # Listener {post_accept} callback
select($rvec, undef, undef, 60) or
return send($sock, 'timed out waiting to recv FDs', 0);
# (4096 * 33) >MAX_ARG_STRLEN
- my @fds = PublicInbox::IPC::recv_cmd($sock, my $buf, 4096 * 33) or
+ my @fds = $PublicInbox::IPC::recv_cmd->($sock, my $buf, 4096 * 33) or
return; # EOF
if (!defined($fds[0])) {
warn(my $msg = "recv_cmd failed: $!");
@@ -1178,7 +1178,7 @@ sub event_step {
local %ENV = %{$self->{env}};
local $current_lei = $self;
eval {
- my @fds = PublicInbox::IPC::recv_cmd(
+ my @fds = $PublicInbox::IPC::recv_cmd->(
$self->{sock} // return, my $buf, 4096);
if (scalar(@fds) == 1 && !defined($fds[0])) {
return if $! == EAGAIN;
diff --git a/lib/PublicInbox/LeiSelfSocket.pm b/lib/PublicInbox/LeiSelfSocket.pm
index b8745252..0e15bc7c 100644
--- a/lib/PublicInbox/LeiSelfSocket.pm
+++ b/lib/PublicInbox/LeiSelfSocket.pm
@@ -21,7 +21,8 @@ sub new {
sub event_step {
my ($self) = @_;
- my @fds = PublicInbox::IPC::recv_cmd($self->{sock}, my $buf, 4096 * 33);
+ my ($buf, @fds);
+ @fds = $PublicInbox::IPC::recv_cmd->($self->{sock}, $buf, 4096 * 33);
if (scalar(@fds) == 1 && !defined($fds[0])) {
return if $!{EAGAIN};
die "recvmsg: $!" unless $!{ECONNRESET};
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index bb2abe28..4c7e0f80 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -173,19 +173,20 @@ int pi_fork_exec(SV *redirref, SV *file, SV *cmdref, SV *envref, SV *rlimref,
return (int)pid;
}
-static int sleep_wait(unsigned *tries, int err)
+static int sendmsg_retry(unsigned *tries)
{
const struct timespec req = { 0, 100000000 }; /* 100ms */
+ int err = errno;
switch (err) {
+ case EINTR: PERL_ASYNC_CHECK(); return 1;
case ENOBUFS: case ENOMEM: case ETOOMANYREFS:
- if (++*tries < 50) {
- fprintf(stderr, "# sleeping on sendmsg: %s (#%u)\n",
- strerror(err), *tries);
- nanosleep(&req, NULL);
- return 1;
- }
- default:
- return 0;
+ if (++*tries >= 50) return 0;
+ fprintf(stderr, "# sleeping on sendmsg: %s (#%u)\n",
+ strerror(err), *tries);
+ nanosleep(&req, NULL);
+ PERL_ASYNC_CHECK();
+ return 1;
+ default: return 0;
}
}
@@ -237,7 +238,7 @@ SV *send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags)
}
do {
sent = sendmsg(PerlIO_fileno(s), &msg, flags);
- } while (sent < 0 && sleep_wait(&tries, errno));
+ } while (sent < 0 && sendmsg_retry(&tries));
return sent >= 0 ? newSViv(sent) : &PL_sv_undef;
}
@@ -259,20 +260,24 @@ void recv_cmd4(PerlIO *s, SV *buf, STRLEN n)
msg.msg_control = &cmsg.hdr;
msg.msg_controllen = CMSG_SPACE(SEND_FD_SPACE);
- i = recvmsg(PerlIO_fileno(s), &msg, 0);
+ for (;;) {
+ i = recvmsg(PerlIO_fileno(s), &msg, 0);
+ if (i >= 0 || errno != EINTR) break;
+ PERL_ASYNC_CHECK();
+ }
if (i >= 0) {
SvCUR_set(buf, i);
+ if (cmsg.hdr.cmsg_level == SOL_SOCKET &&
+ cmsg.hdr.cmsg_type == SCM_RIGHTS) {
+ size_t len = cmsg.hdr.cmsg_len;
+ int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
+ for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++)
+ Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
+ }
} else {
Inline_Stack_Push(&PL_sv_undef);
SvCUR_set(buf, 0);
}
- if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
- cmsg.hdr.cmsg_type == SCM_RIGHTS) {
- size_t len = cmsg.hdr.cmsg_len;
- int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
- for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++)
- Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
- }
Inline_Stack_Done;
}
#endif /* defined(CMSG_SPACE) && defined(CMSG_LEN) */
diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm
index 4cf45d0f..e83beb6a 100644
--- a/lib/PublicInbox/Syscall.pm
+++ b/lib/PublicInbox/Syscall.pm
@@ -394,6 +394,8 @@ use constant msg_controllen => CMSG_SPACE(10 * SIZEOF_int) + 16; # 10 FDs
if (defined($SYS_sendmsg) && defined($SYS_recvmsg)) {
no warnings 'once';
+require PublicInbox::CmdIPC4;
+
*send_cmd4 = sub ($$$$) {
my ($sock, $fds, undef, $flags) = @_;
my $iov = pack('P'.TMPL_size_t,
@@ -418,16 +420,12 @@ no warnings 'once';
$cmsghdr, # msg_control
$msg_controllen,
0); # msg_flags
- my $sent;
+ my $s;
my $try = 0;
do {
- $sent = syscall($SYS_sendmsg, fileno($sock), $mh, $flags);
- } while ($sent < 0 &&
- ($!{ENOBUFS} || $!{ENOMEM} || $!{ETOOMANYREFS}) &&
- (++$try < 50) &&
- warn "# sleeping on sendmsg: $! (#$try)\n" &&
- select(undef, undef, undef, 0.1) == 0);
- $sent >= 0 ? $sent : undef;
+ $s = syscall($SYS_sendmsg, fileno($sock), $mh, $flags);
+ } while ($s < 0 && PublicInbox::CmdIPC4::sendmsg_retry($try));
+ $s >= 0 ? $s : undef;
};
*recv_cmd4 = sub ($$$) {
@@ -446,8 +444,11 @@ no warnings 'once';
$cmsghdr, # msg_control
msg_controllen,
0); # msg_flags
- my $r = syscall($SYS_recvmsg, fileno($sock), $mh, 0);
- if ($r < 0) { # $! is set
+ my $r;
+ do {
+ $r = syscall($SYS_recvmsg, fileno($sock), $mh, 0);
+ } while ($r < 0 && $!{EINTR});
+ if ($r < 0) {
$_[1] = '';
return (undef);
}
diff --git a/lib/PublicInbox/XapClient.pm b/lib/PublicInbox/XapClient.pm
index f6c09c3b..9e2d71a0 100644
--- a/lib/PublicInbox/XapClient.pm
+++ b/lib/PublicInbox/XapClient.pm
@@ -21,7 +21,7 @@ sub mkreq {
}
my @fds = map fileno($_), @$ios;
my $buf = join("\0", @arg, '');
- $n = PublicInbox::IPC::send_cmd($self->{io}, \@fds, $buf, 0) //
+ $n = $PublicInbox::IPC::send_cmd->($self->{io}, \@fds, $buf, 0) //
die "send_cmd: $!";
$n == length($buf) or die "send_cmd: $n != ".length($buf);
$r;
diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm
index c98708e3..ae907766 100644
--- a/lib/PublicInbox/XapHelper.pm
+++ b/lib/PublicInbox/XapHelper.pm
@@ -177,7 +177,7 @@ sub recv_loop {
my $in = \*STDIN;
while (!defined($parent_pid) || getppid == $parent_pid) {
PublicInbox::DS::sig_setmask($workerset);
- my @fds = PublicInbox::IPC::recv_cmd($in, $rbuf, 4096*33);
+ my @fds = $PublicInbox::IPC::recv_cmd->($in, $rbuf, 4096*33);
scalar(@fds) or exit(66); # EX_NOINPUT
die "recvmsg: $!" if !defined($fds[0]);
PublicInbox::DS::block_signals();
diff --git a/script/lei b/script/lei
index 1d90be0a..087afc33 100755
--- a/script/lei
+++ b/script/lei
@@ -116,10 +116,7 @@ $SIG{CONT} = sub { send($sock, 'CONT', 0) };
my $x_it_code = 0;
while (1) {
my (@fds) = $recv_cmd->($sock, my $buf, 4096 * 33);
- if (scalar(@fds) == 1 && !defined($fds[0])) {
- next if $!{EINTR};
- die "recvmsg: $!";
- }
+ die "recvmsg: $!" if scalar(@fds) == 1 && !defined($fds[0]);
last if $buf eq '';
if ($buf =~ /\Aexec (.+)\z/) {
$exec_cmd->(\@fds, split(/\0/, $1));
diff --git a/t/cmd_ipc.t b/t/cmd_ipc.t
index e5d22aab..ccf4ca31 100644
--- a/t/cmd_ipc.t
+++ b/t/cmd_ipc.t
@@ -59,18 +59,20 @@ my $do_test = sub { SKIP: {
if ($pid == 0) {
# need to loop since Perl signals are racy
# (the interpreter doesn't self-pipe)
- CORE::kill('ALRM', $tgt) while (tick(0.05));
+ my $n = 3;
+ while (tick(0.01 * $n) && --$n) {
+ kill('ALRM', $tgt)
+ }
+ close $s1;
POSIX::_exit(1);
}
+ close $s1;
@fds = $recv->($s2, $buf, length($src) + 1);
- ok($!{EINTR}, "EINTR set by ($desc)");
- kill('KILL', $pid);
waitpid($pid, 0);
- is_deeply(\@fds, [ undef ], "EINTR $desc");
+ is_deeply(\@fds, [], "EINTR->EOF $desc");
ok($alrm, 'SIGALRM hit');
}
- close $s1;
@fds = $recv->($s2, $buf, length($src) + 1);
is_deeply(\@fds, [], "no FDs on EOF $desc");
is($buf, '', "buffer cleared on EOF ($desc)");
diff --git a/t/xap_helper.t b/t/xap_helper.t
index 2303301d..27742cad 100644
--- a/t/xap_helper.t
+++ b/t/xap_helper.t
@@ -52,8 +52,8 @@ my $doreq = sub {
my $buf = join("\0", @arg, '');
my @fds = fileno($y);
push @fds, fileno($err) if $err;
- my $n = PublicInbox::IPC::send_cmd($s, \@fds, $buf, 0);
- $n // xbail "send: $!";
+ my $n = $PublicInbox::IPC::send_cmd->($s, \@fds, $buf, 0) //
+ xbail "send: $!";
my $exp = length($buf);
$exp == $n or xbail "req @arg sent short ($n != $exp)";
$x;
^ permalink raw reply related [flat|nested] only message in thread
only message in thread, other threads:[~2023-10-06 1:02 UTC | newest]
Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-10-06 1:02 [PATCH] ipc: lower-level send_cmd/recv_cmd handle EINTR directly Eric Wong
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).