dumping ground for random patches and texts
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: spew@80x24.org
Subject: [PATCH] xap_helper_cxx work + generation
Date: Mon, 21 Aug 2023 10:34:16 +0000	[thread overview]
Message-ID: <20230821103416.3600148-1-e@80x24.org> (raw)

---
 lib/PublicInbox/CodeSearch.pm   |  29 +++
 lib/PublicInbox/Search.pm       |  33 ++++
 lib/PublicInbox/XapHelperCxx.pm |   7 +-
 lib/PublicInbox/xap_helper.h    | 335 ++++++++++++++++++++++++--------
 t/xap_helper.t                  |  73 +++++++
 5 files changed, 396 insertions(+), 81 deletions(-)
 create mode 100644 t/xap_helper.t

diff --git a/lib/PublicInbox/CodeSearch.pm b/lib/PublicInbox/CodeSearch.pm
index a5ccce03..ddd09e5f 100644
--- a/lib/PublicInbox/CodeSearch.pm
+++ b/lib/PublicInbox/CodeSearch.pm
@@ -63,6 +63,35 @@ sub cqparse_new ($) {
 	$qp;
 }
 
+sub generate_cxx () { # generates snippet for xap_helper.h
+	my ($line, $file) = (__LINE__ + 2, __FILE__);
+	my $ret = <<EOM;
+# line ${\(__LINE__)} "${\(__FILE__)}"
+static void qp_init_codesearch(Xapian::QueryParser *qp)
+{
+	static NVRP *d, *dt, *ct;
+	if (!d) {
+		d = new NVRP(${\(AT)}, "dt:");
+		dt = new NVRP(${\(AT)}, "d:");
+		ct = new NVRP(${\(CT)}, "ct:");
+	}
+	qp->ADD_VRP(d);
+	qp->ADD_VRP(dt);
+	qp->ADD_VRP(ct);
+EOM
+	for my $name (sort keys %bool_pfx_external) {
+		for (split(/ /, $bool_pfx_external{$name})) {
+			$ret .= qq{\tqp->add_boolean_prefix("$name", "$_");\n}
+		}
+	}
+	for my $name (sort keys %prob_prefix) {
+		for (split(/ /, $prob_prefix{$name})) {
+			$ret .= qq{\tqp->add_prefix("$name", "$_");\n}
+		}
+	}
+	$ret .= "}\n";
+}
+
 # returns a Xapian::Query to filter by roots
 sub roots_filter { # retry_reopen callback
 	my ($self, $git_dir) = @_;
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index 537647d4..91f2f934 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -527,6 +527,39 @@ EOF
 	$qp;
 }
 
+sub generate_cxx () { # generates snippet for xap_helper.h
+	my $ret = <<EOM;
+# line ${\(__LINE__)} "${\(__FILE__)}"
+static void qp_init_search(Xapian::QueryParser *qp)
+{
+	static NVRP *d, *dt, *z, *rt, *uid;
+	if (!d) {
+		d = new NVRP(${\(YYYYMMDD)}, "d:");
+		dt = new NVRP(${\(DT)}, "dt:");
+		z = new NVRP(${\(BYTES)}, "z:");
+		rt = new NVRP(${\(TS)}, "rt:");
+		uid = new NVRP(${\(UID)}, "uid:");
+	}
+	qp->ADD_VRP(d);
+	qp->ADD_VRP(dt);
+	qp->ADD_VRP(z);
+	qp->ADD_VRP(rt);
+	qp->ADD_VRP(uid);
+EOM
+	for my $name (sort keys %bool_pfx_external) {
+		for (split(/ /, $bool_pfx_external{$name})) {
+			$ret .= qq{\tqp->add_boolean_prefix("$name", "$_");\n}
+		}
+	}
+	# TODO: altid support
+	for my $name (sort keys %prob_prefix) {
+		for (split(/ /, $prob_prefix{$name})) {
+			$ret .= qq{\tqp->add_prefix("$name", "$_");\n}
+		}
+	}
+	$ret .= "}\n";
+}
+
 sub help {
 	my ($self) = @_;
 	$self->{qp} //= $self->qparse_new; # parse altids
diff --git a/lib/PublicInbox/XapHelperCxx.pm b/lib/PublicInbox/XapHelperCxx.pm
index 8b374bab..e7d08b28 100644
--- a/lib/PublicInbox/XapHelperCxx.pm
+++ b/lib/PublicInbox/XapHelperCxx.pm
@@ -14,6 +14,7 @@ my $dir = ($ENV{PERL_INLINE_DIRECTORY} //
 my $bin = "$dir/xap_helper";
 my ($srcpfx) = (__FILE__ =~ m!\A(.+/)[^/]+\z!);
 my @srcs = map { $srcpfx.$_ } qw(xap_helper.h);
+my @pm_dep = map { $srcpfx.$_ } qw(Search.pm CodeSearch.pm);
 my $xflags = ($ENV{CXXFLAGS} // '-Wall -ggdb3 -O0') . ' ' .
 	($ENV{LDFLAGS} // '-Wl,-O1 -Wl,--compress-debug-sections=zlib');
 
@@ -43,7 +44,11 @@ sub build () {
 		local $/;
 		print $fh readline($rfh);
 	}
+	require PublicInbox::CodeSearch;
+	print $fh PublicInbox::Search::generate_cxx();
+	print $fh PublicInbox::CodeSearch::generate_cxx();
 	close $fh;
+
 	my $cmd = "$pkg_config --libs --cflags xapian-core";
 	chomp(my $fl = `$cmd`);
 	die "$cmd failed: \$?=$?" if $?;
@@ -61,7 +66,7 @@ sub build () {
 sub start (@) {
 	my $ctime = 0;
 	my @bin = stat($bin);
-	for ((@bin ? @srcs : ())) {
+	for ((@bin ? (@srcs, @pm_dep) : ())) {
 		my @st = stat($_) or die "stat $_: $!";
 		$ctime = $st[10] if $st[10] > $ctime;
 	}
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
index 1719eb55..5f25ae3b 100644
--- a/lib/PublicInbox/xap_helper.h
+++ b/lib/PublicInbox/xap_helper.h
@@ -2,9 +2,11 @@
  * Copyright (C) all contributors <meta@public-inbox.org>
  * License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
  *
- * Read-only helper process using minimal C++ for Xapian.
+ * Standalone helper process using minimal C++ for Xapian.
  * Use C11 as much as possible to make life easier for contributors
  * and the maintainer (who doesn't know C++ well)
+ * The socket API is internal to the public-inbox and NOT intended
+ * for ordinary users, only public-inbox hackers.
  */
 #include <sys/resource.h>
 #include <sys/socket.h>
@@ -15,6 +17,8 @@
 
 #include <assert.h>
 #include <err.h> // BSD, glibc, and musl all have this
+#include <errno.h>
+#include <fcntl.h>
 #include <limits.h>
 #include <search.h>
 #include <stdio.h>
@@ -23,23 +27,56 @@
 #include <unistd.h>
 #include <xapian.h> // our only reason for using C++
 
-static const int sock_fd = 3; // SOCK_SEQPACKET
+#define MY_VER(maj,min,rev) ((maj) << 16 | (min) << 8 | (rev))
+#define XAP_VER \
+	MY_VER(XAPIAN_MAJOR_VERSION,XAPIAN_MINOR_VERSION,XAPIAN_REVISION)
+
+#if XAP_VER >= MY_VER(1,3,6)
+#	define NVRP Xapian::NumberRangeProcessor
+#	define ADD_VRP add_rangeprocessor
+#else
+#	define NVRP Xapian::NumberValueRangeProcessor
+#	define ADD_VRP add_valuerangeprocessor
+#endif
+
+
+static const int sock_fd = 0; // SOCK_SEQPACKET as stdin :P
 static pid_t parent_pid;
-static void *pid_tree;
-static void *xdb_tree;
+static void *pid_tree, *srch_tree; // tsearch + tdelete + twalk
 
-struct xdbkey {
-	int len;
-	char buf[]; // $shard_path0\0$shard_path1\0...
+// PublicInbox::Search and PublicInbox::CodeSearch generate these:
+static void qp_init_search(Xapian::QueryParser *);
+static void qp_init_codesearch(Xapian::QueryParser *);
+
+struct srch {
+	int paths_len; // int for comparisons
+	Xapian::Database *db;
+	Xapian::QueryParser *qp;
+	char paths[]; // $shard_path0\0$shard_path1\0...
 };
 
-struct cmd_opt {
+#define MY_ARG_MAX 128
+typedef bool (*cmd)(struct req *);
+
+// only one request per-process since we have RLIMIT_CPU timeout
+struct req { // argv and pfxv point into global rbuf
+	char *argv[MY_ARG_MAX];
+	char *pfxv[MY_ARG_MAX]; // -A <prefix>
+	struct srch *srch;
+	const char *eidx_key;
+	cmd fn;
 	unsigned long long max;
-	long sort_col; // value column
 	unsigned long timeout_sec;
+	long sort_col; // value column
+	int argc;
+	int pfxc;
+	FILE *fp; // blocking buffered response pipe or socket
+	bool has_input; // fp is bidirectional
 	bool collapse_threads;
+	bool codesearch;
 	bool relevance; // sort by relevance before column
 	bool asc; // ascending sort
+	bool pct; // return percentage
 };
 
 struct worker {
@@ -47,30 +84,58 @@ struct worker {
 	unsigned nr;
 };
 
-#define MY_ARG_MAX 128
+static bool cmd_dump_ibx(struct req *req)
+{
+	if (optind >= req->argc) return false; // need ibx_id
+	const char *ibx_id = req->argv[optind];
+
+	return true;
+}
+
+static bool cmd_test_pid(struct req *req)
+{
+	fprintf(req->fp, "%d", (int)getpid());
+	return true;
+}
+
+#define CMD(n) { .fn_len = sizeof(#n) - 1, .fn_name = #n, .fn = cmd_##n }
+struct cmd_entry {
+	size_t fn_len;
+	const char *fn_name;
+	cmd fn;
+} cmds[] = {
+	CMD(dump_ibx),
+	CMD(test_pid),
+};
+
+#define MY_ARRAY_SIZE(x)	(sizeof(x)/sizeof((x)[0]))
 #define RECV_FD_CAPA 1
-#define RECV_FD_SPACE (RECV_FD_CAPA * sizeof(int))
+#define RECV_FD_SPACE	(RECV_FD_CAPA * sizeof(int))
 union my_cmsg {
 	struct cmsghdr hdr;
 	char pad[sizeof(struct cmsghdr) + 16 + RECV_FD_SPACE];
 };
 
-static int recv_fd(char *buf, size_t *len)
+static void xclose(int fd)
+{
+	if (close(fd) < 0 && errno != EINTR)
+		err(EXIT_FAILURE, "BUG: close");
+}
+
+static bool recv_req(struct req *req, char *rbuf, size_t *len)
 {
 	union my_cmsg cmsg = { 0 };
 	struct msghdr msg = { .msg_iovlen = 1 };
 	struct iovec iov;
-	iov.iov_base = buf;
+	iov.iov_base = rbuf;
 	iov.iov_len = *len;
 	msg.msg_iov = &iov;
 	msg.msg_control = &cmsg.hdr;
 	msg.msg_controllen = CMSG_SPACE(RECV_FD_SPACE);
 
 	ssize_t r = recvmsg(sock_fd, &msg, 0);
-	if (r < 0) {
+	if (r < 0)
 		err(EXIT_FAILURE, "recvmsg");
-		return -1;
-	}
 	if (r == 0)
 		exit(EX_NOINPUT); /* grandparent went away */
 	*len = r;
@@ -78,10 +143,27 @@ static int recv_fd(char *buf, size_t *len)
 			cmsg.hdr.cmsg_type == SCM_RIGHTS) {
 		size_t len = cmsg.hdr.cmsg_len;
 		int *fdp = (int *)CMSG_DATA(&cmsg.hdr);
-		for (size_t i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++)
-			return *fdp;
+		size_t i;
+		for (i = 0; CMSG_LEN((i + 1) * sizeof(int)) <= len; i++) {
+			int fl = fcntl(*fdp, F_GETFL);
+			const char *mode = NULL;
+			switch (fl) {
+			case -1: warnx("invalid fd=%d", *fdp); return false;
+			case O_WRONLY: mode = "w"; break;
+			case O_RDWR: mode = "r+"; req->has_input = true; break;
+			default: warnx("invalid mode from F_GETFL: 0x%x", fl);
+			}
+			if (mode) {
+				req->fp = fdopen(*fdp, mode);
+				if (req->fp) return true;
+				warn("fdopen(fd=%d)", *fdp);
+			}
+			xclose(*fdp);
+			return false;
+		}
 	}
-	return -2;
+	warnx("no FD received in %zd-byte request", r);
+	return false;
 }
 
 static int split2argv(char **argv, char *buf, size_t len)
@@ -90,98 +172,187 @@ static int split2argv(char **argv, char *buf, size_t len)
 		warnx("bogus argument given");
 		return 0;
 	}
-	size_t nr = 1;
-	argv[0] = buf;
-	for (size_t i = 1; i <= len; i++) {
-		if (!buf[i])
-			argv[nr++] = buf + i + 1;
+	size_t nr = 0;
+	char *c = buf;
+	for (size_t i = 1; i < len; i++) {
+		if (!buf[i]) {
+			argv[nr++] = c;
+			c = buf + i + 1;
+		}
 	}
 	return (int)nr;
 }
 
-static int cmp_xdbkey(const void *pa, const void *pb) // for tfind|tsearch
+static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch
 {
-	const struct xdbkey *a = (const struct xdbkey *)pa;
-	const struct xdbkey *b = (const struct xdbkey *)pb;
-	int diff = a->len - b->len;
+	const struct srch *a = (const struct srch *)pa;
+	const struct srch *b = (const struct srch *)pb;
+	int diff = a->paths_len - b->paths_len;
 
-	return diff ? diff : memcmp(a->buf, b->buf, (size_t)a->len);
+	return diff ? diff : memcmp(a->paths, b->paths, (size_t)a->paths_len);
 }
 
-static void dispatch(int wfd, int argc, char **argv)
+static bool srch_init(struct req *req)
 {
-	int c = 0;
+	char *dirv[MY_ARG_MAX];
+	int i;
+	struct srch *srch = req->srch;
+	int dirc = split2argv(dirv, srch->paths, (size_t)srch->paths_len);
+	try {
+		srch->db = new Xapian::Database(dirv[0]);
+	} catch (...) {
+		warn("E: Xapian::Database(%s)", dirv[0]);
+		return false;
+	}
+	try {
+		for (i = 1; i < dirc; i++)
+			srch->db->add_database(Xapian::Database(dirv[i]));
+	} catch (...) {
+		warn("E: add_database(%s)", dirv[i]);
+		goto cleanup;
+	}
+	try {
+		srch->qp = new Xapian::QueryParser;
+	} catch (...) {
+		warn("E: Xapian::QueryParser");
+		goto cleanup;
+	}
+	srch->qp->set_default_op(Xapian::Query::OP_AND);
+	srch->qp->set_database(*srch->db);
+	try {
+		srch->qp->set_stemmer(Xapian::Stem("english"));
+	} catch (...) {
+		warn("E: Xapian::Stem");
+		goto cleanup;
+	}
+	srch->qp->set_stemming_strategy(Xapian::QueryParser::STEM_SOME);
+
+#if XAP_VER >= MY_VER(1, 3, 3)
+	srch->qp->set_max_expansion(100);
+#else // Xapian < 1.3.3
+	srch->qp->set_max_wildcard_expansion(100);
+#endif // Xapian < 1.3.3
+
+	if (req->codesearch)
+		qp_init_codesearch(srch->qp); // CodeSearch.pm
+	else
+		qp_init_search(srch->qp); // Search.pm
+	return true;
+cleanup:
+	if (srch->db) delete srch->db;
+	if (srch->qp) delete srch->qp;
+	srch->qp = NULL;
+	srch->db = NULL;
+	return false;
+}
+
+static void dispatch(struct req *req)
+{
+	int c;
+	size_t size = strlen(req->argv[0]);
 	union {
-		struct xdbkey *xdbkey;
+		struct srch *srch;
 		char *ptr;
-	} buf;
-	size_t size;
-	struct cmd_opt opt = { 0 };
+	} fbuf;
 	char *end;
-	FILE *fp = open_memstream(&buf.ptr, &size);
+	FILE *kfp;
+	struct srch **s;
+	req->fn = NULL;
+	for (c = 0; c < (int)MY_ARRAY_SIZE(cmds); c++) {
+		if (cmds[c].fn_len == size &&
+			!memcmp(cmds[c].fn_name, req->argv[0], size)) {
+			req->fn = cmds[c].fn;
+			break;
+		}
+	}
+	if (!req->fn) goto cmd_err;
 
-	fwrite((const void *)&c, sizeof(c), 1, fp); // xdbkey.len placeholder
+	kfp = open_memstream(&fbuf.ptr, &size);
+	// write padding, first
+	fwrite(&req->argv[0], offsetof(struct srch, paths), 1, kfp);
 
+	// global getopt variables:
 	optind = 1;
 	opterr = optopt = 0;
 	optarg = NULL;
 
-	while ((c = getopt(argc, argv, "ad:k:m:rtT:")) != -1) {
+	while ((c = getopt(req->argc, req->argv, "aA:d:k:m:O:rtT:")) != -1) {
 		switch (c) {
-		case 'a': opt.asc = true; break;
-		case 'd': fwrite(optarg, strlen(optarg) + 1, 1, fp); break;
+		case 'a': req->asc = true; break;
+		case 'A': req->pfxv[req->pfxc++] = optarg; break;
+		case 'c': req->codesearch = true; break;
+		case 'd': fwrite(optarg, strlen(optarg) + 1, 1, kfp); break;
 		case 'k':
-			opt.sort_col = strtol(optarg, &end, 10);
+			req->sort_col = strtol(optarg, &end, 10);
 			if (*end) goto cmd_err;
-			switch (opt.sort_col) {
+			switch (req->sort_col) {
 			case LONG_MAX: case LONG_MIN: goto cmd_err;
 			}
 			break;
 		case 'm':
-			opt.max = strtoull(optarg, &end, 10);
+			req->max = strtoull(optarg, &end, 10);
 			if (*end) goto cmd_err;
-			if (opt.max == ULLONG_MAX) goto cmd_err;
+			if (req->max == ULLONG_MAX) goto cmd_err;
 			break;
-		case 'r': opt.relevance = true; break;
-		case 't': opt.collapse_threads = true; break;
+		case 'O': req->eidx_key = optarg; break;
+		case 'r': req->relevance = true; break;
+		case 't': req->collapse_threads = true; break;
 		case 'T':
-			opt.timeout_sec = strtoul(optarg, &end, 10);
+			req->timeout_sec = strtoul(optarg, &end, 10);
 			if (*end) goto cmd_err;
-			if (opt.timeout_sec == ULONG_MAX) goto cmd_err;
+			if (req->timeout_sec == ULONG_MAX) goto cmd_err;
 			break;
+		default: goto cmd_err;
 		}
 	}
-	if (ferror(fp) | fclose(fp)) {
+	if (ferror(kfp) | fclose(kfp)) {
 		perror("ferror|fclose");
 		goto cmd_err;
 	}
-	buf.xdbkey->len = size - sizeof(int);
-	if (buf.xdbkey->len <= 0) {
+	fbuf.srch->db = NULL;
+	fbuf.srch->qp = NULL;
+	fbuf.srch->paths_len = size - offsetof(struct srch, paths);
+	if (fbuf.srch->paths_len <= 0) {
+		free(fbuf.srch);
 		warnx("no -d args");
 		goto cmd_err;
 	}
-	struct xdbkey *k;
-	k = (struct xdbkey *)tsearch(buf.xdbkey, &xdb_tree, cmp_xdbkey);
-	if (k != buf.xdbkey)
-		free(buf.xdbkey);
+	s = (struct srch **)tsearch(fbuf.srch, &srch_tree, srch_cmp);
+	if (!s) {
+		warn("tsearch");
+		goto cmd_err;
+	}
+	req->srch = *s;
+	if (req->srch != fbuf.srch) { // reuse existing
+		free(fbuf.srch);
+	} else if (!srch_init(req)) {
+		assert(fbuf.srch == *((struct srch **)tfind(
+					fbuf.srch, &srch_tree, srch_cmp)));
+		void *del = tdelete(fbuf.srch, &srch_tree, srch_cmp);
+		assert(del);
+		free(fbuf.srch);
+		goto cmd_err;
+	}
+	if (!req->fn(req))
+		goto cmd_err;
 cmd_err:
-	write(wfd, "E", 1);
+	return; // just be silent on errors, for now
 }
 
 static void recv_loop(void)
 {
-	static char buf[1024 * 128]; // per-process
-	char *argv[MY_ARG_MAX]; // points into buf
+	static char rbuf[1024 * 128]; // per-process
 	while (1) {
-		if (getpid() != parent_pid)
+		if (getppid() != parent_pid)
 			exit(EXIT_SUCCESS);
-		size_t len = sizeof(buf);
-		int wfd = recv_fd(buf, &len);
-		if (wfd < 0) continue;
-		int argc = split2argv(argv, buf, len);
-		if (argc > 0)
-			dispatch(wfd, argc, argv);
-		close(wfd);
+		size_t len = sizeof(rbuf);
+		struct req req = { 0 };
+		if (!recv_req(&req, rbuf, &len))
+			continue;
+		req.argc = split2argv(req.argv, rbuf, len);
+		if (req.argc > 0)
+			dispatch(&req);
+		fclose(req.fp);
 	}
 }
 
@@ -197,7 +368,7 @@ static void insert_pid(pid_t pid, unsigned nr)
 {
 	struct worker *w = (struct worker *)malloc(sizeof(*w));
 	if (!w) {
-		warn("malloc for worker=%u", nr);
+		warn("E: malloc(worker=%u)", nr);
 		kill(pid, SIGTERM);
 		return;
 	}
@@ -205,12 +376,10 @@ static void insert_pid(pid_t pid, unsigned nr)
 	w->nr = nr;
 	assert(tfind((const void *)w, &pid_tree, cmp_worker) == NULL);
 	void *ret = tsearch(w, &pid_tree, cmp_worker);
-	if (!ret) {
-		warn("tsearch for worker=%u", nr);
+	if (!ret) { // likely malloc failure
+		warn("E: tsearch(worker=%u)", nr);
 		free(w);
 		kill(pid, SIGTERM);
-	} else if (ret != w) {
-		err(EXIT_FAILURE, "BUG: tsearch %p != %p", ret, w);
 	}
 }
 
@@ -218,15 +387,15 @@ static int delete_pid(pid_t pid)
 {
 	struct worker key;
 	key.pid = pid;
-	struct worker *w;
+	struct worker **w;
 
-	w = (struct worker *)tdelete(&key, &pid_tree, cmp_worker);
+	w = (struct worker **)tdelete(&key, &pid_tree, cmp_worker);
 	if (!w) {
-		warnx("invalid pid=%d reaped", (int)pid);
+		warnx("W: unknown pid=%d reaped", (int)pid);
 		return -1;
 	}
-	key.nr = w->nr;
-	free(w);
+	key.nr = (*w)->nr;
+	free(*w);
 	return (int)key.nr;
 }
 
@@ -234,7 +403,7 @@ static void start_worker(unsigned nr)
 {
 	pid_t pid = fork();
 	if (pid < 0)
-		warn("fork worker=%u", nr);
+		warn("E: fork(worker=%u)", nr);
 	else if (pid == 0)
 		recv_loop();
 	else
@@ -246,6 +415,12 @@ int main(int argc, char *argv[])
 	unsigned long jobs = 1;
 	int c;
 
+#ifdef _SC_NPROCESSORS_ONLN
+	long j = sysconf(_SC_NPROCESSORS_ONLN);
+	if (j > 0)
+		jobs = j > UCHAR_MAX ? UCHAR_MAX : j;
+#endif // _SC_NPROCESSORS_ONLN
+
 	while ((c = getopt(argc, argv, "j:")) != -1) {
 		char *end;
 
@@ -274,13 +449,13 @@ int main(int argc, char *argv[])
 
 	int st;
 	pid_t pid;
-	bool done = false;
+	bool quit = false;
 	while ((pid = wait(&st)) > 0) {
 		int nr = delete_pid(pid);
 		if (nr >= 0) {
 			if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT)
-				done = true;
-			if (!done)
+				quit = true;
+			if (!quit)
 				start_worker(nr);
 		}
 	}
diff --git a/t/xap_helper.t b/t/xap_helper.t
new file mode 100644
index 00000000..0d4ba30c
--- /dev/null
+++ b/t/xap_helper.t
@@ -0,0 +1,73 @@
+#!perl -w
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use v5.12;
+use PublicInbox::TestCommon;
+use PublicInbox::Spawn qw(spawn);
+use Socket qw(AF_UNIX SOCK_SEQPACKET SOCK_STREAM MSG_EOR);
+use POSIX qw(dup2);
+use PublicInbox::AutoReap;
+use PublicInbox::IPC;
+use PublicInbox::SearchIdx;
+use autodie;
+my ($tmp, $for_destroy) = tmpdir();
+use Data::Dumper; $Data::Dumper::Useqq = 1;
+
+my $fi_data = './t/git.fast-import-data';
+open my $fi_fh, '<', $fi_data;
+open my $dh, '<', '.';
+my $crepo = create_coderepo 'for-cindex', sub {
+	my ($d) = @_;
+	xsys_e([qw(git init -q --bare)]);
+	xsys_e([qw(git fast-import --quiet)], undef, { 0 => $fi_fh });
+	chdir($dh);
+	run_script([qw(-cindex --dangerous -L medium --no-fsync -q -j1), $d])
+		or xbail '-cindex internal';
+	run_script([qw(-cindex --dangerous -L medium --no-fsync -q -j3 -d),
+		"$d/cidx-ext", $d]) or xbail '-cindex "external"';
+};
+
+my $v2 = create_inbox 'v2', indexlevel => 'medium', version => 2,
+			tmpdir => "$tmp/v2", sub {
+	my ($im) = @_;
+	$im->add(eml_load 't/data/0001.patch') or BAIL_OUT;
+};
+
+my (@int) = glob("$crepo/public-inbox-cindex/cidx*/?");
+my (@ext) = glob("$crepo/cidx-ext/cidx*/?");
+is(scalar(@ext), 2, 'have 2 external shards') or diag explain(\@ext);
+is(scalar(@int), 1, 'have 1 internal shard') or diag explain(\@int);
+
+my $mkreq = sub {
+	my ($s, $mode, @arg) = @_;
+	my ($x, $y);
+	$mode eq 'r' ? pipe($x, $y)
+		: socketpair($x, $y, AF_UNIX, SOCK_STREAM, 0);
+	my $buf = join("\0", @arg, '');
+	my $n = PublicInbox::IPC::send_cmd($s, [ fileno($y) ], $buf, MSG_EOR);
+	$n // xbail "send: $!";
+	is(length($buf), $n, "req @arg sent");
+	$x;
+};
+
+my $test = sub {
+	my (@arg) = @_;
+	socketpair(my $s, my $y, AF_UNIX, SOCK_SEQPACKET, 0);
+	my $pid = spawn([$^X, @arg], undef, { 0 => $y });
+	my $ar = PublicInbox::AutoReap->new($pid);
+	close $y;
+	my $r = $mkreq->($s, 'r', qw(test_pid -d), $int[0]);
+	my $tpid = do { local $/; <$r> };
+	like($tpid, qr/\A\d+\z/, 'got PID');
+
+	$r = $mkreq->($s, 'r', qw(dump_ibx -d), $int[0], '13');
+	my $res = do { local $/; <$r> };
+	diag "res=$res";
+	sleep 1;
+	$ar;
+};
+
+my $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e
+		PublicInbox::XapHelperCxx::start('-j1')]);
+
+done_testing;

                 reply	other threads:[~2023-08-21 10:34 UTC|newest]

Thread overview: [no followups] expand[flat|nested]  mbox.gz  Atom feed

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20230821103416.3600148-1-e@80x24.org \
    --to=e@80x24.org \
    --cc=spew@80x24.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).