* [PATCH 1/2] WIP-cidx-xh-split
@ 2023-11-26 1:57 Eric Wong
2023-11-26 1:57 ` [PATCH 2/2] WIP-cidx Eric Wong
0 siblings, 1 reply; 2+ messages in thread
From: Eric Wong @ 2023-11-26 1:57 UTC (permalink / raw)
To: spew
---
MANIFEST | 1 +
lib/PublicInbox/XapHelperCxx.pm | 10 +-
lib/PublicInbox/xap_helper.h | 270 +-------------------------------
lib/PublicInbox/xh_cidx.h | 260 ++++++++++++++++++++++++++++++
4 files changed, 273 insertions(+), 268 deletions(-)
create mode 100644 lib/PublicInbox/xh_cidx.h
diff --git a/MANIFEST b/MANIFEST
index 85811133..bbbe0b91 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -378,6 +378,7 @@ lib/PublicInbox/XapHelperCxx.pm
lib/PublicInbox/Xapcmd.pm
lib/PublicInbox/gcf2_libgit2.h
lib/PublicInbox/xap_helper.h
+lib/PublicInbox/xh_cidx.h
sa_config/Makefile
sa_config/README
sa_config/root/etc/spamassassin/public-inbox.pre
diff --git a/lib/PublicInbox/XapHelperCxx.pm b/lib/PublicInbox/XapHelperCxx.pm
index f421c7bc..8a66fdcd 100644
--- a/lib/PublicInbox/XapHelperCxx.pm
+++ b/lib/PublicInbox/XapHelperCxx.pm
@@ -20,7 +20,7 @@ $ENV{PERL_INLINE_DIRECTORY} // die('BUG: PERL_INLINE_DIRECTORY unset');
substr($dir, 0, 0) = "$ENV{PERL_INLINE_DIRECTORY}/";
my $bin = "$dir/xap_helper";
my ($srcpfx) = (__FILE__ =~ m!\A(.+/)[^/]+\z!);
-my @srcs = map { $srcpfx.$_ } qw(xap_helper.h);
+my @srcs = map { $srcpfx.$_ } qw(xap_helper.h xh_cidx.h);
my @pm_dep = map { $srcpfx.$_ } qw(Search.pm CodeSearch.pm);
my $ldflags = '-Wl,-O1';
$ldflags .= ' -Wl,--compress-debug-sections=zlib' if $^O ne 'openbsd';
@@ -61,11 +61,9 @@ sub build () {
require PublicInbox::OnDestroy;
my ($prog) = ($bin =~ m!/([^/]+)\z!);
my $lk = PublicInbox::Lock->new("$dir/$prog.lock")->lock_for_scope;
- open my $fh, '>', "$dir/$prog.cpp";
- say $fh qq(# include "$_") for @srcs;
- print $fh PublicInbox::Search::generate_cxx();
- print $fh PublicInbox::CodeSearch::generate_cxx();
- close $fh;
+ write_file '>', "$dir/$prog.cpp", qq{#include "xap_helper.h"\n},
+ PublicInbox::Search::generate_cxx(),
+ PublicInbox::CodeSearch::generate_cxx();
opendir my $dh, '.';
my $restore = PublicInbox::OnDestroy->new(\&chdir, $dh);
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
index b6b517d5..0824ce71 100644
--- a/lib/PublicInbox/xap_helper.h
+++ b/lib/PublicInbox/xap_helper.h
@@ -146,6 +146,12 @@ struct worker {
unsigned nr;
};
+struct fbuf {
+ FILE *fp;
+ char *ptr;
+ size_t len;
+};
+
#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
static size_t split2argv(char **dst, char *buf, size_t len, size_t limit)
{
@@ -253,87 +259,11 @@ static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
}
-static void dump_ibx_term(struct req *req, const char *pfx,
- Xapian::Document *doc, const char *ibx_id)
-{
- Xapian::TermIterator cur = doc->termlist_begin();
- Xapian::TermIterator end = doc->termlist_end();
- size_t pfx_len = strlen(pfx);
-
- for (cur.skip_to(pfx); cur != end; cur++) {
- std::string tn = *cur;
-
- if (starts_with(&tn, pfx, pfx_len)) {
- fprintf(req->fp[0], "%s %s\n",
- tn.c_str() + pfx_len, ibx_id);
- ++req->nr_out;
- }
- }
-}
-
static int my_setlinebuf(FILE *fp) // glibc setlinebuf(3) can't report errors
{
return setvbuf(fp, NULL, _IOLBF, 0);
}
-static enum exc_iter dump_ibx_iter(struct req *req, const char *ibx_id,
- Xapian::MSetIterator *i)
-{
- try {
- Xapian::Document doc = i->get_document();
- for (int p = 0; p < req->pfxc; p++)
- dump_ibx_term(req, req->pfxv[p], &doc, ibx_id);
- } catch (const Xapian::DatabaseModifiedError & e) {
- req->srch->db->reopen();
- return ITER_RETRY;
- } catch (const Xapian::DocNotFoundError & e) { // oh well...
- warnx("doc not found: %s", e.get_description().c_str());
- }
- return ITER_OK;
-}
-
-static bool cmd_dump_ibx(struct req *req)
-{
- if ((optind + 1) >= req->argc)
- ABORT("usage: dump_ibx [OPTIONS] IBX_ID QRY_STR");
- if (!req->pfxc)
- ABORT("dump_ibx requires -A PREFIX");
-
- const char *ibx_id = req->argv[optind];
- if (my_setlinebuf(req->fp[0])) // for sort(1) pipe
- EABORT("setlinebuf(fp[0])"); // WTF?
- req->asc = true;
- req->sort_col = -1;
- Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]);
-
- // @UNIQ_FOLD in CodeSearchIdx.pm can handle duplicate lines fine
- // in case we need to retry on DB reopens
- for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
- for (int t = 10; t > 0; --t)
- switch (dump_ibx_iter(req, ibx_id, &i)) {
- case ITER_OK: t = 0; break; // leave inner loop
- case ITER_RETRY: break; // continue for-loop
- case ITER_ABORT: return false; // error
- }
- }
- emit_mset_stats(req, &mset);
- return true;
-}
-
-struct fbuf {
- FILE *fp;
- char *ptr;
- size_t len;
-};
-
-struct dump_roots_tmp {
- struct stat sb;
- void *mm_ptr;
- char **entries;
- struct fbuf wbuf;
- int root2off_fd;
-};
-
// n.b. __cleanup__ works fine with C++ exceptions, but not longjmp
// Only clang and g++ are supported, as AFAIK there's no other
// relevant Free(-as-in-speech) C++ compilers.
@@ -360,126 +290,6 @@ static void xclose(int fd)
EABORT("BUG: close");
}
-#define CLEANUP_DUMP_ROOTS __attribute__((__cleanup__(dump_roots_ensure)))
-static void dump_roots_ensure(void *ptr)
-{
- struct dump_roots_tmp *drt = (struct dump_roots_tmp *)ptr;
- if (drt->root2off_fd >= 0)
- xclose(drt->root2off_fd);
- hdestroy(); // idempotent
- if (drt->mm_ptr && munmap(drt->mm_ptr, drt->sb.st_size))
- EABORT("BUG: munmap(%p, %zu)", drt->mm_ptr, drt->sb.st_size);
- free(drt->entries);
- fbuf_ensure(&drt->wbuf);
-}
-
-static bool root2offs_str(struct fbuf *root_offs, Xapian::Document *doc)
-{
- Xapian::TermIterator cur = doc->termlist_begin();
- Xapian::TermIterator end = doc->termlist_end();
- ENTRY e, *ep;
- fbuf_init(root_offs);
- for (cur.skip_to("G"); cur != end; cur++) {
- std::string tn = *cur;
- if (!starts_with(&tn, "G", 1))
- continue;
- union { const char *in; char *out; } u;
- u.in = tn.c_str() + 1;
- e.key = u.out;
- ep = hsearch(e, FIND);
- if (!ep) ABORT("hsearch miss `%s'", e.key);
- // ep->data is a NUL-terminated string matching /[0-9]+/
- fputc(' ', root_offs->fp);
- fputs((const char *)ep->data, root_offs->fp);
- }
- fputc('\n', root_offs->fp);
- if (ferror(root_offs->fp) | fclose(root_offs->fp))
- err(EXIT_FAILURE, "ferror|fclose(root_offs)"); // ENOMEM
- root_offs->fp = NULL;
- return true;
-}
-
-// writes term values matching @pfx for a given @doc, ending the line
-// with the contents of @root_offs
-static void dump_roots_term(struct req *req, const char *pfx,
- struct dump_roots_tmp *drt,
- struct fbuf *root_offs,
- Xapian::Document *doc)
-{
- Xapian::TermIterator cur = doc->termlist_begin();
- Xapian::TermIterator end = doc->termlist_end();
- size_t pfx_len = strlen(pfx);
-
- for (cur.skip_to(pfx); cur != end; cur++) {
- std::string tn = *cur;
- if (!starts_with(&tn, pfx, pfx_len))
- continue;
- fputs(tn.c_str() + pfx_len, drt->wbuf.fp);
- fwrite(root_offs->ptr, root_offs->len, 1, drt->wbuf.fp);
- ++req->nr_out;
- }
-}
-
-// we may have lines which exceed PIPE_BUF, so we do our own
-// buffering and rely on flock(2), here
-static bool dump_roots_flush(struct req *req, struct dump_roots_tmp *drt)
-{
- char *p;
- int fd = fileno(req->fp[0]);
- bool ok = true;
-
- if (!drt->wbuf.fp) return true;
- if (fd < 0) EABORT("BUG: fileno");
- if (ferror(drt->wbuf.fp) | fclose(drt->wbuf.fp)) // ENOMEM?
- err(EXIT_FAILURE, "ferror|fclose(drt->wbuf.fp)");
- drt->wbuf.fp = NULL;
- if (!drt->wbuf.len) goto done_free;
- while (flock(drt->root2off_fd, LOCK_EX)) {
- if (errno == EINTR) continue;
- err(EXIT_FAILURE, "LOCK_EX"); // ENOLCK?
- }
- p = drt->wbuf.ptr;
- do { // write to client FD
- ssize_t n = write(fd, p, drt->wbuf.len);
- if (n > 0) {
- drt->wbuf.len -= n;
- p += n;
- } else {
- perror(n ? "write" : "write (zero bytes)");
- return false;
- }
- } while (drt->wbuf.len);
- while (flock(drt->root2off_fd, LOCK_UN)) {
- if (errno == EINTR) continue;
- err(EXIT_FAILURE, "LOCK_UN"); // ENOLCK?
- }
-done_free: // OK to skip on errors, dump_roots_ensure calls fbuf_ensure
- free(drt->wbuf.ptr);
- drt->wbuf.ptr = NULL;
- return ok;
-}
-
-static enum exc_iter dump_roots_iter(struct req *req,
- struct dump_roots_tmp *drt,
- Xapian::MSetIterator *i)
-{
- CLEANUP_FBUF struct fbuf root_offs = {}; // " $ID0 $ID1 $IDx..\n"
- try {
- Xapian::Document doc = i->get_document();
- if (!root2offs_str(&root_offs, &doc))
- return ITER_ABORT; // bad request, abort
- for (int p = 0; p < req->pfxc; p++)
- dump_roots_term(req, req->pfxv[p], drt,
- &root_offs, &doc);
- } catch (const Xapian::DatabaseModifiedError & e) {
- req->srch->db->reopen();
- return ITER_RETRY;
- } catch (const Xapian::DocNotFoundError & e) { // oh well...
- warnx("doc not found: %s", e.get_description().c_str());
- }
- return ITER_OK;
-}
-
static char *hsearch_enter_key(char *s)
{
#if defined(__OpenBSD__) || defined(__DragonFly__)
@@ -499,72 +309,6 @@ static char *hsearch_enter_key(char *s)
return s;
}
-static bool cmd_dump_roots(struct req *req)
-{
- CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt = {};
- drt.root2off_fd = -1;
- if ((optind + 1) >= req->argc)
- ABORT("usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR");
- if (!req->pfxc)
- ABORT("dump_roots requires -A PREFIX");
- const char *root2off_file = req->argv[optind];
- drt.root2off_fd = open(root2off_file, O_RDONLY);
- if (drt.root2off_fd < 0)
- EABORT("open(%s)", root2off_file);
- if (fstat(drt.root2off_fd, &drt.sb)) // ENOMEM?
- err(EXIT_FAILURE, "fstat(%s)", root2off_file);
- // each entry is at least 43 bytes ({OIDHEX}\0{INT}\0),
- // so /32 overestimates the number of expected entries by
- // ~%25 (as recommended by Linux hcreate(3) manpage)
- size_t est = (drt.sb.st_size / 32) + 1; //+1 for "\0" termination
- if ((uint64_t)drt.sb.st_size > (uint64_t)SIZE_MAX)
- err(EXIT_FAILURE, "%s size too big (%lld bytes > %zu)",
- root2off_file, (long long)drt.sb.st_size, SIZE_MAX);
- drt.mm_ptr = mmap(NULL, drt.sb.st_size, PROT_READ,
- MAP_PRIVATE, drt.root2off_fd, 0);
- if (drt.mm_ptr == MAP_FAILED)
- err(EXIT_FAILURE, "mmap(%zu, %s)",
- drt.sb.st_size, root2off_file);
- drt.entries = (char **)calloc(est * 2, sizeof(char *));
- if (!drt.entries)
- err(EXIT_FAILURE, "calloc(%zu * 2, %zu)", est, sizeof(char *));
- size_t tot = split2argv(drt.entries, (char *)drt.mm_ptr,
- drt.sb.st_size, est * 2);
- if (tot <= 0) return false; // split2argv already warned on error
- if (!hcreate(est))
- err(EXIT_FAILURE, "hcreate(%zu)", est);
- for (size_t i = 0; i < tot; ) {
- ENTRY e;
- e.key = hsearch_enter_key(drt.entries[i++]); // dies on ENOMEM
- e.data = drt.entries[i++];
- if (!hsearch(e, ENTER))
- err(EXIT_FAILURE, "hsearch(%s => %s, ENTER)", e.key,
- (const char *)e.data);
- }
- req->asc = true;
- req->sort_col = -1;
- Xapian::MSet mset = commit_mset(req, req->argv[optind + 1]);
-
- // @UNIQ_FOLD in CodeSearchIdx.pm can handle duplicate lines fine
- // in case we need to retry on DB reopens
- for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
- if (!drt.wbuf.fp)
- fbuf_init(&drt.wbuf);
- for (int t = 10; t > 0; --t)
- switch (dump_roots_iter(req, &drt, &i)) {
- case ITER_OK: t = 0; break; // leave inner loop
- case ITER_RETRY: break; // continue for-loop
- case ITER_ABORT: return false; // error
- }
- if (!(req->nr_out & 0x3fff) && !dump_roots_flush(req, &drt))
- return false;
- }
- if (!dump_roots_flush(req, &drt))
- return false;
- emit_mset_stats(req, &mset);
- return true;
-}
-
// for test usage only, we need to ensure the compiler supports
// __cleanup__ when exceptions are thrown
struct inspect { struct req *req; };
@@ -588,6 +332,8 @@ static bool cmd_test_inspect(struct req *req)
return false;
}
+#include "xh_cidx.h" // CodeSearchIdx.pm stuff
+
#define CMD(n) { .fn_len = sizeof(#n) - 1, .fn_name = #n, .fn = cmd_##n }
static const struct cmd_entry {
size_t fn_len;
diff --git a/lib/PublicInbox/xh_cidx.h b/lib/PublicInbox/xh_cidx.h
new file mode 100644
index 00000000..8513d88b
--- /dev/null
+++ b/lib/PublicInbox/xh_cidx.h
@@ -0,0 +1,260 @@
+// Copyright (C) all contributors <meta@public-inbox.org>
+// License: GPL-2.0+ <https://www.gnu.org/licenses/gpl-2.0.txt>
+// This file is only intended to be included by xap_helper.h
+// it implements pieces used by CodeSearchIdx.pm
+
+static void dump_ibx_term(struct req *req, const char *pfx,
+ Xapian::Document *doc, const char *ibx_id)
+{
+ Xapian::TermIterator cur = doc->termlist_begin();
+ Xapian::TermIterator end = doc->termlist_end();
+ size_t pfx_len = strlen(pfx);
+
+ for (cur.skip_to(pfx); cur != end; cur++) {
+ std::string tn = *cur;
+
+ if (starts_with(&tn, pfx, pfx_len)) {
+ fprintf(req->fp[0], "%s %s\n",
+ tn.c_str() + pfx_len, ibx_id);
+ ++req->nr_out;
+ }
+ }
+}
+
+static enum exc_iter dump_ibx_iter(struct req *req, const char *ibx_id,
+ Xapian::MSetIterator *i)
+{
+ try {
+ Xapian::Document doc = i->get_document();
+ for (int p = 0; p < req->pfxc; p++)
+ dump_ibx_term(req, req->pfxv[p], &doc, ibx_id);
+ } catch (const Xapian::DatabaseModifiedError & e) {
+ req->srch->db->reopen();
+ return ITER_RETRY;
+ } catch (const Xapian::DocNotFoundError & e) { // oh well...
+ warnx("doc not found: %s", e.get_description().c_str());
+ }
+ return ITER_OK;
+}
+
+static bool cmd_dump_ibx(struct req *req)
+{
+ if ((optind + 1) >= req->argc)
+ ABORT("usage: dump_ibx [OPTIONS] IBX_ID QRY_STR");
+ if (!req->pfxc)
+ ABORT("dump_ibx requires -A PREFIX");
+
+ const char *ibx_id = req->argv[optind];
+ if (my_setlinebuf(req->fp[0])) // for sort(1) pipe
+ EABORT("setlinebuf(fp[0])"); // WTF?
+ req->asc = true;
+ req->sort_col = -1;
+ Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]);
+
+ // @UNIQ_FOLD in CodeSearchIdx.pm can handle duplicate lines fine
+ // in case we need to retry on DB reopens
+ for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
+ for (int t = 10; t > 0; --t)
+ switch (dump_ibx_iter(req, ibx_id, &i)) {
+ case ITER_OK: t = 0; break; // leave inner loop
+ case ITER_RETRY: break; // continue for-loop
+ case ITER_ABORT: return false; // error
+ }
+ }
+ emit_mset_stats(req, &mset);
+ return true;
+}
+
+struct dump_roots_tmp {
+ struct stat sb;
+ void *mm_ptr;
+ char **entries;
+ struct fbuf wbuf;
+ int root2off_fd;
+};
+
+#define CLEANUP_DUMP_ROOTS __attribute__((__cleanup__(dump_roots_ensure)))
+static void dump_roots_ensure(void *ptr)
+{
+ struct dump_roots_tmp *drt = (struct dump_roots_tmp *)ptr;
+ if (drt->root2off_fd >= 0)
+ xclose(drt->root2off_fd);
+ hdestroy(); // idempotent
+ if (drt->mm_ptr && munmap(drt->mm_ptr, drt->sb.st_size))
+ EABORT("BUG: munmap(%p, %zu)", drt->mm_ptr, drt->sb.st_size);
+ free(drt->entries);
+ fbuf_ensure(&drt->wbuf);
+}
+
+static bool root2offs_str(struct fbuf *root_offs, Xapian::Document *doc)
+{
+ Xapian::TermIterator cur = doc->termlist_begin();
+ Xapian::TermIterator end = doc->termlist_end();
+ ENTRY e, *ep;
+ fbuf_init(root_offs);
+ for (cur.skip_to("G"); cur != end; cur++) {
+ std::string tn = *cur;
+ if (!starts_with(&tn, "G", 1))
+ continue;
+ union { const char *in; char *out; } u;
+ u.in = tn.c_str() + 1;
+ e.key = u.out;
+ ep = hsearch(e, FIND);
+ if (!ep) ABORT("hsearch miss `%s'", e.key);
+ // ep->data is a NUL-terminated string matching /[0-9]+/
+ fputc(' ', root_offs->fp);
+ fputs((const char *)ep->data, root_offs->fp);
+ }
+ fputc('\n', root_offs->fp);
+ if (ferror(root_offs->fp) | fclose(root_offs->fp))
+ err(EXIT_FAILURE, "ferror|fclose(root_offs)"); // ENOMEM
+ root_offs->fp = NULL;
+ return true;
+}
+
+// writes term values matching @pfx for a given @doc, ending the line
+// with the contents of @root_offs
+static void dump_roots_term(struct req *req, const char *pfx,
+ struct dump_roots_tmp *drt,
+ struct fbuf *root_offs,
+ Xapian::Document *doc)
+{
+ Xapian::TermIterator cur = doc->termlist_begin();
+ Xapian::TermIterator end = doc->termlist_end();
+ size_t pfx_len = strlen(pfx);
+
+ for (cur.skip_to(pfx); cur != end; cur++) {
+ std::string tn = *cur;
+ if (!starts_with(&tn, pfx, pfx_len))
+ continue;
+ fputs(tn.c_str() + pfx_len, drt->wbuf.fp);
+ fwrite(root_offs->ptr, root_offs->len, 1, drt->wbuf.fp);
+ ++req->nr_out;
+ }
+}
+
+// we may have lines which exceed PIPE_BUF, so we do our own
+// buffering and rely on flock(2), here
+static bool dump_roots_flush(struct req *req, struct dump_roots_tmp *drt)
+{
+ char *p;
+ int fd = fileno(req->fp[0]);
+ bool ok = true;
+
+ if (!drt->wbuf.fp) return true;
+ if (fd < 0) EABORT("BUG: fileno");
+ if (ferror(drt->wbuf.fp) | fclose(drt->wbuf.fp)) // ENOMEM?
+ err(EXIT_FAILURE, "ferror|fclose(drt->wbuf.fp)");
+ drt->wbuf.fp = NULL;
+ if (!drt->wbuf.len) goto done_free;
+ while (flock(drt->root2off_fd, LOCK_EX)) {
+ if (errno == EINTR) continue;
+ err(EXIT_FAILURE, "LOCK_EX"); // ENOLCK?
+ }
+ p = drt->wbuf.ptr;
+ do { // write to client FD
+ ssize_t n = write(fd, p, drt->wbuf.len);
+ if (n > 0) {
+ drt->wbuf.len -= n;
+ p += n;
+ } else {
+ perror(n ? "write" : "write (zero bytes)");
+ return false;
+ }
+ } while (drt->wbuf.len);
+ while (flock(drt->root2off_fd, LOCK_UN)) {
+ if (errno == EINTR) continue;
+ err(EXIT_FAILURE, "LOCK_UN"); // ENOLCK?
+ }
+done_free: // OK to skip on errors, dump_roots_ensure calls fbuf_ensure
+ free(drt->wbuf.ptr);
+ drt->wbuf.ptr = NULL;
+ return ok;
+}
+
+static enum exc_iter dump_roots_iter(struct req *req,
+ struct dump_roots_tmp *drt,
+ Xapian::MSetIterator *i)
+{
+ CLEANUP_FBUF struct fbuf root_offs = {}; // " $ID0 $ID1 $IDx..\n"
+ try {
+ Xapian::Document doc = i->get_document();
+ if (!root2offs_str(&root_offs, &doc))
+ return ITER_ABORT; // bad request, abort
+ for (int p = 0; p < req->pfxc; p++)
+ dump_roots_term(req, req->pfxv[p], drt,
+ &root_offs, &doc);
+ } catch (const Xapian::DatabaseModifiedError & e) {
+ req->srch->db->reopen();
+ return ITER_RETRY;
+ } catch (const Xapian::DocNotFoundError & e) { // oh well...
+ warnx("doc not found: %s", e.get_description().c_str());
+ }
+ return ITER_OK;
+}
+
+static bool cmd_dump_roots(struct req *req)
+{
+ CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt = {};
+ drt.root2off_fd = -1;
+ if ((optind + 1) >= req->argc)
+ ABORT("usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR");
+ if (!req->pfxc)
+ ABORT("dump_roots requires -A PREFIX");
+ const char *root2off_file = req->argv[optind];
+ drt.root2off_fd = open(root2off_file, O_RDONLY);
+ if (drt.root2off_fd < 0)
+ EABORT("open(%s)", root2off_file);
+ if (fstat(drt.root2off_fd, &drt.sb)) // ENOMEM?
+ err(EXIT_FAILURE, "fstat(%s)", root2off_file);
+ // each entry is at least 43 bytes ({OIDHEX}\0{INT}\0),
+ // so /32 overestimates the number of expected entries by
+ // ~%25 (as recommended by Linux hcreate(3) manpage)
+ size_t est = (drt.sb.st_size / 32) + 1; //+1 for "\0" termination
+ if ((uint64_t)drt.sb.st_size > (uint64_t)SIZE_MAX)
+ err(EXIT_FAILURE, "%s size too big (%lld bytes > %zu)",
+ root2off_file, (long long)drt.sb.st_size, SIZE_MAX);
+ drt.mm_ptr = mmap(NULL, drt.sb.st_size, PROT_READ,
+ MAP_PRIVATE, drt.root2off_fd, 0);
+ if (drt.mm_ptr == MAP_FAILED)
+ err(EXIT_FAILURE, "mmap(%zu, %s)",
+ drt.sb.st_size, root2off_file);
+ drt.entries = (char **)calloc(est * 2, sizeof(char *));
+ if (!drt.entries)
+ err(EXIT_FAILURE, "calloc(%zu * 2, %zu)", est, sizeof(char *));
+ size_t tot = split2argv(drt.entries, (char *)drt.mm_ptr,
+ drt.sb.st_size, est * 2);
+ if (tot <= 0) return false; // split2argv already warned on error
+ if (!hcreate(est))
+ err(EXIT_FAILURE, "hcreate(%zu)", est);
+ for (size_t i = 0; i < tot; ) {
+ ENTRY e;
+ e.key = hsearch_enter_key(drt.entries[i++]); // dies on ENOMEM
+ e.data = drt.entries[i++];
+ if (!hsearch(e, ENTER))
+ err(EXIT_FAILURE, "hsearch(%s => %s, ENTER)", e.key,
+ (const char *)e.data);
+ }
+ req->asc = true;
+ req->sort_col = -1;
+ Xapian::MSet mset = commit_mset(req, req->argv[optind + 1]);
+
+ // @UNIQ_FOLD in CodeSearchIdx.pm can handle duplicate lines fine
+ // in case we need to retry on DB reopens
+ for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
+ if (!drt.wbuf.fp)
+ fbuf_init(&drt.wbuf);
+ for (int t = 10; t > 0; --t)
+ switch (dump_roots_iter(req, &drt, &i)) {
+ case ITER_OK: t = 0; break; // leave inner loop
+ case ITER_RETRY: break; // continue for-loop
+ case ITER_ABORT: return false; // error
+ }
+ if (!(req->nr_out & 0x3fff) && !dump_roots_flush(req, &drt))
+ return false;
+ }
+ if (!dump_roots_flush(req, &drt))
+ return false;
+ emit_mset_stats(req, &mset);
+ return true;
+}
^ permalink raw reply related [flat|nested] 2+ messages in thread
* [PATCH 2/2] WIP-cidx
2023-11-26 1:57 [PATCH 1/2] WIP-cidx-xh-split Eric Wong
@ 2023-11-26 1:57 ` Eric Wong
0 siblings, 0 replies; 2+ messages in thread
From: Eric Wong @ 2023-11-26 1:57 UTC (permalink / raw)
To: spew
more work
---
MANIFEST | 1 +
Makefile.PL | 8 ++-
lib/PublicInbox/Search.pm | 6 ++
lib/PublicInbox/XapClient.pm | 1 +
lib/PublicInbox/XapHelper.pm | 39 +++++++++++-
lib/PublicInbox/XapHelperCxx.pm | 2 +-
lib/PublicInbox/xap_helper.h | 107 +++++++++++++++++++++++++++-----
lib/PublicInbox/xh_cidx.h | 37 ++++-------
lib/PublicInbox/xh_mset.h | 96 ++++++++++++++++++++++++++++
t/cindex.t | 52 +++++++++++++++-
t/xap_helper.t | 49 +++++++++++++--
11 files changed, 344 insertions(+), 54 deletions(-)
create mode 100644 lib/PublicInbox/xh_mset.h
diff --git a/MANIFEST b/MANIFEST
index bbbe0b91..7b6178f9 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -379,6 +379,7 @@ lib/PublicInbox/Xapcmd.pm
lib/PublicInbox/gcf2_libgit2.h
lib/PublicInbox/xap_helper.h
lib/PublicInbox/xh_cidx.h
+lib/PublicInbox/xh_mset.h
sa_config/Makefile
sa_config/README
sa_config/root/etc/spamassassin/public-inbox.pre
diff --git a/Makefile.PL b/Makefile.PL
index 38e030f5..28f8263e 100644
--- a/Makefile.PL
+++ b/Makefile.PL
@@ -273,14 +273,16 @@ pm_to_blib : lib/PublicInbox.pm
lib/PublicInbox.pm : FORCE
VERSION=\$(VERSION) \$(PERL) -w ./version-gen.perl
+XH_TESTS = t/xap_helper.t t/cindex.t
+
test-asan : pure_all
- TEST_XH_CXX_ONLY=1 CXXFLAGS='-O0 -Wall -ggdb3 -fsanitize=address' \\
- prove -bvw t/xap_helper.t
+ TEST_XH_CXX_ONLY=1 CXXFLAGS='-Wall -ggdb3 -fsanitize=address' \\
+ prove -bvw \$(XH_TESTS)
VG_OPT = -v --trace-children=yes --track-fds=yes
VG_OPT += --leak-check=yes --track-origins=yes
test-valgrind : pure_all
TEST_XH_CXX_ONLY=1 VALGRIND="valgrind \$(VG_OPT)" \\
- prove -bvw t/xap_helper.t
+ prove -bvw \$(XH_TESTS)
EOF
}
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index 477f77dc..c4a79473 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -247,6 +247,12 @@ sub mdocid {
int(($docid - 1) / $nshard) + 1;
}
+sub docids_to_artnums {
+ my $nshard = shift->{nshard};
+ # XXX does array vs arrayref make a difference in modern Perls?
+ map { int(($_ - 1) / $nshard) + 1 } @_;
+}
+
sub mset_to_artnums {
my ($self, $mset) = @_;
my $nshard = $self->{nshard};
diff --git a/lib/PublicInbox/XapClient.pm b/lib/PublicInbox/XapClient.pm
index 4dcbbe5d..7dbc867f 100644
--- a/lib/PublicInbox/XapClient.pm
+++ b/lib/PublicInbox/XapClient.pm
@@ -11,6 +11,7 @@ use v5.12;
use PublicInbox::Spawn qw(spawn);
use Socket qw(AF_UNIX SOCK_SEQPACKET);
use PublicInbox::IPC;
+use PublicInbox::IO;
use autodie qw(fork pipe socketpair);
sub mkreq {
diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm
index fe831b8f..7539634f 100644
--- a/lib/PublicInbox/XapHelper.pm
+++ b/lib/PublicInbox/XapHelper.pm
@@ -26,14 +26,18 @@ our @SPEC = (
'a', # ascending sort
'c', # code search
'd=s@', # shard dirs
+ 'g=s', # git dir (with -c)
'k=i', # sort column (like sort(1))
'm=i', # maximum number of results
'o=i', # offset
+ 'p', # show percent
'r', # 1=relevance then column
't', # collapse threads
'A=s@', # prefixes
+ 'D', # emit docdata
+ 'K=i', # timeout kill after i seconds
'O=s', # eidx_key
- 'T=i', # timeout in seconds
+ 'T=i', # threadid
);
sub cmd_test_inspect {
@@ -144,6 +148,39 @@ sub cmd_dump_roots {
emit_mset_stats($req, $mset);
}
+sub mset_iter ($$) {
+ my ($req, $it) = @_;
+ eval {
+ my $buf = $it->get_docid;
+ $buf .= "\0".$it->get_percent if $req->{p};
+ my $doc = ($req->{A} || $req->{D}) ? $it->get_document : undef;
+ for my $p (@{$req->{A}}) {
+ $buf .= "\0".$p.$_ for xap_terms($p, $doc);
+ }
+ $buf .= "\0".$doc->get_data if $req->{D};
+ say { $req->{0} } $buf;
+ };
+ $@ ? iter_retry_check($req) : 0;
+}
+
+sub cmd_mset { # to be used by WWW + IMAP
+ my ($req, $qry_str) = @_;
+ $qry_str // die 'usage: mset [OPTIONS] QRY_STR';
+ my $opt = { limit => $req->{'m'}, offset => $req->{o} // 0 };
+ $opt->{relevance} = 1 if $req->{r};
+ $opt->{threads} = 1 if defined $req->{t};
+ $opt->{git_dir} = $req->{g} if defined $req->{g};
+ $opt->{eidx_key} = $req->{O} if defined $req->{O};
+ $opt->{threadid} = $req->{T} if defined $req->{T};
+ my $mset = $req->{srch}->mset($qry_str, $opt);
+ say { $req->{0} } 'mset.size=', $mset->size;
+ for my $it ($mset->items) {
+ for (my $t = 10; $t > 0; --$t) {
+ $t = mset_iter($req, $it) // $t;
+ }
+ }
+}
+
sub dispatch {
my ($req, $cmd, @argv) = @_;
my $fn = $req->can("cmd_$cmd") or return;
diff --git a/lib/PublicInbox/XapHelperCxx.pm b/lib/PublicInbox/XapHelperCxx.pm
index 8a66fdcd..9414e80e 100644
--- a/lib/PublicInbox/XapHelperCxx.pm
+++ b/lib/PublicInbox/XapHelperCxx.pm
@@ -20,7 +20,7 @@ $ENV{PERL_INLINE_DIRECTORY} // die('BUG: PERL_INLINE_DIRECTORY unset');
substr($dir, 0, 0) = "$ENV{PERL_INLINE_DIRECTORY}/";
my $bin = "$dir/xap_helper";
my ($srcpfx) = (__FILE__ =~ m!\A(.+/)[^/]+\z!);
-my @srcs = map { $srcpfx.$_ } qw(xap_helper.h xh_cidx.h);
+my @srcs = map { $srcpfx.$_ } qw(xh_mset.h xh_cidx.h xap_helper.h);
my @pm_dep = map { $srcpfx.$_ } qw(Search.pm CodeSearch.pm);
my $ldflags = '-Wl,-O1';
$ldflags .= ' -Wl,--compress-debug-sections=zlib' if $^O ne 'openbsd';
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
index 0824ce71..14738591 100644
--- a/lib/PublicInbox/xap_helper.h
+++ b/lib/PublicInbox/xap_helper.h
@@ -124,10 +124,12 @@ struct req { // argv and pfxv point into global rbuf
char *argv[MY_ARG_MAX];
char *pfxv[MY_ARG_MAX]; // -A <prefix>
struct srch *srch;
+ char *Pgit_dir;
char *Oeidx_key;
cmd fn;
unsigned long long max;
unsigned long long off;
+ unsigned long long threadid;
unsigned long timeout_sec;
size_t nr_out;
long sort_col; // value column, negative means BoolWeight
@@ -138,6 +140,8 @@ struct req { // argv and pfxv point into global rbuf
bool collapse_threads;
bool code_search;
bool relevance; // sort by relevance before column
+ bool emit_percent;
+ bool emit_docdata;
bool asc; // ascending sort
};
@@ -230,12 +234,53 @@ static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
return enquire_mset(req, &enq);
}
+static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
+{
+ return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
+}
+
+static void apply_roots_filter(struct req *req, Xapian::Query *qry)
+{
+ if (!req->Pgit_dir) return;
+ req->Pgit_dir[0] = 'P'; // modifies static rbuf
+ Xapian::Database *xdb = req->srch->db;
+ for (int i = 0; i < 9; i++) {
+ try {
+ std::string P = req->Pgit_dir;
+ Xapian::PostingIterator p = xdb->postlist_begin(P);
+ if (p == xdb->postlist_end(P)) {
+ warnx("W: %s not indexed?", req->Pgit_dir + 1);
+ return;
+ }
+ Xapian::TermIterator cur = xdb->termlist_begin(*p);
+ Xapian::TermIterator end = xdb->termlist_end(*p);
+ cur.skip_to("G");
+ if (cur == end) {
+ warnx("W: %s has no root commits?",
+ req->Pgit_dir + 1);
+ return;
+ }
+ Xapian::Query f = Xapian::Query(*cur);
+ for (++cur; cur != end; ++cur) {
+ std::string tn = *cur;
+ if (!starts_with(&tn, "G", 1))
+ continue;
+ f = Xapian::Query(Xapian::Query::OP_OR, f, tn);
+ }
+ *qry = Xapian::Query(Xapian::Query::OP_FILTER, *qry, f);
+ return;
+ } catch (const Xapian::DatabaseModifiedError & e) {
+ xdb->reopen();
+ }
+ }
+}
+
// for cindex
static Xapian::MSet commit_mset(struct req *req, const char *qry_str)
{
struct srch *srch = req->srch;
Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
- // TODO: git_dir + roots_filter
+ apply_roots_filter(req, &qry);
// we only want commits:
qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
@@ -254,11 +299,6 @@ static void emit_mset_stats(struct req *req, const Xapian::MSet *mset)
ABORT("BUG: %s caller only passed 1 FD", req->argv[0]);
}
-static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
-{
- return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
-}
-
static int my_setlinebuf(FILE *fp) // glibc setlinebuf(3) can't report errors
{
return setvbuf(fp, NULL, _IOLBF, 0);
@@ -284,6 +324,32 @@ static void fbuf_init(struct fbuf *fbuf)
if (!fbuf->fp) err(EXIT_FAILURE, "open_memstream(fbuf)");
}
+static bool write_all(int fd, const struct fbuf *wbuf, size_t len)
+{
+ const char *p = wbuf->ptr;
+ assert(wbuf->len >= len);
+ do { // write to client FD
+ ssize_t n = write(fd, p, len);
+ if (n > 0) {
+ len -= n;
+ p += n;
+ } else {
+ perror(n ? "write" : "write (zero bytes)");
+ return false;
+ }
+ } while (len);
+ return true;
+}
+
+#define ERR_FLUSH(f) do { \
+ if (ferror(f) | fflush(f)) err(EXIT_FAILURE, "ferror|fflush "#f); \
+} while (0)
+
+#define ERR_CLOSE(f, e) do { \
+ if (ferror(f) | fclose(f)) \
+ e ? err(e, "ferror|fclose "#f) : perror("ferror|fclose "#f); \
+} while (0)
+
static void xclose(int fd)
{
if (close(fd) < 0 && errno != EINTR)
@@ -332,6 +398,7 @@ static bool cmd_test_inspect(struct req *req)
return false;
}
+#include "xh_mset.h" // read-only (WWW, IMAP, lei) stuff
#include "xh_cidx.h" // CodeSearchIdx.pm stuff
#define CMD(n) { .fn_len = sizeof(#n) - 1, .fn_name = #n, .fn = cmd_##n }
@@ -341,6 +408,7 @@ static const struct cmd_entry {
cmd fn;
} cmds[] = { // should be small enough to not need bsearch || gperf
// most common commands first
+ CMD(mset), // WWW and IMAP requests
CMD(dump_ibx), // many inboxes
CMD(dump_roots), // per-cidx shard
CMD(test_inspect), // least common commands last
@@ -513,7 +581,7 @@ static void dispatch(struct req *req)
char *end;
FILE *kfp;
struct srch **s;
- req->fn = NULL;
+ req->threadid = ULLONG_MAX;
for (c = 0; c < (int)MY_ARRAY_SIZE(cmds); c++) {
if (cmds[c].fn_len == size &&
!memcmp(cmds[c].fn_name, req->argv[0], size)) {
@@ -534,11 +602,13 @@ static void dispatch(struct req *req)
MY_DO_OPTRESET();
// keep sync with @PublicInbox::XapHelper::SPEC
- while ((c = getopt(req->argc, req->argv, "acd:k:m:o:rtA:O:T:")) != -1) {
+ while ((c = getopt(req->argc, req->argv,
+ "acd:g:k:m:o:prtA:DK:O:T:")) != -1) {
switch (c) {
case 'a': req->asc = true; break;
case 'c': req->code_search = true; break;
case 'd': fwrite(optarg, strlen(optarg) + 1, 1, kfp); break;
+ case 'g': req->Pgit_dir = optarg - 1; break; // pad "P" prefix
case 'k':
req->sort_col = strtol(optarg, &end, 10);
if (*end) ABORT("-k %s", optarg);
@@ -556,6 +626,7 @@ static void dispatch(struct req *req)
if (*end || req->off == ULLONG_MAX)
ABORT("-o %s", optarg);
break;
+ case 'p': req->emit_percent = true; break;
case 'r': req->relevance = true; break;
case 't': req->collapse_threads = true; break;
case 'A':
@@ -563,17 +634,22 @@ static void dispatch(struct req *req)
if (MY_ARG_MAX == req->pfxc)
ABORT("too many -A");
break;
+ case 'D': req->emit_docdata = true; break;
case 'O': req->Oeidx_key = optarg - 1; break; // pad "O" prefix
- case 'T':
+ case 'K':
req->timeout_sec = strtoul(optarg, &end, 10);
if (*end || req->timeout_sec == ULONG_MAX)
+ ABORT("-K %s", optarg);
+ break;
+ case 'T':
+ req->threadid = strtoull(optarg, &end, 10);
+ if (*end || req->threadid == ULLONG_MAX)
ABORT("-T %s", optarg);
break;
default: ABORT("bad switch `-%c'", c);
}
}
- if (ferror(kfp) | fclose(kfp)) /* sets kbuf.srch */
- err(EXIT_FAILURE, "ferror|fclose"); // likely ENOMEM
+ ERR_CLOSE(kfp, EXIT_FAILURE); // may ENOMEM, sets kbuf.srch
kbuf.srch->db = NULL;
kbuf.srch->qp = NULL;
kbuf.srch->paths_len = size - offsetof(struct srch, paths);
@@ -632,8 +708,7 @@ static void stderr_restore(FILE *tmp_err)
stderr = orig_err;
return;
#endif
- if (ferror(stderr) | fflush(stderr))
- err(EXIT_FAILURE, "ferror|fflush stderr");
+ ERR_CLOSE(stderr, EXIT_FAILURE);
while (dup2(orig_err_fd, STDERR_FILENO) < 0) {
if (errno != EINTR)
err(EXIT_FAILURE, "dup2(%d => 2)", orig_err_fd);
@@ -663,12 +738,10 @@ static void recv_loop(void) // worker process loop
stderr_set(req.fp[1]);
req.argc = (int)SPLIT2ARGV(req.argv, rbuf, len);
dispatch(&req);
- if (ferror(req.fp[0]) | fclose(req.fp[0]))
- perror("ferror|fclose fp[0]");
+ ERR_CLOSE(req.fp[0], 0);
if (req.fp[1]) {
stderr_restore(req.fp[1]);
- if (ferror(req.fp[1]) | fclose(req.fp[1]))
- perror("ferror|fclose fp[1]");
+ ERR_CLOSE(req.fp[1], 0);
}
}
}
diff --git a/lib/PublicInbox/xh_cidx.h b/lib/PublicInbox/xh_cidx.h
index 8513d88b..49190214 100644
--- a/lib/PublicInbox/xh_cidx.h
+++ b/lib/PublicInbox/xh_cidx.h
@@ -106,8 +106,7 @@ static bool root2offs_str(struct fbuf *root_offs, Xapian::Document *doc)
fputs((const char *)ep->data, root_offs->fp);
}
fputc('\n', root_offs->fp);
- if (ferror(root_offs->fp) | fclose(root_offs->fp))
- err(EXIT_FAILURE, "ferror|fclose(root_offs)"); // ENOMEM
+ ERR_CLOSE(root_offs->fp, EXIT_FAILURE); // ENOMEM
root_offs->fp = NULL;
return true;
}
@@ -137,38 +136,24 @@ static void dump_roots_term(struct req *req, const char *pfx,
// buffering and rely on flock(2), here
static bool dump_roots_flush(struct req *req, struct dump_roots_tmp *drt)
{
- char *p;
- int fd = fileno(req->fp[0]);
bool ok = true;
+ off_t off = ftello(drt->wbuf.fp);
+ if (off < 0) EABORT("ftello");
+ if (!off) return ok;
+
+ ERR_FLUSH(drt->wbuf.fp); // ENOMEM
+ int fd = fileno(req->fp[0]);
- if (!drt->wbuf.fp) return true;
- if (fd < 0) EABORT("BUG: fileno");
- if (ferror(drt->wbuf.fp) | fclose(drt->wbuf.fp)) // ENOMEM?
- err(EXIT_FAILURE, "ferror|fclose(drt->wbuf.fp)");
- drt->wbuf.fp = NULL;
- if (!drt->wbuf.len) goto done_free;
while (flock(drt->root2off_fd, LOCK_EX)) {
if (errno == EINTR) continue;
err(EXIT_FAILURE, "LOCK_EX"); // ENOLCK?
}
- p = drt->wbuf.ptr;
- do { // write to client FD
- ssize_t n = write(fd, p, drt->wbuf.len);
- if (n > 0) {
- drt->wbuf.len -= n;
- p += n;
- } else {
- perror(n ? "write" : "write (zero bytes)");
- return false;
- }
- } while (drt->wbuf.len);
+ ok = write_all(fd, &drt->wbuf, (size_t)off);
while (flock(drt->root2off_fd, LOCK_UN)) {
if (errno == EINTR) continue;
err(EXIT_FAILURE, "LOCK_UN"); // ENOLCK?
}
-done_free: // OK to skip on errors, dump_roots_ensure calls fbuf_ensure
- free(drt->wbuf.ptr);
- drt->wbuf.ptr = NULL;
+ if (fseeko(drt->wbuf.fp, 0, SEEK_SET)) EABORT("fseeko");
return ok;
}
@@ -239,11 +224,11 @@ static bool cmd_dump_roots(struct req *req)
req->sort_col = -1;
Xapian::MSet mset = commit_mset(req, req->argv[optind + 1]);
+ fbuf_init(&drt.wbuf);
+
// @UNIQ_FOLD in CodeSearchIdx.pm can handle duplicate lines fine
// in case we need to retry on DB reopens
for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
- if (!drt.wbuf.fp)
- fbuf_init(&drt.wbuf);
for (int t = 10; t > 0; --t)
switch (dump_roots_iter(req, &drt, &i)) {
case ITER_OK: t = 0; break; // leave inner loop
diff --git a/lib/PublicInbox/xh_mset.h b/lib/PublicInbox/xh_mset.h
new file mode 100644
index 00000000..056fe22b
--- /dev/null
+++ b/lib/PublicInbox/xh_mset.h
@@ -0,0 +1,96 @@
+// Copyright (C) all contributors <meta@public-inbox.org>
+// License: GPL-2.0+ <https://www.gnu.org/licenses/gpl-2.0.txt>
+// This file is only intended to be included by xap_helper.h
+// it implements pieces used by WWW, IMAP and lei
+
+static void emit_doc_term(FILE *fp, const char *pfx, Xapian::Document *doc)
+{
+ Xapian::TermIterator cur = doc->termlist_begin();
+ Xapian::TermIterator end = doc->termlist_end();
+ size_t pfx_len = strlen(pfx);
+
+ for (cur.skip_to(pfx); cur != end; cur++) {
+ std::string tn = *cur;
+ if (!starts_with(&tn, pfx, pfx_len)) continue;
+ fputc(0, fp);
+ fwrite(tn.data(), tn.size(), 1, fp);
+ }
+}
+
+static enum exc_iter mset_iter(const struct req *req, FILE *fp, off_t off,
+ Xapian::MSetIterator *i)
+{
+ try {
+ fprintf(fp, "%llu", (unsigned long long)(*(*i))); // get_docid
+ if (req->emit_percent)
+ fprintf(fp, "%c%d", 0, i->get_percent());
+ if (req->pfxc || req->emit_docdata) {
+ Xapian::Document doc = i->get_document();
+ for (int p = 0; p < req->pfxc; p++)
+ emit_doc_term(fp, req->pfxv[p], &doc);
+ if (req->emit_docdata) {
+ std::string d = doc.get_data();
+ fputc(0, fp);
+ fwrite(d.data(), d.size(), 1, fp);
+ }
+ }
+ fputc('\n', fp);
+ } catch (const Xapian::DatabaseModifiedError & e) {
+ req->srch->db->reopen();
+ if (fseeko(fp, off, SEEK_SET) < 0) EABORT("fseeko");
+ return ITER_RETRY;
+ } catch (const Xapian::DocNotFoundError & e) { // oh well...
+ warnx("doc not found: %s", e.get_description().c_str());
+ if (fseeko(fp, off, SEEK_SET) < 0) EABORT("fseeko");
+ }
+ return ITER_OK;
+}
+
+#ifndef WBUF_FLUSH_THRESHOLD
+# define WBUF_FLUSH_THRESHOLD (BUFSIZ - 1000)
+#endif
+#if WBUF_FLUSH_THRESHOLD < 0
+# undef WBUF_FLUSH_THRESHOLD
+# define WBUF_FLUSH_THRESHOLD BUFSIZ
+#endif
+
+static bool cmd_mset(struct req *req)
+{
+ if (optind >= req->argc) ABORT("usage: mset [OPTIONS] WANT QRY_STR");
+ if (req->fp[1]) ABORT("mset only accepts 1 FD");
+ const char *qry_str = req->argv[optind];
+ CLEANUP_FBUF struct fbuf wbuf = {};
+ Xapian::MSet mset = req->code_search ? commit_mset(req, qry_str) :
+ mail_mset(req, qry_str);
+ fbuf_init(&wbuf);
+ fprintf(wbuf.fp, "mset.size=%llu\n", (unsigned long long)mset.size());
+ int fd = fileno(req->fp[0]);
+ for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
+ off_t off = ftello(wbuf.fp);
+ if (off < 0) EABORT("ftello");
+ /*
+ * TODO verify our fflush + fseeko use isn't affected by a
+ * glibc <2.25 bug:
+ * https://sourceware.org/bugzilla/show_bug.cgi?id=20181
+ * CentOS 7.x only has glibc 2.17. In any case, bug #20181
+ * shouldn't affect us since our use of fseeko is used to
+ * effectively discard data.
+ */
+ if (off > WBUF_FLUSH_THRESHOLD) {
+ ERR_FLUSH(wbuf.fp);
+ if (!write_all(fd, &wbuf, (size_t)off)) return false;
+ if (fseeko(wbuf.fp, 0, SEEK_SET)) EABORT("fseeko");
+ off = 0;
+ }
+ for (int t = 10; t > 0; --t)
+ switch (mset_iter(req, wbuf.fp, off, &i)) {
+ case ITER_OK: t = 0; break; // leave inner loop
+ case ITER_RETRY: break; // continue for-loop
+ case ITER_ABORT: return false; // error
+ }
+ }
+ off_t off = ftello(wbuf.fp);
+ if (off < 0) EABORT("ftello");
+ ERR_FLUSH(wbuf.fp);
+ return off > 0 ? write_all(fd, &wbuf, (size_t)off) : true;
+}
diff --git a/t/cindex.t b/t/cindex.t
index 1a9e564a..ac7a6000 100644
--- a/t/cindex.t
+++ b/t/cindex.t
@@ -121,22 +121,70 @@ my $no_metadata_set = sub {
use_ok 'PublicInbox::CodeSearch';
+
+my @xh_args;
+my $exp = [ 'initial with NUL character', 'remove NUL character' ];
+my $zp_git = abs_path("$zp/.git");
if ('multi-repo search') {
my $csrch = PublicInbox::CodeSearch->new("$tmp/ext");
my $mset = $csrch->mset('NUL');
is(scalar($mset->items), 2, 'got results');
- my $exp = [ 'initial with NUL character', 'remove NUL character' ];
my @have = sort(map { $_->get_document->get_data } $mset->items);
is_xdeeply(\@have, $exp, 'got expected subjects');
$mset = $csrch->mset('NUL', { git_dir => "$tmp/wt0/.git" });
is(scalar($mset->items), 0, 'no results with other GIT_DIR');
- $mset = $csrch->mset('NUL', { git_dir => abs_path("$zp/.git") });
+ $mset = $csrch->mset('NUL', { git_dir => $zp_git });
@have = sort(map { $_->get_document->get_data } $mset->items);
is_xdeeply(\@have, $exp, 'got expected subjects w/ GIT_DIR filter');
my @xdb = $csrch->xdb_shards_flat;
$no_metadata_set->(0, ['indexlevel'], \@xdb);
+ @xh_args = $csrch->xh_args;
+}
+
+my $test_xhc = sub {
+ my ($xhc) = @_;
+ my $impl = $xhc->{impl};
+ my ($r, @l);
+ $r = $xhc->mkreq([], qw(mset -D -c -g), $zp_git, @xh_args, 'NUL');
+ chomp(@l = <$r>);
+ is(shift(@l), 'mset.size=2', "got expected header $impl");
+ my %docid2data;
+ my @got = sort map {
+ my @f = split /\0/;
+ is scalar(@f), 2, 'got 2 entries';
+ $docid2data{$f[0]} = $f[1];
+ $f[1];
+ } @l;
+ is_deeply(\@got, $exp, "expected doc_data $impl");
+
+ $r = $xhc->mkreq([], qw(mset -c -g), "$tmp/wt0/.git", @xh_args, 'NUL');
+ chomp(@l = <$r>);
+ is(shift(@l), 'mset.size=0', "got miss in wrong dir $impl");
+ is_deeply(\@l, [], "no extra lines $impl");
+
+ my $csrch = PublicInbox::CodeSearch->new("$tmp/ext");
+ while (my ($did, $expect) = each %docid2data) {
+ is_deeply($csrch->xdb->get_document($did)->get_data,
+ $expect, "docid=$did data matches");
+ }
+ ok(!$xhc->{io}->close, "$impl close");
+ is($?, 66 << 8, "got EX_NOINPUT from $impl exit");
+};
+
+SKIP: {
+ require_mods('+SCM_RIGHTS', 1);
+ require PublicInbox::XapClient;
+ my $xhc = PublicInbox::XapClient::start_helper('-j0');
+ $test_xhc->($xhc);
+ skip 'PI_NO_CXX set', 1 if $ENV{PI_NO_CXX};
+ $xhc->{impl} =~ /Cxx/ or
+ skip 'C++ compiler or xapian development libs missing', 1;
+ skip 'TEST_XH_CXX_ONLY set', 1 if $ENV{TEST_XH_CXX_ONLY};
+ local $ENV{PI_NO_CXX} = 1; # force XS or SWIG binding test
+ $xhc = PublicInbox::XapClient::start_helper('-j0');
+ $test_xhc->($xhc);
}
if ('--update') {
diff --git a/t/xap_helper.t b/t/xap_helper.t
index e3abeded..ee25b2dc 100644
--- a/t/xap_helper.t
+++ b/t/xap_helper.t
@@ -40,6 +40,7 @@ my $v2 = create_inbox 'v2', indexlevel => 'medium', version => 2,
};
my @ibx_idx = glob("$v2->{inboxdir}/xap*/?");
+my @ibx_shard_args = map { ('-d', $_) } @ibx_idx;
my (@int) = glob("$crepo/public-inbox-cindex/cidx*/?");
my (@ext) = glob("$crepo/cidx-ext/cidx*/?");
is(scalar(@ext), 2, 'have 2 external shards') or diag explain(\@ext);
@@ -76,8 +77,7 @@ my $test = sub {
is($cinfo{has_threadid}, '0', 'has_threadid false for cindex');
is($cinfo{pid}, $info{pid}, 'PID unchanged for cindex');
- my @dump = (qw(dump_ibx -A XDFID), (map { ('-d', $_) } @ibx_idx),
- qw(13 rt:0..));
+ my @dump = (qw(dump_ibx -A XDFID), @ibx_shard_args, qw(13 rt:0..));
$r = $doreq->($s, @dump);
my @res;
while (sysread($r, my $buf, 512) != 0) { push @res, $buf }
@@ -89,7 +89,8 @@ my $test = sub {
my $res = do { local $/; <$r> };
is(join('', @res), $res, 'got identical response w/ error pipe');
my $stats = do { local $/; <$err_rd> };
- is($stats, "mset.size=6 nr_out=6\n", 'mset.size reported');
+ is($stats, "mset.size=6 nr_out=6\n", 'mset.size reported') or
+ diag "res=$res";
return wantarray ? ($ar, $s) : $ar if $cinfo{pid} == $pid;
@@ -198,7 +199,47 @@ for my $n (@NO_CXX) {
is(scalar(@res), scalar(grep(/\A[0-9a-f]{40,} [0-9]+\n\z/, @res)),
'entries match format');
$err = do { local $/; <$err_r> };
- is($err, "mset.size=6 nr_out=5\n", "got expected status ($xhc->{impl})");
+ is $err, "mset.size=6 nr_out=5\n", "got expected status ($xhc->{impl})";
+
+ $r = $xhc->mkreq([], qw(mset -p -A XDFID -A Q), @ibx_shard_args,
+ 'dfn:lib/PublicInbox/Search.pm');
+ chomp((my $hdr, @res) = readline($r));
+ is $hdr, 'mset.size=1', "got expected header via mset ($xhc->{impl}";
+ is scalar(@res), 1, 'got one result';
+ @res = split /\0/, $res[0];
+ {
+ my $doc = $v2->search->xdb->get_document($res[0]);
+ my @q = PublicInbox::Search::xap_terms('Q', $doc);
+ is_deeply \@q, [ $mid ], 'docid usable';
+ }
+ ok $res[1] > 0 && $res[1] <= 100, 'pct > 0 && <= 100';
+ is $res[2], 'XDFID'.$dfid, 'XDFID result matches';
+ is $res[3], 'Q'.$mid, 'Q (msgid) mset result matches';
+ is scalar(@res), 4, 'only 4 columns in result';
+
+ $r = $xhc->mkreq([], qw(mset -p -A XDFID -A Q), @ibx_shard_args,
+ 'dt:19700101'.'000000..');
+ chomp(($hdr, @res) = readline($r));
+ is $hdr, 'mset.size=6',
+ "got expected header via multi-result mset ($xhc->{impl}";
+ is(scalar(@res), 6, 'got 6 rows');
+ for my $r (@res) {
+ my ($docid, $pct, @rest) = split /\0/, $r;
+ my $doc = $v2->search->xdb->get_document($docid);
+ ok $pct > 0 && $pct <= 100,
+ "pct > 0 && <= 100 #$docid ($xhc->{impl})";
+ my %terms;
+ for (@rest) {
+ s/\A([A-Z]+)// or xbail 'no prefix=', \@rest;
+ push @{$terms{$1}}, $_;
+ }
+ while (my ($pfx, $vals) = each %terms) {
+ @$vals = sort @$vals;
+ my @q = PublicInbox::Search::xap_terms($pfx, $doc);
+ is_deeply $vals, \@q,
+ "#$docid $pfx as expected ($xhc->{impl})";
+ }
+ }
}
done_testing;
^ permalink raw reply related [flat|nested] 2+ messages in thread
end of thread, other threads:[~2023-11-26 1:57 UTC | newest]
Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2023-11-26 1:57 [PATCH 1/2] WIP-cidx-xh-split Eric Wong
2023-11-26 1:57 ` [PATCH 2/2] WIP-cidx Eric Wong
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).