dumping ground for random patches and texts
 help / color / mirror / Atom feed
* [PATCH] ipc: lower-level send_cmd/recv_cmd handle EINTR directly
@ 2023-10-06  1:02 Eric Wong
  0 siblings, 0 replies; only message in thread
From: Eric Wong @ 2023-10-06  1:02 UTC (permalink / raw)
  To: spew

This ensures script/lei $send_cmd usage is EINTR-safe (since
I prefer to avoid loading PublicInbox::IPC for startup time).
Overall, it saves us some code, too.
---
 lib/PublicInbox/CmdIPC4.pm       | 24 +++++++++++++------
 lib/PublicInbox/IPC.pm           | 26 ++++----------------
 lib/PublicInbox/LEI.pm           |  6 ++---
 lib/PublicInbox/LeiSelfSocket.pm |  3 ++-
 lib/PublicInbox/Spawn.pm         | 41 ++++++++++++++++++--------------
 lib/PublicInbox/Syscall.pm       | 21 ++++++++--------
 lib/PublicInbox/XapClient.pm     |  2 +-
 lib/PublicInbox/XapHelper.pm     |  2 +-
 script/lei                       |  5 +---
 t/cmd_ipc.t                      | 12 ++++++----
 t/xap_helper.t                   |  4 ++--
 11 files changed, 72 insertions(+), 74 deletions(-)

diff --git a/lib/PublicInbox/CmdIPC4.pm b/lib/PublicInbox/CmdIPC4.pm
index 4bc4c729..2f102ec6 100644
--- a/lib/PublicInbox/CmdIPC4.pm
+++ b/lib/PublicInbox/CmdIPC4.pm
@@ -7,6 +7,16 @@
 package PublicInbox::CmdIPC4;
 use v5.12;
 use Socket qw(SOL_SOCKET SCM_RIGHTS);
+
+sub sendmsg_retry ($) {
+	return 1 if $!{EINTR};
+	return unless ($!{ENOMEM} || $!{ENOBUFS} || $!{ETOOMANYREFS});
+	return if ++$_[0] >= 50;
+	warn "# sleeping on sendmsg: $! (#$_[0])\n";
+	select(undef, undef, undef, 0.1);
+	1;
+}
+
 BEGIN { eval {
 require Socket::MsgHdr; # XS
 no warnings 'once';
@@ -20,21 +30,21 @@ no warnings 'once';
 	my $try = 0;
 	do {
 		$s = Socket::MsgHdr::sendmsg($sock, $mh, $flags);
-	} while (!defined($s) &&
-			($!{ENOBUFS} || $!{ENOMEM} || $!{ETOOMANYREFS}) &&
-			(++$try < 50) &&
-			warn "# sleeping on sendmsg: $! (#$try)\n" &&
-			select(undef, undef, undef, 0.1) == 0);
+	} while (!defined($s) && sendmsg_retry($try));
 	$s;
 };
 
 *recv_cmd4 = sub ($$$) {
 	my ($s, undef, $len) = @_; # $_[1] = destination buffer
 	my $mh = Socket::MsgHdr->new(buflen => $len, controllen => 256);
-	my $r = Socket::MsgHdr::recvmsg($s, $mh, 0) // do {
+	my $r;
+	do {
+		$r = Socket::MsgHdr::recvmsg($s, $mh, 0);
+	} while (!defined($r) && $!{EINTR});
+	if (!defined($r)) {
 		$_[1] = '';
 		return (undef);
-	};
+	}
 	$_[1] = $mh->buf;
 	return () if $r == 0;
 	my (undef, undef, $data) = $mh->cmsghdr;
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 9b4b1508..839281b2 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -204,27 +204,9 @@ sub ipc_sibling_atfork_child {
 	$pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
 }
 
-sub send_cmd ($$$$) {
-	my ($s, $fds, $buf, $fl) = @_;
-	while (1) {
-		my $n = $send_cmd->($s, $fds, $buf, $fl);
-		next if !defined($n) && $!{EINTR};
-		return $n;
-	}
-}
-
-sub recv_cmd ($$$) {
-	my ($s, undef, $len) = @_; # $_[1] is $buf
-	while (1) {
-		my @fds = $recv_cmd->($s, $_[1], $len);
-		next if scalar(@fds) == 1 && !defined($fds[0]) && $!{EINTR};
-		return @fds;
-	}
-}
-
 sub recv_and_run {
 	my ($self, $s2, $len, $full_stream) = @_;
-	my @fds = recv_cmd($s2, my $buf, $len // $MY_MAX_ARG_STRLEN);
+	my @fds = $recv_cmd->($s2, my $buf, $len // $MY_MAX_ARG_STRLEN);
 	return if scalar(@fds) && !defined($fds[0]);
 	my $n = length($buf) or return 0;
 	my $nfd = 0;
@@ -291,11 +273,11 @@ sub stream_in_full ($$$) {
 	my ($s1, $fds, $buf) = @_;
 	socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
 		croak "socketpair: $!";
-	my $n = send_cmd($s1, [ fileno($r) ],
+	my $n = $send_cmd->($s1, [ fileno($r) ],
 			ipc_freeze(['do_sock_stream', length($buf)]),
 			0) // croak "sendmsg: $!";
 	undef $r;
-	$n = send_cmd($w, $fds, $buf, 0) // croak "sendmsg: $!";
+	$n = $send_cmd->($w, $fds, $buf, 0) // croak "sendmsg: $!";
 	while ($n < length($buf)) {
 		my $x = syswrite($w, $buf, length($buf) - $n, $n);
 		if (!defined($n)) {
@@ -315,7 +297,7 @@ sub wq_io_do { # always async
 		if (length($buf) > $MY_MAX_ARG_STRLEN) {
 			stream_in_full($s1, $fds, $buf);
 		} else {
-			my $n = send_cmd $s1, $fds, $buf, 0;
+			my $n = $send_cmd->($s1, $fds, $buf, 0);
 			return if defined($n); # likely
 			$!{ETOOMANYREFS} and
 				croak "sendmsg: $! (check RLIMIT_NOFILE)";
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index e300f0a4..f8bcd43d 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1041,7 +1041,7 @@ sub start_mua {
 
 sub send_exec_cmd { # tell script/lei to execute a command
 	my ($self, $io, $cmd, $env) = @_;
-	PublicInbox::IPC::send_cmd(
+	$PublicInbox::IPC::send_cmd->(
 			$self->{sock} // die('lei client gone'),
 			[ map { fileno($_) } @$io ],
 			exec_buf($cmd, $env), 0) //
@@ -1139,7 +1139,7 @@ sub accept_dispatch { # Listener {post_accept} callback
 	select($rvec, undef, undef, 60) or
 		return send($sock, 'timed out waiting to recv FDs', 0);
 	# (4096 * 33) >MAX_ARG_STRLEN
-	my @fds = PublicInbox::IPC::recv_cmd($sock, my $buf, 4096 * 33) or
+	my @fds = $PublicInbox::IPC::recv_cmd->($sock, my $buf, 4096 * 33) or
 		return; # EOF
 	if (!defined($fds[0])) {
 		warn(my $msg = "recv_cmd failed: $!");
@@ -1178,7 +1178,7 @@ sub event_step {
 	local %ENV = %{$self->{env}};
 	local $current_lei = $self;
 	eval {
-		my @fds = PublicInbox::IPC::recv_cmd(
+		my @fds = $PublicInbox::IPC::recv_cmd->(
 			$self->{sock} // return, my $buf, 4096);
 		if (scalar(@fds) == 1 && !defined($fds[0])) {
 			return if $! == EAGAIN;
diff --git a/lib/PublicInbox/LeiSelfSocket.pm b/lib/PublicInbox/LeiSelfSocket.pm
index b8745252..0e15bc7c 100644
--- a/lib/PublicInbox/LeiSelfSocket.pm
+++ b/lib/PublicInbox/LeiSelfSocket.pm
@@ -21,7 +21,8 @@ sub new {
 
 sub event_step {
 	my ($self) = @_;
-	my @fds = PublicInbox::IPC::recv_cmd($self->{sock}, my $buf, 4096 * 33);
+	my ($buf, @fds);
+	@fds = $PublicInbox::IPC::recv_cmd->($self->{sock}, $buf, 4096 * 33);
 	if (scalar(@fds) == 1 && !defined($fds[0])) {
 		return if $!{EAGAIN};
 		die "recvmsg: $!" unless $!{ECONNRESET};
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index bb2abe28..4c7e0f80 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -173,19 +173,20 @@ int pi_fork_exec(SV *redirref, SV *file, SV *cmdref, SV *envref, SV *rlimref,
 	return (int)pid;
 }
 
-static int sleep_wait(unsigned *tries, int err)
+static int sendmsg_retry(unsigned *tries)
 {
 	const struct timespec req = { 0, 100000000 }; /* 100ms */
+	int err = errno;
 	switch (err) {
+	case EINTR: PERL_ASYNC_CHECK(); return 1;
 	case ENOBUFS: case ENOMEM: case ETOOMANYREFS:
-		if (++*tries < 50) {
-			fprintf(stderr, "# sleeping on sendmsg: %s (#%u)\n",
-				strerror(err), *tries);
-			nanosleep(&req, NULL);
-			return 1;
-		}
-	default:
-		return 0;
+		if (++*tries >= 50) return 0;
+		fprintf(stderr, "# sleeping on sendmsg: %s (#%u)\n",
+			strerror(err), *tries);
+		nanosleep(&req, NULL);
+		PERL_ASYNC_CHECK();
+		return 1;
+	default: return 0;
 	}
 }
 
@@ -237,7 +238,7 @@ SV *send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags)
 	}
 	do {
 		sent = sendmsg(PerlIO_fileno(s), &msg, flags);
-	} while (sent < 0 && sleep_wait(&tries, errno));
+	} while (sent < 0 && sendmsg_retry(&tries));
 	return sent >= 0 ? newSViv(sent) : &PL_sv_undef;
 }
 
@@ -259,20 +260,24 @@ void recv_cmd4(PerlIO *s, SV *buf, STRLEN n)
 	msg.msg_control = &cmsg.hdr;
 	msg.msg_controllen = CMSG_SPACE(SEND_FD_SPACE);
 
-	i = recvmsg(PerlIO_fileno(s), &msg, 0);
+	for (;;) {
+		i = recvmsg(PerlIO_fileno(s), &msg, 0);
+		if (i >= 0 || errno != EINTR) break;
+		PERL_ASYNC_CHECK();
+	}
 	if (i >= 0) {
 		SvCUR_set(buf, i);
+		if (cmsg.hdr.cmsg_level == SOL_SOCKET &&
+				cmsg.hdr.cmsg_type == SCM_RIGHTS) {
+			size_t len = cmsg.hdr.cmsg_len;
+			int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
+			for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++)
+				Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
+		}
 	} else {
 		Inline_Stack_Push(&PL_sv_undef);
 		SvCUR_set(buf, 0);
 	}
-	if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
-			cmsg.hdr.cmsg_type == SCM_RIGHTS) {
-		size_t len = cmsg.hdr.cmsg_len;
-		int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
-		for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++)
-			Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
-	}
 	Inline_Stack_Done;
 }
 #endif /* defined(CMSG_SPACE) && defined(CMSG_LEN) */
diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm
index 4cf45d0f..e83beb6a 100644
--- a/lib/PublicInbox/Syscall.pm
+++ b/lib/PublicInbox/Syscall.pm
@@ -394,6 +394,8 @@ use constant msg_controllen => CMSG_SPACE(10 * SIZEOF_int) + 16; # 10 FDs
 
 if (defined($SYS_sendmsg) && defined($SYS_recvmsg)) {
 no warnings 'once';
+require PublicInbox::CmdIPC4;
+
 *send_cmd4 = sub ($$$$) {
 	my ($sock, $fds, undef, $flags) = @_;
 	my $iov = pack('P'.TMPL_size_t,
@@ -418,16 +420,12 @@ no warnings 'once';
 			$cmsghdr, # msg_control
 			$msg_controllen,
 			0); # msg_flags
-	my $sent;
+	my $s;
 	my $try = 0;
 	do {
-		$sent = syscall($SYS_sendmsg, fileno($sock), $mh, $flags);
-	} while ($sent < 0 &&
-			($!{ENOBUFS} || $!{ENOMEM} || $!{ETOOMANYREFS}) &&
-			(++$try < 50) &&
-			warn "# sleeping on sendmsg: $! (#$try)\n" &&
-			select(undef, undef, undef, 0.1) == 0);
-	$sent >= 0 ? $sent : undef;
+		$s = syscall($SYS_sendmsg, fileno($sock), $mh, $flags);
+	} while ($s < 0 && PublicInbox::CmdIPC4::sendmsg_retry($try));
+	$s >= 0 ? $s : undef;
 };
 
 *recv_cmd4 = sub ($$$) {
@@ -446,8 +444,11 @@ no warnings 'once';
 			$cmsghdr, # msg_control
 			msg_controllen,
 			0); # msg_flags
-	my $r = syscall($SYS_recvmsg, fileno($sock), $mh, 0);
-	if ($r < 0) { # $! is set
+	my $r;
+	do {
+		$r = syscall($SYS_recvmsg, fileno($sock), $mh, 0);
+	} while ($r < 0 && $!{EINTR});
+	if ($r < 0) {
 		$_[1] = '';
 		return (undef);
 	}
diff --git a/lib/PublicInbox/XapClient.pm b/lib/PublicInbox/XapClient.pm
index f6c09c3b..9e2d71a0 100644
--- a/lib/PublicInbox/XapClient.pm
+++ b/lib/PublicInbox/XapClient.pm
@@ -21,7 +21,7 @@ sub mkreq {
 	}
 	my @fds = map fileno($_), @$ios;
 	my $buf = join("\0", @arg, '');
-	$n = PublicInbox::IPC::send_cmd($self->{io}, \@fds, $buf, 0) //
+	$n = $PublicInbox::IPC::send_cmd->($self->{io}, \@fds, $buf, 0) //
 		die "send_cmd: $!";
 	$n == length($buf) or die "send_cmd: $n != ".length($buf);
 	$r;
diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm
index c98708e3..ae907766 100644
--- a/lib/PublicInbox/XapHelper.pm
+++ b/lib/PublicInbox/XapHelper.pm
@@ -177,7 +177,7 @@ sub recv_loop {
 	my $in = \*STDIN;
 	while (!defined($parent_pid) || getppid == $parent_pid) {
 		PublicInbox::DS::sig_setmask($workerset);
-		my @fds = PublicInbox::IPC::recv_cmd($in, $rbuf, 4096*33);
+		my @fds = $PublicInbox::IPC::recv_cmd->($in, $rbuf, 4096*33);
 		scalar(@fds) or exit(66); # EX_NOINPUT
 		die "recvmsg: $!" if !defined($fds[0]);
 		PublicInbox::DS::block_signals();
diff --git a/script/lei b/script/lei
index 1d90be0a..087afc33 100755
--- a/script/lei
+++ b/script/lei
@@ -116,10 +116,7 @@ $SIG{CONT} = sub { send($sock, 'CONT', 0) };
 my $x_it_code = 0;
 while (1) {
 	my (@fds) = $recv_cmd->($sock, my $buf, 4096 * 33);
-	if (scalar(@fds) == 1 && !defined($fds[0])) {
-		next if $!{EINTR};
-		die "recvmsg: $!";
-	}
+	die "recvmsg: $!" if scalar(@fds) == 1 && !defined($fds[0]);
 	last if $buf eq '';
 	if ($buf =~ /\Aexec (.+)\z/) {
 		$exec_cmd->(\@fds, split(/\0/, $1));
diff --git a/t/cmd_ipc.t b/t/cmd_ipc.t
index e5d22aab..ccf4ca31 100644
--- a/t/cmd_ipc.t
+++ b/t/cmd_ipc.t
@@ -59,18 +59,20 @@ my $do_test = sub { SKIP: {
 			if ($pid == 0) {
 				# need to loop since Perl signals are racy
 				# (the interpreter doesn't self-pipe)
-				CORE::kill('ALRM', $tgt) while (tick(0.05));
+				my $n = 3;
+				while (tick(0.01 * $n) && --$n) {
+					kill('ALRM', $tgt)
+				}
+				close $s1;
 				POSIX::_exit(1);
 			}
+			close $s1;
 			@fds = $recv->($s2, $buf, length($src) + 1);
-			ok($!{EINTR}, "EINTR set by ($desc)");
-			kill('KILL', $pid);
 			waitpid($pid, 0);
-			is_deeply(\@fds, [ undef ], "EINTR $desc");
+			is_deeply(\@fds, [], "EINTR->EOF $desc");
 			ok($alrm, 'SIGALRM hit');
 		}
 
-		close $s1;
 		@fds = $recv->($s2, $buf, length($src) + 1);
 		is_deeply(\@fds, [], "no FDs on EOF $desc");
 		is($buf, '', "buffer cleared on EOF ($desc)");
diff --git a/t/xap_helper.t b/t/xap_helper.t
index 2303301d..27742cad 100644
--- a/t/xap_helper.t
+++ b/t/xap_helper.t
@@ -52,8 +52,8 @@ my $doreq = sub {
 	my $buf = join("\0", @arg, '');
 	my @fds = fileno($y);
 	push @fds, fileno($err) if $err;
-	my $n = PublicInbox::IPC::send_cmd($s, \@fds, $buf, 0);
-	$n // xbail "send: $!";
+	my $n = $PublicInbox::IPC::send_cmd->($s, \@fds, $buf, 0) //
+		xbail "send: $!";
 	my $exp = length($buf);
 	$exp == $n or xbail "req @arg sent short ($n != $exp)";
 	$x;

^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2023-10-06  1:02 UTC | newest]

Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-10-06  1:02 [PATCH] ipc: lower-level send_cmd/recv_cmd handle EINTR directly Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).