dumping ground for random patches and texts
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: spew@80x24.org
Subject: [PATCH] cindex: rewrite prune, again...
Date: Sat, 22 Apr 2023 01:44:25 +0000	[thread overview]
Message-ID: <20230422014425.1642941-1-e@80x24.org> (raw)

Should be faster since xapian-delve is way faster
---
 lib/PublicInbox/CodeSearchIdx.pm | 218 ++++++++++++++++---------------
 1 file changed, 113 insertions(+), 105 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index fa761ed0..8f3128d8 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -28,7 +28,7 @@ use PublicInbox::SHA qw(sha256_hex);
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::SearchIdx qw(add_val);
 use PublicInbox::Config qw(glob2re);
-use PublicInbox::Spawn qw(spawn popen_rd);
+use PublicInbox::Spawn qw(which spawn popen_rd);
 use PublicInbox::OnDestroy;
 use PublicInbox::CidxLogP;
 use PublicInbox::Git qw(%OFMT2HEXLEN);
@@ -47,21 +47,15 @@ our (
 	$MAX_SIZE,
 	$REINDEX, # PublicInbox::SharedKV
 	@GIT_DIR_GONE, # [ git_dir1, git_dir2 ]
-	%TO_PRUNE, # (docid => docid) mapping (hash in case of retry_reopen)
-	$PRUNE_CUR, # per-shard document ID
-	$PRUNE_MAX, # per-shard document ID to stop at
-	$PRUNE_OP_P, # prune_done() notification socket
-	$PRUNE_NR, # total number pruned
 	$PRUNE_DONE, # marks off prune completions
 	$NCHANGE, # current number of changes
 	$REPO_CTX, # current repo being indexed in shards
-	%ACTIVE_GIT_DIR, # GIT_DIR => undef mapping for prune
 	$IDX_TODO, # [ $git0, $root0, $git1, $root1, ...]
 	$GIT_TODO, # [ GIT_DIR0, GIT_DIR1, ...]
-	%HEXLEN2TMPGIT, # ((40|64) => PublicInbox::Git for prune)
-	%ALT_FH, # '', or 'sha256' => tmp IO for TMPGIT alternates
-	$TMPDIR, # File::Temp->newdir object
+	%ALT_FH, # hexlen => tmp IO for TMPDIR git alternates
+	$TMPDIR, # File::Temp->newdir object for prune
 	@PRUNE_QUEUE, # GIT_DIRs to prepare for pruning
+	@AWK, @COMM, # awk(1) and comm(1) commands
 );
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
@@ -180,14 +174,6 @@ sub cidx_ckpoint ($;$) {
 	my ($self, $msg) = @_;
 	progress($self, $msg) if defined($msg);
 	$TXN_BYTES = $BATCH_BYTES; # reset
-	if (my @to_prune = values(%TO_PRUNE)) {
-		%TO_PRUNE = ();
-		$PRUNE_NR += scalar(@to_prune);
-		progress($self,
-		  "prune [$self->{shard}] $PRUNE_NR ($PRUNE_CUR/$PRUNE_MAX)");
-		$self->begin_txn_lazy;
-		$self->{xdb}->delete_document($_) for @to_prune;
-	}
 	return if $PublicInbox::Search::X{CLOEXEC_UNSET};
 	$self->commit_txn_lazy;
 	$self->begin_txn_lazy;
@@ -294,7 +280,7 @@ sub shard_done { # called via PktOp on shard_index completion
 	$repo_ctx->{shard_ok}->{$n} = 1;
 }
 
-sub prune_done { # called via PktOp->event_step completion
+sub prune_done { # called via prune_do completion
 	my ($self, $n) = @_;
 	return if $DO_QUIT || !$PRUNE_DONE;
 	die "BUG: \$PRUNE_DONE->[$n] already defined" if $PRUNE_DONE->[$n];
@@ -662,79 +648,31 @@ sub scan_git_dirs ($) {
 	index_next($self) for (1..$LIVE_JOBS);
 }
 
-sub prune_cb { # git->check_async callback
-	my ($hex, $type, undef, $self_id) = @_;
-	my ($self, $id) = @$self_id;
-	return if $type eq 'commit';
-	progress($self, "$hex $type #$id") if ($self->{-opt}->{verbose}//0) > 1;
-	my $len = $self->{xdb}->get_doclength($id);
-	$TO_PRUNE{$id} = $id;
-
-	# all math around TXN_BYTES calculation is pretty fuzzy,
-	# but need a way to regularly flush output to avoid OOM,
-	# so assume the average term + position overhead is the
-	# answer to everything: 42
-	cidx_ckpoint($self) if ($TXN_BYTES -= ($len * 42)) <= 0;
-}
-
-sub prune_git_dir ($$$) {
-	my ($self, $id, $doc) = @_;
-	my @P = xap_terms('P', $doc);
-	scalar(@P) == 1 or warn
-"BUG? shard[$self->{shard}] #$id has zero or multiple paths: @P";
-	for my $P (@P) {
-		next if exists($ACTIVE_GIT_DIR{$P}) && -d $P;
-		$TO_PRUNE{$id} = $id;
-		progress($self, "$P gone #$id");
-		my $len = $self->{xdb}->get_doclength($id);
-		cidx_ckpoint($self) if ($TXN_BYTES -= ($len * 42)) <= 0;
-	}
-}
-
-sub event_step { # may be requeued via DS
+sub prune_do { # via wq_io_do in IDX_SHARDS
 	my ($self) = @_;
-	my $PRUNE_BATCH = 1000;
+	my $gone = delete $self->{0} // die 'BUG: no {0} gone input';
+	my $prune_op_p = delete $self->{1} // die 'BUG: no {1} op_p';
+	sysseek($gone, 0, SEEK_SET) or die "seek: $!";
 	$TXN_BYTES = $BATCH_BYTES;
-	for (; --$PRUNE_BATCH && !$DO_QUIT && $PRUNE_CUR <= $PRUNE_MAX;
-			$PRUNE_CUR++) {
-		my $doc = $self->get_doc($PRUNE_CUR) // next;
-		my @cmt = xap_terms('Q', $doc);
-		if (scalar(@cmt) == 0) {
-			prune_git_dir($self, $PRUNE_CUR, $doc);
-		} else {
-			scalar(@cmt) == 1 or warn
-"BUG? shard[$self->{shard}] #$PRUNE_CUR has multiple commits: @cmt";
-			for my $o (@cmt) {
-				$HEXLEN2TMPGIT{length($o)}->check_async($o,
-						\&prune_cb, [$self, $PRUNE_CUR])
-			}
+	$self->begin_txn_lazy;
+	my $xdb = $self->{xdb};
+	my $nr = 0;
+	local $/ = "\0";
+	while (my $p = <$gone>) { # Q$cmt or P$git_dir
+		chomp $p;
+		my @docids = docids_by_postlist($self, $p) or warn <<EOM;
+W: no docids for `$p' [$self->{shard}]
+EOM
+		for (@docids) {
+			$TXN_BYTES -= $xdb->get_doclength($_) * 42;
+			$xdb->delete_document($_);
 		}
+		++$nr;
+		$TXN_BYTES < 0 and
+			cidx_ckpoint($self, "prune [$self->{shard}] $nr");
 	}
-	$_->async_wait_all for (values %HEXLEN2TMPGIT);
-	cidx_ckpoint($self);
-	return PublicInbox::DS::requeue($self) if $PRUNE_CUR <= $PRUNE_MAX;
-	send($PRUNE_OP_P, "prune_done $self->{shard}", MSG_EOR);
-	$PRUNE_NR //= 0;
-	progress($self, "prune [$self->{shard}] $PRUNE_NR done");
-	$_->cleanup for (values %HEXLEN2TMPGIT);
-	$PRUNE_OP_P = $PRUNE_CUR = $PRUNE_MAX = undef;
-	undef %ACTIVE_GIT_DIR;
-	undef %HEXLEN2TMPGIT;
-}
-
-sub prune_start { # via wq_io_do in IDX_SHARDS
-	my ($self, $tmpdir, @active_git_dir) = @_;
-	$PRUNE_CUR = 1;
-	$PRUNE_OP_P = delete $self->{0} // die 'BUG: no {0} op_p';
-	%ACTIVE_GIT_DIR = map { $_ => undef } @active_git_dir;
-	for my $git_dir (<$tmpdir/*.git>) {
-		my ($hexlen) = ($git_dir =~ m!/hexlen([0-9]+)\.git\z!);
-		$hexlen or die "BUG: no hexlen in $git_dir";
-		$HEXLEN2TMPGIT{$hexlen} = PublicInbox::Git->new($git_dir);
-	}
-	$self->begin_txn_lazy;
-	$PRUNE_MAX = $self->{xdb}->get_lastdocid // 1;
-	event_step($self);
+	send($prune_op_p, "prune_done $self->{shard}", MSG_EOR);
+	cidx_ckpoint($self, "prune [$self->{shard}] $nr done") if $nr;
 }
 
 sub shards_active { # post_loop_do
@@ -781,10 +719,10 @@ sub prep_umask ($) {
 }
 
 sub prep_alternate_end { # awaitpid callback for config extensions.objectFormat
-	my ($pid, $objdir, $out, $send_prune) = @_;
+	my ($pid, $objdir, $out, $run_prune) = @_;
 	my $status = $? >> 8;
 	my $next_dir = shift(@PRUNE_QUEUE);
-	prep_alternate_start($next_dir, $send_prune) if defined($next_dir);
+	prep_alternate_start($next_dir, $run_prune) if defined($next_dir);
 	my $fmt;
 	if ($status == 1) { # unset, default is '' (SHA-1)
 		$fmt = 'sha1';
@@ -797,17 +735,17 @@ sub prep_alternate_end { # awaitpid callback for config extensions.objectFormat
 	my $hexlen = $OFMT2HEXLEN{$fmt} // return warn <<EOM;
 E: ignoring objdir=$objdir, unknown extensions.objectFormat=$fmt
 EOM
-	unless ($ALT_FH{$fmt}) {
+	unless ($ALT_FH{$hexlen}) {
 		my $git_dir = "$TMPDIR/hexlen$hexlen.git";
 		PublicInbox::Import::init_bare($git_dir, 'cidx-all', $fmt);
 		my $f = "$git_dir/objects/info/alternates";
-		open $ALT_FH{$fmt}, '>', $f or die "open($f): $!";
+		open $ALT_FH{$hexlen}, '>', $f or die "open($f): $!";
 	}
-	say { $ALT_FH{$fmt} } $objdir or die "say: $!";
+	say { $ALT_FH{$hexlen} } $objdir or die "say: $!";
 }
 
 sub prep_alternate_start {
-	my ($git_dir, $send_prune) = @_;
+	my ($git_dir, $run_prune) = @_;
 	my $o = $git_dir.'/objects';
 	while (!-d $o) {
 		$git_dir = shift(@PRUNE_QUEUE) // return
@@ -817,7 +755,12 @@ sub prep_alternate_start {
 			qw(config extensions.objectFormat) ];
 	open my $out, '+>', undef or die "open(tmp): $!";
 	my $pid = spawn($cmd, undef, { 1 => $out });
-	awaitpid($pid, \&prep_alternate_end, $o, $out, $send_prune);
+	awaitpid($pid, \&prep_alternate_end, $o, $out, $run_prune);
+}
+
+sub prune_cmd_done { # awaitpid cb for sort, xapian-delve, sed failures
+	my ($pid, $cmd, $run_prune) = @_;
+	$? and die "@$cmd failed: \$?=$?";
 }
 
 sub init_prune ($) {
@@ -827,23 +770,88 @@ sub init_prune ($) {
 	require File::Temp;
 	require PublicInbox::Import;
 	$TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
-	my $send_prune = PublicInbox::OnDestroy->new($$, \&send_prune, $self);
+
+	# Dealing with millions of commits here at once, so use faster tools.
+	# xapian-delve is nearly an order-of-magnitude faster than Xapian Perl
+	# bindings.  sed/awk are faster than Perl for simple stream ops, and
+	# sort+comm are more memory-efficient with gigantic lists
+	my @delve = (undef, qw(-A Q -1));
+	my @sed = (undef, '-ne', 's/^Q//p');
+	my @sort = (undef, '-u');
+	@COMM = (undef, qw(-2 -3 indexed_commits -));
+	@AWK = (undef, '$2 == "commit" { print $1 }'); # --batch-check output
+	my @x = ('xapian-delve' => \@delve, sed => \@sed,
+		sort => \@sort, comm => \@COMM, awk => \@AWK);
+	while (my ($x, $argv) = splice(@x, 0, 2)) {
+		$argv->[0] = which($x) // die "E: `$x' required for --prune\n";
+	}
+	for (0..$#IDX_SHARDS) { push @delve, "$self->{xpfx}/$_" }
+	for (qw(parallel compress-program)) { # GNU sort options
+		my $v = $self->{-opt}->{$_};
+		push @sort, "--$_=$v" if defined $v;
+	}
+	my $run_prune = PublicInbox::OnDestroy->new($$, \&run_prune, $self);
+	pipe(my ($sed_in, $delve_out)) or die "pipe: $!";
+	pipe(my ($sort_in, $sed_out)) or die "pipe: $!";
+	open(my $sort_out, '+>', "$TMPDIR/indexed_commits") or die "open: $!";
+	my $pid = spawn(\@sort, { TMPDIR => "$TMPDIR" },
+			{ 0 => $sort_in, 1 => $sort_out });
+	awaitpid($pid, \&prune_cmd_done, \@sort, $run_prune);
+	$pid = spawn(\@sed, undef, { 0 => $sed_in, 1 => $sed_out });
+	awaitpid($pid, \&prune_cmd_done, \@sed, $run_prune);
+	$pid = spawn(\@delve, undef, { 1 => $delve_out });
+	awaitpid($pid, \&prune_cmd_done, \@delve, $run_prune);
 	@PRUNE_QUEUE = @{$self->{git_dirs}};
 	for (1..$LIVE_JOBS) {
-		prep_alternate_start(shift(@PRUNE_QUEUE) // last, $send_prune);
+		prep_alternate_start(shift(@PRUNE_QUEUE) // last, $run_prune);
 	}
 }
 
-sub send_prune { # OnDestroy when `git config extensions.objectFormat' are done
+sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done
 	my ($self) = @_;
-	for (values %ALT_FH) { close $_ or die "close: $!" }
-	%ALT_FH = ();
-	my @active_git_dir = (@{$self->{git_dirs}}, @GIT_DIR_GONE);
+	return if $DO_QUIT;
+	pipe(my ($awk_in, $batch_out)) or die "pipe: $!";
+	pipe(my ($comm_in, $awk_out)) or die "pipe: $!";
+	my $awk_pid = spawn(\@AWK, undef, { 0 => $awk_in, 1 => $awk_out });
+	my $old = popen_rd(\@COMM, undef, { 0 => $comm_in, -C => "$TMPDIR" });
+	for ($awk_out, $awk_in, $comm_in) { close $_ or die "close: $!" }
+
+	# batch needs to write sequentially, shortest (sha1) to longest (sha256)
+	# and rely on git sorting by default (no --unordered)
+	my @batch = qw(git _ cat-file --batch-all-objects --batch-check);
+	push @batch, '--buffer' if PublicInbox::Git::git_version() ge v2.6;
+	for my $hexlen (sort { $a <=> $b } keys %ALT_FH) {
+		close($ALT_FH{$hexlen}) or die "close: $!";
+		return if $DO_QUIT;
+		$batch[1] = "--git-dir=$TMPDIR/hexlen$hexlen.git";
+		my $pid = spawn(\@batch, undef, { 1 => $batch_out });
+		waitpid($pid, 0) == $pid or die "waitpid: $! (\$?=$?)";
+		$? and die "E: @batch failed: \$?=$?";
+	}
+	close $batch_out or die "close: $!";
+	waitpid($awk_pid, 0) == $awk_pid or die "waitpid: $! (\$?=$?)";
+	$? and die "E: @AWK failed: \$?=$?";
+	my @gone = map {
+		open my $fh, '+>', undef or die "open: $!";
+		$fh;
+	} @RDONLY_XDB;
+	while (defined(my $cmt = <$old>)) {
+		chomp $cmt;
+		my $n = hex(substr($cmt, 0, 8)) % scalar(@gone);
+		print { $gone[$n] } 'Q', $cmt, "\0" or die "print: $!";
+		return if $DO_QUIT;
+	}
+	close $old or die "E: comm: \$?=$?";
+	for my $git_dir (@GIT_DIR_GONE) {
+		my $n = git_dir_hash($git_dir) % scalar(@gone);
+		print { $gone[$n] } 'P', $git_dir, "\0" or die "print: $!";
+	}
 	my ($c, $p) = PublicInbox::PktOp->pair;
 	$c->{ops}->{prune_done} = [ $self ];
-	for my $s (@IDX_SHARDS) {
-		$s->wq_io_do('prune_start', [ $p->{op_p} ],
-				"$TMPDIR", @active_git_dir)
+	for my $n (0..$#gone) {
+		return if $DO_QUIT;
+		$gone[$n]->flush or die "flush: $!";
+		$IDX_SHARDS[$n]->wq_io_do('prune_do', [$gone[$n], $p->{op_p}]);
 	}
 }
 
@@ -858,7 +866,7 @@ sub cidx_run { # main entry point
 	local $PRUNE_DONE = [];
 	local $IDX_TODO = [];
 	local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
-		$GIT_TODO, $REPO_CTX, %ALT_FH, $TMPDIR, %HEXLEN2TMPGIT);
+		$GIT_TODO, $REPO_CTX, %ALT_FH, $TMPDIR);
 	local $BATCH_BYTES = $self->{-opt}->{batch_size} //
 				$PublicInbox::SearchIdx::BATCH_BYTES;
 	local @IDX_SHARDS = cidx_init($self);

             reply	other threads:[~2023-04-22  1:44 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-04-22  1:44 Eric Wong [this message]
  -- strict thread matches above, loose matches on Subject: below --
2023-04-22  9:07 [PATCH] cindex: rewrite prune, again Eric Wong
2023-04-22  2:51 Eric Wong
2023-04-21 12:49 Eric Wong
2023-04-21 12:45 Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20230422014425.1642941-1-e@80x24.org \
    --to=e@80x24.org \
    --cc=spew@80x24.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).