From: Eric Wong <e@80x24.org>
To: spew@80x24.org
Subject: [PATCH 2/2] WIP
Date: Sat, 13 May 2023 22:16:32 +0000 [thread overview]
Message-ID: <20230513221632.145626-2-e@80x24.org> (raw)
In-Reply-To: <20230513221632.145626-1-e@80x24.org>
---
lib/PublicInbox/CodeSearchIdx.pm | 233 +++++++++++++++++++++++++++----
lib/PublicInbox/Search.pm | 2 +-
script/public-inbox-cindex | 3 +-
3 files changed, 208 insertions(+), 30 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 22097d03..b1492a22 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -31,7 +31,7 @@ use PublicInbox::OnDestroy;
use PublicInbox::CidxLogP;
use PublicInbox::CidxComm;
use PublicInbox::Git qw(%OFMT2HEXLEN);
-use Socket qw(MSG_EOR);
+use Socket qw(MSG_EOR AF_UNIX SOCK_SEQPACKET);
use Carp ();
our (
$LIVE, # pid => cmd
@@ -54,8 +54,11 @@ our (
%ALT_FH, # hexlen => tmp IO for TMPDIR git alternates
$TMPDIR, # File::Temp->newdir object for prune
@PRUNE_QUEUE, # GIT_DIRs to prepare for pruning
+ %TODO, @IBXQ, @IBXISH,
+ @JOIN, # join(1) command for associate
$PRUNE_ENV, # env for awk(1), comm(1), sort(1) commands during prune
@AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands
+ @ASSOC_PFX
);
# stop walking history if we see >$SEEN_MAX existing commits, this assumes
@@ -63,6 +66,9 @@ our (
# git walks commits quickly if it doesn't have to read trees
our $SEEN_MAX = 100000;
+# window for commits/emails to determine a inbox <-> coderepo association
+my $ASSOC_MAX = 50000;
+
our @PRUNE_BATCH = qw(git _ cat-file --batch-all-objects --batch-check);
# TODO: do we care about committer name + email? or tree OID?
@@ -454,6 +460,136 @@ sub shard_commit { # via wq_io_do
send($op_p, "shard_done $self->{shard}", MSG_EOR);
}
+sub dump_roots_done { # via PktOp on dump_shard_roots completion
+ my ($self, $associate, $n, $sort_err) = @_;
+ warn "# dump_roots_done[$n] $sort_err";
+}
+
+# dump the patchids of each shard: $XDFID $ROOT1 $ROOT2..
+sub dump_shard_roots { # via wq_io_do
+ my ($self) = @_;
+ my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
+ local $self->{xdb};
+ my $xdb = $self->idx_acquire;
+ my $qry = $PublicInbox::Search::X{Query}->new(
+ PublicInbox::Search::OP_FILTER(),
+ 'T'.'c');
+ my $enq = $PublicInbox::Search::X{Enquire}->new($xdb);
+ $enq->set_query($qry);
+ $enq->set_weighting_scheme($PublicInbox::Search::X{BoolWeight}->new);
+ $enq->set_docid_order($PublicInbox::Search::ENQ_DESCENDING);
+ my ($off, $lim) = (0, 10000);
+ my ($mset, $doc, @roots);
+ pipe(my ($r, $w)) or die "pipe: $!";
+ open my $fh, '>', "$TMPDIR/$self->{shard}.to-roots" or die "open: $!";
+ my @sort = (@SORT, '-k1,1');
+ my $pid = spawn(\@sort, undef, { 0 => $r, 1 => $fh });
+ do {
+ $mset = $enq->get_mset($off, $lim);
+ for my $x ($mset->items) {
+ $doc = $x->get_document;
+ @roots = xap_terms('G', $doc);
+ say $w "$_ @roots" for (xap_terms('XDFID', $doc));
+ }
+ } while ($mset->size != 0 && ($off += $lim));
+ close $w or die "close: $!";
+ waitpid($pid, 0) == $pid or die "waitpid($pid): $!";
+ send($op_p, "dump_roots_done $self->{shard} $?", MSG_EOR);
+}
+
+sub dump_roots_patchids {
+ my ($self, $associate) = @_;
+ progress($self, 'dumping patchids from coderepos');
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{dump_roots_done} = [ $self, $associate ];
+ for (@IDX_SHARDS) { $_->wq_io_do('dump_shard_roots', [ $p->{op_p} ]) }
+}
+
+sub dump_ibx_patchids_done {
+ my ($self, $pid, $n, $sort_err) = @_;
+ warn "# dump_ibx_patchids_done[$n] $sort_err";
+}
+
+sub recv_ibx { # wq_io_do
+ my ($self) = @_;
+ my $op_p = delete($self->{0}) // die 'BUG: no op_p';
+ my $r_ibx = delete($self->{1}) // die 'BUG: no {r_ibx}';
+ my $dst = "$TMPDIR/$self->{shard}.to_ibx_id";
+ pipe(my ($r, $w)) or die "pipe: $!";
+ open my $fh, '>', "$dst.tmp" or die "open: $!";
+ my @sort = (@SORT, '-k1,1');
+ my $pid = spawn(\@sort, undef, { 0 => $r, 1 => $fh });
+ close $r or die "close: $!";
+ while (1) {
+ recv($r_ibx, my $d, 4096, 0) // die "recv: $!";
+ last if $d eq ''; # EOF
+ $d =~ s/\A([0-9]+) // or die 'BUG: no ibx_idx';
+ my $ibx_id = $1;
+ my ($ibx, $mset, $doc);
+ if (-f "$d/ei.lock") {
+ $ibx = PublicInbox::ExtSearch->new($d);
+ } elsif (-f "$d/inbox.lock" || -d "$d/public-inbox") {
+ $ibx = PublicInbox::Inbox->new({ inboxdir => $d });
+ } else {
+ warn "W: unknown dir: $d (ignoring)\n";
+ next;
+ }
+ my $max = $self->{-opt}->{'associate-max'} // $ASSOC_MAX;
+ $max = $ASSOC_MAX if !$max;
+ $max = ((2 ** 31) - 1) if $max < 0;
+ my $srch = $ibx->search or do {
+ warn "W: no search index: $d (ignoring)\n";
+ next;
+ };
+ my $opt = { limit => 10000, offset => 0, relevance => -2 };
+ do {
+ $max > 0 && $max < $opt->{limit} and
+ $opt->{limit} = $max;
+ last if $opt->{limit} <= 0;
+ $mset = $srch->mset('z:0..', $opt);
+ for my $x ($mset->items) {
+ my $doc = $x->get_document;
+ for my $p (@ASSOC_PFX) {
+ for (xap_terms($p, $doc)) {
+ print $w "$_ $ibx_id\n";
+ }
+ }
+ }
+ } while ($mset->size && ($opt->{offset} += $opt->{limit}));
+ }
+ close $w or die "close(sort): $!";
+ waitpid($pid, 0) == $pid or die "waitpid($pid): $!";
+ if ($? == 0) {
+ rename("$dst.tmp", $dst) or
+ warn "rename($dst.tmp => $dst): $!";
+ } else {
+ warn "@sort failed for shard #$self->{shard}: $?";
+ unlink "$dst.tmp";
+ }
+
+ send($op_p, "recv_ibx_done $self->{shard}", MSG_EOR);
+}
+
+sub dump_ibx_patchids {
+ my ($self, $ibx) = @_;
+ my $associate = $TODO{associate} // return;
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{dump_ibx_patchids_done} = [ $self, $associate ];
+ for my $s (@IDX_SHARDS) {
+ $s->wq_io_do('dump_ibx_patchids', [ $p->{op_p} ], $ibx);
+ }
+}
+
+# repurpose shard workers to dump inbox patchids with perfect balance
+sub dump_ibx_start {
+ my ($self) = @_;
+ my ($r, $w);
+ socketpair($r, $w, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{dump_ibx_patchids_done} = [ $self, $TODO{associate} ];
+ $_->wq_io_do('recv_ibx', [ $p->{op_p}, $r ]) for @IDX_SHARDS;
+}
+
sub index_next ($) {
my ($self) = @_;
return if $DO_QUIT;
@@ -465,6 +601,10 @@ sub index_next ($) {
$self, $git);
fp_start($self, $git, $prep_repo);
ct_start($self, $git, $prep_repo);
+ } elsif ($TMPDIR) {
+ delete $TODO{dump_ibx_start};
+ return dump_ibx_patchids($self, shift @IBXQ) if @IBXQ;
+ dump_roots_patchids($self, delete($TODO{associate}) // return);
}
# else: wait for shards_active (post_loop_do) callback
}
@@ -629,7 +769,6 @@ EOM
sub scan_git_dirs ($) {
my ($self) = @_;
@$GIT_TODO = @{$self->{git_dirs}};
- index_next($self) for (1..$LIVE_JOBS);
}
sub prune_do { # via wq_io_do in IDX_SHARDS
@@ -717,6 +856,7 @@ sub prep_alternate_end { # awaitpid callback for config extensions.objectFormat
E: ignoring objdir=$objdir, unknown extensions.objectFormat=$fmt
EOM
unless ($ALT_FH{$hexlen}) {
+ require PublicInbox::Import;
my $git_dir = "$TMPDIR/hexlen$hexlen.git";
PublicInbox::Import::init_bare($git_dir, 'cidx-all', $fmt);
my $f = "$git_dir/objects/info/alternates";
@@ -739,52 +879,76 @@ sub prep_alternate_start {
awaitpid($pid, \&prep_alternate_end, $o, $out, $run_prune);
}
-sub prune_cmd_done { # awaitpid cb for sort, xapian-delve, sed failures
+sub cmd_done { # awaitpid cb for sort, xapian-delve, sed failures
my ($pid, $cmd, $run_prune) = @_;
$? and die "@$cmd failed: \$?=$?";
}
+sub associate { # join(1)
+ my ($self) = @_;
+ warn "# associating";
+ # TODO
+ system "ls -Rl $TMPDIR >&2";
+ warn "# Waiting for $TMPDIR/cont";
+ sleep(1) until -f "$TMPDIR/cont";
+}
+
+sub require_progs {
+ my $op = shift;
+ while (my ($x, $argv) = splice(@_, 0, 2)) {
+ my $e = $x;
+ $e =~ tr/a-z-/A-Z_/;
+ my $c = $ENV{$e} // $x;
+ $argv->[0] //= which($c) // die "E: `$x' required for --$op\n";
+ }
+}
+
+sub init_associate ($) {
+ my ($self) = @_;
+ require PublicInbox::ExtSearch;
+ require PublicInbox::Inbox;
+ require_progs('associate', join => \@JOIN);
+ $TODO{associate} = PublicInbox::OnDestroy->new($$, \&associate, $self);
+ $TODO{dump_ibx_start} = PublicInbox::OnDestroy->new($$,
+ \&dump_ibx_start, $self, $TODO{associate});
+ my @unknown;
+ my @pfx = @{$self->{-opt}->{'associate-prefixes'} // [ 'patchid' ]};
+ for (map { split(/\s*,\s*/, $_) } @pfx) {
+ my $v = $PublicInbox::Search::PATCH_BOOL_COMMON{$_} //
+ push(@unknown, $_);
+ push(@ASSOC_PFX, $v);
+ }
+ die <<EOM if @unknown;
+--associate-prefixes contains unsupported prefixes: @unknown
+EOM
+}
+
sub init_prune ($) {
my ($self) = @_;
return (@$PRUNE_DONE = map { 1 } @IDX_SHARDS) if !$self->{-opt}->{prune};
- require File::Temp;
- require PublicInbox::Import;
- $TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
-
# Dealing with millions of commits here at once, so use faster tools.
# xapian-delve is nearly an order-of-magnitude faster than Xapian Perl
# bindings. sed/awk are faster than Perl for simple stream ops, and
# sort+comm are more memory-efficient with gigantic lists
my @delve = (undef, qw(-A Q -1));
my @sed = (undef, '-ne', 's/^Q//p');
- @SORT = (undef, '-u');
@COMM = (undef, qw(-2 -3 indexed_commits -));
@AWK = (undef, '$2 == "commit" { print $1 }'); # --batch-check output
- my @x = ('xapian-delve' => \@delve, sed => \@sed,
- sort => \@SORT, comm => \@COMM, awk => \@AWK);
- while (my ($x, $argv) = splice(@x, 0, 2)) {
- my $e = $x;
- $e =~ tr/a-z-/A-Z_/;
- my $c = $ENV{$e} // $x;
- $argv->[0] = which($c) // die "E: `$x' required for --prune\n";
- }
+ require_progs('prune', 'xapian-delve' => \@delve, sed => \@sed,
+ comm => \@COMM, awk => \@AWK);
for (0..$#IDX_SHARDS) { push @delve, "$self->{xpfx}/$_" }
- for (qw(parallel compress-program buffer-size)) { # GNU sort options
- my $v = $self->{-opt}->{"sort-$_"};
- push @SORT, "--$_=$v" if defined $v;
- }
my $run_prune = PublicInbox::OnDestroy->new($$, \&run_prune, $self);
pipe(my ($sed_in, $delve_out)) or die "pipe: $!";
pipe(my ($sort_in, $sed_out)) or die "pipe: $!";
open(my $sort_out, '+>', "$TMPDIR/indexed_commits") or die "open: $!";
$PRUNE_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' };
my $pid = spawn(\@SORT, $PRUNE_ENV, { 0 => $sort_in, 1 => $sort_out });
- awaitpid($pid, \&prune_cmd_done, \@SORT, $run_prune);
+ awaitpid($pid, \&cmd_done, \@SORT, $run_prune);
$pid = spawn(\@sed, $PRUNE_ENV, { 0 => $sed_in, 1 => $sed_out });
- awaitpid($pid, \&prune_cmd_done, \@sed, $run_prune);
+ awaitpid($pid, \&cmd_done, \@sed, $run_prune);
$pid = spawn(\@delve, undef, { 1 => $delve_out });
- awaitpid($pid, \&prune_cmd_done, \@delve, $run_prune);
+ awaitpid($pid, \&cmd_done, \@delve, $run_prune);
@PRUNE_QUEUE = @{$self->{git_dirs}};
for (1..$LIVE_JOBS) {
prep_alternate_start(shift(@PRUNE_QUEUE) // last, $run_prune);
@@ -814,9 +978,9 @@ sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done
{ 0 => $sort_in, 1 => $sort_out });
my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $PRUNE_ENV,
{ 0 => $comm_in, -C => "$TMPDIR" });
- awaitpid($awk_pid, \&prune_cmd_done, \@AWK);
- awaitpid($sort_pid, \&prune_cmd_done, \@SORT);
- awaitpid($comm_pid, \&prune_cmd_done, \@COMM);
+ awaitpid($awk_pid, \&cmd_done, \@AWK);
+ awaitpid($sort_pid, \&cmd_done, \@SORT);
+ awaitpid($comm_pid, \&cmd_done, \@COMM);
PublicInbox::CidxComm->new($comm_rd, $self); # calls cidx_read_comm
my $git_ver = PublicInbox::Git::git_version();
push @PRUNE_BATCH, '--buffer' if $git_ver ge v2.6;
@@ -868,10 +1032,21 @@ sub cidx_run { # main entry point
local $IDX_TODO = [];
local $GIT_TODO = [];
local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
- $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, @SORT, $PRUNE_ENV);
+ $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $PRUNE_ENV,
+ %TODO, @IBXQ, @IBXISH, @JOIN, @ASSOC_PFX);
local $BATCH_BYTES = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
- local @IDX_SHARDS = cidx_init($self);
+ local @SORT = (undef, '-u');
+ if (grep { $_ } @{$self->{-opt}}{qw(prune associate)}) {
+ require File::Temp;
+ $TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
+ require_progs('(prune|associate)', sort => \@SORT);
+ for (qw(parallel compress-program buffer-size)) { # GNU sort
+ my $v = $self->{-opt}->{"sort-$_"};
+ push @SORT, "--$_=$v" if defined $v;
+ }
+ }
+ local @IDX_SHARDS = cidx_init($self); # forks
local $self->{current_info} = '';
local $MY_SIG = {
CHLD => \&PublicInbox::DS::enqueue_reap,
@@ -920,7 +1095,9 @@ sub cidx_run { # main entry point
PublicInbox::IPC::detect_nproc() || 2;
local @RDONLY_XDB = $self->xdb_shards_flat;
init_prune($self);
+ init_associate($self) if $self->{-opt}->{associate};
scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
+ index_next($self) for (1..$LIVE_JOBS);
# FreeBSD ignores/discards SIGCHLD while signals are blocked and
# EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending
diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm
index 6986cb88..36b6dce8 100644
--- a/lib/PublicInbox/Search.pm
+++ b/lib/PublicInbox/Search.pm
@@ -573,7 +573,7 @@ sub all_terms {
my %ret;
for (; $cur != $end; $cur++) {
my $tn = $cur->get_termname;
- index($tn, $pfx) == 0 and
+ # index($tn, $pfx) == 0 and
$ret{substr($tn, length($pfx))} = undef;
}
wantarray ? (sort keys %ret) : \%ret;
diff --git a/script/public-inbox-cindex b/script/public-inbox-cindex
index 2f7796e7..03a87685 100755
--- a/script/public-inbox-cindex
+++ b/script/public-inbox-cindex
@@ -26,8 +26,9 @@ See public-inbox-cindex(1) man page for full documentation.
EOF
my $opt = { fsync => 1, scan => 1 }; # --no-scan is hidden
GetOptions($opt, qw(quiet|q verbose|v+ reindex jobs|j=i fsync|sync! dangerous
- indexlevel|index-level|L=s
+ indexlevel|index-level|L=s associate associate-max=i
batch_size|batch-size=s max_size|max-size=s
+ include|I=s@ only=s@ all
project-list=s exclude=s@
sort-parallel=s sort-compress-program=s sort-buffer-size=s
d=s update|u scan! prune dry-run|n C=s@ help|h))
next prev parent reply other threads:[~2023-05-13 22:16 UTC|newest]
Thread overview: 11+ messages / expand[flat|nested] mbox.gz Atom feed top
2023-05-13 22:16 [PATCH 1/2] search: add comments wrt codesearch, reduce ops Eric Wong
2023-05-13 22:16 ` Eric Wong [this message]
-- strict thread matches above, loose matches on Subject: below --
2023-05-04 11:07 [PATCH 1/2] reuse open(2) from rb_file_load_ok on POSIX-like system Eric Wong
2023-05-04 11:07 ` [PATCH 2/2] wip Eric Wong
2023-04-17 23:06 [PATCH 1/2] reuse open(2) from rb_file_load_ok on POSIX-like system Eric Wong
2023-04-17 23:06 ` [PATCH 2/2] wip Eric Wong
2021-01-15 10:24 [PATCH 1/2] lei_to_mail: prepare Eric Wong
2021-01-15 10:24 ` [PATCH 2/2] wip Eric Wong
2020-03-31 6:42 [PATCH 1/2] v2writable: index Message-IDs w/ spaces properly Eric Wong
2020-03-31 6:42 ` [PATCH 2/2] WIP Eric Wong
2019-11-03 3:17 [PATCH 1/2] edit: hoist out prompt functions into sub functions Eric Wong
2019-11-03 3:17 ` [PATCH 2/2] wip Eric Wong
2019-09-23 3:16 [PATCH 1/2] edit: hoist out prompt functions into sub functions ew
2019-09-23 3:16 ` [PATCH 2/2] wip ew
2019-01-07 8:02 [PATCH 1/2] hoist out resolve_repo_dir from -index Eric Wong
2019-01-07 8:02 ` [PATCH 2/2] wip Eric Wong
2018-09-09 8:35 [PATCH 1/2] cont.c (fiber_memsize): do not rely on ROOT_FIBER_CONTEXT Eric Wong
2018-09-09 8:35 ` [PATCH 2/2] wip Eric Wong
2018-09-09 8:00 [PATCH 1/2] test_fiber: show crash on GC after forking Eric Wong
2018-09-09 8:00 ` [PATCH 2/2] wip Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20230513221632.145626-2-e@80x24.org \
--to=e@80x24.org \
--cc=spew@80x24.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).