about summary refs log tree commit homepage
path: root/lib/PublicInbox/Spawn.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/Spawn.pm')
-rw-r--r--lib/PublicInbox/Spawn.pm269
1 files changed, 167 insertions, 102 deletions
diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm
index 3f69108a..e36659ce 100644
--- a/lib/PublicInbox/Spawn.pm
+++ b/lib/PublicInbox/Spawn.pm
@@ -6,10 +6,8 @@
 # is explicitly defined in the environment (and writable).
 # Under Linux, vfork can make a big difference in spawning performance
 # as process size increases (fork still needs to mark pages for CoW use).
-# Currently, we only use this for code intended for long running
-# daemons (inside the PSGI code (-httpd) and -nntpd).  The short-lived
-# scripts (-mda, -index, -learn, -init) either use IPC::run or standard
-# Perl routines.
+# None of this is intended to be thread-safe since Perl5 maintainers
+# officially discourage the use of threads.
 #
 # There'll probably be more OS-level C stuff here, down the line.
 # We don't want too many DSOs: https://udrepper.livejournal.com/8790.html
@@ -17,14 +15,17 @@
 package PublicInbox::Spawn;
 use v5.12;
 use parent qw(Exporter);
-use Symbol qw(gensym);
-use Fcntl qw(LOCK_EX SEEK_SET);
+use PublicInbox::Lock;
+use Fcntl qw(SEEK_SET);
 use IO::Handle ();
-use PublicInbox::ProcessPipe;
-our @EXPORT_OK = qw(which spawn popen_rd run_die);
-our @RLIMITS = qw(RLIMIT_CPU RLIMIT_CORE RLIMIT_DATA);
+use Carp qw(croak);
+use PublicInbox::IO;
+our @EXPORT_OK = qw(which spawn popen_rd popen_wr run_die run_wait run_qx);
+our (@RLIMITS, %RLIMITS);
+use autodie qw(close open pipe seek sysseek truncate);
 
 BEGIN {
+        @RLIMITS = qw(RLIMIT_CPU RLIMIT_CORE RLIMIT_DATA);
         my $all_libc = <<'ALL_LIBC'; # all *nix systems we support
 #include <sys/resource.h>
 #include <sys/socket.h>
@@ -37,12 +38,6 @@ BEGIN {
 #include <time.h>
 #include <stdio.h>
 #include <string.h>
-
-/* some platforms need alloca.h, but some don't */
-#if defined(__GNUC__) && !defined(alloca)
-#  define alloca(sz) __builtin_alloca(sz)
-#endif
-
 #include <signal.h>
 #include <assert.h>
 
@@ -54,11 +49,17 @@ BEGIN {
  *   This is unlike "sv_len", which returns what you would expect.
  */
 #define AV2C_COPY(dst, src) do { \
+        static size_t dst##__capa; \
         I32 i; \
         I32 top_index = av_len(src); \
         I32 real_len = top_index + 1; \
         I32 capa = real_len + 1; \
-        dst = alloca(capa * sizeof(char *)); \
+        if (capa > dst##__capa) { \
+                dst##__capa = 0; /* in case Newx croaks */ \
+                Safefree(dst); \
+                Newx(dst, capa, char *); \
+                dst##__capa = capa; \
+        } \
         for (i = 0; i < real_len; i++) { \
                 SV **sv = av_fetch(src, i, 0); \
                 dst[i] = SvPV_nolen(*sv); \
@@ -87,23 +88,28 @@ int pi_fork_exec(SV *redirref, SV *file, SV *cmdref, SV *envref, SV *rlimref,
         AV *env = (AV *)SvRV(envref);
         AV *rlim = (AV *)SvRV(rlimref);
         const char *filename = SvPV_nolen(file);
-        pid_t pid;
-        char **argv, **envp;
+        pid_t pid = -1;
+        static char **argv, **envp;
         sigset_t set, old;
         int ret, perrnum;
         volatile int cerrnum = 0; /* shared due to vfork */
-        int chld_is_member;
+        int chld_is_member; /* needed due to shared memory w/ vfork */
         I32 max_fd = av_len(redir);
 
         AV2C_COPY(argv, cmd);
         AV2C_COPY(envp, env);
 
-        if (sigfillset(&set)) return -1;
-        if (sigprocmask(SIG_SETMASK, &set, &old)) return -1;
+        if (sigfillset(&set)) goto out;
+        if (sigdelset(&set, SIGABRT)) goto out;
+        if (sigdelset(&set, SIGBUS)) goto out;
+        if (sigdelset(&set, SIGFPE)) goto out;
+        if (sigdelset(&set, SIGILL)) goto out;
+        if (sigdelset(&set, SIGSEGV)) goto out;
+        /* no XCPU/XFSZ here */
+        if (sigprocmask(SIG_SETMASK, &set, &old)) goto out;
         chld_is_member = sigismember(&old, SIGCHLD);
-        if (chld_is_member < 0) return -1;
-        if (chld_is_member > 0)
-                sigdelset(&old, SIGCHLD);
+        if (chld_is_member < 0) goto out;
+        if (chld_is_member > 0 && sigdelset(&old, SIGCHLD)) goto out;
 
         pid = vfork();
         if (pid == 0) {
@@ -122,8 +128,10 @@ int pi_fork_exec(SV *redirref, SV *file, SV *cmdref, SV *envref, SV *rlimref,
                         exit_err("setpgid", &cerrnum);
                 for (sig = 1; sig < NSIG; sig++)
                         signal(sig, SIG_DFL); /* ignore errors on signals */
-                if (*cd && chdir(cd) < 0)
-                        exit_err("chdir", &cerrnum);
+                if (*cd && chdir(cd) < 0) {
+                        write(2, "cd ", 3);
+                        exit_err(cd, &cerrnum);
+                }
 
                 max_rlim = av_len(rlim);
                 for (i = 0; i < max_rlim; i += 3) {
@@ -162,22 +170,26 @@ int pi_fork_exec(SV *redirref, SV *file, SV *cmdref, SV *envref, SV *rlimref,
         } else if (perrnum) {
                 errno = perrnum;
         }
+out:
+        if (pid < 0)
+                croak("E: fork_exec %s: %s\n", filename, strerror(errno));
         return (int)pid;
 }
 
-static int sleep_wait(unsigned *tries, int err)
+static int sendmsg_retry(unsigned *tries)
 {
         const struct timespec req = { 0, 100000000 }; /* 100ms */
+        int err = errno;
         switch (err) {
+        case EINTR: PERL_ASYNC_CHECK(); return 1;
         case ENOBUFS: case ENOMEM: case ETOOMANYREFS:
-                if (++*tries < 50) {
-                        fprintf(stderr, "sleeping on sendmsg: %s (#%u)\n",
-                                strerror(err), *tries);
-                        nanosleep(&req, NULL);
-                        return 1;
-                }
-        default:
-                return 0;
+                if (++*tries >= 50) return 0;
+                fprintf(stderr, "# sleeping on sendmsg: %s (#%u)\n",
+                        strerror(err), *tries);
+                nanosleep(&req, NULL);
+                PERL_ASYNC_CHECK();
+                return 1;
+        default: return 0;
         }
 }
 
@@ -229,7 +241,7 @@ SV *send_cmd4(PerlIO *s, SV *svfds, SV *data, int flags)
         }
         do {
                 sent = sendmsg(PerlIO_fileno(s), &msg, flags);
-        } while (sent < 0 && sleep_wait(&tries, errno));
+        } while (sent < 0 && sendmsg_retry(&tries));
         return sent >= 0 ? newSViv(sent) : &PL_sv_undef;
 }
 
@@ -251,58 +263,76 @@ void recv_cmd4(PerlIO *s, SV *buf, STRLEN n)
         msg.msg_control = &cmsg.hdr;
         msg.msg_controllen = CMSG_SPACE(SEND_FD_SPACE);
 
-        i = recvmsg(PerlIO_fileno(s), &msg, 0);
-        if (i < 0)
-                Inline_Stack_Push(&PL_sv_undef);
-        else
+        for (;;) {
+                i = recvmsg(PerlIO_fileno(s), &msg, 0);
+                if (i >= 0 || errno != EINTR) break;
+                PERL_ASYNC_CHECK();
+        }
+        if (i >= 0) {
                 SvCUR_set(buf, i);
-        if (i > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
-                        cmsg.hdr.cmsg_type == SCM_RIGHTS) {
-                size_t len = cmsg.hdr.cmsg_len;
-                int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
-                for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++)
-                        Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
+                if (cmsg.hdr.cmsg_level == SOL_SOCKET &&
+                                cmsg.hdr.cmsg_type == SCM_RIGHTS) {
+                        size_t len = cmsg.hdr.cmsg_len;
+                        int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
+                        for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++)
+                                Inline_Stack_Push(sv_2mortal(newSViv(*fdp++)));
+                }
+        } else {
+                Inline_Stack_Push(&PL_sv_undef);
+                SvCUR_set(buf, 0);
         }
         Inline_Stack_Done;
 }
 #endif /* defined(CMSG_SPACE) && defined(CMSG_LEN) */
-ALL_LIBC
 
-        my $inline_dir = $ENV{PERL_INLINE_DIRECTORY} //= (
+void rlimit_map()
+{
+        Inline_Stack_Vars;
+        Inline_Stack_Reset;
+ALL_LIBC
+        my $inline_dir = $ENV{PERL_INLINE_DIRECTORY} // (
                         $ENV{XDG_CACHE_HOME} //
                         ( ($ENV{HOME} // '/nonexistent').'/.cache' )
                 ).'/public-inbox/inline-c';
-        warn "$inline_dir exists, not writable\n" if -e $inline_dir && !-w _;
-        $all_libc = undef unless -d _ && -w _;
+        undef $all_libc unless -d $inline_dir;
         if (defined $all_libc) {
-                my $f = "$inline_dir/.public-inbox.lock";
-                open my $oldout, '>&', \*STDOUT or die "dup(1): $!";
-                open my $olderr, '>&', \*STDERR or die "dup(2): $!";
-                open my $fh, '+>', $f or die "open($f): $!";
-                open STDOUT, '>&', $fh or die "1>$f: $!";
-                open STDERR, '>&', $fh or die "2>$f: $!";
+                for (@RLIMITS, 'RLIM_INFINITY') {
+                        $all_libc .= <<EOM;
+        Inline_Stack_Push(sv_2mortal(newSVpvs("$_")));
+        Inline_Stack_Push(sv_2mortal(newSViv($_)));
+EOM
+                }
+                $all_libc .= <<EOM;
+        Inline_Stack_Done;
+} // rlimit_map
+EOM
+                local $ENV{PERL_INLINE_DIRECTORY} = $inline_dir;
+                # CentOS 7.x ships Inline 0.53, 0.64+ has built-in locking
+                my $lk = PublicInbox::Lock->new($inline_dir.
+                                                '/.public-inbox.lock');
+                my $fh = $lk->lock_acquire;
+                open my $oldout, '>&', \*STDOUT;
+                open my $olderr, '>&', \*STDERR;
+                open STDOUT, '>&', $fh;
+                open STDERR, '>&', $fh;
                 STDERR->autoflush(1);
                 STDOUT->autoflush(1);
-
-                # CentOS 7.x ships Inline 0.53, 0.64+ has built-in locking
-                flock($fh, LOCK_EX) or die "LOCK_EX($f): $!";
-                eval <<'EOM';
-use Inline C => $all_libc, BUILD_NOISY => 1;
-EOM
+                eval 'use Inline C => $all_libc, BUILD_NOISY => 1';
                 my $err = $@;
-                my $ndc_err = '';
-                $err = $@;
-                open(STDERR, '>&', $olderr) or warn "restore stderr: $!";
-                open(STDOUT, '>&', $oldout) or warn "restore stdout: $!";
+                open(STDERR, '>&', $olderr);
+                open(STDOUT, '>&', $oldout);
                 if ($err) {
                         seek($fh, 0, SEEK_SET);
                         my @msg = <$fh>;
-                        warn "Inline::C build failed:\n",
-                                $ndc_err, $err, "\n", @msg;
+                        truncate($fh, 0);
+                        warn "Inline::C build failed:\n", $err, "\n", @msg;
                         $all_libc = undef;
                 }
         }
-        unless ($all_libc) {
+        if (defined $all_libc) { # set for Gcf2
+                $ENV{PERL_INLINE_DIRECTORY} = $inline_dir;
+                %RLIMITS = rlimit_map();
+        } else {
                 require PublicInbox::SpawnPP;
                 *pi_fork_exec = \&PublicInbox::SpawnPP::pi_fork_exec
         }
@@ -319,59 +349,94 @@ sub which ($) {
 }
 
 sub spawn ($;$$) {
-        my ($cmd, $env, $opts) = @_;
+        my ($cmd, $env, $opt) = @_;
         my $f = which($cmd->[0]) // die "$cmd->[0]: command not found\n";
-        my @env;
-        $opts ||= {};
+        my (@env, @rdr);
         my %env = (%ENV, $env ? %$env : ());
         while (my ($k, $v) = each %env) {
                 push @env, "$k=$v" if defined($v);
         }
-        my $redir = [];
         for my $child_fd (0..2) {
-                my $parent_fd = $opts->{$child_fd};
-                if (defined($parent_fd) && $parent_fd !~ /\A[0-9]+\z/) {
-                        my $fd = fileno($parent_fd) //
-                                        die "$parent_fd not an IO GLOB? $!";
-                        $parent_fd = $fd;
+                my $pfd = $opt->{$child_fd};
+                if ('SCALAR' eq ref($pfd)) {
+                        open my $fh, '+>', undef;
+                        $opt->{"fh.$child_fd"} = $fh; # for read_out_err
+                        if ($child_fd == 0) {
+                                print $fh $$pfd;
+                                $fh->flush or die "flush: $!";
+                                sysseek($fh, 0, SEEK_SET);
+                        }
+                        $pfd = fileno($fh);
+                } elsif (defined($pfd) && $pfd !~ /\A[0-9]+\z/) {
+                        my $fd = fileno($pfd) //
+                                        croak "BUG: $pfd not an IO GLOB? $!";
+                        $pfd = $fd;
                 }
-                $redir->[$child_fd] = $parent_fd // $child_fd;
+                $rdr[$child_fd] = $pfd // $child_fd;
         }
         my $rlim = [];
-
         foreach my $l (@RLIMITS) {
-                my $v = $opts->{$l} // next;
-                my $r = eval "require BSD::Resource; BSD::Resource::$l();";
-                unless (defined $r) {
-                        warn "$l undefined by BSD::Resource: $@\n";
-                        next;
-                }
+                my $v = $opt->{$l} // next;
+                my $r = $RLIMITS{$l} //
+                        eval "require BSD::Resource; BSD::Resource::$l();" //
+                        do {
+                                warn "$l undefined by BSD::Resource: $@\n";
+                                next;
+                        };
                 push @$rlim, $r, @$v;
         }
-        my $cd = $opts->{'-C'} // ''; # undef => NULL mapping doesn't work?
-        my $pgid = $opts->{pgid} // -1;
-        my $pid = pi_fork_exec($redir, $f, $cmd, \@env, $rlim, $cd, $pgid);
-        die "fork_exec @$cmd failed: $!\n" unless $pid > 0;
-        $pid;
+        my $cd = $opt->{'-C'} // ''; # undef => NULL mapping doesn't work?
+        my $pgid = $opt->{pgid} // -1;
+        pi_fork_exec(\@rdr, $f, $cmd, \@env, $rlim, $cd, $pgid);
 }
 
 sub popen_rd {
+        my ($cmd, $env, $opt, @cb_arg) = @_;
+        pipe(my $r, local $opt->{1});
+        PublicInbox::IO::attach_pid($r, spawn($cmd, $env, $opt), @cb_arg);
+}
+
+sub popen_wr {
+        my ($cmd, $env, $opt, @cb_arg) = @_;
+        pipe(local $opt->{0}, my $w);
+        $w->autoflush(1);
+        PublicInbox::IO::attach_pid($w, spawn($cmd, $env, $opt), @cb_arg);
+}
+
+sub read_out_err ($) {
+        my ($opt) = @_;
+        for my $fd (1, 2) { # read stdout/stderr
+                my $fh = delete($opt->{"fh.$fd"}) // next;
+                seek($fh, 0, SEEK_SET);
+                PublicInbox::IO::read_all $fh, undef, $opt->{$fd};
+        }
+}
+
+sub run_wait ($;$$) {
         my ($cmd, $env, $opt) = @_;
-        pipe(my ($r, $w)) or die "pipe: $!\n";
-        $opt ||= {};
-        $opt->{1} = fileno($w);
-        my $pid = spawn($cmd, $env, $opt);
-        return ($r, $pid) if wantarray;
-        my $ret = gensym;
-        tie *$ret, 'PublicInbox::ProcessPipe', $pid, $r, @$opt{qw(cb arg)};
-        $ret;
+        waitpid(spawn($cmd, $env, $opt), 0);
+        read_out_err($opt);
+        $?
 }
 
 sub run_die ($;$$) {
         my ($cmd, $env, $rdr) = @_;
-        my $pid = spawn($cmd, $env, $rdr);
-        waitpid($pid, 0) == $pid or die "@$cmd did not finish";
-        $? == 0 or die "@$cmd failed: \$?=$?\n";
+        run_wait($cmd, $env, $rdr) and croak "E: @$cmd failed: \$?=$?";
+}
+
+sub run_qx {
+        my ($cmd, $env, $opt) = @_;
+        my $fh = popen_rd($cmd, $env, $opt);
+        my @ret;
+        if (wantarray) {
+                @ret = <$fh>;
+        } else {
+                local $/;
+                $ret[0] = <$fh>;
+        }
+        $fh->close; # caller should check $?
+        read_out_err($opt);
+        wantarray ? @ret : $ret[0];
 }
 
 1;