about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-01-13 19:06:15 -1200
committerEric Wong <e@80x24.org>2021-01-14 23:14:08 +0000
commit39d44555e3f04c97e98c7f5d3538bbba6a19656b (patch)
tree034da2d3634118b076b95e126f73f875795ddb3a
parent7dd5b28cb9bdcfa262ddad47d7f033f600675dc3 (diff)
downloadpublic-inbox-39d44555e3f04c97e98c7f5d3538bbba6a19656b.tar.gz
The new test ensures consistency between oneshot and
client/daemon users.  Cancelling an in-progress result now also
stops xsearch workers to avoid wasted CPU and I/O.

Note the lei->atfork_child_wq usage changes, it is to workaround
a bug in Perl 5: http://nntp.perl.org/group/perl.perl5.porters/258784
<CAHhgV8hPbcmkzWizp6Vijw921M5BOXixj4+zTh3nRS9vRBYk8w@mail.gmail.com>

This switches the internal protocol to use SOCK_SEQPACKET
AF_UNIX sockets to prevent merging messages from the daemon to
client to run pager and kill/exit the client script.
-rw-r--r--MANIFEST1
-rw-r--r--lib/PublicInbox/IPC.pm45
-rw-r--r--lib/PublicInbox/LEI.pm158
-rw-r--r--lib/PublicInbox/LeiOverview.pm5
-rw-r--r--lib/PublicInbox/LeiQuery.pm22
-rw-r--r--lib/PublicInbox/LeiXSearch.pm34
-rwxr-xr-xscript/lei74
-rw-r--r--t/lei.t2
-rw-r--r--xt/lei-sigpipe.t32
9 files changed, 225 insertions, 148 deletions
diff --git a/MANIFEST b/MANIFEST
index 810aec42..2ca240fc 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -429,6 +429,7 @@ xt/git_async_cmp.t
 xt/httpd-async-stream.t
 xt/imapd-mbsync-oimap.t
 xt/imapd-validate.t
+xt/lei-sigpipe.t
 xt/mem-imapd-tls.t
 xt/mem-msgview.t
 xt/msgtime_cmp.t
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index c54fcc64..fbc91f6f 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -130,7 +130,8 @@ sub ipc_worker_spawn {
 
 sub ipc_worker_reap { # dwaitpid callback
         my ($self, $pid) = @_;
-        warn "PID:$pid died with \$?=$?\n" if $?;
+        # SIGTERM (15) is our default exit signal
+        warn "PID:$pid died with \$?=$?\n" if $? && ($? & 127) != 15;
 }
 
 # for base class, override in sub classes
@@ -236,50 +237,31 @@ sub ipc_sibling_atfork_child {
         $pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
 }
 
-sub _close_recvd ($) {
-        my ($self) = @_;
-        my $x = $self->{-wq_recv_modes};
-        my $end = $x ? $#$x : 2;
-        close($_) for (grep { defined } (delete @$self{0..$end}));
-}
-
 sub wq_worker_loop ($) {
         my ($self) = @_;
-        my $buf;
         my $len = $self->{wq_req_len} // (4096 * 33);
-        my ($sub, $args);
         my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
-        local $SIG{PIPE} = sub {
-                my $cur_sub = $sub;
-                _close_recvd($self);
-                die(bless(\$cur_sub, 'PublicInbox::SIGPIPE')) if $cur_sub;
-        };
         while (1) {
-                my (@fds) = $recv_cmd->($s2, $buf, $len) or return; # EOF
-                my $i = 0;
+                my @fds = $recv_cmd->($s2, my $buf, $len) or return; # EOF
                 my @m = @{$self->{-wq_recv_modes} // [qw( +<&= >&= >&= )]};
+                my $nfd = 0;
                 for my $fd (@fds) {
                         my $mode = shift(@m);
                         if (open(my $cmdfh, $mode, $fd)) {
-                                $self->{$i++} = $cmdfh;
+                                $self->{$nfd++} = $cmdfh;
                                 $cmdfh->autoflush(1);
                         } else {
-                                die "$$ open($mode$fd) (FD:$i): $!";
+                                die "$$ open($mode$fd) (FD:$nfd): $!";
                         }
                 }
                 # Sereal dies on truncated data, Storable returns undef
-                $args = thaw($buf) //
+                my $args = thaw($buf) //
                         die "thaw error on buffer of size:".length($buf);
-                eval {
-                        $sub = shift @$args;
-                        eval { $self->$sub(@$args) };
-                        undef $sub; # quiet SIG{PIPE} handler
-                        die $@ if $@;
-                };
+                my $sub = shift @$args;
+                eval { $self->$sub(@$args) };
                 warn "$$ wq_worker: $@" if $@ &&
                                         ref($@) ne 'PublicInbox::SIGPIPE';
-                # need to close explicitly to avoid warnings after SIGPIPE
-                _close_recvd($self);
+                delete @$self{0..($nfd-1)};
         }
 }
 
@@ -400,9 +382,16 @@ sub wq_close {
         }
 }
 
+sub wq_kill {
+        my ($self, $sig) = @_;
+        my $workers = $self->{-wq_workers} or return;
+        kill($sig // 'TERM', keys %$workers);
+}
+
 sub WQ_MAX_WORKERS { $WQ_MAX_WORKERS }
 
 sub DESTROY {
+        wq_kill($_[0]);
         wq_close($_[0]);
         ipc_worker_stop($_[0]);
 }
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 7313738e..2889fa76 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -11,13 +11,13 @@ use v5.10.1;
 use parent qw(PublicInbox::DS PublicInbox::LeiExternal
         PublicInbox::LeiQuery);
 use Getopt::Long ();
-use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un);
-use Errno qw(EAGAIN ECONNREFUSED ENOENT);
+use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
+use Errno qw(EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET);
 use POSIX ();
 use IO::Handle ();
 use Sys::Syslog qw(syslog openlog);
 use PublicInbox::Config;
-use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLONESHOT);
+use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLET);
 use PublicInbox::Sigfd;
 use PublicInbox::DS qw(now dwaitpid);
 use PublicInbox::Spawn qw(spawn run_die popen_rd);
@@ -238,16 +238,15 @@ my %CONFIG_KEYS = (
         'leistore.dir' => 'top-level storage location',
 );
 
-sub x_it ($$) { # pronounced "exit"
+# pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE
+sub x_it ($$) {
         my ($self, $code) = @_;
-        $self->{1}->autoflush(1); # make sure client sees stdout before exit
-        my $sig = ($code & 127);
-        $code >>= 8 unless $sig;
+        # make sure client sees stdout before exit
+        $self->{1}->autoflush(1) if $self->{1};
         if (my $sock = $self->{sock}) {
-                my $fds = [ map { fileno($_) } @$self{0..2} ];
-                $send_cmd->($sock, $fds, "exit=$code\n", 0);
-        } else { # for oneshot
-                $quit->($code);
+                send($sock, "x_it $code", MSG_EOR);
+        } elsif (!($code & 127)) { # oneshot, ignore signals
+                $quit->($code >> 8);
         }
 }
 
@@ -274,22 +273,20 @@ sub atfork_prepare_wq {
                                 grep { defined } @$self{qw(0 1 2 sock)}
 }
 
-# usage: local %SIG = (%SIG, $lei->atfork_child_wq($wq));
+# usage: my %sig = $lei->atfork_child_wq($wq);
+#         local @SIG{keys %sig} = values %sig;
 sub atfork_child_wq {
         my ($self, $wq) = @_;
-        return () if $self->{0}; # did not fork
-        $self->{$_} = $wq->{$_} for (0..2);
-        $self->{sock} = $wq->{3} // die 'BUG: no {sock}'; # may be undef
-        my $oldpipe = $SIG{PIPE};
+        @$self{qw(0 1 2 sock)} = delete(@$wq{0..3});
         %PATH2CFG = ();
         @TO_CLOSE_ATFORK_CHILD = ();
-        (
-                __WARN__ => sub { err($self, @_) },
-                PIPE => sub {
-                        $self->x_it(141);
-                        $oldpipe->() if ref($oldpipe) eq 'CODE';
-                }
-        );
+        (__WARN__ => sub { err($self, @_) },
+        PIPE => sub {
+                $self->x_it(13); # SIGPIPE = 13
+                # we need to close explicitly to avoid Perl warning on SIGPIPE
+                close($_) for (delete @$self{1..2});
+                die bless(\"$_[0]", 'PublicInbox::SIGPIPE'),
+        });
 }
 
 # usage: ($lei, @io) = $lei->atfork_parent_wq($wq);
@@ -300,9 +297,9 @@ sub atfork_parent_wq {
                 my $ret = bless { %$self }, ref($self);
                 $self->{env} = $env;
                 delete @$ret{qw(-lei_store cfg pgr)};
-                ($ret, delete @$ret{qw(0 1 2 sock)});
+                ($ret, delete @$ret{0..2}, delete($ret->{sock}) // ());
         } else {
-                ($self, @$self{qw(0 1 2 sock)});
+                ($self, @$self{0..2}, $self->{sock} // ());
         }
 }
 
@@ -647,7 +644,7 @@ sub start_pager {
                 my $buf = "exec 1\0".$pager;
                 while (my ($k, $v) = each %new_env) { $buf .= "\0$k=$v" };
                 my $fds = [ map { fileno($_) } @$rdr{0..2} ];
-                $send_cmd->($sock, $fds, $buf .= "\n", 0);
+                $send_cmd->($sock, $fds, $buf, MSG_EOR);
         } else {
                 $pgr->[0] = spawn([$pager], $env, $rdr);
         }
@@ -660,50 +657,39 @@ sub start_pager {
 sub stop_pager {
         my ($self) = @_;
         my $pgr = delete($self->{pgr}) or return;
-        my $pid = $pgr->[0];
-        close $self->{1};
-        # {2} may not be redirected
-        $self->{1} = $pgr->[1];
         $self->{2} = $pgr->[2];
+        # do not restore original stdout, just close it so we error out
+        close(delete($self->{1})) if $self->{1};
+        my $pid = $pgr->[0];
         dwaitpid($pid, undef, $self->{sock}) if $pid;
 }
 
 sub accept_dispatch { # Listener {post_accept} callback
         my ($sock) = @_; # ignore other
-        $sock->blocking(1);
         $sock->autoflush(1);
         my $self = bless { sock => $sock }, __PACKAGE__;
-        vec(my $rin = '', fileno($sock), 1) = 1;
-        # `say $sock' triggers "die" in lei(1)
-        my $buf;
-        if (select(my $rout = $rin, undef, undef, 1)) {
-                my @fds = $recv_cmd->($sock, $buf, 4096 * 33); # >MAX_ARG_STRLEN
-                if (scalar(@fds) == 3) {
-                        my $i = 0;
-                        for my $rdr (qw(<&= >&= >&=)) {
-                                my $fd = shift(@fds);
-                                if (open(my $fh, $rdr, $fd)) {
-                                        $self->{$i++} = $fh;
-                                }  else {
-                                        say $sock "open($rdr$fd) (FD=$i): $!";
-                                        return;
-                                }
+        vec(my $rvec, fileno($sock), 1) = 1;
+        select($rvec, undef, undef, 1) or
+                return send($sock, 'timed out waiting to recv FDs', MSG_EOR);
+        my @fds = $recv_cmd->($sock, my $buf, 4096 * 33); # >MAX_ARG_STRLEN
+        if (scalar(@fds) == 3) {
+                my $i = 0;
+                for my $rdr (qw(<&= >&= >&=)) {
+                        my $fd = shift(@fds);
+                        if (open(my $fh, $rdr, $fd)) {
+                                $self->{$i++} = $fh;
+                                next;
                         }
-                } else {
-                        say $sock "recv_cmd failed: $!";
-                        return;
+                        return send($sock, "open($rdr$fd) (FD=$i): $!", MSG_EOR);
                 }
         } else {
-                say $sock "timed out waiting to recv FDs";
-                return;
+                return send($sock, "recv_cmd failed: $!", MSG_EOR);
         }
         $self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY
         # $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
         # $buf = "$$\0$argc\0".join("\0", @ARGV).$ENV_STR."\0\0";
-        if (substr($buf, -2, 2, '') ne "\0\0") { # s/\0\0\z//
-                say $sock "request command truncated";
-                return;
-        }
+        substr($buf, -2, 2, '') eq "\0\0" or  # s/\0\0\z//
+                return send($sock, 'request command truncated', MSG_EOR);
         my ($argc, @argv) = split(/\0/, $buf, -1);
         undef $buf;
         my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
@@ -711,23 +697,50 @@ sub accept_dispatch { # Listener {post_accept} callback
                 local %ENV = %env;
                 $self->{env} = \%env;
                 eval { dispatch($self, @argv) };
-                say $sock $@ if $@;
+                send($sock, $@, MSG_EOR) if $@;
         } else {
-                say $sock "chdir($env{PWD}): $!"; # implicit close
+                send($sock, "chdir($env{PWD}): $!", MSG_EOR); # implicit close
         }
 }
 
+sub dclose {
+        my ($self) = @_;
+        delete $self->{lxs}; # stops LeiXSearch queries
+        $self->close; # PublicInbox::DS::close
+}
+
 # for long-running results
 sub event_step {
         my ($self) = @_;
         local %ENV = %{$self->{env}};
-        eval {}; # TODO
-        if ($@) {
-                say { $self->{sock} } $@;
-                $self->close; # PublicInbox::DS::close
+        my $sock = $self->{sock};
+        eval {
+                while (my @fds = $recv_cmd->($sock, my $buf, 4096)) {
+                        if (scalar(@fds) == 1 && !defined($fds[0])) {
+                                return if $! == EAGAIN;
+                                next if $! == EINTR;
+                                last if $! == ECONNRESET;
+                                die "recvmsg: $!";
+                        }
+                        for my $fd (@fds) {
+                                open my $rfh, '+<&=', $fd;
+                        }
+                        die "unrecognized client signal: $buf";
+                }
+                dclose($self);
+        };
+        if (my $err = $@) {
+                eval { $self->fail($err) };
+                dclose($self);
         }
 }
 
+sub event_step_init {
+        my ($self) = @_;
+        $self->{sock}->blocking(0);
+        $self->SUPER::new($self->{sock}, EPOLLIN|EPOLLET);
+}
+
 sub noop {}
 
 our $oldset; sub oldset { $oldset }
@@ -742,7 +755,7 @@ sub lazy_start {
                 die "connect($path): $!";
         }
         umask(077) // die("umask(077): $!");
-        socket(my $l, AF_UNIX, SOCK_STREAM, 0) or die "socket: $!";
+        socket(my $l, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!";
         bind($l, pack_sockaddr_un($path)) or die "bind($path): $!";
         listen($l, 1024) or die "listen: $!";
         my @st = stat($path) or die "stat($path): $!";
@@ -793,7 +806,7 @@ sub lazy_start {
                 USR2 => \&noop,
         };
         my $sigfd = PublicInbox::Sigfd->new($sig, SFD_NONBLOCK);
-        local %SIG = (%SIG, %$sig) if !$sigfd;
+        local @SIG{keys %$sig} = values(%$sig) unless $sigfd;
         local $SIG{PIPE} = 'IGNORE';
         if ($sigfd) { # TODO: use inotify/kqueue to detect unlinked sockets
                 push @TO_CLOSE_ATFORK_CHILD, $sigfd->{sock};
@@ -853,24 +866,19 @@ sub oneshot {
         local $quit = $exit if $exit;
         local %PATH2CFG;
         umask(077) // die("umask(077): $!");
-        local $SIG{PIPE} = sub { die(bless(\"$_[0]", 'PublicInbox::SIGPIPE')) };
-        eval {
-                my $self = bless {
-                        0 => *STDIN{GLOB},
-                        1 => *STDOUT{GLOB},
-                        2 => *STDERR{GLOB},
-                        env => \%ENV
-                }, __PACKAGE__;
-                dispatch($self, @ARGV);
-        };
-        die $@ if $@ && ref($@) ne 'PublicInbox::SIGPIPE';
+        dispatch((bless {
+                0 => *STDIN{GLOB},
+                1 => *STDOUT{GLOB},
+                2 => *STDERR{GLOB},
+                env => \%ENV
+        }, __PACKAGE__), @ARGV);
 }
 
 # ensures stdout hits the FS before sock disconnects so a client
 # can immediately reread it
 sub DESTROY {
         my ($self) = @_;
-        $self->{1}->autoflush(1);
+        $self->{1}->autoflush(1) if $self->{1};
         stop_pager($self);
 }
 
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index 8a1f4f82..194c5e28 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -108,8 +108,9 @@ sub _unbless_smsg {
 
 sub ovv_atexit_child {
         my ($self, $lei) = @_;
-        my $bref = delete $lei->{ovv_buf} or return;
-        print { $lei->{1} } $$bref;
+        if (my $bref = delete $lei->{ovv_buf}) {
+                print { $lei->{1} } $$bref;
+        }
 }
 
 # JSON module ->pretty output wastes too much vertical white space,
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 7ca01454..1a3e1193 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -40,14 +40,13 @@ sub lei_q {
         if ($opt->{external} // 1) {
                 $self->_externals_each(\&_vivify_external, \@srcs);
         }
-        my $j = $opt->{jobs} // scalar(@srcs) > 3 ? 3 : scalar(@srcs);
+        my $j = $opt->{jobs} // (scalar(@srcs) > 3 ? 3 : scalar(@srcs));
         $j = 1 if !$opt->{thread};
         $j++ if $opt->{'local'}; # for sto->search below
-        if ($self->{sock}) {
-                $self->atfork_prepare_wq($lxs);
-                $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
-                        // $lxs->wq_workers($j);
-        }
+        $self->atfork_prepare_wq($lxs);
+        $lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
+                // $lxs->wq_workers($j);
+
         unshift(@srcs, $sto->search) if $opt->{'local'};
         # no forking workers after this
         require PublicInbox::LeiOverview;
@@ -77,16 +76,7 @@ sub lei_q {
         # my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
         $self->{mset_opt} = \%mset_opt;
         $self->{ovv}->ovv_begin($self);
-        pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
-        require PublicInbox::EOFpipe;
-        my $eof = PublicInbox::EOFpipe->new($eof_wait, \&query_done, $self);
-        $lxs->do_query($self, $qry_done, \@srcs);
-        $eof->event_step unless $self->{sock};
-}
-
-sub query_done { # PublicInbox::EOFpipe callback
-        my ($self) = @_;
-        $self->{ovv}->ovv_end($self);
+        $lxs->do_query($self, \@srcs);
 }
 
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index c030b2b2..d06b6f1d 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -92,7 +92,9 @@ sub _mset_more ($$) {
 
 sub query_thread_mset { # for --thread
         my ($self, $lei, $ibxish) = @_;
-        local %SIG = (%SIG, $lei->atfork_child_wq($self));
+        my %sig = $lei->atfork_child_wq($self);
+        local @SIG{keys %sig} = values %sig;
+
         my ($srch, $over) = ($ibxish->search, $ibxish->over);
         unless ($srch && $over) {
                 my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
@@ -125,9 +127,10 @@ sub query_thread_mset { # for --thread
 
 sub query_mset { # non-parallel for non-"--thread" users
         my ($self, $lei, $srcs) = @_;
+        my %sig = $lei->atfork_child_wq($self);
+        local @SIG{keys %sig} = values %sig;
         my $mo = { %{$lei->{mset_opt}} };
         my $mset;
-        local %SIG = (%SIG, $lei->atfork_child_wq($self));
         $self->attach_external($_) for @$srcs;
         my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
         do {
@@ -143,9 +146,17 @@ sub query_mset { # non-parallel for non-"--thread" users
         $lei->{ovv}->ovv_atexit_child($lei);
 }
 
+sub query_done { # PublicInbox::EOFpipe callback
+        my ($lei) = @_;
+        $lei->{ovv}->ovv_end($lei);
+        $lei->dclose;
+}
+
 sub do_query {
-        my ($self, $lei_orig, $qry_done, $srcs) = @_;
+        my ($self, $lei_orig, $srcs) = @_;
         my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+
+        pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
         $io[0] = $qry_done; # don't need stdin
         $io[1]->autoflush(1);
         $io[2]->autoflush(1);
@@ -160,9 +171,20 @@ sub do_query {
         for my $rmt (@{$self->{remotes} // []}) {
                 $self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
         }
-
-        # sent off to children, they will drop remaining references to it
-        close $qry_done;
+        @io = ();
+        close $qry_done; # fully closed when children are done
+
+        # query_done will run when query_*mset close $qry_done
+        if ($lei_orig->{sock}) { # watch for client premature exit
+                require PublicInbox::EOFpipe;
+                PublicInbox::EOFpipe->new($eof_wait, \&query_done, $lei_orig);
+                $lei_orig->{lxs} = $self;
+                $lei_orig->event_step_init;
+        } else {
+                $self->wq_close;
+                read($eof_wait, my $buf, 1); # wait for close($lei->{0})
+                query_done($lei_orig); # may SIGPIPE
+        }
 }
 
 sub ipc_atfork_child {
diff --git a/script/lei b/script/lei
index 5c32ab88..9610a876 100755
--- a/script/lei
+++ b/script/lei
@@ -3,32 +3,47 @@
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 use strict;
 use v5.10.1;
-use Socket qw(AF_UNIX SOCK_STREAM pack_sockaddr_un);
+use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
+use Errno qw(EINTR ECONNRESET);
 use PublicInbox::CmdIPC4;
 my $narg = 4;
+my ($sock, $pwd);
 my $recv_cmd = PublicInbox::CmdIPC4->can('recv_cmd4');
 my $send_cmd = PublicInbox::CmdIPC4->can('send_cmd4') // do {
         require PublicInbox::Spawn; # takes ~50ms even if built *sigh*
-        $narg = 4;
         $recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
         PublicInbox::Spawn->can('send_cmd4');
 };
 
+sub sigchld {
+        my ($sig) = @_;
+        my $flags = $sig ? POSIX::WNOHANG() : 0;
+        while (waitpid(-1, $flags) > 0) {}
+}
+
 sub exec_cmd {
         my ($fds, $argc, @argv) = @_;
-        my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
-        my @m = (*STDIN{IO}, '<&=',  *STDOUT{IO}, '>&=',
-                *STDERR{IO}, '>&=');
+        my @m = (*STDIN{IO}, '<&=',  *STDOUT{IO}, '>&=', *STDERR{IO}, '>&=');
+        my @rdr;
         for my $fd (@$fds) {
                 my ($old_io, $mode) = splice(@m, 0, 2);
-                open($old_io, $mode, $fd) or die "open $mode$fd: $!";
+                open(my $tmpfh, $mode, $fd) or die "open $mode$fd: $!";
+                push @rdr, $old_io, $mode, $tmpfh;
+        }
+        require POSIX; # WNOHANG
+        $SIG{CHLD} = \&sigchld;
+        my $pid = fork // die "fork: $!";
+        if ($pid == 0) {
+                my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
+                while (my ($old_io, $mode, $tmpfh) = splice(@rdr, 0, 3)) {
+                        open $old_io, $mode, $tmpfh or die "open $mode: $!";
+                }
+                %ENV = (%ENV, %env);
+                exec(@argv);
+                die "exec: @argv: $!";
         }
-        %ENV = (%ENV, %env);
-        exec(@argv);
-        die "exec: @argv: $!";
 }
 
-my ($sock, $pwd);
 if ($send_cmd && eval {
         my $path = do {
                 my $runtime_dir = ($ENV{XDG_RUNTIME_DIR} // '') . '/lei';
@@ -40,10 +55,10 @@ if ($send_cmd && eval {
                         require File::Path;
                         File::Path::mkpath($runtime_dir, 0, 0700);
                 }
-                "$runtime_dir/$narg.sock";
+                "$runtime_dir/$narg.seq.sock";
         };
         my $addr = pack_sockaddr_un($path);
-        socket($sock, AF_UNIX, SOCK_STREAM, 0) or die "socket: $!";
+        socket($sock, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!";
         unless (connect($sock, $addr)) { # start the daemon if not started
                 local $ENV{PERL5LIB} = join(':', @INC);
                 open(my $daemon, '-|', $^X, qw[-MPublicInbox::LEI
@@ -73,22 +88,41 @@ Falling back to (slow) one-shot mode
         }
         1;
 }) { # (Socket::MsgHdr|Inline::C), $sock, $pwd are all available:
-        local $ENV{PWD} = $pwd;
+        $ENV{PWD} = $pwd;
         my $buf = join("\0", scalar(@ARGV), @ARGV);
         while (my ($k, $v) = each %ENV) { $buf .= "\0$k=$v" }
         $buf .= "\0\0";
-        select $sock;
-        $| = 1; # unbuffer selected $sock
-        $send_cmd->($sock, [ 0, 1, 2 ], $buf, 0);
-        while (my (@fds) = $recv_cmd->($sock, $buf, 4096 * 33)) {
-                if ($buf =~ /\Aexit=([0-9]+)\n\z/) {
-                        exit($1);
-                } elsif ($buf =~ /\Aexec (.+)\n\z/) {
+        $send_cmd->($sock, [ 0, 1, 2 ], $buf, MSG_EOR);
+        $SIG{TERM} = $SIG{INT} = $SIG{QUIT} = sub {
+                my ($sig) = @_; # 'TERM', not an integer :<
+                $SIG{$sig} = 'DEFAULT';
+                kill($sig, $$); # exit($signo + 128)
+        };
+        my $x_it_code = 0;
+        while (1) {
+                my (@fds) = $recv_cmd->($sock, $buf, 4096 * 33);
+                if (scalar(@fds) == 1 && !defined($fds[0])) {
+                        last if $! == ECONNRESET;
+                        next if $! == EINTR;
+                        die "recvmsg: $!";
+                }
+                last if $buf eq '';
+                if ($buf =~ /\Ax_it ([0-9]+)\z/) {
+                        $x_it_code = $1 + 0;
+                        last;
+                } elsif ($buf =~ /\Aexec (.+)\z/) {
                         exec_cmd(\@fds, split(/\0/, $1));
                 } else {
+                        sigchld();
                         die $buf;
                 }
         }
+        sigchld();
+        if (my $sig = ($x_it_code & 127)) {
+                kill $sig, $$;
+                sleep;
+        }
+        exit($x_it_code >> 8);
 } else { # for systems lacking Socket::MsgHdr or Inline::C
         warn $@ if $@;
         require PublicInbox::LEI;
diff --git a/t/lei.t b/t/lei.t
index 6819f182..3ebaade6 100644
--- a/t/lei.t
+++ b/t/lei.t
@@ -215,7 +215,7 @@ SKIP: { # real socket
         skip 'Socket::MsgHdr or Inline::C missing or unconfigured', $nr;
 
         local $ENV{XDG_RUNTIME_DIR} = "$home/xdg_run";
-        my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/$nfd.sock";
+        my $sock = "$ENV{XDG_RUNTIME_DIR}/lei/$nfd.seq.sock";
 
         ok($lei->('daemon-pid'), 'daemon-pid');
         is($err, '', 'no error from daemon-pid');
diff --git a/xt/lei-sigpipe.t b/xt/lei-sigpipe.t
new file mode 100644
index 00000000..4d35bbb3
--- /dev/null
+++ b/xt/lei-sigpipe.t
@@ -0,0 +1,32 @@
+#!perl -w
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use PublicInbox::TestCommon;
+use POSIX qw(WTERMSIG WIFSIGNALED SIGPIPE);
+require_mods(qw(json DBD::SQLite Search::Xapian));
+# XXX this needs an already configured lei instance with many messages
+
+my $do_test = sub {
+        my $env = shift // {};
+        pipe(my ($r, $w)) or BAIL_OUT $!;
+        open my $err, '+>', undef or BAIL_OUT $!;
+        my $opt = { run_mode => 0, 1 => $w, 2 => $err };
+        my $tp = start_script([qw(lei q -t), 'bytes:1..'], $env, $opt);
+        close $w;
+        sysread($r, my $buf, 1);
+        close $r; # trigger SIGPIPE
+        $tp->join;
+        ok(WIFSIGNALED($?), 'signaled');
+        is(WTERMSIG($?), SIGPIPE, 'got SIGPIPE');
+        seek($err, 0, 0);
+        my @err = grep(!m{mkdir /dev/null\b}, <$err>);
+        is_deeply(\@err, [], 'no errors');
+};
+
+$do_test->();
+$do_test->({XDG_RUNTIME_DIR => '/dev/null'});
+
+done_testing;