From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id C8E481F406 for ; Mon, 28 Aug 2023 06:33:44 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1693204424; bh=pYKGw8ifYiC4Pehg6Eh5N8jfnwd+/cUZlwU1+oHmB84=; h=From:To:Subject:Date:From; b=youQdgLIp7V/iN6yoTB0p17Wvjws/hUJMmrbkBAhYtvOYMYlvSXSNKTX5O5HHVG2+ yOlDFMtl7YJEszmN8uTIGdBTFLRO5hFWJK+6i1kTezYeLr1tx1guxI/wgNZcuh5kUH ZLtNfKDd3PLnR16V4JgWSyV09vGRudTD2JZivlDY= From: Eric Wong To: spew@80x24.org Subject: [PATCH] treewide: drop MSG_EOR with AF_UNIX+SOCK_SEQPACKET Date: Mon, 28 Aug 2023 06:33:44 +0000 Message-Id: <20230828063344.131703-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: It's apparently not needed for AF_UNIX + SOCK_SEQPACKET as our receivers never check for MSG_EOR in "struct msghdr".msg_flags anyways. This should workaround truncation on OpenBSD recvmsg when MSG_EOR is used by the sender. --- lib/PublicInbox/CodeSearchIdx.pm | 8 ++++---- lib/PublicInbox/IPC.pm | 10 +++++----- lib/PublicInbox/LEI.pm | 28 ++++++++++++++-------------- lib/PublicInbox/PktOp.pm | 4 ++-- lib/PublicInbox/WQBlocked.pm | 3 +-- lib/PublicInbox/XapClient.pm | 4 ++-- script/lei | 10 +++++----- t/check-www-inbox.perl | 4 ++-- t/cmd_ipc.t | 8 ++++---- t/lei-daemon.t | 4 ++-- t/xap_helper.t | 4 ++-- 11 files changed, 43 insertions(+), 44 deletions(-) diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index f9251be5..2e46e30a 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -55,7 +55,7 @@ use PublicInbox::CidxLogP; use PublicInbox::CidxComm; use PublicInbox::Git qw(%OFMT2HEXLEN); use PublicInbox::Compat qw(uniqstr); -use Socket qw(MSG_EOR AF_UNIX SOCK_SEQPACKET); +use Socket qw(AF_UNIX SOCK_SEQPACKET); use Carp (); our ( $LIVE, # pid => cmd @@ -253,7 +253,7 @@ sub cidx_reap_log { # awaitpid cb my ($pid, $self, $op_p) = @_; if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT || ($? & 127) == POSIX::SIGPIPE))) { - send($op_p, "shard_done $self->{shard}", MSG_EOR); + send($op_p, "shard_done $self->{shard}", 0); } else { warn "E: git @LOG_STDIN: \$?=$?\n"; $self->{xdb}->cancel_transaction; @@ -501,7 +501,7 @@ sub shard_commit { # via wq_io_do my ($self) = @_; my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p'; $self->commit_txn_lazy; - send($op_p, "shard_done $self->{shard}", MSG_EOR); + send($op_p, "shard_done $self->{shard}", 0); } sub assoc_max_init ($) { @@ -782,7 +782,7 @@ sub prune_commit { # via wq_io_do in IDX_SHARDS my $prune_op_p = delete $self->{0} // die 'BUG: no {0} op_p'; my $nr = delete $self->{nr_prune} // die 'BUG: nr_prune undef'; cidx_ckpoint($self, "prune [$self->{shard}] $nr done") if $nr; - send($prune_op_p, "prune_done $self->{shard}", MSG_EOR); + send($prune_op_p, "prune_done $self->{shard}", 0); } sub shards_active { # post_loop_do diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 84765748..766c377f 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -16,7 +16,7 @@ use PublicInbox::DS qw(awaitpid); use PublicInbox::Spawn; use PublicInbox::OnDestroy; use PublicInbox::WQWorker; -use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM); +use Socket qw(AF_UNIX SOCK_STREAM); my $MY_MAX_ARG_STRLEN = 4096 * 33; # extra 4K for serialization my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough? our @EXPORT_OK = qw(ipc_freeze ipc_thaw nproc_shards); @@ -279,7 +279,7 @@ sub wq_broadcast { my $buf = ipc_freeze([$sub, @args]); for my $bcast1 (values %$wkr) { my $sock = $bcast1 // $self->{-wq_s1} // next; - send($sock, $buf, MSG_EOR) // croak "send: $!"; + send($sock, $buf, 0) // croak "send: $!"; # XXX shouldn't have to deal with EMSGSIZE here... } } else { @@ -294,7 +294,7 @@ sub stream_in_full ($$$) { croak "socketpair: $!"; my $n = send_cmd($s1, [ fileno($r) ], ipc_freeze(['do_sock_stream', length($buf)]), - MSG_EOR) // croak "sendmsg: $!"; + 0) // croak "sendmsg: $!"; undef $r; $n = send_cmd($w, $fds, $buf, 0) // croak "sendmsg: $!"; while ($n < length($buf)) { @@ -316,7 +316,7 @@ sub wq_io_do { # always async if (length($buf) > $MY_MAX_ARG_STRLEN) { stream_in_full($s1, $fds, $buf); } else { - my $n = send_cmd $s1, $fds, $buf, MSG_EOR; + my $n = send_cmd $s1, $fds, $buf, 0; return if defined($n); # likely $!{ETOOMANYREFS} and croak "sendmsg: $! (check RLIMIT_NOFILE)"; @@ -365,7 +365,7 @@ sub wq_nonblock_do { # always async if ($self->{wqb}) { # saturated once, assume saturated forever $self->{wqb}->flush_send($buf); } else { - $send_cmd->($self->{-wq_s1}, [], $buf, MSG_EOR) // + $send_cmd->($self->{-wq_s1}, [], $buf, 0) // ($!{EAGAIN} ? PublicInbox::WQBlocked->new($self, $buf) : croak("sendmsg: $!")); } diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 578686e2..5fbb1211 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -10,7 +10,7 @@ use v5.12; use parent qw(PublicInbox::DS PublicInbox::LeiExternal PublicInbox::LeiQuery); use Getopt::Long (); -use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un); +use Socket qw(AF_UNIX SOCK_SEQPACKET pack_sockaddr_un); use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET); use Cwd qw(getcwd); use POSIX qw(strftime); @@ -481,7 +481,7 @@ sub x_it ($$) { if ($self->{pkt_op_p}) { # worker => lei-daemon $self->{pkt_op_p}->pkt_do('x_it', $code); } elsif ($self->{sock}) { # lei->daemon => lei(1) client - send($self->{sock}, "x_it $code", MSG_EOR); + send($self->{sock}, "x_it $code", 0); } elsif ($quit == \&CORE::exit) { # an admin (one-shot) command exit($code >> 8); } # else ignore if client disconnected @@ -548,7 +548,7 @@ sub child_error { # passes non-fatal curl exit codes to user if ($self->{pkt_op_p}) { # to top lei-daemon $self->{pkt_op_p}->pkt_do('child_error', $child_error); } elsif ($self->{sock}) { # to lei(1) client - send($self->{sock}, "child_error $child_error", MSG_EOR); + send($self->{sock}, "child_error $child_error", 0); } # else noop if client disconnected } @@ -1017,7 +1017,7 @@ sub send_exec_cmd { # tell script/lei to execute a command PublicInbox::IPC::send_cmd( $self->{sock} // die('lei client gone'), [ map { fileno($_) } @$io ], - exec_buf($cmd, $env), MSG_EOR) // + exec_buf($cmd, $env), 0) // Carp::croak("sendmsg: $!"); } @@ -1028,7 +1028,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail while (my $op = shift(@$alerts)) { if ($op eq ':WINCH') { # hit the process group that started the MUA - send($sock, '-WINCH', MSG_EOR) if $sock; + send($sock, '-WINCH', 0) if $sock; } elsif ($op eq ':bell') { out($self, "\a"); } elsif ($op =~ /(?{2} } @msg; $self->{2}->autoflush(1); stop_pager($self); - send($self->{sock}, 'wait', MSG_EOR); # wait for user to quit pager + send($self->{sock}, 'wait', 0); # wait for user to quit pager } sub stop_pager { @@ -1110,25 +1110,25 @@ sub accept_dispatch { # Listener {post_accept} callback my $self = bless { sock => $sock }, __PACKAGE__; vec(my $rvec = '', fileno($sock), 1) = 1; select($rvec, undef, undef, 60) or - return send($sock, 'timed out waiting to recv FDs', MSG_EOR); + return send($sock, 'timed out waiting to recv FDs', 0); # (4096 * 33) >MAX_ARG_STRLEN my @fds = PublicInbox::IPC::recv_cmd($sock, my $buf, 4096 * 33) or return; # EOF if (!defined($fds[0])) { warn(my $msg = "recv_cmd failed: $!"); - return send($sock, $msg, MSG_EOR); + return send($sock, $msg, 0); } else { my $i = 0; for my $fd (@fds) { open($self->{$i++}, '+<&=', $fd) and next; - send($sock, "open(+<&=$fd) (FD=$i): $!", MSG_EOR); + send($sock, "open(+<&=$fd) (FD=$i): $!", 0); } - $i == 4 or return send($sock, 'not enough FDs='.($i-1), MSG_EOR) + $i == 4 or return send($sock, 'not enough FDs='.($i-1), 0) } # $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV); # $buf = "$argc\0".join("\0", @ARGV).$ENV_STR."\0\0"; substr($buf, -2, 2, '') eq "\0\0" or # s/\0\0\z// - return send($sock, 'request command truncated', MSG_EOR); + return send($sock, 'request command truncated', 0); my ($argc, @argv) = split(/\0/, $buf, -1); undef $buf; my %env = map { split(/=/, $_, 2) } splice(@argv, $argc); @@ -1174,7 +1174,7 @@ sub event_step { die "unrecognized client signal: $buf"; } my $s = $self->{-socks} // []; # lei up --all - @$s = grep { send($_, $buf, MSG_EOR) } @$s; + @$s = grep { send($_, $buf, 0) } @$s; }; if (my $err = $@) { eval { $self->fail($err) }; @@ -1534,7 +1534,7 @@ sub cfg_dump ($$) { sub request_umask { my ($lei) = @_; my $s = $lei->{sock} // return; - send($s, 'umask', MSG_EOR) // die "send: $!"; + send($s, 'umask', 0) // die "send: $!"; vec(my $rvec = '', fileno($s), 1) = 1; select($rvec, undef, undef, 2) or die 'timeout waiting for umask'; recv($s, my $v, 5, 0) // die "recv: $!"; diff --git a/lib/PublicInbox/PktOp.pm b/lib/PublicInbox/PktOp.pm index 4c434566..dc432307 100644 --- a/lib/PublicInbox/PktOp.pm +++ b/lib/PublicInbox/PktOp.pm @@ -11,7 +11,7 @@ use v5.10.1; use parent qw(PublicInbox::DS); use Errno qw(EAGAIN ECONNRESET); use PublicInbox::Syscall qw(EPOLLIN); -use Socket qw(AF_UNIX MSG_EOR SOCK_SEQPACKET); +use Socket qw(AF_UNIX SOCK_SEQPACKET); use PublicInbox::IPC qw(ipc_freeze ipc_thaw); use Scalar::Util qw(blessed); @@ -32,7 +32,7 @@ sub pair { sub pkt_do { # for the producer to trigger event_step in consumer my ($self, $cmd, @args) = @_; - send($self->{op_p}, @args ? "$cmd\0".ipc_freeze(\@args) : $cmd, MSG_EOR) + send($self->{op_p}, @args ? "$cmd\0".ipc_freeze(\@args) : $cmd, 0) } sub event_step { diff --git a/lib/PublicInbox/WQBlocked.pm b/lib/PublicInbox/WQBlocked.pm index fbb43600..8d931fa9 100644 --- a/lib/PublicInbox/WQBlocked.pm +++ b/lib/PublicInbox/WQBlocked.pm @@ -8,7 +8,6 @@ use parent qw(PublicInbox::DS); use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT); use PublicInbox::IPC; use Carp (); -use Socket qw(MSG_EOR); sub new { my ($cls, $wq, $buf) = @_; @@ -25,7 +24,7 @@ sub flush_send { } else { my $wq_s1 = $self->{sock}; my $n = $PublicInbox::IPC::send_cmd->($wq_s1, [], $buf, - MSG_EOR); + 0); next if defined($n); Carp::croak("sendmsg: $!") unless $!{EAGAIN}; PublicInbox::DS::epwait($wq_s1, EPOLLOUT|EPOLLONESHOT); diff --git a/lib/PublicInbox/XapClient.pm b/lib/PublicInbox/XapClient.pm index 56e3c3b4..f6c09c3b 100644 --- a/lib/PublicInbox/XapClient.pm +++ b/lib/PublicInbox/XapClient.pm @@ -9,7 +9,7 @@ package PublicInbox::XapClient; use v5.12; use PublicInbox::Spawn qw(spawn); -use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR); +use Socket qw(AF_UNIX SOCK_SEQPACKET); use PublicInbox::IPC; sub mkreq { @@ -21,7 +21,7 @@ sub mkreq { } my @fds = map fileno($_), @$ios; my $buf = join("\0", @arg, ''); - $n = PublicInbox::IPC::send_cmd($self->{io}, \@fds, $buf, MSG_EOR) // + $n = PublicInbox::IPC::send_cmd($self->{io}, \@fds, $buf, 0) // die "send_cmd: $!"; $n == length($buf) or die "send_cmd: $n != ".length($buf); $r; diff --git a/script/lei b/script/lei index 5feb7751..a77ea880 100755 --- a/script/lei +++ b/script/lei @@ -2,7 +2,7 @@ # Copyright (C) all contributors # License: AGPL-3.0+ use v5.12; -use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un); +use Socket qw(AF_UNIX SOCK_SEQPACKET pack_sockaddr_un); use PublicInbox::CmdIPC4; my $narg = 5; my $sock; @@ -109,9 +109,9 @@ open my $dh, '<', '.' or die "open(.) $!"; my $buf = join("\0", scalar(@ARGV), @ARGV); while (my ($k, $v) = each %ENV) { $buf .= "\0$k=$v" } $buf .= "\0\0"; -$send_cmd->($sock, [0, 1, 2, fileno($dh)], $buf, MSG_EOR) or die "sendmsg: $!"; -$SIG{TSTP} = sub { send($sock, 'STOP', MSG_EOR); kill 'STOP', $$ }; -$SIG{CONT} = sub { send($sock, 'CONT', MSG_EOR) }; +$send_cmd->($sock, [0, 1, 2, fileno($dh)], $buf, 0) or die "sendmsg: $!"; +$SIG{TSTP} = sub { send($sock, 'STOP', 0); kill 'STOP', $$ }; +$SIG{CONT} = sub { send($sock, 'CONT', 0) }; my $x_it_code = 0; while (1) { @@ -126,7 +126,7 @@ while (1) { } elsif ($buf eq '-WINCH') { kill($buf, @parent); # for MUA } elsif ($buf eq 'umask') { - send($sock, 'u'.pack('V', umask), MSG_EOR) or die "send: $!" + send($sock, 'u'.pack('V', umask), 0) or die "send: $!" } elsif ($buf =~ /\Ax_it ([0-9]+)\z/) { $x_it_code ||= $1 + 0; last; diff --git a/t/check-www-inbox.perl b/t/check-www-inbox.perl index 033b90d1..46f9ce1e 100644 --- a/t/check-www-inbox.perl +++ b/t/check-www-inbox.perl @@ -123,7 +123,7 @@ while (keys %workers) { # reacts to SIGCHLD } } while ($u = shift @queue) { - my $s = $todo[1]->send($u, MSG_EOR); + my $s = $todo[1]->send($u, 0); if ($!{EAGAIN}) { unshift @queue, $u; last; @@ -177,7 +177,7 @@ sub worker_loop { foreach my $l (@links, "DONE\t$u") { next if $l eq '' || $l =~ /\.mbox(?:\.gz)\z/; do { - $s = $done_wr->send($l, MSG_EOR); + $s = $done_wr->send($l, 0); } while (!defined $s && $!{EINTR}); die "$$ send $!\n" unless defined $s; my $n = length($l); diff --git a/t/cmd_ipc.t b/t/cmd_ipc.t index cd76d5e8..403d0eed 100644 --- a/t/cmd_ipc.t +++ b/t/cmd_ipc.t @@ -5,7 +5,7 @@ use strict; use v5.10.1; use Test::More; use PublicInbox::TestCommon; -use Socket qw(AF_UNIX SOCK_STREAM MSG_EOR); +use Socket qw(AF_UNIX SOCK_STREAM); pipe(my ($r, $w)) or BAIL_OUT; my ($send, $recv); require_ok 'PublicInbox::Spawn'; @@ -122,7 +122,7 @@ SKIP: { $send = $send_ic; $recv = $recv_ic; $do_test->(SOCK_STREAM, 0, 'Inline::C stream'); - $do_test->($SOCK_SEQPACKET, MSG_EOR, 'Inline::C seqpacket'); + $do_test->($SOCK_SEQPACKET, 0, 'Inline::C seqpacket'); } SKIP: { @@ -131,7 +131,7 @@ SKIP: { $send = PublicInbox::CmdIPC4->can('send_cmd4'); $recv = PublicInbox::CmdIPC4->can('recv_cmd4'); $do_test->(SOCK_STREAM, 0, 'MsgHdr stream'); - $do_test->($SOCK_SEQPACKET, MSG_EOR, 'MsgHdr seqpacket'); + $do_test->($SOCK_SEQPACKET, 0, 'MsgHdr seqpacket'); SKIP: { ($send_ic && $recv_ic) or skip 'Inline::C not installed/enabled', 12; @@ -149,7 +149,7 @@ SKIP: { $recv = PublicInbox::Syscall->can('recv_cmd4') or skip 'recv_cmd4 not defined for arch'; $do_test->(SOCK_STREAM, 0, 'PP Linux stream'); - $do_test->($SOCK_SEQPACKET, MSG_EOR, 'PP Linux seqpacket'); + $do_test->($SOCK_SEQPACKET, 0, 'PP Linux seqpacket'); } done_testing; diff --git a/t/lei-daemon.t b/t/lei-daemon.t index e11105bc..78ed265e 100644 --- a/t/lei-daemon.t +++ b/t/lei-daemon.t @@ -2,7 +2,7 @@ # Copyright (C) all contributors # License: AGPL-3.0+ use strict; use v5.10.1; use PublicInbox::TestCommon; -use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un); +use Socket qw(AF_UNIX SOCK_SEQPACKET pack_sockaddr_un); test_lei({ daemon_only => 1 }, sub { my $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do { @@ -40,7 +40,7 @@ test_lei({ daemon_only => 1 }, sub { socket(my $c, AF_UNIX, SOCK_SEQPACKET, 0) or BAIL_OUT "socket: $!"; connect($c, $addr) or BAIL_OUT "connect: $!"; - $send_cmd->($c, \@fds, 'hi', MSG_EOR); + $send_cmd->($c, \@fds, 'hi', 0); } lei_ok('daemon-pid'); chomp($pid = $lei_out); diff --git a/t/xap_helper.t b/t/xap_helper.t index 3646cf97..73c1c849 100644 --- a/t/xap_helper.t +++ b/t/xap_helper.t @@ -7,7 +7,7 @@ require_mods(qw(DBD::SQLite Search::Xapian)); my $msg = no_scm_rights; plan(skip_all => $msg) if $msg; # TODO: FIFO support? use PublicInbox::Spawn qw(spawn); -use Socket qw(AF_UNIX SOCK_SEQPACKET SOCK_STREAM MSG_EOR); +use Socket qw(AF_UNIX SOCK_SEQPACKET SOCK_STREAM); require PublicInbox::AutoReap; require PublicInbox::IPC; require PublicInbox::XapClient; @@ -54,7 +54,7 @@ my $doreq = sub { my $buf = join("\0", @arg, ''); my @fds = fileno($y); push @fds, fileno($err) if $err; - my $n = PublicInbox::IPC::send_cmd($s, \@fds, $buf, MSG_EOR); + my $n = PublicInbox::IPC::send_cmd($s, \@fds, $buf, 0); $n // xbail "send: $!"; my $arg = "@arg"; $arg =~ s/\Q$tmp\E/\$TMP/gs;