diff options
author | Eric Wong <e@80x24.org> | 2023-08-24 01:22:35 +0000 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2023-08-24 07:47:53 +0000 |
commit | 6d834eeb3dafe63c3c221d5e8ccf64d1d8837a70 (patch) | |
tree | bed23c09892fd6e6c0e205116942acc6c171eb7e /lib/PublicInbox/xap_helper.h | |
parent | 1c8430a7fa407e476ef70a6a199983faf071d7a5 (diff) | |
download | public-inbox-6d834eeb3dafe63c3c221d5e8ccf64d1d8837a70.tar.gz |
It's now just `dump_roots' instead of `dump_shard_roots', since this doesn't need to be tied to the concept of shards. I'm still shaky with C++, but intend to keep using stuff like hsearch(3) to make life easier for C hackers :P
Diffstat (limited to 'lib/PublicInbox/xap_helper.h')
-rw-r--r-- | lib/PublicInbox/xap_helper.h | 332 |
1 files changed, 294 insertions, 38 deletions
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h index 52db92b7..c9b4e0cc 100644 --- a/lib/PublicInbox/xap_helper.h +++ b/lib/PublicInbox/xap_helper.h @@ -6,12 +6,16 @@ * this is not linked to Perl in any way. * C (not C++) is used as much as possible to lower the contribution * barrier for hackers who mainly know C (this includes the maintainer). + * Yes, that means we use C stdlib stuff like hsearch and open_memstream + * instead their equivalents in the C++ stdlib :P * Everything here is an unstable internal API of public-inbox and * NOT intended for ordinary users; only public-inbox hackers */ #ifndef _ALL_SOURCE # define _ALL_SOURCE #endif +#include <sys/file.h> +#include <sys/mman.h> #include <sys/resource.h> #include <sys/socket.h> #include <sys/stat.h> @@ -80,6 +84,7 @@ struct req { // argv and pfxv point into global rbuf unsigned long long max; unsigned long long off; unsigned long timeout_sec; + size_t nr_out; long sort_col; // value column, negative means BoolWeight int argc; int pfxc; @@ -96,6 +101,28 @@ struct worker { unsigned nr; }; +#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst)) +static size_t split2argv(char **dst, char *buf, size_t len, size_t limit) +{ + if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) { + warnx("bogus argument given"); + return 0; + } + size_t nr = 0; + char *c = buf; + for (size_t i = 1; i < len; i++) { + if (!buf[i]) { + dst[nr++] = c; + c = buf + i + 1; + } + if (nr == limit) { + warnx("too many args: %zu", nr); + return 0; + } + } + return (long)nr; +} + static bool has_threadid(const struct srch *srch) { return srch->db->get_metadata("has_threadid") == "1"; @@ -118,8 +145,12 @@ static Xapian::Enquire prep_enquire(const struct req *req) static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq) { - if (!req->max) - req->max = 50; + if (!req->max) { + switch (sizeof(Xapian::doccount)) { + case 4: req->max = UINT_MAX; break; + default: req->max = ULLONG_MAX; + } + } for (int i = 0; i < 9; i++) { try { Xapian::MSet mset = enq->get_mset(req->off, req->max); @@ -131,13 +162,13 @@ static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq) return enq->get_mset(req->off, req->max); } +// for v1, v2, and extindex static Xapian::MSet mail_mset(struct req *req, const char *qry_str) { struct srch *srch = req->srch; Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags); if (req->Oeidx_key) { req->Oeidx_key[0] = 'O'; // modifies static rbuf - fprintf(stderr, "dbg eidxkey:%s>\n", req->Oeidx_key); qry = Xapian::Query(Xapian::Query::OP_FILTER, qry, Xapian::Query(req->Oeidx_key)); } @@ -150,6 +181,21 @@ static Xapian::MSet mail_mset(struct req *req, const char *qry_str) return enquire_mset(req, &enq); } +// for cindex +static Xapian::MSet commit_mset(struct req *req, const char *qry_str) +{ + struct srch *srch = req->srch; + Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags); + // TODO: git_dir + roots_filter + + // we only want commits: + qry = Xapian::Query(Xapian::Query::OP_FILTER, qry, + Xapian::Query("T" "c")); + Xapian::Enquire enq = prep_enquire(req); + enq.set_query(qry); + return enquire_mset(req, &enq); +} + static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len) { return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len); @@ -165,9 +211,11 @@ static void dump_ibx_term(struct req *req, const char *pfx, for (cur.skip_to(pfx); cur != end; cur++) { std::string tn = *cur; - if (starts_with(&tn, pfx, pfx_len)) + if (starts_with(&tn, pfx, pfx_len)) { fprintf(req->fp[0], "%s %s\n", tn.c_str() + pfx_len, ibx_id); + ++req->nr_out; + } } } @@ -194,7 +242,6 @@ static bool cmd_dump_ibx(struct req *req) } req->asc = true; req->sort_col = -1; - req->max = (unsigned long long)req->srch->db->get_doccount(); Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]); for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) { try { @@ -208,8 +255,244 @@ static bool cmd_dump_ibx(struct req *req) } } if (req->fp[1]) - fprintf(req->fp[1], "mset.size=%llu\n", - (unsigned long long)mset.size()); + fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n", + (unsigned long long)mset.size(), req->nr_out); + return true; +} + +struct fbuf { + FILE *fp; + char *ptr; + size_t len; +}; + +struct dump_roots_tmp { + struct stat sb; + void *mm_ptr; + char **entries; + struct fbuf wbuf; + int root2id_fd; +}; + +#define CLEANUP_FBUF __attribute__((__cleanup__(fbuf_ensure))) +static void fbuf_ensure(void *ptr) +{ + struct fbuf *fbuf = (struct fbuf *)ptr; + if (fbuf->fp && fclose(fbuf->fp)) + perror("fclose(fbuf->fp)"); + fbuf->fp = NULL; + free(fbuf->ptr); +} + +static bool fbuf_init(struct fbuf *fbuf) +{ + assert(!fbuf->ptr); + fbuf->fp = open_memstream(&fbuf->ptr, &fbuf->len); + if (fbuf->fp) return true; + perror("open_memstream(fbuf)"); + return false; +} + +static void xclose(int fd) +{ + if (close(fd) < 0 && errno != EINTR) + err(EXIT_FAILURE, "BUG: close"); +} + +#define CLEANUP_DUMP_ROOTS __attribute__((__cleanup__(dump_roots_ensure))) +static void dump_roots_ensure(void *ptr) +{ + struct dump_roots_tmp *drt = (struct dump_roots_tmp *)ptr; + if (drt->root2id_fd >= 0) + xclose(drt->root2id_fd); + hdestroy(); // idempotent + if (drt->mm_ptr && munmap(drt->mm_ptr, drt->sb.st_size)) + err(EXIT_FAILURE, "BUG: munmap"); + free(drt->entries); + fbuf_ensure(&drt->wbuf); +} + +static bool root2ids_str(struct fbuf *root_ids, struct dump_roots_tmp *drt, + Xapian::Document *doc) +{ + if (!fbuf_init(root_ids)) return false; + + bool ok = true; + Xapian::TermIterator cur = doc->termlist_begin(); + Xapian::TermIterator end = doc->termlist_end(); + ENTRY e, *ep; + for (cur.skip_to("G"); cur != end; cur++) { + std::string tn = *cur; + if (!starts_with(&tn, "G", 1)) + continue; + union { const char *in; char *out; } u; + u.in = tn.c_str() + 1; + e.key = u.out; + ep = hsearch(e, FIND); + if (!ep) { + warnx("hsearch miss `%s'", e.key); + return false; + } + // ep->data is a NUL-terminated string matching /[0-9]+/ + fputc(' ', root_ids->fp); + fputs((const char *)ep->data, root_ids->fp); + } + fputc('\n', root_ids->fp); + if (ferror(root_ids->fp) | fclose(root_ids->fp)) { + perror("ferror|fclose(root_ids)"); + ok = false; + } + root_ids->fp = NULL; + return ok; +} + +// writes term values matching @pfx for a given @doc, ending the line +// with the contents of @root_ids +static void dump_roots_term(struct req *req, const char *pfx, + struct dump_roots_tmp *drt, + struct fbuf *root_ids, + Xapian::Document *doc) +{ + Xapian::TermIterator cur = doc->termlist_begin(); + Xapian::TermIterator end = doc->termlist_end(); + size_t pfx_len = strlen(pfx); + + for (cur.skip_to(pfx); cur != end; cur++) { + std::string tn = *cur; + if (!starts_with(&tn, pfx, pfx_len)) + continue; + fputs(tn.c_str() + pfx_len, drt->wbuf.fp); + fwrite(root_ids->ptr, root_ids->len, 1, drt->wbuf.fp); + ++req->nr_out; + } +} + +// we may have lines which exceed PIPE_BUF, so we do our own +// buffering and rely on flock(2), here +static bool dump_roots_flush(struct req *req, struct dump_roots_tmp *drt) +{ + char *p; + int fd = fileno(req->fp[0]); + bool ok = true; + + if (!drt->wbuf.fp) return true; + if (fd < 0) err(EXIT_FAILURE, "BUG: fileno"); + if (fclose(drt->wbuf.fp)) { + warn("fclose(drt->wbuf.fp)"); // malloc failure? + return false; + } + drt->wbuf.fp = NULL; + if (!drt->wbuf.len) goto done_free; + if (flock(drt->root2id_fd, LOCK_EX)) { + perror("LOCK_EX"); + return false; + } + p = drt->wbuf.ptr; + do { + ssize_t n = write(fd, p, drt->wbuf.len); + if (n > 0) { + drt->wbuf.len -= n; + p += n; + } else { + perror(n ? "write" : "write (zero bytes)"); + return false; + } + } while (drt->wbuf.len); + if (flock(drt->root2id_fd, LOCK_UN)) { + perror("LOCK_UN"); + return false; + } +done_free: + free(drt->wbuf.ptr); + drt->wbuf.ptr = NULL; + return ok; +} + +static bool cmd_dump_roots(struct req *req) +{ + CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt { .root2id_fd = -1 }; + if ((optind + 1) >= req->argc) { + warnx("usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR"); + return false; // need file + qry_str + } + if (!req->pfxc) { + warnx("dump_roots requires -A PREFIX"); + return false; + } + const char *root2id_file = req->argv[optind]; + drt.root2id_fd = open(root2id_file, O_RDONLY); + if (drt.root2id_fd < 0) { + warn("open(%s)", root2id_file); + return false; + } + if (fstat(drt.root2id_fd, &drt.sb)) { + warn("fstat(%s)", root2id_file); + return false; + } + // each entry is at least 43 bytes ({OIDHEX}\0{INT}\0), + // so /32 overestimates the number of expected entries by + // ~%25 (as recommended by Linux hcreate(3) manpage) + size_t est = (drt.sb.st_size / 32) + 1; + if ((uint64_t)drt.sb.st_size > (uint64_t)SIZE_MAX) { + warnx("%s size too big (%lld bytes > %zu)", root2id_file, + (long long)drt.sb.st_size, SIZE_MAX); + return false; + } + drt.mm_ptr = mmap(NULL, drt.sb.st_size, PROT_READ, + MAP_PRIVATE, drt.root2id_fd, 0); + if (drt.mm_ptr == MAP_FAILED) { + warn("mmap(%s)", root2id_file); + return false; + } + drt.entries = (char **)calloc(est * 2, sizeof(char *)); + if (!drt.entries) { + warn("calloc(%zu * 2, %zu)", est, sizeof(char *)); + return false; + } + size_t tot = split2argv(drt.entries, (char *)drt.mm_ptr, + drt.sb.st_size, est * 2); + if (tot <= 0) return false; // split2argv already warned on error + if (!hcreate(est)) { + warn("hcreate(%zu)", est); + return false; + } + for (size_t i = 0; i < tot; ) { + ENTRY e; + e.key = drt.entries[i++]; + e.data = drt.entries[i++]; + if (!hsearch(e, ENTER)) { + warn("hsearch(%s => %s, ENTER)", e.key, + (const char *)e.data); + return false; + } + } + req->asc = true; + req->sort_col = -1; + Xapian::MSet mset = commit_mset(req, req->argv[optind + 1]); + for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) { + CLEANUP_FBUF struct fbuf root_ids = { 0 }; + if (!drt.wbuf.fp && !fbuf_init(&drt.wbuf)) + return false; + try { + Xapian::Document doc = i.get_document(); + if (!root2ids_str(&root_ids, &drt, &doc)) + return false; + for (int p = 0; p < req->pfxc; p++) + dump_roots_term(req, req->pfxv[p], &drt, + &root_ids, &doc); + } catch (const Xapian::Error & e) { + fprintf(orig_err, "W: %s (#%ld)\n", + e.get_description().c_str(), (long)(*i)); + continue; + } + if (!(req->nr_out & 0x3fff) && !dump_roots_flush(req, &drt)) + return false; + } + if (!dump_roots_flush(req, &drt)) + return false; + if (req->fp[1]) + fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n", + (unsigned long long)mset.size(), req->nr_out); return true; } @@ -228,7 +511,8 @@ static const struct cmd_entry { cmd fn; } cmds[] = { // should be small enough to not need bsearch || gperf // most common commands first - CMD(dump_ibx), + CMD(dump_ibx), // many inboxes + CMD(dump_roots), // per-cidx shard CMD(test_inspect), // least common commands last }; @@ -240,12 +524,6 @@ union my_cmsg { char pad[sizeof(struct cmsghdr) + 16 + RECV_FD_SPACE]; }; -static void xclose(int fd) -{ - if (close(fd) < 0 && errno != EINTR) - err(EXIT_FAILURE, "BUG: close"); -} - static bool recv_req(struct req *req, char *rbuf, size_t *len) { union my_cmsg cmsg = { 0 }; @@ -306,28 +584,6 @@ static bool recv_req(struct req *req, char *rbuf, size_t *len) return false; } -#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst)) -static int split2argv(char **dst, char *buf, size_t len, size_t limit) -{ - if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) { - warnx("bogus argument given"); - return 0; - } - size_t nr = 0; - char *c = buf; - for (size_t i = 1; i < len; i++) { - if (!buf[i]) { - dst[nr++] = c; - c = buf + i + 1; - } - if (nr == limit) { - warnx("too many args: %zu", nr); - return 0; - } - } - return (int)nr; -} - static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch { const struct srch *a = (const struct srch *)pa; @@ -355,7 +611,7 @@ static bool srch_init(struct req *req) char *dirv[MY_ARG_MAX]; int i; struct srch *srch = req->srch; - int dirc = SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len); + int dirc = (int)SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len); const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE; srch->qp_flags = FLAG_PHRASE | Xapian::QueryParser::FLAG_BOOLEAN | @@ -538,7 +794,7 @@ static void recv_loop(void) // worker process loop perror("W: setlinebuf(req.fp[1])"); stderr = req.fp[1]; } - req.argc = SPLIT2ARGV(req.argv, rbuf, len); + req.argc = (int)SPLIT2ARGV(req.argv, rbuf, len); if (req.argc > 0) dispatch(&req); if (ferror(req.fp[0]) | fclose(req.fp[0])) |