From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 0E2F81F55F for ; Fri, 1 Sep 2023 20:02:05 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1693598525; bh=cVM5mJFd4+JAEIrxNBEOTy7nbfviHwZc3CnFIxtp+b8=; h=From:To:Subject:Date:From; b=MHuEv+b7cdBLgHwRkWEXfJW7LDgHCtMa+9ozQ8CFRkMeLr//MvmU98mVeMEA1FqyA Ll2SzVsugvUQi3PwsxwDmrtvC6r9Qs/Z8y9tX8Wu4e0NdDe1LtFpSgHsN1FCBe8cZS VA8IzW13FBO8ri3JsqOT3xQgA0jhwvLPx+hSRYEQ= From: Eric Wong To: spew@80x24.org Subject: [PATCH] xap_helper: support SIGTTIN+SIGTTOU worker adjustments Date: Fri, 1 Sep 2023 20:02:04 +0000 Message-ID: <20230901200204.1608663-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Being able to tune worker process counts on-the-fly when xap_helper gets used with -{netd,httpd,imapd} will be useful for tuning new setups. --- lib/PublicInbox/IPC.pm | 2 +- lib/PublicInbox/XapHelper.pm | 103 +++++++++++--- lib/PublicInbox/xap_helper.h | 262 ++++++++++++++++++++++++++++++----- t/xap_helper.t | 54 ++++++-- 4 files changed, 349 insertions(+), 72 deletions(-) diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 766c377f..528b9133 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -42,7 +42,7 @@ if ($enc && $dec) { # should be custom ops *ipc_thaw = \&Storable::thaw; } -my $recv_cmd = PublicInbox::Spawn->can('recv_cmd4'); +our $recv_cmd = PublicInbox::Spawn->can('recv_cmd4'); our $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do { require PublicInbox::CmdIPC4; $recv_cmd //= PublicInbox::CmdIPC4->can('recv_cmd4'); diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm index 36266e65..e39aa078 100644 --- a/lib/PublicInbox/XapHelper.pm +++ b/lib/PublicInbox/XapHelper.pm @@ -10,10 +10,14 @@ $GLP->configure(qw(require_order bundling no_ignore_case no_auto_abbrev)); use PublicInbox::Search qw(xap_terms); use PublicInbox::CodeSearch; use PublicInbox::IPC; +use PublicInbox::DS qw(awaitpid); +use POSIX qw(:signal_h); use Fcntl qw(LOCK_UN LOCK_EX); my $X = \%PublicInbox::Search::X; -our (%SRCH, %PIDS, $parent_pid); +our (%SRCH, %WORKERS, $parent_pid, $alive, $nworker, $workerset); our $stderr = \*STDERR; +our $fullset = POSIX::SigSet->new; +$fullset->fillset or die "fillset: $!"; # only short options for portability in C++ implementation our @SPEC = ( @@ -171,11 +175,17 @@ sub dispatch { sub recv_loop { local $SIG{__WARN__} = sub { print $stderr @_ }; my $rbuf; - while (!defined($parent_pid) || getppid != $parent_pid) { - my $req = bless {}, __PACKAGE__; - my @fds = PublicInbox::IPC::recv_cmd(\*STDIN, $rbuf, 4096*33); + my $in = \*STDIN; + while (!defined($parent_pid) || getppid == $parent_pid) { + PublicInbox::DS::sig_setmask($workerset); + my @fds = $PublicInbox::IPC::recv_cmd->($in, $rbuf, 4096*33); scalar(@fds) or exit(66); # EX_NOINPUT - $fds[0] // die "recvmsg: $!"; + if (!defined($fds[0])) { + next if $!{EINTR}; + die "recvmsg: $!"; + } + PublicInbox::DS::sig_setmask($fullset); + my $req = bless {}, __PACKAGE__; my $i = 0; for my $fd (@fds) { open($req->{$i++}, '+<&=', $fd) and next; @@ -195,38 +205,85 @@ sub recv_loop { } } +sub reap_worker { # awaitpid CB + my ($pid, $nr) = @_; + delete $WORKERS{$nr}; + if (($? >> 8) == 66) { # EX_NOINPUT + $alive = undef; + PublicInbox::DS->SetLoopTimeout(1); + } elsif ($?) { + warn "worker[$nr] died \$?=$?\n"; + } + PublicInbox::DS::requeue(\&start_workers) if $alive; +} + sub start_worker ($) { my ($nr) = @_; - my $pid = fork // return warn("fork: $!"); + my $pid = fork; + if (!defined($pid)) { + warn("fork: $!"); + return undef; + }; if ($pid == 0) { - undef %PIDS; + undef %WORKERS; + PublicInbox::DS::Reset(); + $SIG{TERM} = sub { $parent_pid = -1 }; + $SIG{TTIN} = $SIG{TTOU} = 'IGNORE'; + $SIG{CHLD} = 'DEFAULT'; recv_loop(); exit(0); } else { - $PIDS{$pid} = $nr; + $WORKERS{$nr} = $pid; + awaitpid($pid, \&reap_worker, $nr); + } +} + +sub start_workers { + for my $nr (grep { !defined($WORKERS{$_}) } (0..($nworker - 1))) { + start_worker($nr) if $alive; } } +sub do_sigttou { + if ($alive && $nworker > 1) { + --$nworker; + my @nr = grep { $_ >= $nworker } keys %WORKERS; + kill('TERM', @WORKERS{@nr}); + } +} + +sub xh_alive { $alive || scalar(keys %WORKERS) } + sub start (@) { my (@argv) = @_; - local (%SRCH, %PIDS, $parent_pid); + local (%SRCH, %WORKERS); + local $alive = 1; PublicInbox::Search::load_xapian(); $GLP->getoptionsfromarray(\@argv, my $opt = { j => 1 }, 'j=i') or die 'bad args'; - return recv_loop() if !$opt->{j}; - die '-j must be >= 0' if $opt->{j} < 0; - start_worker($_) for (1..($opt->{j})); - - my $quit; - until ($quit) { - my $p = waitpid(-1, 0) or return; - if (defined(my $nr = delete $PIDS{$p})) { - $quit = 1 if ($? >> 8) == 66; # EX_NOINPUT - start_worker($nr) if !$quit; - } else { - warn "W: unknown pid=$p reaped\n"; - } - } + local $workerset = POSIX::SigSet->new; + $workerset->fillset or die "fillset: $!"; + + local $nworker = $opt->{j}; + return recv_loop() if $nworker == 0; + die '-j must be >= 0' if $nworker < 0; + $workerset->delset(POSIX::SIGTERM); + $workerset->delset(POSIX::SIGCHLD); + local $parent_pid = $$; + my $sig = { + TTIN => sub { + if ($alive) { + ++$nworker; + PublicInbox::DS::requeue(\&start_workers) + } + }, + TTOU => \&do_sigttou, + CHLD => \&PublicInbox::DS::enqueue_reap, + }; + my $oldset = PublicInbox::DS::block_signals(); + start_workers(); + @PublicInbox::DS::post_loop_do = \&xh_alive; + PublicInbox::DS::event_loop($sig, $oldset); } 1; diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h index 871a381c..95eef58d 100644 --- a/lib/PublicInbox/xap_helper.h +++ b/lib/PublicInbox/xap_helper.h @@ -56,15 +56,18 @@ # define STDERR_ASSIGNABLE (0) #endif -static const int sock_fd = 0; // SOCK_SEQPACKET as stdin :P -static pid_t parent_pid; +static const int sock_fd = STDIN_FILENO; // SOCK_SEQPACKET as stdin :P +static volatile pid_t parent_pid; // may be set in worker sighandler (sigw) +static sigset_t fullset, workerset; +static bool alive = true; #if STDERR_ASSIGNABLE static FILE *orig_err = stderr; #endif static int orig_err_fd = -1; static void *srch_tree; // tsearch + tdelete + twalk static pid_t *worker_pids; // nr => pid -static unsigned long nworker; +static unsigned long nworker, nworker_hwm; +static int pipefds[2]; // PublicInbox::Search and PublicInbox::CodeSearch generate these: static void mail_nrp_init(void); @@ -598,11 +601,21 @@ static bool recv_req(struct req *req, char *rbuf, size_t *len) msg.msg_control = &cmsg.hdr; msg.msg_controllen = CMSG_SPACE(RECV_FD_SPACE); + // allow SIGTERM to hit + if (sigprocmask(SIG_SETMASK, &workerset, NULL)) + err(EXIT_FAILURE, "SIG_SETMASK"); ssize_t r = recvmsg(sock_fd, &msg, 0); - if (r < 0) - err(EXIT_FAILURE, "recvmsg"); - if (r == 0) + if (r == 0) { exit(EX_NOINPUT); /* grandparent went away */ + } else if (r < 0) { + if (errno == EINTR) + return false; // retry recv_loop + err(EXIT_FAILURE, "recvmsg"); + } + + // success! no signals for the rest of the request/response cycle + if (sigprocmask(SIG_SETMASK, &fullset, NULL)) + err(EXIT_FAILURE, "SIG_SETMASK"); *len = r; if (r > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET && cmsg.hdr.cmsg_type == SCM_RIGHTS) { @@ -875,9 +888,19 @@ static void stderr_restore(FILE *tmp_err) clearerr(stderr); } +static void sigw(int sig) // SIGTERM handler for worker +{ + parent_pid = -1; // break out of recv_loop +} + static void recv_loop(void) // worker process loop { static char rbuf[4096 * 33]; // per-process + struct sigaction sa = {}; + sa.sa_handler = sigw; + if (sigaction(SIGTERM, &sa, NULL)) + err(EXIT_FAILURE, "sigaction TERM"); + while (!parent_pid || getppid() == parent_pid) { size_t len = sizeof(rbuf); struct req req = {}; @@ -904,18 +927,6 @@ static void insert_pid(pid_t pid, unsigned nr) worker_pids[nr] = pid; } -static int delete_pid(pid_t pid) -{ - for (unsigned nr = 0; nr < nworker; nr++) { - if (worker_pids[nr] == pid) { - worker_pids[nr] = 0; - return nr; - } - } - warnx("W: unknown pid=%d reaped", (int)pid); - return -1; -} - static void start_worker(unsigned nr) { pid_t pid = fork(); @@ -925,11 +936,32 @@ static void start_worker(unsigned nr) insert_pid(pid, nr); } else { cleanup_pids(); + xclose(pipefds[0]); + xclose(pipefds[1]); + if (signal(SIGCHLD, SIG_DFL) == SIG_ERR) + err(EXIT_FAILURE, "signal CHLD"); + if (signal(SIGTTIN, SIG_IGN) == SIG_ERR) + err(EXIT_FAILURE, "signal TTIN"); + if (signal(SIGTTOU, SIG_IGN) == SIG_ERR) + err(EXIT_FAILURE, "signal TTIN"); recv_loop(); exit(0); } } +static void start_workers(void) +{ + sigset_t old; + if (sigprocmask(SIG_SETMASK, &fullset, &old)) + err(EXIT_FAILURE, "SIG_SETMASK"); + for (unsigned long nr = 0; nr < nworker; nr++) { + if (!worker_pids[nr]) + start_worker(nr); + } + if (sigprocmask(SIG_SETMASK, &old, NULL)) + err(EXIT_FAILURE, "SIG_SETMASK"); +} + static void cleanup_all(void) { cleanup_pids(); @@ -939,6 +971,121 @@ static void cleanup_all(void) #endif } +static void sigp(int sig) // parent signal handler +{ + static const char eagain[] = "signals coming in too fast"; + static const char bad_sig[] = "BUG: bad sig\n"; + static const char write_err[] = "BUG: sigp write: "; + char c = 0; + + switch (sig) { + case SIGCHLD: c = '.'; break; + case SIGTTOU: c = '-'; break; + case SIGTTIN: c = '+'; break; + default: + write(STDERR_FILENO, bad_sig, sizeof(bad_sig) - 1); + _exit(EXIT_FAILURE); + } + ssize_t w = write(pipefds[1], &c, 1); + if (w == sizeof(c)) return; + int e = 0; + if (w < 0) { + e = errno; + if (e == EAGAIN) { + write(STDERR_FILENO, eagain, sizeof(eagain) - 1); + return; + } + } + struct iovec iov[3]; + iov[0].iov_base = (void *)write_err; + iov[0].iov_len = sizeof(write_err) - 1; + iov[1].iov_base = (void *)(e ? strerror(e) : "zero write"); + iov[1].iov_len = strlen((const char *)iov[1].iov_base); + iov[2].iov_base = (void *)"\n"; + iov[2].iov_len = 1; + (void)writev(STDERR_FILENO, iov, MY_ARRAY_SIZE(iov)); + _exit(EXIT_FAILURE); +} + +static void reaped_worker(pid_t pid, int st) +{ + unsigned long nr = 0; + for (; nr < nworker_hwm; nr++) { + if (worker_pids[nr] == pid) { + worker_pids[nr] = 0; + break; + } + } + if (nr >= nworker_hwm) { + warnx("W: unknown pid=%d reaped $?=%d", (int)pid, st); + return; + } + if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT) + alive = false; + else if (st) + warnx("worker[%lu] died $?=%d", nr, st); + if (alive) + start_workers(); +} + +static void do_sigchld(void) +{ + while (1) { + int st; + pid_t pid = waitpid(-1, &st, WNOHANG); + if (pid > 0) { + reaped_worker(pid, st); + } else if (pid == 0) { + return; + } else { + switch (errno) { + case ECHILD: return; + case EINTR: break; // can it happen w/ WNOHANG? + default: err(EXIT_FAILURE, "BUG: waitpid"); + } + } + } +} + +static void do_sigttin(void) +{ + if (!alive) return; + void *p = reallocarray(worker_pids, nworker + 1, sizeof(pid_t)); + if (!p) { + warn("reallocarray"); + } else { + worker_pids = (pid_t *)p; + worker_pids[nworker++] = 0; + if (nworker_hwm < nworker) + nworker_hwm = nworker; + start_workers(); + } +} + +static void do_sigttou(void) +{ + if (!alive || nworker <= 1) return; + + // worker_pids array does not shrink + --nworker; + for (unsigned long nr = nworker; nr < nworker_hwm; nr++) { + pid_t pid = worker_pids[nr]; + if (pid != 0) + kill(pid, SIGTERM); + } +} + +static size_t living_workers(void) +{ + size_t ret = 0; + + for (unsigned long nr = 0; nr < nworker_hwm; nr++) { + if (worker_pids[nr]) + ret++; + } + return ret; +} + int main(int argc, char *argv[]) { int c; @@ -953,7 +1100,7 @@ int main(int argc, char *argv[]) err(EXIT_FAILURE, "dup(2)"); } - nworker = 0; + nworker = 1; #ifdef _SC_NPROCESSORS_ONLN long j = sysconf(_SC_NPROCESSORS_ONLN); if (j > 0) @@ -981,27 +1128,66 @@ int main(int argc, char *argv[]) errx(EXIT_FAILURE, "BUG: `-%c'", c); } } - if (nworker == 0) { - recv_loop(); - } else { - parent_pid = getpid(); - worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t)); - if (!worker_pids) - err(EXIT_FAILURE, "calloc"); - for (unsigned i = 0; i < nworker; i++) - start_worker(i); - int st; - pid_t pid; - bool quit = false; - while ((pid = wait(&st)) > 0) { - int nr = delete_pid(pid); - if (nr < 0) continue; - if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT) - quit = true; - if (!quit) - start_worker(nr); + // global sigsets: + if (sigfillset(&fullset)) err(EXIT_FAILURE, "sigfillset"); + if (sigfillset(&workerset)) err(EXIT_FAILURE, "sigfillset"); + + if (nworker == 0) { // no SIGTERM handling w/o workers + recv_loop(); + return 0; + } + if (sigdelset(&workerset, SIGTERM)) err(EXIT_FAILURE, "del TERM"); + if (sigdelset(&workerset, SIGCHLD)) err(EXIT_FAILURE, "del CHLD"); + parent_pid = getpid(); + nworker_hwm = nworker; + worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t)); + if (!worker_pids) err(EXIT_FAILURE, "calloc"); + + if (pipe(pipefds)) err(EXIT_FAILURE, "pipe"); + int fl = fcntl(F_GETFL, pipefds[1]); + if (fl == -1) err(EXIT_FAILURE, "F_GETFL"); + if (fcntl(F_SETFL, pipefds[1], fl | O_NONBLOCK)) + err(EXIT_FAILURE, "F_SETFL"); + + sigset_t pset; // parent-only + if (sigfillset(&pset)) err(EXIT_FAILURE, "sigfillset"); + if (sigdelset(&pset, SIGCHLD)) err(EXIT_FAILURE, "del CHLD"); + if (sigdelset(&pset, SIGTTIN)) err(EXIT_FAILURE, "del TTIN"); + if (sigdelset(&pset, SIGTTOU)) err(EXIT_FAILURE, "del TTOU"); + + struct sigaction sa = {}; + sa.sa_handler = sigp; + + if (sigaction(SIGTTIN, &sa, NULL)) err(EXIT_FAILURE, "sig TTIN"); + if (sigaction(SIGTTOU, &sa, NULL)) err(EXIT_FAILURE, "sig TTOU"); + sa.sa_flags = SA_NOCLDSTOP; + if (sigaction(SIGCHLD, &sa, NULL)) err(EXIT_FAILURE, "sig CHLD"); + + if (sigprocmask(SIG_SETMASK, &pset, NULL)) + err(EXIT_FAILURE, "SIG_SETMASK"); + + start_workers(); + + char sbuf[64]; + while (alive || living_workers()) { + ssize_t n = read(pipefds[0], &sbuf, sizeof(sbuf)); + if (n < 0) { + if (errno == EINTR) continue; + err(EXIT_FAILURE, "read"); + } else if (n == 0) { + errx(EXIT_FAILURE, "read EOF"); + } + do_sigchld(); + for (ssize_t i = 0; i < n; i++) { + switch (sbuf[i]) { + case '.': break; // do_sigchld already called + case '-': do_sigttou(); break; + case '+': do_sigttin(); break; + default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]); + } } } + return 0; } diff --git a/t/xap_helper.t b/t/xap_helper.t index b68f2773..c90ce81c 100644 --- a/t/xap_helper.t +++ b/t/xap_helper.t @@ -95,21 +95,54 @@ my $test = sub { my $stats = do { local $/; <$err_rd> }; is($stats, "mset.size=6 nr_out=6\n", 'mset.size reported'); - if ($arg[-1] !~ /\('-j0'\)/) { - kill('KILL', $cinfo{pid}); + return $ar if $cinfo{pid} == $pid; + + # test worker management: + kill('TERM', $cinfo{pid}); + my $tries = 0; + do { $r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0]); - %info = map { - split(/=/, $_, 2) - } split(/ /, do { local $/; <$r> }); - isnt($info{pid}, $cinfo{pid}, 'spawned new worker'); + %info = map { split(/=/, $_, 2) } split(/ /, do { local $/; <$r> }); + } while ($info{pid} == $cinfo{pid} && ++$tries < 10); + isnt($info{pid}, $cinfo{pid}, 'spawned new worker'); + + my %pids; + $tries = 0; + my @ins = ($s, qw(test_inspect -d), $ibx_idx[0]); + kill('TTIN', $pid); + until (scalar(keys %pids) >= 2 || ++$tries > 10) { + tick; + my @r = map { $doreq->(@ins) } (0..5); + for my $fh (@r) { + my $buf = do { local $/; <$fh> } // die "read: $!"; + $buf =~ /\bpid=(\d+)/ and $pids{$1} = undef; + } + } + is(scalar keys %pids, 2, 'have two pids'); + + kill('TTOU', $pid); + %pids = (); + my $delay = $tries * 0.11 * ($ENV{VALGRIND} ? 10 : 1); + $tries = 0; + diag 'waiting '.$delay.'s for SIGTTOU'; + tick($delay); + until (scalar(keys %pids) == 1 || ++$tries > 100) { + %pids = (); + my @r = map { $doreq->(@ins) } (0..5); + for my $fh (@r) { + my $buf = do { local $/; <$fh> } // die "read: $!"; + $buf =~ /\bpid=(\d+)/ and $pids{$1} = undef; + } } + is(scalar keys %pids, 1, 'have one pid') or diag explain(\%pids); + is($info{pid}, (keys %pids)[0], 'kept oldest PID after TTOU'); + $ar; }; -my $ar; my @NO_CXX = (1); unless ($ENV{TEST_XH_CXX_ONLY}) { - $ar = $test->(qw[-MPublicInbox::XapHelper -e + my $ar = $test->(qw[-MPublicInbox::XapHelper -e PublicInbox::XapHelper::start('-j0')]); $ar = $test->(qw[-MPublicInbox::XapHelper -e PublicInbox::XapHelper::start('-j1')]); @@ -119,10 +152,10 @@ SKIP: { require PublicInbox::XapHelperCxx; PublicInbox::XapHelperCxx::check_build(); }; - skip "XapHelperCxx build: $@", 1 if $@; + skip "XapHelperCxx build: $@", 1 if $@ || $ENV{PI_NO_CXX}; @NO_CXX = $ENV{TEST_XH_CXX_ONLY} ? (0) : (0, 1); - $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e + my $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e PublicInbox::XapHelperCxx::start('-j0')]); $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e PublicInbox::XapHelperCxx::start('-j1')]); @@ -142,6 +175,7 @@ my @id2root; close $fh; } +my $ar; for my $n (@NO_CXX) { local $ENV{PI_NO_CXX} = $n; my ($xhc, $pid) = PublicInbox::XapClient::start_helper('-j0');