about summary refs log tree commit homepage
path: root/lib/PublicInbox/xap_helper.h
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-08-24 01:22:35 +0000
committerEric Wong <e@80x24.org>2023-08-24 07:47:53 +0000
commit6d834eeb3dafe63c3c221d5e8ccf64d1d8837a70 (patch)
treebed23c09892fd6e6c0e205116942acc6c171eb7e /lib/PublicInbox/xap_helper.h
parent1c8430a7fa407e476ef70a6a199983faf071d7a5 (diff)
downloadpublic-inbox-6d834eeb3dafe63c3c221d5e8ccf64d1d8837a70.tar.gz
It's now just `dump_roots' instead of `dump_shard_roots', since
this doesn't need to be tied to the concept of shards.  I'm
still shaky with C++, but intend to keep using stuff like
hsearch(3) to make life easier for C hackers :P
Diffstat (limited to 'lib/PublicInbox/xap_helper.h')
-rw-r--r--lib/PublicInbox/xap_helper.h332
1 files changed, 294 insertions, 38 deletions
diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h
index 52db92b7..c9b4e0cc 100644
--- a/lib/PublicInbox/xap_helper.h
+++ b/lib/PublicInbox/xap_helper.h
@@ -6,12 +6,16 @@
  * this is not linked to Perl in any way.
  * C (not C++) is used as much as possible to lower the contribution
  * barrier for hackers who mainly know C (this includes the maintainer).
+ * Yes, that means we use C stdlib stuff like hsearch and open_memstream
+ * instead their equivalents in the C++ stdlib :P
  * Everything here is an unstable internal API of public-inbox and
  * NOT intended for ordinary users; only public-inbox hackers
  */
 #ifndef _ALL_SOURCE
 #        define _ALL_SOURCE
 #endif
+#include <sys/file.h>
+#include <sys/mman.h>
 #include <sys/resource.h>
 #include <sys/socket.h>
 #include <sys/stat.h>
@@ -80,6 +84,7 @@ struct req { // argv and pfxv point into global rbuf
         unsigned long long max;
         unsigned long long off;
         unsigned long timeout_sec;
+        size_t nr_out;
         long sort_col; // value column, negative means BoolWeight
         int argc;
         int pfxc;
@@ -96,6 +101,28 @@ struct worker {
         unsigned nr;
 };
 
+#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
+static size_t split2argv(char **dst, char *buf, size_t len, size_t limit)
+{
+        if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) {
+                warnx("bogus argument given");
+                return 0;
+        }
+        size_t nr = 0;
+        char *c = buf;
+        for (size_t i = 1; i < len; i++) {
+                if (!buf[i]) {
+                        dst[nr++] = c;
+                        c = buf + i + 1;
+                }
+                if (nr == limit) {
+                        warnx("too many args: %zu", nr);
+                        return 0;
+                }
+        }
+        return (long)nr;
+}
+
 static bool has_threadid(const struct srch *srch)
 {
         return srch->db->get_metadata("has_threadid") == "1";
@@ -118,8 +145,12 @@ static Xapian::Enquire prep_enquire(const struct req *req)
 
 static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq)
 {
-        if (!req->max)
-                req->max = 50;
+        if (!req->max) {
+                switch (sizeof(Xapian::doccount)) {
+                case 4: req->max = UINT_MAX; break;
+                default: req->max = ULLONG_MAX;
+                }
+        }
         for (int i = 0; i < 9; i++) {
                 try {
                         Xapian::MSet mset = enq->get_mset(req->off, req->max);
@@ -131,13 +162,13 @@ static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq)
         return enq->get_mset(req->off, req->max);
 }
 
+// for v1, v2, and extindex
 static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
 {
         struct srch *srch = req->srch;
         Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
         if (req->Oeidx_key) {
                 req->Oeidx_key[0] = 'O'; // modifies static rbuf
-                fprintf(stderr, "dbg eidxkey:%s>\n", req->Oeidx_key);
                 qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
                                         Xapian::Query(req->Oeidx_key));
         }
@@ -150,6 +181,21 @@ static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
         return enquire_mset(req, &enq);
 }
 
+// for cindex
+static Xapian::MSet commit_mset(struct req *req, const char *qry_str)
+{
+        struct srch *srch = req->srch;
+        Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
+        // TODO: git_dir + roots_filter
+
+        // we only want commits:
+        qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
+                                Xapian::Query("T" "c"));
+        Xapian::Enquire enq = prep_enquire(req);
+        enq.set_query(qry);
+        return enquire_mset(req, &enq);
+}
+
 static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
 {
         return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
@@ -165,9 +211,11 @@ static void dump_ibx_term(struct req *req, const char *pfx,
         for (cur.skip_to(pfx); cur != end; cur++) {
                 std::string tn = *cur;
 
-                if (starts_with(&tn, pfx, pfx_len))
+                if (starts_with(&tn, pfx, pfx_len)) {
                         fprintf(req->fp[0], "%s %s\n",
                                 tn.c_str() + pfx_len, ibx_id);
+                        ++req->nr_out;
+                }
         }
 }
 
@@ -194,7 +242,6 @@ static bool cmd_dump_ibx(struct req *req)
         }
         req->asc = true;
         req->sort_col = -1;
-        req->max = (unsigned long long)req->srch->db->get_doccount();
         Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]);
         for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
                 try {
@@ -208,8 +255,244 @@ static bool cmd_dump_ibx(struct req *req)
                 }
         }
         if (req->fp[1])
-                fprintf(req->fp[1], "mset.size=%llu\n",
-                        (unsigned long long)mset.size());
+                fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
+                        (unsigned long long)mset.size(), req->nr_out);
+        return true;
+}
+
+struct fbuf {
+        FILE *fp;
+        char *ptr;
+        size_t len;
+};
+
+struct dump_roots_tmp {
+        struct stat sb;
+        void *mm_ptr;
+        char **entries;
+        struct fbuf wbuf;
+        int root2id_fd;
+};
+
+#define CLEANUP_FBUF __attribute__((__cleanup__(fbuf_ensure)))
+static void fbuf_ensure(void *ptr)
+{
+        struct fbuf *fbuf = (struct fbuf *)ptr;
+        if (fbuf->fp && fclose(fbuf->fp))
+                perror("fclose(fbuf->fp)");
+        fbuf->fp = NULL;
+        free(fbuf->ptr);
+}
+
+static bool fbuf_init(struct fbuf *fbuf)
+{
+        assert(!fbuf->ptr);
+        fbuf->fp = open_memstream(&fbuf->ptr, &fbuf->len);
+        if (fbuf->fp) return true;
+        perror("open_memstream(fbuf)");
+        return false;
+}
+
+static void xclose(int fd)
+{
+        if (close(fd) < 0 && errno != EINTR)
+                err(EXIT_FAILURE, "BUG: close");
+}
+
+#define CLEANUP_DUMP_ROOTS __attribute__((__cleanup__(dump_roots_ensure)))
+static void dump_roots_ensure(void *ptr)
+{
+        struct dump_roots_tmp *drt = (struct dump_roots_tmp *)ptr;
+        if (drt->root2id_fd >= 0)
+                xclose(drt->root2id_fd);
+        hdestroy(); // idempotent
+        if (drt->mm_ptr && munmap(drt->mm_ptr, drt->sb.st_size))
+                err(EXIT_FAILURE, "BUG: munmap");
+        free(drt->entries);
+        fbuf_ensure(&drt->wbuf);
+}
+
+static bool root2ids_str(struct fbuf *root_ids, struct dump_roots_tmp *drt,
+                        Xapian::Document *doc)
+{
+        if (!fbuf_init(root_ids)) return false;
+
+        bool ok = true;
+        Xapian::TermIterator cur = doc->termlist_begin();
+        Xapian::TermIterator end = doc->termlist_end();
+        ENTRY e, *ep;
+        for (cur.skip_to("G"); cur != end; cur++) {
+                std::string tn = *cur;
+                if (!starts_with(&tn, "G", 1))
+                        continue;
+                union { const char *in; char *out; } u;
+                u.in = tn.c_str() + 1;
+                e.key = u.out;
+                ep = hsearch(e, FIND);
+                if (!ep) {
+                        warnx("hsearch miss `%s'", e.key);
+                        return false;
+                }
+                // ep->data is a NUL-terminated string matching /[0-9]+/
+                fputc(' ', root_ids->fp);
+                fputs((const char *)ep->data, root_ids->fp);
+        }
+        fputc('\n', root_ids->fp);
+        if (ferror(root_ids->fp) | fclose(root_ids->fp)) {
+                perror("ferror|fclose(root_ids)");
+                ok = false;
+        }
+        root_ids->fp = NULL;
+        return ok;
+}
+
+// writes term values matching @pfx for a given @doc, ending the line
+// with the contents of @root_ids
+static void dump_roots_term(struct req *req, const char *pfx,
+                                struct dump_roots_tmp *drt,
+                                struct fbuf *root_ids,
+                                Xapian::Document *doc)
+{
+        Xapian::TermIterator cur = doc->termlist_begin();
+        Xapian::TermIterator end = doc->termlist_end();
+        size_t pfx_len = strlen(pfx);
+
+        for (cur.skip_to(pfx); cur != end; cur++) {
+                std::string tn = *cur;
+                if (!starts_with(&tn, pfx, pfx_len))
+                        continue;
+                fputs(tn.c_str() + pfx_len, drt->wbuf.fp);
+                fwrite(root_ids->ptr, root_ids->len, 1, drt->wbuf.fp);
+                ++req->nr_out;
+        }
+}
+
+// we may have lines which exceed PIPE_BUF, so we do our own
+// buffering and rely on flock(2), here
+static bool dump_roots_flush(struct req *req, struct dump_roots_tmp *drt)
+{
+        char *p;
+        int fd = fileno(req->fp[0]);
+        bool ok = true;
+
+        if (!drt->wbuf.fp) return true;
+        if (fd < 0) err(EXIT_FAILURE, "BUG: fileno");
+        if (fclose(drt->wbuf.fp)) {
+                warn("fclose(drt->wbuf.fp)"); // malloc failure?
+                return false;
+        }
+        drt->wbuf.fp = NULL;
+        if (!drt->wbuf.len) goto done_free;
+        if (flock(drt->root2id_fd, LOCK_EX)) {
+                perror("LOCK_EX");
+                return false;
+        }
+        p = drt->wbuf.ptr;
+        do {
+                ssize_t n = write(fd, p, drt->wbuf.len);
+                if (n > 0) {
+                        drt->wbuf.len -= n;
+                        p += n;
+                } else {
+                        perror(n ? "write" : "write (zero bytes)");
+                        return false;
+                }
+        } while (drt->wbuf.len);
+        if (flock(drt->root2id_fd, LOCK_UN)) {
+                perror("LOCK_UN");
+                return false;
+        }
+done_free:
+        free(drt->wbuf.ptr);
+        drt->wbuf.ptr = NULL;
+        return ok;
+}
+
+static bool cmd_dump_roots(struct req *req)
+{
+        CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt { .root2id_fd = -1 };
+        if ((optind + 1) >= req->argc) {
+                warnx("usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR");
+                return false; // need file + qry_str
+        }
+        if (!req->pfxc) {
+                warnx("dump_roots requires -A PREFIX");
+                return false;
+        }
+        const char *root2id_file = req->argv[optind];
+        drt.root2id_fd = open(root2id_file, O_RDONLY);
+        if (drt.root2id_fd < 0) {
+                warn("open(%s)", root2id_file);
+                return false;
+        }
+        if (fstat(drt.root2id_fd, &drt.sb)) {
+                warn("fstat(%s)", root2id_file);
+                return false;
+        }
+        // each entry is at least 43 bytes ({OIDHEX}\0{INT}\0),
+        // so /32 overestimates the number of expected entries by
+        // ~%25 (as recommended by Linux hcreate(3) manpage)
+        size_t est = (drt.sb.st_size / 32) + 1;
+        if ((uint64_t)drt.sb.st_size > (uint64_t)SIZE_MAX) {
+                warnx("%s size too big (%lld bytes > %zu)", root2id_file,
+                        (long long)drt.sb.st_size, SIZE_MAX);
+                return false;
+        }
+        drt.mm_ptr = mmap(NULL, drt.sb.st_size, PROT_READ,
+                                MAP_PRIVATE, drt.root2id_fd, 0);
+        if (drt.mm_ptr == MAP_FAILED) {
+                warn("mmap(%s)", root2id_file);
+                return false;
+        }
+        drt.entries = (char **)calloc(est * 2, sizeof(char *));
+        if (!drt.entries) {
+                warn("calloc(%zu * 2, %zu)", est, sizeof(char *));
+                return false;
+        }
+        size_t tot = split2argv(drt.entries, (char *)drt.mm_ptr,
+                                drt.sb.st_size, est * 2);
+        if (tot <= 0) return false; // split2argv already warned on error
+        if (!hcreate(est)) {
+                warn("hcreate(%zu)", est);
+                return false;
+        }
+        for (size_t i = 0; i < tot; ) {
+                ENTRY e;
+                e.key = drt.entries[i++];
+                e.data = drt.entries[i++];
+                if (!hsearch(e, ENTER)) {
+                        warn("hsearch(%s => %s, ENTER)", e.key,
+                                (const char *)e.data);
+                        return false;
+                }
+        }
+        req->asc = true;
+        req->sort_col = -1;
+        Xapian::MSet mset = commit_mset(req, req->argv[optind + 1]);
+        for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
+                CLEANUP_FBUF struct fbuf root_ids = { 0 };
+                if (!drt.wbuf.fp && !fbuf_init(&drt.wbuf))
+                        return false;
+                try {
+                        Xapian::Document doc = i.get_document();
+                        if (!root2ids_str(&root_ids, &drt, &doc))
+                                return false;
+                        for (int p = 0; p < req->pfxc; p++)
+                                dump_roots_term(req, req->pfxv[p], &drt,
+                                                &root_ids, &doc);
+                } catch (const Xapian::Error & e) {
+                        fprintf(orig_err, "W: %s (#%ld)\n",
+                                e.get_description().c_str(), (long)(*i));
+                        continue;
+                }
+                if (!(req->nr_out & 0x3fff) && !dump_roots_flush(req, &drt))
+                        return false;
+        }
+        if (!dump_roots_flush(req, &drt))
+                return false;
+        if (req->fp[1])
+                fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
+                        (unsigned long long)mset.size(), req->nr_out);
         return true;
 }
 
@@ -228,7 +511,8 @@ static const struct cmd_entry {
         cmd fn;
 } cmds[] = { // should be small enough to not need bsearch || gperf
         // most common commands first
-        CMD(dump_ibx),
+        CMD(dump_ibx), // many inboxes
+        CMD(dump_roots), // per-cidx shard
         CMD(test_inspect), // least common commands last
 };
 
@@ -240,12 +524,6 @@ union my_cmsg {
         char pad[sizeof(struct cmsghdr) + 16 + RECV_FD_SPACE];
 };
 
-static void xclose(int fd)
-{
-        if (close(fd) < 0 && errno != EINTR)
-                err(EXIT_FAILURE, "BUG: close");
-}
-
 static bool recv_req(struct req *req, char *rbuf, size_t *len)
 {
         union my_cmsg cmsg = { 0 };
@@ -306,28 +584,6 @@ static bool recv_req(struct req *req, char *rbuf, size_t *len)
         return false;
 }
 
-#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
-static int split2argv(char **dst, char *buf, size_t len, size_t limit)
-{
-        if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) {
-                warnx("bogus argument given");
-                return 0;
-        }
-        size_t nr = 0;
-        char *c = buf;
-        for (size_t i = 1; i < len; i++) {
-                if (!buf[i]) {
-                        dst[nr++] = c;
-                        c = buf + i + 1;
-                }
-                if (nr == limit) {
-                        warnx("too many args: %zu", nr);
-                        return 0;
-                }
-        }
-        return (int)nr;
-}
-
 static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch
 {
         const struct srch *a = (const struct srch *)pa;
@@ -355,7 +611,7 @@ static bool srch_init(struct req *req)
         char *dirv[MY_ARG_MAX];
         int i;
         struct srch *srch = req->srch;
-        int dirc = SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
+        int dirc = (int)SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
         const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE;
         srch->qp_flags = FLAG_PHRASE |
                         Xapian::QueryParser::FLAG_BOOLEAN |
@@ -538,7 +794,7 @@ static void recv_loop(void) // worker process loop
                                 perror("W: setlinebuf(req.fp[1])");
                         stderr = req.fp[1];
                 }
-                req.argc = SPLIT2ARGV(req.argv, rbuf, len);
+                req.argc = (int)SPLIT2ARGV(req.argv, rbuf, len);
                 if (req.argc > 0)
                         dispatch(&req);
                 if (ferror(req.fp[0]) | fclose(req.fp[0]))