From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-3.9 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF,SUBJ_ALL_CAPS, T_SCC_BODY_TEXT_LINE shortcircuit=no autolearn=no autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id CBDDB1F452 for ; Sat, 13 May 2023 22:16:32 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1684016192; bh=WsEsy4PtzCCEU3m8pWU+BBzQidrJ1xICO3BnJfnqPBs=; h=From:To:Subject:Date:In-Reply-To:References:From; b=B/5NY248XIwNhI01YpZOkLFI1/vHBct98b/CT3Oe/CIr+W+CxUhLxRnHuxlX9BzhE TkgNcXvyfS3rhs8R7tIGyDGqWmFCc0bEL1PF73QCL4YABUo+DHanZnbdCTk9FxW4+1 oi2AWpfXogGGZNjlCuevYvIasfS5qDudICGhoOac= From: Eric Wong To: spew@80x24.org Subject: [PATCH 2/2] WIP Date: Sat, 13 May 2023 22:16:32 +0000 Message-Id: <20230513221632.145626-2-e@80x24.org> In-Reply-To: <20230513221632.145626-1-e@80x24.org> References: <20230513221632.145626-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: --- lib/PublicInbox/CodeSearchIdx.pm | 233 +++++++++++++++++++++++++++---- lib/PublicInbox/Search.pm | 2 +- script/public-inbox-cindex | 3 +- 3 files changed, 208 insertions(+), 30 deletions(-) diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 22097d03..b1492a22 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -31,7 +31,7 @@ use PublicInbox::OnDestroy; use PublicInbox::CidxLogP; use PublicInbox::CidxComm; use PublicInbox::Git qw(%OFMT2HEXLEN); -use Socket qw(MSG_EOR); +use Socket qw(MSG_EOR AF_UNIX SOCK_SEQPACKET); use Carp (); our ( $LIVE, # pid => cmd @@ -54,8 +54,11 @@ our ( %ALT_FH, # hexlen => tmp IO for TMPDIR git alternates $TMPDIR, # File::Temp->newdir object for prune @PRUNE_QUEUE, # GIT_DIRs to prepare for pruning + %TODO, @IBXQ, @IBXISH, + @JOIN, # join(1) command for associate $PRUNE_ENV, # env for awk(1), comm(1), sort(1) commands during prune @AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands + @ASSOC_PFX ); # stop walking history if we see >$SEEN_MAX existing commits, this assumes @@ -63,6 +66,9 @@ our ( # git walks commits quickly if it doesn't have to read trees our $SEEN_MAX = 100000; +# window for commits/emails to determine a inbox <-> coderepo association +my $ASSOC_MAX = 50000; + our @PRUNE_BATCH = qw(git _ cat-file --batch-all-objects --batch-check); # TODO: do we care about committer name + email? or tree OID? @@ -454,6 +460,136 @@ sub shard_commit { # via wq_io_do send($op_p, "shard_done $self->{shard}", MSG_EOR); } +sub dump_roots_done { # via PktOp on dump_shard_roots completion + my ($self, $associate, $n, $sort_err) = @_; + warn "# dump_roots_done[$n] $sort_err"; +} + +# dump the patchids of each shard: $XDFID $ROOT1 $ROOT2.. +sub dump_shard_roots { # via wq_io_do + my ($self) = @_; + my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p'; + local $self->{xdb}; + my $xdb = $self->idx_acquire; + my $qry = $PublicInbox::Search::X{Query}->new( + PublicInbox::Search::OP_FILTER(), + 'T'.'c'); + my $enq = $PublicInbox::Search::X{Enquire}->new($xdb); + $enq->set_query($qry); + $enq->set_weighting_scheme($PublicInbox::Search::X{BoolWeight}->new); + $enq->set_docid_order($PublicInbox::Search::ENQ_DESCENDING); + my ($off, $lim) = (0, 10000); + my ($mset, $doc, @roots); + pipe(my ($r, $w)) or die "pipe: $!"; + open my $fh, '>', "$TMPDIR/$self->{shard}.to-roots" or die "open: $!"; + my @sort = (@SORT, '-k1,1'); + my $pid = spawn(\@sort, undef, { 0 => $r, 1 => $fh }); + do { + $mset = $enq->get_mset($off, $lim); + for my $x ($mset->items) { + $doc = $x->get_document; + @roots = xap_terms('G', $doc); + say $w "$_ @roots" for (xap_terms('XDFID', $doc)); + } + } while ($mset->size != 0 && ($off += $lim)); + close $w or die "close: $!"; + waitpid($pid, 0) == $pid or die "waitpid($pid): $!"; + send($op_p, "dump_roots_done $self->{shard} $?", MSG_EOR); +} + +sub dump_roots_patchids { + my ($self, $associate) = @_; + progress($self, 'dumping patchids from coderepos'); + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{dump_roots_done} = [ $self, $associate ]; + for (@IDX_SHARDS) { $_->wq_io_do('dump_shard_roots', [ $p->{op_p} ]) } +} + +sub dump_ibx_patchids_done { + my ($self, $pid, $n, $sort_err) = @_; + warn "# dump_ibx_patchids_done[$n] $sort_err"; +} + +sub recv_ibx { # wq_io_do + my ($self) = @_; + my $op_p = delete($self->{0}) // die 'BUG: no op_p'; + my $r_ibx = delete($self->{1}) // die 'BUG: no {r_ibx}'; + my $dst = "$TMPDIR/$self->{shard}.to_ibx_id"; + pipe(my ($r, $w)) or die "pipe: $!"; + open my $fh, '>', "$dst.tmp" or die "open: $!"; + my @sort = (@SORT, '-k1,1'); + my $pid = spawn(\@sort, undef, { 0 => $r, 1 => $fh }); + close $r or die "close: $!"; + while (1) { + recv($r_ibx, my $d, 4096, 0) // die "recv: $!"; + last if $d eq ''; # EOF + $d =~ s/\A([0-9]+) // or die 'BUG: no ibx_idx'; + my $ibx_id = $1; + my ($ibx, $mset, $doc); + if (-f "$d/ei.lock") { + $ibx = PublicInbox::ExtSearch->new($d); + } elsif (-f "$d/inbox.lock" || -d "$d/public-inbox") { + $ibx = PublicInbox::Inbox->new({ inboxdir => $d }); + } else { + warn "W: unknown dir: $d (ignoring)\n"; + next; + } + my $max = $self->{-opt}->{'associate-max'} // $ASSOC_MAX; + $max = $ASSOC_MAX if !$max; + $max = ((2 ** 31) - 1) if $max < 0; + my $srch = $ibx->search or do { + warn "W: no search index: $d (ignoring)\n"; + next; + }; + my $opt = { limit => 10000, offset => 0, relevance => -2 }; + do { + $max > 0 && $max < $opt->{limit} and + $opt->{limit} = $max; + last if $opt->{limit} <= 0; + $mset = $srch->mset('z:0..', $opt); + for my $x ($mset->items) { + my $doc = $x->get_document; + for my $p (@ASSOC_PFX) { + for (xap_terms($p, $doc)) { + print $w "$_ $ibx_id\n"; + } + } + } + } while ($mset->size && ($opt->{offset} += $opt->{limit})); + } + close $w or die "close(sort): $!"; + waitpid($pid, 0) == $pid or die "waitpid($pid): $!"; + if ($? == 0) { + rename("$dst.tmp", $dst) or + warn "rename($dst.tmp => $dst): $!"; + } else { + warn "@sort failed for shard #$self->{shard}: $?"; + unlink "$dst.tmp"; + } + + send($op_p, "recv_ibx_done $self->{shard}", MSG_EOR); +} + +sub dump_ibx_patchids { + my ($self, $ibx) = @_; + my $associate = $TODO{associate} // return; + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{dump_ibx_patchids_done} = [ $self, $associate ]; + for my $s (@IDX_SHARDS) { + $s->wq_io_do('dump_ibx_patchids', [ $p->{op_p} ], $ibx); + } +} + +# repurpose shard workers to dump inbox patchids with perfect balance +sub dump_ibx_start { + my ($self) = @_; + my ($r, $w); + socketpair($r, $w, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!"; + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{dump_ibx_patchids_done} = [ $self, $TODO{associate} ]; + $_->wq_io_do('recv_ibx', [ $p->{op_p}, $r ]) for @IDX_SHARDS; +} + sub index_next ($) { my ($self) = @_; return if $DO_QUIT; @@ -465,6 +601,10 @@ sub index_next ($) { $self, $git); fp_start($self, $git, $prep_repo); ct_start($self, $git, $prep_repo); + } elsif ($TMPDIR) { + delete $TODO{dump_ibx_start}; + return dump_ibx_patchids($self, shift @IBXQ) if @IBXQ; + dump_roots_patchids($self, delete($TODO{associate}) // return); } # else: wait for shards_active (post_loop_do) callback } @@ -629,7 +769,6 @@ EOM sub scan_git_dirs ($) { my ($self) = @_; @$GIT_TODO = @{$self->{git_dirs}}; - index_next($self) for (1..$LIVE_JOBS); } sub prune_do { # via wq_io_do in IDX_SHARDS @@ -717,6 +856,7 @@ sub prep_alternate_end { # awaitpid callback for config extensions.objectFormat E: ignoring objdir=$objdir, unknown extensions.objectFormat=$fmt EOM unless ($ALT_FH{$hexlen}) { + require PublicInbox::Import; my $git_dir = "$TMPDIR/hexlen$hexlen.git"; PublicInbox::Import::init_bare($git_dir, 'cidx-all', $fmt); my $f = "$git_dir/objects/info/alternates"; @@ -739,52 +879,76 @@ sub prep_alternate_start { awaitpid($pid, \&prep_alternate_end, $o, $out, $run_prune); } -sub prune_cmd_done { # awaitpid cb for sort, xapian-delve, sed failures +sub cmd_done { # awaitpid cb for sort, xapian-delve, sed failures my ($pid, $cmd, $run_prune) = @_; $? and die "@$cmd failed: \$?=$?"; } +sub associate { # join(1) + my ($self) = @_; + warn "# associating"; + # TODO + system "ls -Rl $TMPDIR >&2"; + warn "# Waiting for $TMPDIR/cont"; + sleep(1) until -f "$TMPDIR/cont"; +} + +sub require_progs { + my $op = shift; + while (my ($x, $argv) = splice(@_, 0, 2)) { + my $e = $x; + $e =~ tr/a-z-/A-Z_/; + my $c = $ENV{$e} // $x; + $argv->[0] //= which($c) // die "E: `$x' required for --$op\n"; + } +} + +sub init_associate ($) { + my ($self) = @_; + require PublicInbox::ExtSearch; + require PublicInbox::Inbox; + require_progs('associate', join => \@JOIN); + $TODO{associate} = PublicInbox::OnDestroy->new($$, \&associate, $self); + $TODO{dump_ibx_start} = PublicInbox::OnDestroy->new($$, + \&dump_ibx_start, $self, $TODO{associate}); + my @unknown; + my @pfx = @{$self->{-opt}->{'associate-prefixes'} // [ 'patchid' ]}; + for (map { split(/\s*,\s*/, $_) } @pfx) { + my $v = $PublicInbox::Search::PATCH_BOOL_COMMON{$_} // + push(@unknown, $_); + push(@ASSOC_PFX, $v); + } + die <{-opt}->{prune}; - require File::Temp; - require PublicInbox::Import; - $TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1); - # Dealing with millions of commits here at once, so use faster tools. # xapian-delve is nearly an order-of-magnitude faster than Xapian Perl # bindings. sed/awk are faster than Perl for simple stream ops, and # sort+comm are more memory-efficient with gigantic lists my @delve = (undef, qw(-A Q -1)); my @sed = (undef, '-ne', 's/^Q//p'); - @SORT = (undef, '-u'); @COMM = (undef, qw(-2 -3 indexed_commits -)); @AWK = (undef, '$2 == "commit" { print $1 }'); # --batch-check output - my @x = ('xapian-delve' => \@delve, sed => \@sed, - sort => \@SORT, comm => \@COMM, awk => \@AWK); - while (my ($x, $argv) = splice(@x, 0, 2)) { - my $e = $x; - $e =~ tr/a-z-/A-Z_/; - my $c = $ENV{$e} // $x; - $argv->[0] = which($c) // die "E: `$x' required for --prune\n"; - } + require_progs('prune', 'xapian-delve' => \@delve, sed => \@sed, + comm => \@COMM, awk => \@AWK); for (0..$#IDX_SHARDS) { push @delve, "$self->{xpfx}/$_" } - for (qw(parallel compress-program buffer-size)) { # GNU sort options - my $v = $self->{-opt}->{"sort-$_"}; - push @SORT, "--$_=$v" if defined $v; - } my $run_prune = PublicInbox::OnDestroy->new($$, \&run_prune, $self); pipe(my ($sed_in, $delve_out)) or die "pipe: $!"; pipe(my ($sort_in, $sed_out)) or die "pipe: $!"; open(my $sort_out, '+>', "$TMPDIR/indexed_commits") or die "open: $!"; $PRUNE_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' }; my $pid = spawn(\@SORT, $PRUNE_ENV, { 0 => $sort_in, 1 => $sort_out }); - awaitpid($pid, \&prune_cmd_done, \@SORT, $run_prune); + awaitpid($pid, \&cmd_done, \@SORT, $run_prune); $pid = spawn(\@sed, $PRUNE_ENV, { 0 => $sed_in, 1 => $sed_out }); - awaitpid($pid, \&prune_cmd_done, \@sed, $run_prune); + awaitpid($pid, \&cmd_done, \@sed, $run_prune); $pid = spawn(\@delve, undef, { 1 => $delve_out }); - awaitpid($pid, \&prune_cmd_done, \@delve, $run_prune); + awaitpid($pid, \&cmd_done, \@delve, $run_prune); @PRUNE_QUEUE = @{$self->{git_dirs}}; for (1..$LIVE_JOBS) { prep_alternate_start(shift(@PRUNE_QUEUE) // last, $run_prune); @@ -814,9 +978,9 @@ sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done { 0 => $sort_in, 1 => $sort_out }); my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $PRUNE_ENV, { 0 => $comm_in, -C => "$TMPDIR" }); - awaitpid($awk_pid, \&prune_cmd_done, \@AWK); - awaitpid($sort_pid, \&prune_cmd_done, \@SORT); - awaitpid($comm_pid, \&prune_cmd_done, \@COMM); + awaitpid($awk_pid, \&cmd_done, \@AWK); + awaitpid($sort_pid, \&cmd_done, \@SORT); + awaitpid($comm_pid, \&cmd_done, \@COMM); PublicInbox::CidxComm->new($comm_rd, $self); # calls cidx_read_comm my $git_ver = PublicInbox::Git::git_version(); push @PRUNE_BATCH, '--buffer' if $git_ver ge v2.6; @@ -868,10 +1032,21 @@ sub cidx_run { # main entry point local $IDX_TODO = []; local $GIT_TODO = []; local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE, - $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, @SORT, $PRUNE_ENV); + $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $PRUNE_ENV, + %TODO, @IBXQ, @IBXISH, @JOIN, @ASSOC_PFX); local $BATCH_BYTES = $self->{-opt}->{batch_size} // $PublicInbox::SearchIdx::BATCH_BYTES; - local @IDX_SHARDS = cidx_init($self); + local @SORT = (undef, '-u'); + if (grep { $_ } @{$self->{-opt}}{qw(prune associate)}) { + require File::Temp; + $TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1); + require_progs('(prune|associate)', sort => \@SORT); + for (qw(parallel compress-program buffer-size)) { # GNU sort + my $v = $self->{-opt}->{"sort-$_"}; + push @SORT, "--$_=$v" if defined $v; + } + } + local @IDX_SHARDS = cidx_init($self); # forks local $self->{current_info} = ''; local $MY_SIG = { CHLD => \&PublicInbox::DS::enqueue_reap, @@ -920,7 +1095,9 @@ sub cidx_run { # main entry point PublicInbox::IPC::detect_nproc() || 2; local @RDONLY_XDB = $self->xdb_shards_flat; init_prune($self); + init_associate($self) if $self->{-opt}->{associate}; scan_git_dirs($self) if $self->{-opt}->{scan} // 1; + index_next($self) for (1..$LIVE_JOBS); # FreeBSD ignores/discards SIGCHLD while signals are blocked and # EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm index 6986cb88..36b6dce8 100644 --- a/lib/PublicInbox/Search.pm +++ b/lib/PublicInbox/Search.pm @@ -573,7 +573,7 @@ sub all_terms { my %ret; for (; $cur != $end; $cur++) { my $tn = $cur->get_termname; - index($tn, $pfx) == 0 and + # index($tn, $pfx) == 0 and $ret{substr($tn, length($pfx))} = undef; } wantarray ? (sort keys %ret) : \%ret; diff --git a/script/public-inbox-cindex b/script/public-inbox-cindex index 2f7796e7..03a87685 100755 --- a/script/public-inbox-cindex +++ b/script/public-inbox-cindex @@ -26,8 +26,9 @@ See public-inbox-cindex(1) man page for full documentation. EOF my $opt = { fsync => 1, scan => 1 }; # --no-scan is hidden GetOptions($opt, qw(quiet|q verbose|v+ reindex jobs|j=i fsync|sync! dangerous - indexlevel|index-level|L=s + indexlevel|index-level|L=s associate associate-max=i batch_size|batch-size=s max_size|max-size=s + include|I=s@ only=s@ all project-list=s exclude=s@ sort-parallel=s sort-compress-program=s sort-buffer-size=s d=s update|u scan! prune dry-run|n C=s@ help|h))