From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF, T_SCC_BODY_TEXT_LINE shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id EC5B11F452 for ; Sat, 22 Apr 2023 09:07:50 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1682154471; bh=6bVtATPuU3rXvZbDiPb9VeKMt0EBEgxGgVBRmDC7/ok=; h=From:To:Subject:Date:From; b=TgwuljC4te+LTY3omRirjv2MWJe9+BbDswylMJw7LmDK2qYd3GygBUiW/74ZxtoXd jfwKr9RumzGHYZeX5WxISOhyZpRcAURWYfSUGdtxPZ9Vkx44qrE8UbTLbQLfnCG3F+ z2okSp8rxCzsS23uuDqy/5Log93FjI2Rv9FGdlh8= From: Eric Wong To: spew@80x24.org Subject: [PATCH] cindex: rewrite prune, again... Date: Sat, 22 Apr 2023 09:07:50 +0000 Message-Id: <20230422090750.2278059-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: Should be faster since xapian-delve is way faster --- MANIFEST | 1 + lib/PublicInbox/CidxComm.pm | 28 ++++ lib/PublicInbox/CodeSearchIdx.pm | 243 ++++++++++++++++++------------- script/public-inbox-cindex | 1 + 4 files changed, 168 insertions(+), 105 deletions(-) create mode 100644 lib/PublicInbox/CidxComm.pm diff --git a/MANIFEST b/MANIFEST index c338f0f4..69730623 100644 --- a/MANIFEST +++ b/MANIFEST @@ -160,6 +160,7 @@ lib/PublicInbox/AdminEdit.pm lib/PublicInbox/AltId.pm lib/PublicInbox/AutoReap.pm lib/PublicInbox/Cgit.pm +lib/PublicInbox/CidxComm.pm lib/PublicInbox/CidxLogP.pm lib/PublicInbox/CmdIPC4.pm lib/PublicInbox/CodeSearch.pm diff --git a/lib/PublicInbox/CidxComm.pm b/lib/PublicInbox/CidxComm.pm new file mode 100644 index 00000000..fb7be0aa --- /dev/null +++ b/lib/PublicInbox/CidxComm.pm @@ -0,0 +1,28 @@ +# Copyright (C) all contributors +# License: AGPL-3.0+ +# +# Waits for initial comm(1) output for PublicInbox::CodeSearchIdx. +# The initial output from `comm' can take a while to generate because +# it needs to wait on: +# `git cat-file --batch-all-objects --batch-check --unordered | sort' +# We still rely on blocking reads, here, since comm should be fast once +# it's seeing input. (`--unordered | sort' is intentional for HDDs) +package PublicInbox::CidxComm; +use v5.12; +use parent qw(PublicInbox::DS); +use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT); + +sub new { + my ($cls, $rd, $cidx) = @_; + my $self = bless { cidx => $cidx }, $cls; + $self->SUPER::new($rd, EPOLLIN|EPOLLONESHOT); +} + +sub event_step { + my ($self) = @_; + my $rd = $self->{sock} // return warn('BUG?: no {sock}'); + $self->close; # PublicInbox::DS::close, deferred, so $sock is usable + delete($self->{cidx})->cidx_read_comm($rd); +} + +1; diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index fa761ed0..440c537e 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -28,9 +28,10 @@ use PublicInbox::SHA qw(sha256_hex); use PublicInbox::Search qw(xap_terms); use PublicInbox::SearchIdx qw(add_val); use PublicInbox::Config qw(glob2re); -use PublicInbox::Spawn qw(spawn popen_rd); +use PublicInbox::Spawn qw(which spawn popen_rd); use PublicInbox::OnDestroy; use PublicInbox::CidxLogP; +use PublicInbox::CidxComm; use PublicInbox::Git qw(%OFMT2HEXLEN); use Socket qw(MSG_EOR); use Carp (); @@ -47,21 +48,16 @@ our ( $MAX_SIZE, $REINDEX, # PublicInbox::SharedKV @GIT_DIR_GONE, # [ git_dir1, git_dir2 ] - %TO_PRUNE, # (docid => docid) mapping (hash in case of retry_reopen) - $PRUNE_CUR, # per-shard document ID - $PRUNE_MAX, # per-shard document ID to stop at - $PRUNE_OP_P, # prune_done() notification socket - $PRUNE_NR, # total number pruned $PRUNE_DONE, # marks off prune completions $NCHANGE, # current number of changes $REPO_CTX, # current repo being indexed in shards - %ACTIVE_GIT_DIR, # GIT_DIR => undef mapping for prune $IDX_TODO, # [ $git0, $root0, $git1, $root1, ...] $GIT_TODO, # [ GIT_DIR0, GIT_DIR1, ...] - %HEXLEN2TMPGIT, # ((40|64) => PublicInbox::Git for prune) - %ALT_FH, # '', or 'sha256' => tmp IO for TMPGIT alternates - $TMPDIR, # File::Temp->newdir object + %ALT_FH, # hexlen => tmp IO for TMPDIR git alternates + $TMPDIR, # File::Temp->newdir object for prune @PRUNE_QUEUE, # GIT_DIRs to prepare for pruning + $PRUNE_ENV, # env for awk(1), comm(1), sort(1) commands during prune + @AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands ); # stop walking history if we see >$SEEN_MAX existing commits, this assumes @@ -69,6 +65,8 @@ our ( # git walks commits quickly if it doesn't have to read trees our $SEEN_MAX = 100000; +our @PRUNE_BATCH = qw(git _ cat-file --batch-all-objects --batch-check); + # TODO: do we care about committer name + email? or tree OID? my @FMT = qw(H P ct an ae at s b); # (b)ody must be last @@ -180,14 +178,6 @@ sub cidx_ckpoint ($;$) { my ($self, $msg) = @_; progress($self, $msg) if defined($msg); $TXN_BYTES = $BATCH_BYTES; # reset - if (my @to_prune = values(%TO_PRUNE)) { - %TO_PRUNE = (); - $PRUNE_NR += scalar(@to_prune); - progress($self, - "prune [$self->{shard}] $PRUNE_NR ($PRUNE_CUR/$PRUNE_MAX)"); - $self->begin_txn_lazy; - $self->{xdb}->delete_document($_) for @to_prune; - } return if $PublicInbox::Search::X{CLOEXEC_UNSET}; $self->commit_txn_lazy; $self->begin_txn_lazy; @@ -294,7 +284,7 @@ sub shard_done { # called via PktOp on shard_index completion $repo_ctx->{shard_ok}->{$n} = 1; } -sub prune_done { # called via PktOp->event_step completion +sub prune_done { # called via prune_do completion my ($self, $n) = @_; return if $DO_QUIT || !$PRUNE_DONE; die "BUG: \$PRUNE_DONE->[$n] already defined" if $PRUNE_DONE->[$n]; @@ -662,79 +652,30 @@ sub scan_git_dirs ($) { index_next($self) for (1..$LIVE_JOBS); } -sub prune_cb { # git->check_async callback - my ($hex, $type, undef, $self_id) = @_; - my ($self, $id) = @$self_id; - return if $type eq 'commit'; - progress($self, "$hex $type #$id") if ($self->{-opt}->{verbose}//0) > 1; - my $len = $self->{xdb}->get_doclength($id); - $TO_PRUNE{$id} = $id; - - # all math around TXN_BYTES calculation is pretty fuzzy, - # but need a way to regularly flush output to avoid OOM, - # so assume the average term + position overhead is the - # answer to everything: 42 - cidx_ckpoint($self) if ($TXN_BYTES -= ($len * 42)) <= 0; -} - -sub prune_git_dir ($$$) { - my ($self, $id, $doc) = @_; - my @P = xap_terms('P', $doc); - scalar(@P) == 1 or warn -"BUG? shard[$self->{shard}] #$id has zero or multiple paths: @P"; - for my $P (@P) { - next if exists($ACTIVE_GIT_DIR{$P}) && -d $P; - $TO_PRUNE{$id} = $id; - progress($self, "$P gone #$id"); - my $len = $self->{xdb}->get_doclength($id); - cidx_ckpoint($self) if ($TXN_BYTES -= ($len * 42)) <= 0; - } -} - -sub event_step { # may be requeued via DS +sub prune_do { # via wq_io_do in IDX_SHARDS my ($self) = @_; - my $PRUNE_BATCH = 1000; + my $gone = delete $self->{0} // die 'BUG: no {0} gone input'; + my $prune_op_p = delete $self->{1} // die 'BUG: no {1} op_p'; $TXN_BYTES = $BATCH_BYTES; - for (; --$PRUNE_BATCH && !$DO_QUIT && $PRUNE_CUR <= $PRUNE_MAX; - $PRUNE_CUR++) { - my $doc = $self->get_doc($PRUNE_CUR) // next; - my @cmt = xap_terms('Q', $doc); - if (scalar(@cmt) == 0) { - prune_git_dir($self, $PRUNE_CUR, $doc); - } else { - scalar(@cmt) == 1 or warn -"BUG? shard[$self->{shard}] #$PRUNE_CUR has multiple commits: @cmt"; - for my $o (@cmt) { - $HEXLEN2TMPGIT{length($o)}->check_async($o, - \&prune_cb, [$self, $PRUNE_CUR]) - } + $self->begin_txn_lazy; + my $xdb = $self->{xdb}; + my $nr = 0; + local $/ = "\0"; + while (my $p = <$gone>) { # Q$cmt or P$git_dir + chomp $p; + my @docids = docids_by_postlist($self, $p) or warn <{shard}] +EOM + for (@docids) { + $TXN_BYTES -= $xdb->get_doclength($_) * 42; + $xdb->delete_document($_); } + ++$nr; + $TXN_BYTES < 0 and + cidx_ckpoint($self, "prune [$self->{shard}] $nr"); } - $_->async_wait_all for (values %HEXLEN2TMPGIT); - cidx_ckpoint($self); - return PublicInbox::DS::requeue($self) if $PRUNE_CUR <= $PRUNE_MAX; - send($PRUNE_OP_P, "prune_done $self->{shard}", MSG_EOR); - $PRUNE_NR //= 0; - progress($self, "prune [$self->{shard}] $PRUNE_NR done"); - $_->cleanup for (values %HEXLEN2TMPGIT); - $PRUNE_OP_P = $PRUNE_CUR = $PRUNE_MAX = undef; - undef %ACTIVE_GIT_DIR; - undef %HEXLEN2TMPGIT; -} - -sub prune_start { # via wq_io_do in IDX_SHARDS - my ($self, $tmpdir, @active_git_dir) = @_; - $PRUNE_CUR = 1; - $PRUNE_OP_P = delete $self->{0} // die 'BUG: no {0} op_p'; - %ACTIVE_GIT_DIR = map { $_ => undef } @active_git_dir; - for my $git_dir (<$tmpdir/*.git>) { - my ($hexlen) = ($git_dir =~ m!/hexlen([0-9]+)\.git\z!); - $hexlen or die "BUG: no hexlen in $git_dir"; - $HEXLEN2TMPGIT{$hexlen} = PublicInbox::Git->new($git_dir); - } - $self->begin_txn_lazy; - $PRUNE_MAX = $self->{xdb}->get_lastdocid // 1; - event_step($self); + send($prune_op_p, "prune_done $self->{shard}", MSG_EOR); + cidx_ckpoint($self, "prune [$self->{shard}] $nr done") if $nr; } sub shards_active { # post_loop_do @@ -781,10 +722,10 @@ sub prep_umask ($) { } sub prep_alternate_end { # awaitpid callback for config extensions.objectFormat - my ($pid, $objdir, $out, $send_prune) = @_; + my ($pid, $objdir, $out, $run_prune) = @_; my $status = $? >> 8; my $next_dir = shift(@PRUNE_QUEUE); - prep_alternate_start($next_dir, $send_prune) if defined($next_dir); + prep_alternate_start($next_dir, $run_prune) if defined($next_dir); my $fmt; if ($status == 1) { # unset, default is '' (SHA-1) $fmt = 'sha1'; @@ -797,17 +738,17 @@ sub prep_alternate_end { # awaitpid callback for config extensions.objectFormat my $hexlen = $OFMT2HEXLEN{$fmt} // return warn <', $f or die "open($f): $!"; + open $ALT_FH{$hexlen}, '>', $f or die "open($f): $!"; } - say { $ALT_FH{$fmt} } $objdir or die "say: $!"; + say { $ALT_FH{$hexlen} } $objdir or die "say: $!"; } sub prep_alternate_start { - my ($git_dir, $send_prune) = @_; + my ($git_dir, $run_prune) = @_; my $o = $git_dir.'/objects'; while (!-d $o) { $git_dir = shift(@PRUNE_QUEUE) // return @@ -817,7 +758,12 @@ sub prep_alternate_start { qw(config extensions.objectFormat) ]; open my $out, '+>', undef or die "open(tmp): $!"; my $pid = spawn($cmd, undef, { 1 => $out }); - awaitpid($pid, \&prep_alternate_end, $o, $out, $send_prune); + awaitpid($pid, \&prep_alternate_end, $o, $out, $run_prune); +} + +sub prune_cmd_done { # awaitpid cb for sort, xapian-delve, sed failures + my ($pid, $cmd, $run_prune) = @_; + $? and die "@$cmd failed: \$?=$?"; } sub init_prune ($) { @@ -827,24 +773,109 @@ sub init_prune ($) { require File::Temp; require PublicInbox::Import; $TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1); - my $send_prune = PublicInbox::OnDestroy->new($$, \&send_prune, $self); + + # Dealing with millions of commits here at once, so use faster tools. + # xapian-delve is nearly an order-of-magnitude faster than Xapian Perl + # bindings. sed/awk are faster than Perl for simple stream ops, and + # sort+comm are more memory-efficient with gigantic lists + my @delve = (undef, qw(-A Q -1)); + my @sed = (undef, '-ne', 's/^Q//p'); + @SORT = (undef, '-u'); + @COMM = (undef, qw(-2 -3 indexed_commits -)); + @AWK = (undef, '$2 == "commit" { print $1 }'); # --batch-check output + my @x = ('xapian-delve' => \@delve, sed => \@sed, + sort => \@SORT, comm => \@COMM, awk => \@AWK); + while (my ($x, $argv) = splice(@x, 0, 2)) { + my $e = $x; + $e =~ tr/a-z-/A-Z_/; + my $c = $ENV{$e} // $x; + $argv->[0] = which($c) // die "E: `$x' required for --prune\n"; + } + for (0..$#IDX_SHARDS) { push @delve, "$self->{xpfx}/$_" } + for (qw(parallel compress-program buffer-size)) { # GNU sort options + my $v = $self->{-opt}->{"sort-$_"}; + push @SORT, "--$_=$v" if defined $v; + } + my $run_prune = PublicInbox::OnDestroy->new($$, \&run_prune, $self); + pipe(my ($sed_in, $delve_out)) or die "pipe: $!"; + pipe(my ($sort_in, $sed_out)) or die "pipe: $!"; + open(my $sort_out, '+>', "$TMPDIR/indexed_commits") or die "open: $!"; + $PRUNE_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' }; + my $pid = spawn(\@SORT, $PRUNE_ENV, { 0 => $sort_in, 1 => $sort_out }); + awaitpid($pid, \&prune_cmd_done, \@SORT, $run_prune); + $pid = spawn(\@sed, $PRUNE_ENV, { 0 => $sed_in, 1 => $sed_out }); + awaitpid($pid, \&prune_cmd_done, \@sed, $run_prune); + $pid = spawn(\@delve, undef, { 1 => $delve_out }); + awaitpid($pid, \&prune_cmd_done, \@delve, $run_prune); @PRUNE_QUEUE = @{$self->{git_dirs}}; for (1..$LIVE_JOBS) { - prep_alternate_start(shift(@PRUNE_QUEUE) // last, $send_prune); + prep_alternate_start(shift(@PRUNE_QUEUE) // last, $run_prune); } } -sub send_prune { # OnDestroy when `git config extensions.objectFormat' are done +sub dump_git_commits { # awaitpid cb + my ($pid, $batch_out) = @_; + (defined($pid) && $?) and die "E: @PRUNE_BATCH: \$?=$?"; + return if $DO_QUIT; + my ($hexlen) = keys(%ALT_FH) or return; # done + close(delete $ALT_FH{$hexlen}) or die "close: $!"; + + $PRUNE_BATCH[1] = "--git-dir=$TMPDIR/hexlen$hexlen.git"; + $pid = spawn(\@PRUNE_BATCH, undef, { 1 => $batch_out }); + awaitpid($pid, \&dump_git_commits, $batch_out); +} + +sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done my ($self) = @_; - for (values %ALT_FH) { close $_ or die "close: $!" } - %ALT_FH = (); - my @active_git_dir = (@{$self->{git_dirs}}, @GIT_DIR_GONE); + return if $DO_QUIT; + pipe(my ($awk_in, $batch_out)) or die "pipe: $!"; + pipe(my ($sort_in, $awk_out)) or die "pipe: $!"; + pipe(my ($comm_in, $sort_out)) or die "pipe: $!"; + my $awk_pid = spawn(\@AWK, $PRUNE_ENV, { 0 => $awk_in, 1 => $awk_out }); + my $sort_pid = spawn(\@SORT, $PRUNE_ENV, + { 0 => $sort_in, 1 => $sort_out }); + my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $PRUNE_ENV, + { 0 => $comm_in, -C => "$TMPDIR" }); + awaitpid($awk_pid, \&prune_cmd_done, \@AWK); + awaitpid($sort_pid, \&prune_cmd_done, \@SORT); + awaitpid($comm_pid, \&prune_cmd_done, \@COMM); + PublicInbox::CidxComm->new($comm_rd, $self); # calls cidx_read_comm + my $git_ver = PublicInbox::Git::git_version(); + push @PRUNE_BATCH, '--buffer' if $git_ver ge v2.6; + + # Yes, we pipe --unordered git output to sort(1) because sorting + # inside git leads to orders-of-magnitude slowdowns on rotational + # storage. GNU sort(1) also works well on larger-than-memory + # datasets, and it's not worth eliding sort(1) for old git. + push @PRUNE_BATCH, '--unordered' if $git_ver ge v2.19; + warn(sprintf(<pair; $c->{ops}->{prune_done} = [ $self ]; - for my $s (@IDX_SHARDS) { - $s->wq_io_do('prune_start', [ $p->{op_p} ], - "$TMPDIR", @active_git_dir) + my @gone; + for my $n (0..$#IDX_SHARDS) { + pipe(my ($r, $w)) or die "pipe: $!"; + push @gone, $w; + $IDX_SHARDS[$n]->wq_io_do('prune_do', [$r, $p->{op_p}]); + } + while (defined(my $cmt = <$comm_rd>)) { + chomp $cmt; + my $n = hex(substr($cmt, 0, 8)) % scalar(@gone); + print { $gone[$n] } 'Q', $cmt, "\0" or die "print: $!"; + last if $DO_QUIT; + } + for my $git_dir (@GIT_DIR_GONE) { + my $n = git_dir_hash($git_dir) % scalar(@gone); + print { $gone[$n] } 'P', $git_dir, "\0" or die "print: $!"; } + for (@gone) { close $_ or die "close: $!" }; } sub cidx_run { # main entry point @@ -858,7 +889,8 @@ sub cidx_run { # main entry point local $PRUNE_DONE = []; local $IDX_TODO = []; local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE, - $GIT_TODO, $REPO_CTX, %ALT_FH, $TMPDIR, %HEXLEN2TMPGIT); + $GIT_TODO, $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, @SORT, + $PRUNE_ENV); local $BATCH_BYTES = $self->{-opt}->{batch_size} // $PublicInbox::SearchIdx::BATCH_BYTES; local @IDX_SHARDS = cidx_init($self); @@ -867,6 +899,7 @@ sub cidx_run { # main entry point CHLD => \&PublicInbox::DS::enqueue_reap, USR1 => \&kill_shards, }; + local @PRUNE_BATCH = @PRUNE_BATCH; $MY_SIG->{$_} = \&parent_quit for qw(TERM QUIT INT); my $cb = $SIG{__WARN__} || \&CORE::warn; local $SIG{__WARN__} = sub { diff --git a/script/public-inbox-cindex b/script/public-inbox-cindex index 4c9136cf..2f7796e7 100755 --- a/script/public-inbox-cindex +++ b/script/public-inbox-cindex @@ -29,6 +29,7 @@ GetOptions($opt, qw(quiet|q verbose|v+ reindex jobs|j=i fsync|sync! dangerous indexlevel|index-level|L=s batch_size|batch-size=s max_size|max-size=s project-list=s exclude=s@ + sort-parallel=s sort-compress-program=s sort-buffer-size=s d=s update|u scan! prune dry-run|n C=s@ help|h)) or die $help; if ($opt->{help}) { print $help; exit 0 };