about summary refs log tree commit homepage
path: root/lib/PublicInbox/xap_helper.h
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/xap_helper.h')
-rw-r--r--lib/PublicInbox/xap_helper.h372
1 files changed, 256 insertions, 116 deletions
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
index 3456910b..51ab48bf 100644
--- a/lib/PublicInbox/xap_helper.h
+++ b/lib/PublicInbox/xap_helper.h
@@ -7,7 +7,7 @@
  * this is not linked to Perl in any way.
  * C (not C++) is used as much as possible to lower the contribution
  * barrier for hackers who mainly know C (this includes the maintainer).
- * Yes, that means we use C stdlib stuff like hsearch and open_memstream
+ * Yes, that means we use C stdlib stuff like open_memstream
  * instead their equivalents in the C++ stdlib :P
  * Everything here is an unstable internal API of public-inbox and
  * NOT intended for ordinary users; only public-inbox hackers
@@ -15,6 +15,9 @@
 #ifndef _ALL_SOURCE
 #        define _ALL_SOURCE
 #endif
+#ifndef _GNU_SOURCE
+#        define _GNU_SOURCE
+#endif
 #if defined(__NetBSD__) && !defined(_OPENBSD_SOURCE) // for reallocarray(3)
 #        define _OPENBSD_SOURCE
 #endif
@@ -27,13 +30,13 @@
 #include <sys/types.h>
 #include <sys/uio.h>
 #include <sys/wait.h>
+#include <poll.h>
 
 #include <assert.h>
 #include <err.h> // BSD, glibc, and musl all have this
 #include <errno.h>
 #include <fcntl.h>
 #include <limits.h>
-#include <search.h>
 #include <signal.h>
 #include <stddef.h>
 #include <stdint.h>
@@ -82,6 +85,62 @@
 #define ABORT(...) do { warnx(__VA_ARGS__); abort(); } while (0)
 #define EABORT(...) do { warn(__VA_ARGS__); abort(); } while (0)
 
+static void *xcalloc(size_t nmemb, size_t size)
+{
+        void *ret = calloc(nmemb, size);
+        if (!ret) EABORT("calloc(%zu, %zu)", nmemb, size);
+        return ret;
+}
+
+#if defined(__GLIBC__) && defined(__GLIBC_MINOR__) && \
+                MY_VER(__GLIBC__, __GLIBC_MINOR__, 0) >= MY_VER(2, 28, 0)
+#        define HAVE_REALLOCARRAY 1
+#elif defined(__OpenBSD__) || defined(__DragonFly__) || \
+                defined(__FreeBSD__) || defined(__NetBSD__)
+#        define HAVE_REALLOCARRAY 1
+#endif
+
+static void *xreallocarray(void *ptr, size_t nmemb, size_t size)
+{
+#ifdef HAVE_REALLOCARRAY
+        void *ret = reallocarray(ptr, nmemb, size);
+#else // can't rely on __builtin_mul_overflow in gcc 4.x :<
+        void *ret = NULL;
+        if (nmemb && size > SIZE_MAX / nmemb)
+                errno = ENOMEM;
+        else
+                ret = realloc(ptr, nmemb * size);
+#endif
+        if (!ret) EABORT("reallocarray(..., %zu, %zu)", nmemb, size);
+        return ret;
+}
+
+#include "khashl.h"
+
+struct srch {
+        int ckey_len; // int for comparisons
+        unsigned qp_flags;
+        bool qp_extra_done;
+        Xapian::Database *db;
+        Xapian::QueryParser *qp;
+        unsigned char ckey[]; // $shard_path0\0$shard_path1\0...
+};
+
+static khint_t srch_hash(const struct srch *srch)
+{
+        return kh_hash_bytes(srch->ckey_len, srch->ckey);
+}
+
+static int srch_eq(const struct srch *a, const struct srch *b)
+{
+        return a->ckey_len == b->ckey_len ?
+                !memcmp(a->ckey, b->ckey, (size_t)a->ckey_len) : 0;
+}
+
+KHASHL_CSET_INIT(KH_LOCAL, srch_set, srch_set, struct srch *,
+                srch_hash, srch_eq)
+static srch_set *srch_cache;
+static long my_fd_max, shard_nfd;
 // sock_fd is modified in signal handler, yes, it's SOCK_SEQPACKET
 static volatile int sock_fd = STDIN_FILENO;
 static sigset_t fullset, workerset;
@@ -90,11 +149,12 @@ static bool alive = true;
 static FILE *orig_err = stderr;
 #endif
 static int orig_err_fd = -1;
-static void *srch_tree; // tsearch + tdelete + twalk
 static pid_t *worker_pids; // nr => pid
 #define WORKER_MAX USHRT_MAX
 static unsigned long nworker, nworker_hwm;
 static int pipefds[2];
+static const char *stdout_path, *stderr_path; // for SIGUSR1
+static sig_atomic_t worker_needs_reopen;
 
 // PublicInbox::Search and PublicInbox::CodeSearch generate these:
 static void mail_nrp_init(void);
@@ -108,14 +168,6 @@ enum exc_iter {
         ITER_ABORT
 };
 
-struct srch {
-        int paths_len; // int for comparisons
-        unsigned qp_flags;
-        Xapian::Database *db;
-        Xapian::QueryParser *qp;
-        char paths[]; // $shard_path0\0$shard_path1\0...
-};
-
 #define MY_ARG_MAX 256
 typedef bool (*cmd)(struct req *);
 
@@ -123,6 +175,8 @@ typedef bool (*cmd)(struct req *);
 struct req { // argv and pfxv point into global rbuf
         char *argv[MY_ARG_MAX];
         char *pfxv[MY_ARG_MAX]; // -A <prefix>
+        char *qpfxv[MY_ARG_MAX]; // -Q <user_prefix>[:=]<INTERNAL_PREFIX>
+        char *dirv[MY_ARG_MAX]; // -d /path/to/XDB(shard)
         size_t *lenv; // -A <prefix>LENGTH
         struct srch *srch;
         char *Pgit_dir;
@@ -134,15 +188,12 @@ struct req { // argv and pfxv point into global rbuf
         unsigned long timeout_sec;
         size_t nr_out;
         long sort_col; // value column, negative means BoolWeight
-        int argc;
-        int pfxc;
+        int argc, pfxc, qpfxc, dirc;
         FILE *fp[2]; // [0] response pipe or sock, [1] status/errors (optional)
         bool has_input; // fp[0] is bidirectional
         bool collapse_threads;
         bool code_search;
         bool relevance; // sort by relevance before column
-        bool emit_percent;
-        bool emit_docdata;
         bool asc; // ascending sort
 };
 
@@ -226,6 +277,13 @@ static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
                 qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
                                         Xapian::Query(req->Oeidx_key));
         }
+        // TODO: uid_range
+        if (req->threadid != ULLONG_MAX) {
+                std::string tid = Xapian::sortable_serialise(req->threadid);
+                qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
+                        Xapian::Query(Xapian::Query::OP_VALUE_RANGE, THREADID,
+                                        tid, tid));
+        }
         Xapian::Enquire enq = prep_enquire(req);
         enq.set_query(qry);
         // THREADID is a CPP macro defined on CLI (see) XapHelperCxx.pm
@@ -364,25 +422,6 @@ static size_t off2size(off_t n)
         return (size_t)n;
 }
 
-static char *hsearch_enter_key(char *s)
-{
-#if defined(__OpenBSD__) || defined(__DragonFly__)
-        // hdestroy frees each key on some platforms,
-        // so give it something to free:
-        char *ret = strdup(s);
-        if (!ret) err(EXIT_FAILURE, "strdup");
-        return ret;
-// AFAIK there's no way to detect musl, assume non-glibc Linux is musl:
-#elif defined(__GLIBC__) || defined(__linux__) || \
-        defined(__FreeBSD__) || defined(__NetBSD__)
-        // do nothing on these platforms
-#else
-#warning untested platform detected, unsure if hdestroy(3) frees keys
-#warning contact us at meta@public-inbox.org if you get segfaults
-#endif
-        return s;
-}
-
 // for test usage only, we need to ensure the compiler supports
 // __cleanup__ when exceptions are thrown
 struct inspect { struct req *req; };
@@ -406,6 +445,11 @@ static bool cmd_test_inspect(struct req *req)
         return false;
 }
 
+static bool cmd_test_sleep(struct req *req)
+{
+        for (;;) poll(NULL, 0, 10);
+        return false;
+}
 #include "xh_mset.h" // read-only (WWW, IMAP, lei) stuff
 #include "xh_cidx.h" // CodeSearchIdx.pm stuff
 
@@ -420,6 +464,7 @@ static const struct cmd_entry {
         CMD(dump_ibx), // many inboxes
         CMD(dump_roots), // per-cidx shard
         CMD(test_inspect), // least common commands last
+        CMD(test_sleep), // least common commands last
 };
 
 #define MY_ARRAY_SIZE(x)        (sizeof(x)/sizeof((x)[0]))
@@ -495,15 +540,6 @@ again:
         return false;
 }
 
-static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch
-{
-        const struct srch *a = (const struct srch *)pa;
-        const struct srch *b = (const struct srch *)pb;
-        int diff = a->paths_len - b->paths_len;
-
-        return diff ? diff : memcmp(a->paths, b->paths, (size_t)a->paths_len);
-}
-
 static bool is_chert(const char *dir)
 {
         char iamchert[PATH_MAX];
@@ -517,49 +553,85 @@ static bool is_chert(const char *dir)
         return false;
 }
 
-static bool srch_init(struct req *req)
+static void srch_free(struct srch *srch)
+{
+        delete srch->qp;
+        delete srch->db;
+        free(srch);
+}
+
+static void srch_cache_renew(struct srch *keep)
+{
+        khint_t k;
+
+        // can't delete while iterating, so just free each + clear
+        for (k = kh_begin(srch_cache); k != kh_end(srch_cache); k++) {
+                if (!kh_exist(srch_cache, k)) continue;
+                struct srch *cur = kh_key(srch_cache, k);
+
+                if (cur != keep)
+                        srch_free(cur);
+        }
+        srch_set_cs_clear(srch_cache);
+        if (keep) {
+                int absent;
+                k = srch_set_put(srch_cache, keep, &absent);
+                assert(absent);
+                assert(k < kh_end(srch_cache));
+        }
+}
+
+static void srch_init(struct req *req)
 {
-        char *dirv[MY_ARG_MAX];
         int i;
         struct srch *srch = req->srch;
-        int dirc = (int)SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
         const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE;
-        srch->qp_flags = FLAG_PHRASE |
-                        Xapian::QueryParser::FLAG_BOOLEAN |
+        srch->qp_flags = Xapian::QueryParser::FLAG_BOOLEAN |
                         Xapian::QueryParser::FLAG_LOVEHATE |
                         Xapian::QueryParser::FLAG_WILDCARD;
-        if (is_chert(dirv[0]))
-                srch->qp_flags &= ~FLAG_PHRASE;
-        try {
-                srch->db = new Xapian::Database(dirv[0]);
-        } catch (...) {
-                warn("E: Xapian::Database(%s)", dirv[0]);
-                return false;
+        long nfd = req->dirc * SHARD_COST;
+
+        shard_nfd += nfd;
+        if (shard_nfd > my_fd_max) {
+                srch_cache_renew(srch);
+                shard_nfd = nfd;
         }
-        try {
-                for (i = 1; i < dirc; i++) {
-                        if (srch->qp_flags & FLAG_PHRASE && is_chert(dirv[i]))
+        for (int retried = 0; retried < 2; retried++) {
+                srch->qp_flags |= FLAG_PHRASE;
+                i = 0;
+                try {
+                        srch->db = new Xapian::Database(req->dirv[i]);
+                        if (is_chert(req->dirv[0]))
                                 srch->qp_flags &= ~FLAG_PHRASE;
-                        srch->db->add_database(Xapian::Database(dirv[i]));
+                        for (i = 1; i < req->dirc; i++) {
+                                const char *dir = req->dirv[i];
+                                if (srch->qp_flags & FLAG_PHRASE &&
+                                                is_chert(dir))
+                                        srch->qp_flags &= ~FLAG_PHRASE;
+                                srch->db->add_database(Xapian::Database(dir));
+                        }
+                        break;
+                } catch (const Xapian::Error & e) {
+                        warnx("E: Xapian::Error: %s (%s)",
+                                e.get_description().c_str(), req->dirv[i]);
+                } catch (...) { // does this happen?
+                        warn("E: add_database(%s)", req->dirv[i]);
+                }
+                if (retried) {
+                        errx(EXIT_FAILURE, "E: can't open %s", req->dirv[i]);
+                } else {
+                        warnx("retrying...");
+                        if (srch->db)
+                                delete srch->db;
+                        srch->db = NULL;
+                        srch_cache_renew(srch);
                 }
-        } catch (...) {
-                warn("E: add_database(%s)", dirv[i]);
-                return false;
-        }
-        try {
-                srch->qp = new Xapian::QueryParser;
-        } catch (...) {
-                perror("E: Xapian::QueryParser");
-                return false;
         }
+        // these will raise and die on ENOMEM or other errors
+        srch->qp = new Xapian::QueryParser;
         srch->qp->set_default_op(Xapian::Query::OP_AND);
         srch->qp->set_database(*srch->db);
-        try {
-                srch->qp->set_stemmer(Xapian::Stem("english"));
-        } catch (...) {
-                perror("E: Xapian::Stem");
-                return false;
-        }
+        srch->qp->set_stemmer(Xapian::Stem("english"));
         srch->qp->set_stemming_strategy(Xapian::QueryParser::STEM_SOME);
         srch->qp->SET_MAX_EXPANSION(100);
 
@@ -567,15 +639,31 @@ static bool srch_init(struct req *req)
                 qp_init_code_search(srch->qp); // CodeSearch.pm
         else
                 qp_init_mail_search(srch->qp); // Search.pm
-        return true;
 }
 
-static void free_srch(void *p) // tdestroy
+// setup query parser for altid and arbitrary headers
+static void srch_init_extra(struct req *req)
 {
-        struct srch *srch = (struct srch *)p;
-        delete srch->qp;
-        delete srch->db;
-        free(srch);
+        const char *XPFX;
+        for (int i = 0; i < req->qpfxc; i++) {
+                size_t len = strlen(req->qpfxv[i]);
+                char *c = (char *)memchr(req->qpfxv[i], '=', len);
+
+                if (c) { // it's boolean "gmane=XGMANE"
+                        XPFX = c + 1;
+                        *c = 0;
+                        req->srch->qp->add_boolean_prefix(req->qpfxv[i], XPFX);
+                        continue;
+                }
+                // maybe it's a non-boolean prefix "blob:XBLOBID"
+                c = (char *)memchr(req->qpfxv[i], ':', len);
+                if (!c)
+                        errx(EXIT_FAILURE, "bad -Q %s", req->qpfxv[i]);
+                XPFX = c + 1;
+                *c = 0;
+                req->srch->qp->add_prefix(req->qpfxv[i], XPFX);
+        }
+        req->srch->qp_extra_done = true;
 }
 
 static void dispatch(struct req *req)
@@ -588,7 +676,6 @@ static void dispatch(struct req *req)
         } kbuf;
         char *end;
         FILE *kfp;
-        struct srch **s;
         req->threadid = ULLONG_MAX;
         for (c = 0; c < (int)MY_ARRAY_SIZE(cmds); c++) {
                 if (cmds[c].fn_len == size &&
@@ -602,7 +689,7 @@ static void dispatch(struct req *req)
         kfp = open_memstream(&kbuf.ptr, &size);
         if (!kfp) err(EXIT_FAILURE, "open_memstream(kbuf)");
         // write padding, first (contents don't matter)
-        fwrite(&req->argv[0], offsetof(struct srch, paths), 1, kfp);
+        fwrite(&req->argv[0], offsetof(struct srch, ckey), 1, kfp);
 
         // global getopt variables:
         optopt = 0;
@@ -614,7 +701,11 @@ static void dispatch(struct req *req)
                 switch (c) {
                 case 'a': req->asc = true; break;
                 case 'c': req->code_search = true; break;
-                case 'd': fwrite(optarg, strlen(optarg) + 1, 1, kfp); break;
+                case 'd':
+                        req->dirv[req->dirc++] = optarg;
+                        if (MY_ARG_MAX == req->dirc) ABORT("too many -d");
+                        fprintf(kfp, "-d%c%s%c", 0, optarg, 0);
+                        break;
                 case 'g': req->Pgit_dir = optarg - 1; break; // pad "P" prefix
                 case 'k':
                         req->sort_col = strtol(optarg, &end, 10);
@@ -633,7 +724,6 @@ static void dispatch(struct req *req)
                         if (*end || req->off == ULLONG_MAX)
                                 ABORT("-o %s", optarg);
                         break;
-                case 'p': req->emit_percent = true; break;
                 case 'r': req->relevance = true; break;
                 case 't': req->collapse_threads = true; break;
                 case 'A':
@@ -641,7 +731,6 @@ static void dispatch(struct req *req)
                         if (MY_ARG_MAX == req->pfxc)
                                 ABORT("too many -A");
                         break;
-                case 'D': req->emit_docdata = true; break;
                 case 'K':
                         req->timeout_sec = strtoul(optarg, &end, 10);
                         if (*end || req->timeout_sec == ULONG_MAX)
@@ -653,28 +742,38 @@ static void dispatch(struct req *req)
                         if (*end || req->threadid == ULLONG_MAX)
                                 ABORT("-T %s", optarg);
                         break;
+                case 'Q':
+                        req->qpfxv[req->qpfxc++] = optarg;
+                        if (MY_ARG_MAX == req->qpfxc) ABORT("too many -Q");
+                        fprintf(kfp, "-Q%c%s%c", 0, optarg, 0);
+                        break;
                 default: ABORT("bad switch `-%c'", c);
                 }
         }
         ERR_CLOSE(kfp, EXIT_FAILURE); // may ENOMEM, sets kbuf.srch
         kbuf.srch->db = NULL;
         kbuf.srch->qp = NULL;
-        kbuf.srch->paths_len = size - offsetof(struct srch, paths);
-        if (kbuf.srch->paths_len <= 0)
-                ABORT("no -d args");
-        s = (struct srch **)tsearch(kbuf.srch, &srch_tree, srch_cmp);
-        if (!s) err(EXIT_FAILURE, "tsearch"); // likely ENOMEM
-        req->srch = *s;
-        if (req->srch != kbuf.srch) { // reuse existing
-                free_srch(kbuf.srch);
-        } else if (!srch_init(req)) {
-                assert(kbuf.srch == *((struct srch **)tfind(
-                                        kbuf.srch, &srch_tree, srch_cmp)));
-                void *del = tdelete(kbuf.srch, &srch_tree, srch_cmp);
-                assert(del);
-                free_srch(kbuf.srch);
-                goto cmd_err; // srch_init already warned
+        kbuf.srch->qp_extra_done = false;
+        kbuf.srch->ckey_len = size - offsetof(struct srch, ckey);
+        if (kbuf.srch->ckey_len <= 0 || !req->dirc)
+                ABORT("no -d args (or too many)");
+
+        int absent;
+        khint_t ki = srch_set_put(srch_cache, kbuf.srch, &absent);
+        assert(ki < kh_end(srch_cache));
+        req->srch = kh_key(srch_cache, ki);
+        if (absent) {
+                srch_init(req);
+        } else {
+                assert(req->srch != kbuf.srch);
+                srch_free(kbuf.srch);
+                req->srch->db->reopen();
         }
+        if (req->qpfxc && !req->srch->qp_extra_done)
+                srch_init_extra(req);
+        if (req->timeout_sec)
+                alarm(req->timeout_sec > UINT_MAX ?
+                        UINT_MAX : (unsigned)req->timeout_sec);
         try {
                 if (!req->fn(req))
                         warnx("`%s' failed", req->argv[0]);
@@ -683,8 +782,8 @@ static void dispatch(struct req *req)
         } catch (...) {
                 warn("unhandled exception");
         }
-cmd_err:
-        return; // just be silent on errors, for now
+        if (req->timeout_sec)
+                alarm(0);
 }
 
 static void cleanup_pids(void)
@@ -723,9 +822,12 @@ static void stderr_restore(FILE *tmp_err)
         clearerr(stderr);
 }
 
-static void sigw(int sig) // SIGTERM handler for worker
+static void sigw(int sig) // SIGTERM+SIGUSR1 handler for worker
 {
-        sock_fd = -1; // break out of recv_loop
+        switch (sig) {
+        case SIGUSR1: worker_needs_reopen = 1; break;
+        default: sock_fd = -1; // break out of recv_loop
+        }
 }
 
 #define CLEANUP_REQ __attribute__((__cleanup__(req_cleanup)))
@@ -735,6 +837,18 @@ static void req_cleanup(void *ptr)
         free(req->lenv);
 }
 
+static void reopen_logs(void)
+{
+        if (stdout_path && *stdout_path && !freopen(stdout_path, "a", stdout))
+                err(EXIT_FAILURE, "freopen %s", stdout_path);
+        if (stderr_path && *stderr_path) {
+                if (!freopen(stderr_path, "a", stderr))
+                        err(EXIT_FAILURE, "freopen %s", stderr_path);
+                if (my_setlinebuf(stderr))
+                        err(EXIT_FAILURE, "setlinebuf(stderr)");
+        }
+}
+
 static void recv_loop(void) // worker process loop
 {
         static char rbuf[4096 * 33]; // per-process
@@ -742,6 +856,7 @@ static void recv_loop(void) // worker process loop
         sa.sa_handler = sigw;
 
         CHECK(int, 0, sigaction(SIGTERM, &sa, NULL));
+        CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL));
 
         while (sock_fd == 0) {
                 size_t len = sizeof(rbuf);
@@ -758,6 +873,10 @@ static void recv_loop(void) // worker process loop
                         stderr_restore(req.fp[1]);
                         ERR_CLOSE(req.fp[1], 0);
                 }
+                if (worker_needs_reopen) {
+                        worker_needs_reopen = 0;
+                        reopen_logs();
+                }
         }
 }
 
@@ -804,10 +923,21 @@ static void start_workers(void)
 static void cleanup_all(void)
 {
         cleanup_pids();
-#ifdef __GLIBC__
-        tdestroy(srch_tree, free_srch);
-        srch_tree = NULL;
-#endif
+        if (!srch_cache)
+                return;
+        srch_cache_renew(NULL);
+        srch_set_destroy(srch_cache);
+        srch_cache = NULL;
+}
+
+static void parent_reopen_logs(void)
+{
+        reopen_logs();
+        for (unsigned long nr = nworker; nr < nworker_hwm; nr++) {
+                pid_t pid = worker_pids[nr];
+                if (pid != 0 && kill(pid, SIGUSR1))
+                        warn("BUG?: kill(%d, SIGUSR1)", (int)pid);
+        }
 }
 
 static void sigp(int sig) // parent signal handler
@@ -822,6 +952,7 @@ static void sigp(int sig) // parent signal handler
         case SIGCHLD: c = '.'; break;
         case SIGTTOU: c = '-'; break;
         case SIGTTIN: c = '+'; break;
+        case SIGUSR1: c = '#'; break;
         default:
                 write(STDERR_FILENO, bad_sig, sizeof(bad_sig) - 1);
                 _exit(EXIT_FAILURE);
@@ -928,14 +1059,25 @@ int main(int argc, char *argv[])
 {
         int c;
         socklen_t slen = (socklen_t)sizeof(c);
+        stdout_path = getenv("STDOUT_PATH");
+        stderr_path = getenv("STDERR_PATH");
+        struct rlimit rl;
 
         if (getsockopt(sock_fd, SOL_SOCKET, SO_TYPE, &c, &slen))
                 err(EXIT_FAILURE, "getsockopt");
         if (c != SOCK_SEQPACKET)
                 errx(EXIT_FAILURE, "stdin is not SOCK_SEQPACKET");
 
+        if (getrlimit(RLIMIT_NOFILE, &rl))
+                err(EXIT_FAILURE, "getrlimit");
+        my_fd_max = rl.rlim_cur;
+        if (my_fd_max < 72)
+                warnx("W: RLIMIT_NOFILE=%ld too low\n", my_fd_max);
+        my_fd_max -= 64;
+
         mail_nrp_init();
         code_nrp_init();
+        srch_cache = srch_set_init();
         atexit(cleanup_all);
 
         if (!STDERR_ASSIGNABLE) {
@@ -945,12 +1087,6 @@ int main(int argc, char *argv[])
         }
 
         nworker = 1;
-#ifdef _SC_NPROCESSORS_ONLN
-        long j = sysconf(_SC_NPROCESSORS_ONLN);
-        if (j > 0)
-                nworker = j > WORKER_MAX ? WORKER_MAX : j;
-#endif // _SC_NPROCESSORS_ONLN
-
         // make warn/warnx/err multi-process friendly:
         if (my_setlinebuf(stderr))
                 err(EXIT_FAILURE, "setlinebuf(stderr)");
@@ -992,6 +1128,8 @@ int main(int argc, char *argv[])
         DELSET(SIGXCPU);
         DELSET(SIGXFSZ);
 #undef DELSET
+        CHECK(int, 0, sigdelset(&workerset, SIGUSR1));
+        CHECK(int, 0, sigdelset(&fullset, SIGALRM));
 
         if (nworker == 0) { // no SIGTERM handling w/o workers
                 recv_loop();
@@ -1000,8 +1138,7 @@ int main(int argc, char *argv[])
         CHECK(int, 0, sigdelset(&workerset, SIGTERM));
         CHECK(int, 0, sigdelset(&workerset, SIGCHLD));
         nworker_hwm = nworker;
-        worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t));
-        if (!worker_pids) err(EXIT_FAILURE, "calloc");
+        worker_pids = (pid_t *)xcalloc(nworker, sizeof(pid_t));
 
         if (pipe(pipefds)) err(EXIT_FAILURE, "pipe");
         int fl = fcntl(pipefds[1], F_GETFL);
@@ -1012,10 +1149,12 @@ int main(int argc, char *argv[])
         CHECK(int, 0, sigdelset(&pset, SIGCHLD));
         CHECK(int, 0, sigdelset(&pset, SIGTTIN));
         CHECK(int, 0, sigdelset(&pset, SIGTTOU));
+        CHECK(int, 0, sigdelset(&pset, SIGUSR1));
 
         struct sigaction sa = {};
         sa.sa_handler = sigp;
 
+        CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL));
         CHECK(int, 0, sigaction(SIGTTIN, &sa, NULL));
         CHECK(int, 0, sigaction(SIGTTOU, &sa, NULL));
         sa.sa_flags = SA_NOCLDSTOP;
@@ -1040,6 +1179,7 @@ int main(int argc, char *argv[])
                         case '.': break; // do_sigchld already called
                         case '-': do_sigttou(); break;
                         case '+': do_sigttin(); break;
+                        case '#': parent_reopen_logs(); break;
                         default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]);
                         }
                 }