From 116c399b665df76f766da4e6967455def2d290fd Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 24 Aug 2023 01:22:36 +0000 Subject: xap_helper: reopen+retry in MSetIterator loops It's possible to hit a DatabaseModifiedError while iterating through an MSet. We'll retry in these cases and cleanup some code in both the Perl and C++ implementations. --- lib/PublicInbox/XapHelper.pm | 102 ++++++++++++++++++++++++++++--------------- lib/PublicInbox/xap_helper.h | 101 ++++++++++++++++++++++++++++-------------- 2 files changed, 136 insertions(+), 67 deletions(-) (limited to 'lib') diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm index c80be810..ef6a47a3 100644 --- a/lib/PublicInbox/XapHelper.pm +++ b/lib/PublicInbox/XapHelper.pm @@ -36,28 +36,74 @@ sub cmd_test_inspect { ($req->{srch}->has_threadid ? 1 : 0) } +sub iter_retry_check ($) { + die unless ref($@) =~ /\bDatabaseModifiedError\b/; + $_[0]->{srch}->reopen; + undef; # retries +} + +sub dump_ibx_iter ($$$) { + my ($req, $ibx_id, $it) = @_; + my $out = $req->{0}; + eval { + my $doc = $it->get_document; + for my $p (@{$req->{A}}) { + for (xap_terms($p, $doc)) { + print $out "$_ $ibx_id\n" or die "print: $!"; + ++$req->{nr_out}; + } + } + }; + $@ ? iter_retry_check($req) : 0; +} + +sub emit_mset_stats ($$) { + my ($req, $mset) = @_; + my $err = $req->{1} or return; + say $err 'mset.size='.$mset->size.' nr_out='.$req->{nr_out} +} + sub cmd_dump_ibx { my ($req, $ibx_id, $qry_str) = @_; $qry_str // return warn('usage: dump_ibx [OPTIONS] IBX_ID QRY_STR'); - my @pfx = @{$req->{A}} or return warn('dump_ibx requires -A PREFIX'); + $req->{A} or return warn('dump_ibx requires -A PREFIX'); my $max = $req->{srch}->{xdb}->get_doccount; my $opt = { relevance => -1, limit => $max, offset => $req->{o} // 0 }; $opt->{eidx_key} = $req->{O} if defined $req->{O}; my $mset = $req->{srch}->mset($qry_str, $opt); - my $out = $req->{0}; - $out->autoflush(1); - my $nr = 0; + $req->{0}->autoflush(1); for my $it ($mset->items) { + for (my $t = 10; $t > 0; --$t) { + $t = dump_ibx_iter($req, $ibx_id, $it) // $t; + } + } + if (my $err = $req->{1}) { + say $err 'mset.size='.$mset->size.' nr_out='.$req->{nr_out} + } +} + +sub dump_roots_iter ($$$) { + my ($req, $root2id, $it) = @_; + eval { my $doc = $it->get_document; - for my $p (@pfx) { + my $G = join(' ', map { $root2id->{$_} } xap_terms('G', $doc)); + for my $p (@{$req->{A}}) { for (xap_terms($p, $doc)) { - print $out "$_ $ibx_id\n" or die "print: $!"; - ++$nr; + $req->{wbuf} .= "$_ $G\n"; + ++$req->{nr_out}; } } - } - if (my $err = $req->{1}) { - say $err 'mset.size='.$mset->size.' nr_out='.$nr + }; + $@ ? iter_retry_check($req) : 0; +} + +sub dump_roots_flush ($$) { + my ($req, $fh) = @_; + if ($req->{wbuf} ne '') { + flock($fh, LOCK_EX) or die "flock: $!"; + print { $req->{0} } $req->{wbuf} or die "print: $!"; + flock($fh, LOCK_UN) or die "flock: $!"; + $req->{wbuf} = ''; } } @@ -65,44 +111,29 @@ sub cmd_dump_roots { my ($req, $root2id_file, $qry_str) = @_; $qry_str // return warn('usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR'); - my @pfx = @{$req->{A}} or return warn('dump_roots requires -A PREFIX'); + $req->{A} or return warn('dump_roots requires -A PREFIX'); open my $fh, '<', $root2id_file or die "open($root2id_file): $!"; - my %root2id; # record format: $OIDHEX "\0" uint32_t + my $root2id; # record format: $OIDHEX "\0" uint32_t my @x = split(/\0/, do { local $/; <$fh> } // die "readline: $!"); while (@x) { my $oidhex = shift @x; - $root2id{$oidhex} = shift @x; + $root2id->{$oidhex} = shift @x; } my $opt = { relevance => -1, limit => $req->{'m'}, offset => $req->{o} // 0 }; my $mset = $req->{srch}->mset($qry_str, $opt); $req->{0}->autoflush(1); - my $buf = ''; - my $nr = 0; + $req->{wbuf} = ''; for my $it ($mset->items) { - my $doc = $it->get_document; - my $G = join(' ', map { $root2id{$_} } xap_terms('G', $doc)); - for my $p (@pfx) { - for (xap_terms($p, $doc)) { - $buf .= "$_ $G\n"; - ++$nr; - } + for (my $t = 10; $t > 0; --$t) { + $t = dump_roots_iter($req, $root2id, $it) // $t; } - if (!($nr & 0x3fff)) { - flock($fh, LOCK_EX) or die "flock: $!"; - print { $req->{0} } $buf or die "print: $!"; - flock($fh, LOCK_UN) or die "flock: $!"; - $buf = ''; + if (!($req->{nr_out} & 0x3fff)) { + dump_roots_flush($req, $fh); } } - if ($buf ne '') { - flock($fh, LOCK_EX) or die "flock: $!"; - print { $req->{0} } $buf or die "print: $!"; - flock($fh, LOCK_UN) or die "flock: $!"; - } - if (my $err = $req->{1}) { - say $err 'mset.size='.$mset->size.' nr_out='.$nr - } + dump_roots_flush($req, $fh); + emit_mset_stats($req, $mset); } sub dispatch { @@ -153,6 +184,7 @@ sub recv_loop { next; } my @argv = split(/\0/, $rbuf); + $req->{nr_out} = 0; eval { $req->dispatch(@argv) } if @argv; } } diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h index c9b4e0cc..e3ccfd41 100644 --- a/lib/PublicInbox/xap_helper.h +++ b/lib/PublicInbox/xap_helper.h @@ -63,6 +63,12 @@ static void code_nrp_init(void); static void qp_init_mail_search(Xapian::QueryParser *); static void qp_init_code_search(Xapian::QueryParser *); +enum exc_iter { + ITER_OK = 0, + ITER_RETRY, + ITER_ABORT +}; + struct srch { int paths_len; // int for comparisons unsigned qp_flags; @@ -196,6 +202,13 @@ static Xapian::MSet commit_mset(struct req *req, const char *qry_str) return enquire_mset(req, &enq); } +static void emit_mset_stats(struct req *req, const Xapian::MSet *mset) +{ + if (req->fp[1]) + fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n", + (unsigned long long)mset->size(), req->nr_out); +} + static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len) { return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len); @@ -224,6 +237,20 @@ static int my_setlinebuf(FILE *fp) // glibc setlinebuf(3) can't report errors return setvbuf(fp, NULL, _IOLBF, 0); } +static enum exc_iter dump_ibx_iter(struct req *req, const char *ibx_id, + Xapian::MSetIterator *i) +{ + try { + Xapian::Document doc = i->get_document(); + for (int p = 0; p < req->pfxc; p++) + dump_ibx_term(req, req->pfxv[p], &doc, ibx_id); + } catch (const Xapian::DatabaseModifiedError & e) { + req->srch->db->reopen(); + return ITER_RETRY; + } + return ITER_OK; +} + static bool cmd_dump_ibx(struct req *req) { if ((optind + 1) >= req->argc) { @@ -243,20 +270,18 @@ static bool cmd_dump_ibx(struct req *req) req->asc = true; req->sort_col = -1; Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]); + + // @UNIQ_FOLD in CodeSearchIdx.pm can handle duplicate lines fine + // in case we need to retry on DB reopens for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) { - try { - Xapian::Document doc = i.get_document(); - for (int p = 0; p < req->pfxc; p++) - dump_ibx_term(req, req->pfxv[p], &doc, ibx_id); - } catch (const Xapian::Error & e) { - fprintf(orig_err, "W: %s (#%ld)\n", - e.get_description().c_str(), (long)(*i)); - continue; - } + for (int t = 10; t > 0; --t) + switch (dump_ibx_iter(req, ibx_id, &i)) { + case ITER_OK: t = 0; break; // leave inner loop + case ITER_RETRY: break; // continue for-loop + case ITER_ABORT: return false; // error + } } - if (req->fp[1]) - fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n", - (unsigned long long)mset.size(), req->nr_out); + emit_mset_stats(req, &mset); return true; } @@ -312,8 +337,7 @@ static void dump_roots_ensure(void *ptr) fbuf_ensure(&drt->wbuf); } -static bool root2ids_str(struct fbuf *root_ids, struct dump_roots_tmp *drt, - Xapian::Document *doc) +static bool root2ids_str(struct fbuf *root_ids, Xapian::Document *doc) { if (!fbuf_init(root_ids)) return false; @@ -408,9 +432,28 @@ done_free: return ok; } +static enum exc_iter dump_roots_iter(struct req *req, + struct dump_roots_tmp *drt, + Xapian::MSetIterator *i) +{ + CLEANUP_FBUF struct fbuf root_ids = { 0 }; // " $ID0 $ID1 $IDx..\n" + try { + Xapian::Document doc = i->get_document(); + if (!root2ids_str(&root_ids, &doc)) + return ITER_ABORT; // bad request, abort + for (int p = 0; p < req->pfxc; p++) + dump_roots_term(req, req->pfxv[p], drt, + &root_ids, &doc); + } catch (const Xapian::DatabaseModifiedError & e) { + req->srch->db->reopen(); + return ITER_RETRY; + } + return ITER_OK; +} + static bool cmd_dump_roots(struct req *req) { - CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt { .root2id_fd = -1 }; + CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt = { .root2id_fd = -1 }; if ((optind + 1) >= req->argc) { warnx("usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR"); return false; // need file + qry_str @@ -432,7 +475,7 @@ static bool cmd_dump_roots(struct req *req) // each entry is at least 43 bytes ({OIDHEX}\0{INT}\0), // so /32 overestimates the number of expected entries by // ~%25 (as recommended by Linux hcreate(3) manpage) - size_t est = (drt.sb.st_size / 32) + 1; + size_t est = (drt.sb.st_size / 32) + 1; //+1 for "\0" termination if ((uint64_t)drt.sb.st_size > (uint64_t)SIZE_MAX) { warnx("%s size too big (%lld bytes > %zu)", root2id_file, (long long)drt.sb.st_size, SIZE_MAX); @@ -469,30 +512,24 @@ static bool cmd_dump_roots(struct req *req) req->asc = true; req->sort_col = -1; Xapian::MSet mset = commit_mset(req, req->argv[optind + 1]); + + // @UNIQ_FOLD in CodeSearchIdx.pm can handle duplicate lines fine + // in case we need to retry on DB reopens for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) { - CLEANUP_FBUF struct fbuf root_ids = { 0 }; if (!drt.wbuf.fp && !fbuf_init(&drt.wbuf)) return false; - try { - Xapian::Document doc = i.get_document(); - if (!root2ids_str(&root_ids, &drt, &doc)) - return false; - for (int p = 0; p < req->pfxc; p++) - dump_roots_term(req, req->pfxv[p], &drt, - &root_ids, &doc); - } catch (const Xapian::Error & e) { - fprintf(orig_err, "W: %s (#%ld)\n", - e.get_description().c_str(), (long)(*i)); - continue; - } + for (int t = 10; t > 0; --t) + switch (dump_roots_iter(req, &drt, &i)) { + case ITER_OK: t = 0; break; // leave inner loop + case ITER_RETRY: break; // continue for-loop + case ITER_ABORT: return false; // error + } if (!(req->nr_out & 0x3fff) && !dump_roots_flush(req, &drt)) return false; } if (!dump_roots_flush(req, &drt)) return false; - if (req->fp[1]) - fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n", - (unsigned long long)mset.size(), req->nr_out); + emit_mset_stats(req, &mset); return true; } -- cgit v1.2.3-24-ge0c7