From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id EB4901F406 for ; Mon, 21 Aug 2023 10:34:16 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1692614057; bh=mF7aidGeSrFnf5lV7uIcBET2KWpMtHKE0eavJONSG5I=; h=From:To:Subject:Date:From; b=SHy4z0yZzIJvi/fi1ekv/8X0NkUvwc1WWhZo6CNU9vEpZQCviLb5tNeqQZwFq920Z +xONk7xVLZihr9ptf3x22udJ7jIsl5qXT3KWL+Otr8m4RwVQiSprfEYBxbwX9Z6tr7 7TqutKhsqrkLdbOkoF6laW3+l0cEKFfteB1s62JA= From: Eric Wong To: spew@80x24.org Subject: [PATCH] xap_helper_cxx work + generation Date: Mon, 21 Aug 2023 10:34:16 +0000 Message-Id: <20230821103416.3600148-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: --- lib/PublicInbox/CodeSearch.pm | 29 +++ lib/PublicInbox/Search.pm | 33 ++++ lib/PublicInbox/XapHelperCxx.pm | 7 +- lib/PublicInbox/xap_helper.h | 335 ++++++++++++++++++++++++-------- t/xap_helper.t | 73 +++++++ 5 files changed, 396 insertions(+), 81 deletions(-) create mode 100644 t/xap_helper.t diff --git a/lib/PublicInbox/CodeSearch.pm b/lib/PublicInbox/CodeSearch.pm index a5ccce03..ddd09e5f 100644 --- a/lib/PublicInbox/CodeSearch.pm +++ b/lib/PublicInbox/CodeSearch.pm @@ -63,6 +63,35 @@ sub cqparse_new ($) { $qp; } +sub generate_cxx () { # generates snippet for xap_helper.h + my ($line, $file) = (__LINE__ + 2, __FILE__); + my $ret = <ADD_VRP(d); + qp->ADD_VRP(dt); + qp->ADD_VRP(ct); +EOM + for my $name (sort keys %bool_pfx_external) { + for (split(/ /, $bool_pfx_external{$name})) { + $ret .= qq{\tqp->add_boolean_prefix("$name", "$_");\n} + } + } + for my $name (sort keys %prob_prefix) { + for (split(/ /, $prob_prefix{$name})) { + $ret .= qq{\tqp->add_prefix("$name", "$_");\n} + } + } + $ret .= "}\n"; +} + # returns a Xapian::Query to filter by roots sub roots_filter { # retry_reopen callback my ($self, $git_dir) = @_; diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm index 537647d4..91f2f934 100644 --- a/lib/PublicInbox/Search.pm +++ b/lib/PublicInbox/Search.pm @@ -527,6 +527,39 @@ EOF $qp; } +sub generate_cxx () { # generates snippet for xap_helper.h + my $ret = <ADD_VRP(d); + qp->ADD_VRP(dt); + qp->ADD_VRP(z); + qp->ADD_VRP(rt); + qp->ADD_VRP(uid); +EOM + for my $name (sort keys %bool_pfx_external) { + for (split(/ /, $bool_pfx_external{$name})) { + $ret .= qq{\tqp->add_boolean_prefix("$name", "$_");\n} + } + } + # TODO: altid support + for my $name (sort keys %prob_prefix) { + for (split(/ /, $prob_prefix{$name})) { + $ret .= qq{\tqp->add_prefix("$name", "$_");\n} + } + } + $ret .= "}\n"; +} + sub help { my ($self) = @_; $self->{qp} //= $self->qparse_new; # parse altids diff --git a/lib/PublicInbox/XapHelperCxx.pm b/lib/PublicInbox/XapHelperCxx.pm index 8b374bab..e7d08b28 100644 --- a/lib/PublicInbox/XapHelperCxx.pm +++ b/lib/PublicInbox/XapHelperCxx.pm @@ -14,6 +14,7 @@ my $dir = ($ENV{PERL_INLINE_DIRECTORY} // my $bin = "$dir/xap_helper"; my ($srcpfx) = (__FILE__ =~ m!\A(.+/)[^/]+\z!); my @srcs = map { $srcpfx.$_ } qw(xap_helper.h); +my @pm_dep = map { $srcpfx.$_ } qw(Search.pm CodeSearch.pm); my $xflags = ($ENV{CXXFLAGS} // '-Wall -ggdb3 -O0') . ' ' . ($ENV{LDFLAGS} // '-Wl,-O1 -Wl,--compress-debug-sections=zlib'); @@ -43,7 +44,11 @@ sub build () { local $/; print $fh readline($rfh); } + require PublicInbox::CodeSearch; + print $fh PublicInbox::Search::generate_cxx(); + print $fh PublicInbox::CodeSearch::generate_cxx(); close $fh; + my $cmd = "$pkg_config --libs --cflags xapian-core"; chomp(my $fl = `$cmd`); die "$cmd failed: \$?=$?" if $?; @@ -61,7 +66,7 @@ sub build () { sub start (@) { my $ctime = 0; my @bin = stat($bin); - for ((@bin ? @srcs : ())) { + for ((@bin ? (@srcs, @pm_dep) : ())) { my @st = stat($_) or die "stat $_: $!"; $ctime = $st[10] if $st[10] > $ctime; } diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h index 1719eb55..5f25ae3b 100644 --- a/lib/PublicInbox/xap_helper.h +++ b/lib/PublicInbox/xap_helper.h @@ -2,9 +2,11 @@ * Copyright (C) all contributors * License: AGPL-3.0+ * - * Read-only helper process using minimal C++ for Xapian. + * Standalone helper process using minimal C++ for Xapian. * Use C11 as much as possible to make life easier for contributors * and the maintainer (who doesn't know C++ well) + * The socket API is internal to the public-inbox and NOT intended + * for ordinary users, only public-inbox hackers. */ #include #include @@ -15,6 +17,8 @@ #include #include // BSD, glibc, and musl all have this +#include +#include #include #include #include @@ -23,23 +27,56 @@ #include #include // our only reason for using C++ -static const int sock_fd = 3; // SOCK_SEQPACKET +#define MY_VER(maj,min,rev) ((maj) << 16 | (min) << 8 | (rev)) +#define XAP_VER \ + MY_VER(XAPIAN_MAJOR_VERSION,XAPIAN_MINOR_VERSION,XAPIAN_REVISION) + +#if XAP_VER >= MY_VER(1,3,6) +# define NVRP Xapian::NumberRangeProcessor +# define ADD_VRP add_rangeprocessor +#else +# define NVRP Xapian::NumberValueRangeProcessor +# define ADD_VRP add_valuerangeprocessor +#endif + + +static const int sock_fd = 0; // SOCK_SEQPACKET as stdin :P static pid_t parent_pid; -static void *pid_tree; -static void *xdb_tree; +static void *pid_tree, *srch_tree; // tsearch + tdelete + twalk -struct xdbkey { - int len; - char buf[]; // $shard_path0\0$shard_path1\0... +// PublicInbox::Search and PublicInbox::CodeSearch generate these: +static void qp_init_search(Xapian::QueryParser *); +static void qp_init_codesearch(Xapian::QueryParser *); + +struct srch { + int paths_len; // int for comparisons + Xapian::Database *db; + Xapian::QueryParser *qp; + char paths[]; // $shard_path0\0$shard_path1\0... }; -struct cmd_opt { +#define MY_ARG_MAX 128 +typedef bool (*cmd)(struct req *); + +// only one request per-process since we have RLIMIT_CPU timeout +struct req { // argv and pfxv point into global rbuf + char *argv[MY_ARG_MAX]; + char *pfxv[MY_ARG_MAX]; // -A + struct srch *srch; + const char *eidx_key; + cmd fn; unsigned long long max; - long sort_col; // value column unsigned long timeout_sec; + long sort_col; // value column + int argc; + int pfxc; + FILE *fp; // blocking buffered response pipe or socket + bool has_input; // fp is bidirectional bool collapse_threads; + bool codesearch; bool relevance; // sort by relevance before column bool asc; // ascending sort + bool pct; // return percentage }; struct worker { @@ -47,30 +84,58 @@ struct worker { unsigned nr; }; -#define MY_ARG_MAX 128 +static bool cmd_dump_ibx(struct req *req) +{ + if (optind >= req->argc) return false; // need ibx_id + const char *ibx_id = req->argv[optind]; + + return true; +} + +static bool cmd_test_pid(struct req *req) +{ + fprintf(req->fp, "%d", (int)getpid()); + return true; +} + +#define CMD(n) { .fn_len = sizeof(#n) - 1, .fn_name = #n, .fn = cmd_##n } +struct cmd_entry { + size_t fn_len; + const char *fn_name; + cmd fn; +} cmds[] = { + CMD(dump_ibx), + CMD(test_pid), +}; + +#define MY_ARRAY_SIZE(x) (sizeof(x)/sizeof((x)[0])) #define RECV_FD_CAPA 1 -#define RECV_FD_SPACE (RECV_FD_CAPA * sizeof(int)) +#define RECV_FD_SPACE (RECV_FD_CAPA * sizeof(int)) union my_cmsg { struct cmsghdr hdr; char pad[sizeof(struct cmsghdr) + 16 + RECV_FD_SPACE]; }; -static int recv_fd(char *buf, size_t *len) +static void xclose(int fd) +{ + if (close(fd) < 0 && errno != EINTR) + err(EXIT_FAILURE, "BUG: close"); +} + +static bool recv_req(struct req *req, char *rbuf, size_t *len) { union my_cmsg cmsg = { 0 }; struct msghdr msg = { .msg_iovlen = 1 }; struct iovec iov; - iov.iov_base = buf; + iov.iov_base = rbuf; iov.iov_len = *len; msg.msg_iov = &iov; msg.msg_control = &cmsg.hdr; msg.msg_controllen = CMSG_SPACE(RECV_FD_SPACE); ssize_t r = recvmsg(sock_fd, &msg, 0); - if (r < 0) { + if (r < 0) err(EXIT_FAILURE, "recvmsg"); - return -1; - } if (r == 0) exit(EX_NOINPUT); /* grandparent went away */ *len = r; @@ -78,10 +143,27 @@ static int recv_fd(char *buf, size_t *len) cmsg.hdr.cmsg_type == SCM_RIGHTS) { size_t len = cmsg.hdr.cmsg_len; int *fdp = (int *)CMSG_DATA(&cmsg.hdr); - for (size_t i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++) - return *fdp; + size_t i; + for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++) { + int fl = fcntl(*fdp, F_GETFL); + const char *mode = NULL; + switch (fl) { + case -1: warnx("invalid fd=%d", *fdp); return false; + case O_WRONLY: mode = "w"; break; + case O_RDWR: mode = "r+"; req->has_input = true; break; + default: warnx("invalid mode from F_GETFL: 0x%x", fl); + } + if (mode) { + req->fp = fdopen(*fdp, mode); + if (req->fp) return true; + warn("fdopen(fd=%d)", *fdp); + } + xclose(*fdp); + return false; + } } - return -2; + warnx("no FD received in %zd-byte request", r); + return false; } static int split2argv(char **argv, char *buf, size_t len) @@ -90,98 +172,187 @@ static int split2argv(char **argv, char *buf, size_t len) warnx("bogus argument given"); return 0; } - size_t nr = 1; - argv[0] = buf; - for (size_t i = 1; i <= len; i++) { - if (!buf[i]) - argv[nr++] = buf + i + 1; + size_t nr = 0; + char *c = buf; + for (size_t i = 1; i < len; i++) { + if (!buf[i]) { + argv[nr++] = c; + c = buf + i + 1; + } } return (int)nr; } -static int cmp_xdbkey(const void *pa, const void *pb) // for tfind|tsearch +static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch { - const struct xdbkey *a = (const struct xdbkey *)pa; - const struct xdbkey *b = (const struct xdbkey *)pb; - int diff = a->len - b->len; + const struct srch *a = (const struct srch *)pa; + const struct srch *b = (const struct srch *)pb; + int diff = a->paths_len - b->paths_len; - return diff ? diff : memcmp(a->buf, b->buf, (size_t)a->len); + return diff ? diff : memcmp(a->paths, b->paths, (size_t)a->paths_len); } -static void dispatch(int wfd, int argc, char **argv) +static bool srch_init(struct req *req) { - int c = 0; + char *dirv[MY_ARG_MAX]; + int i; + struct srch *srch = req->srch; + int dirc = split2argv(dirv, srch->paths, (size_t)srch->paths_len); + try { + srch->db = new Xapian::Database(dirv[0]); + } catch (...) { + warn("E: Xapian::Database(%s)", dirv[0]); + return false; + } + try { + for (i = 1; i < dirc; i++) + srch->db->add_database(Xapian::Database(dirv[i])); + } catch (...) { + warn("E: add_database(%s)", dirv[i]); + goto cleanup; + } + try { + srch->qp = new Xapian::QueryParser; + } catch (...) { + warn("E: Xapian::QueryParser"); + goto cleanup; + } + srch->qp->set_default_op(Xapian::Query::OP_AND); + srch->qp->set_database(*srch->db); + try { + srch->qp->set_stemmer(Xapian::Stem("english")); + } catch (...) { + warn("E: Xapian::Stem"); + goto cleanup; + } + srch->qp->set_stemming_strategy(Xapian::QueryParser::STEM_SOME); + +#if XAP_VER >= MY_VER(1, 3, 3) + srch->qp->set_max_expansion(100); +#else // Xapian < 1.3.3 + srch->qp->set_max_wildcard_expansion(100); +#endif // Xapian < 1.3.3 + + if (req->codesearch) + qp_init_codesearch(srch->qp); // CodeSearch.pm + else + qp_init_search(srch->qp); // Search.pm + return true; +cleanup: + if (srch->db) delete srch->db; + if (srch->qp) delete srch->qp; + srch->qp = NULL; + srch->db = NULL; + return false; +} + +static void dispatch(struct req *req) +{ + int c; + size_t size = strlen(req->argv[0]); union { - struct xdbkey *xdbkey; + struct srch *srch; char *ptr; - } buf; - size_t size; - struct cmd_opt opt = { 0 }; + } fbuf; char *end; - FILE *fp = open_memstream(&buf.ptr, &size); + FILE *kfp; + struct srch **s; + req->fn = NULL; + for (c = 0; c < (int)MY_ARRAY_SIZE(cmds); c++) { + if (cmds[c].fn_len == size && + !memcmp(cmds[c].fn_name, req->argv[0], size)) { + req->fn = cmds[c].fn; + break; + } + } + if (!req->fn) goto cmd_err; - fwrite((const void *)&c, sizeof(c), 1, fp); // xdbkey.len placeholder + kfp = open_memstream(&fbuf.ptr, &size); + // write padding, first + fwrite(&req->argv[0], offsetof(struct srch, paths), 1, kfp); + // global getopt variables: optind = 1; opterr = optopt = 0; optarg = NULL; - while ((c = getopt(argc, argv, "ad:k:m:rtT:")) != -1) { + while ((c = getopt(req->argc, req->argv, "aA:d:k:m:O:rtT:")) != -1) { switch (c) { - case 'a': opt.asc = true; break; - case 'd': fwrite(optarg, strlen(optarg) + 1, 1, fp); break; + case 'a': req->asc = true; break; + case 'A': req->pfxv[req->pfxc++] = optarg; break; + case 'c': req->codesearch = true; break; + case 'd': fwrite(optarg, strlen(optarg) + 1, 1, kfp); break; case 'k': - opt.sort_col = strtol(optarg, &end, 10); + req->sort_col = strtol(optarg, &end, 10); if (*end) goto cmd_err; - switch (opt.sort_col) { + switch (req->sort_col) { case LONG_MAX: case LONG_MIN: goto cmd_err; } break; case 'm': - opt.max = strtoull(optarg, &end, 10); + req->max = strtoull(optarg, &end, 10); if (*end) goto cmd_err; - if (opt.max == ULLONG_MAX) goto cmd_err; + if (req->max == ULLONG_MAX) goto cmd_err; break; - case 'r': opt.relevance = true; break; - case 't': opt.collapse_threads = true; break; + case 'O': req->eidx_key = optarg; break; + case 'r': req->relevance = true; break; + case 't': req->collapse_threads = true; break; case 'T': - opt.timeout_sec = strtoul(optarg, &end, 10); + req->timeout_sec = strtoul(optarg, &end, 10); if (*end) goto cmd_err; - if (opt.timeout_sec == ULONG_MAX) goto cmd_err; + if (req->timeout_sec == ULONG_MAX) goto cmd_err; break; + default: goto cmd_err; } } - if (ferror(fp) | fclose(fp)) { + if (ferror(kfp) | fclose(kfp)) { perror("ferror|fclose"); goto cmd_err; } - buf.xdbkey->len = size - sizeof(int); - if (buf.xdbkey->len <= 0) { + fbuf.srch->db = NULL; + fbuf.srch->qp = NULL; + fbuf.srch->paths_len = size - offsetof(struct srch, paths); + if (fbuf.srch->paths_len <= 0) { + free(fbuf.srch); warnx("no -d args"); goto cmd_err; } - struct xdbkey *k; - k = (struct xdbkey *)tsearch(buf.xdbkey, &xdb_tree, cmp_xdbkey); - if (k != buf.xdbkey) - free(buf.xdbkey); + s = (struct srch **)tsearch(fbuf.srch, &srch_tree, srch_cmp); + if (!s) { + warn("tsearch"); + goto cmd_err; + } + req->srch = *s; + if (req->srch != fbuf.srch) { // reuse existing + free(fbuf.srch); + } else if (!srch_init(req)) { + assert(fbuf.srch == *((struct srch **)tfind( + fbuf.srch, &srch_tree, srch_cmp))); + void *del = tdelete(fbuf.srch, &srch_tree, srch_cmp); + assert(del); + free(fbuf.srch); + goto cmd_err; + } + if (!req->fn(req)) + goto cmd_err; cmd_err: - write(wfd, "E", 1); + return; // just be silent on errors, for now } static void recv_loop(void) { - static char buf[1024 * 128]; // per-process - char *argv[MY_ARG_MAX]; // points into buf + static char rbuf[1024 * 128]; // per-process while (1) { - if (getpid() != parent_pid) + if (getppid() != parent_pid) exit(EXIT_SUCCESS); - size_t len = sizeof(buf); - int wfd = recv_fd(buf, &len); - if (wfd < 0) continue; - int argc = split2argv(argv, buf, len); - if (argc > 0) - dispatch(wfd, argc, argv); - close(wfd); + size_t len = sizeof(rbuf); + struct req req = { 0 }; + if (!recv_req(&req, rbuf, &len)) + continue; + req.argc = split2argv(req.argv, rbuf, len); + if (req.argc > 0) + dispatch(&req); + fclose(req.fp); } } @@ -197,7 +368,7 @@ static void insert_pid(pid_t pid, unsigned nr) { struct worker *w = (struct worker *)malloc(sizeof(*w)); if (!w) { - warn("malloc for worker=%u", nr); + warn("E: malloc(worker=%u)", nr); kill(pid, SIGTERM); return; } @@ -205,12 +376,10 @@ static void insert_pid(pid_t pid, unsigned nr) w->nr = nr; assert(tfind((const void *)w, &pid_tree, cmp_worker) == NULL); void *ret = tsearch(w, &pid_tree, cmp_worker); - if (!ret) { - warn("tsearch for worker=%u", nr); + if (!ret) { // likely malloc failure + warn("E: tsearch(worker=%u)", nr); free(w); kill(pid, SIGTERM); - } else if (ret != w) { - err(EXIT_FAILURE, "BUG: tsearch %p != %p", ret, w); } } @@ -218,15 +387,15 @@ static int delete_pid(pid_t pid) { struct worker key; key.pid = pid; - struct worker *w; + struct worker **w; - w = (struct worker *)tdelete(&key, &pid_tree, cmp_worker); + w = (struct worker **)tdelete(&key, &pid_tree, cmp_worker); if (!w) { - warnx("invalid pid=%d reaped", (int)pid); + warnx("W: unknown pid=%d reaped", (int)pid); return -1; } - key.nr = w->nr; - free(w); + key.nr = (*w)->nr; + free(*w); return (int)key.nr; } @@ -234,7 +403,7 @@ static void start_worker(unsigned nr) { pid_t pid = fork(); if (pid < 0) - warn("fork worker=%u", nr); + warn("E: fork(worker=%u)", nr); else if (pid == 0) recv_loop(); else @@ -246,6 +415,12 @@ int main(int argc, char *argv[]) unsigned long jobs = 1; int c; +#ifdef _SC_NPROCESSORS_ONLN + long j = sysconf(_SC_NPROCESSORS_ONLN); + if (j > 0) + jobs = j > UCHAR_MAX ? UCHAR_MAX : j; +#endif // _SC_NPROCESSORS_ONLN + while ((c = getopt(argc, argv, "j:")) != -1) { char *end; @@ -274,13 +449,13 @@ int main(int argc, char *argv[]) int st; pid_t pid; - bool done = false; + bool quit = false; while ((pid = wait(&st)) > 0) { int nr = delete_pid(pid); if (nr >= 0) { if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT) - done = true; - if (!done) + quit = true; + if (!quit) start_worker(nr); } } diff --git a/t/xap_helper.t b/t/xap_helper.t new file mode 100644 index 00000000..0d4ba30c --- /dev/null +++ b/t/xap_helper.t @@ -0,0 +1,73 @@ +#!perl -w +# Copyright (C) all contributors +# License: AGPL-3.0+ +use v5.12; +use PublicInbox::TestCommon; +use PublicInbox::Spawn qw(spawn); +use Socket qw(AF_UNIX SOCK_SEQPACKET SOCK_STREAM MSG_EOR); +use POSIX qw(dup2); +use PublicInbox::AutoReap; +use PublicInbox::IPC; +use PublicInbox::SearchIdx; +use autodie; +my ($tmp, $for_destroy) = tmpdir(); +use Data::Dumper; $Data::Dumper::Useqq = 1; + +my $fi_data = './t/git.fast-import-data'; +open my $fi_fh, '<', $fi_data; +open my $dh, '<', '.'; +my $crepo = create_coderepo 'for-cindex', sub { + my ($d) = @_; + xsys_e([qw(git init -q --bare)]); + xsys_e([qw(git fast-import --quiet)], undef, { 0 => $fi_fh }); + chdir($dh); + run_script([qw(-cindex --dangerous -L medium --no-fsync -q -j1), $d]) + or xbail '-cindex internal'; + run_script([qw(-cindex --dangerous -L medium --no-fsync -q -j3 -d), + "$d/cidx-ext", $d]) or xbail '-cindex "external"'; +}; + +my $v2 = create_inbox 'v2', indexlevel => 'medium', version => 2, + tmpdir => "$tmp/v2", sub { + my ($im) = @_; + $im->add(eml_load 't/data/0001.patch') or BAIL_OUT; +}; + +my (@int) = glob("$crepo/public-inbox-cindex/cidx*/?"); +my (@ext) = glob("$crepo/cidx-ext/cidx*/?"); +is(scalar(@ext), 2, 'have 2 external shards') or diag explain(\@ext); +is(scalar(@int), 1, 'have 1 internal shard') or diag explain(\@int); + +my $mkreq = sub { + my ($s, $mode, @arg) = @_; + my ($x, $y); + $mode eq 'r' ? pipe($x, $y) + : socketpair($x, $y, AF_UNIX, SOCK_STREAM, 0); + my $buf = join("\0", @arg, ''); + my $n = PublicInbox::IPC::send_cmd($s, [ fileno($y) ], $buf, MSG_EOR); + $n // xbail "send: $!"; + is(length($buf), $n, "req @arg sent"); + $x; +}; + +my $test = sub { + my (@arg) = @_; + socketpair(my $s, my $y, AF_UNIX, SOCK_SEQPACKET, 0); + my $pid = spawn([$^X, @arg], undef, { 0 => $y }); + my $ar = PublicInbox::AutoReap->new($pid); + close $y; + my $r = $mkreq->($s, 'r', qw(test_pid -d), $int[0]); + my $tpid = do { local $/; <$r> }; + like($tpid, qr/\A\d+\z/, 'got PID'); + + $r = $mkreq->($s, 'r', qw(dump_ibx -d), $int[0], '13'); + my $res = do { local $/; <$r> }; + diag "res=$res"; + sleep 1; + $ar; +}; + +my $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e + PublicInbox::XapHelperCxx::start('-j1')]); + +done_testing;