dumping ground for random patches and texts
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: spew@80x24.org
Subject: [PATCH 07/10] cindex: parallelize prep phases
Date: Thu, 16 Mar 2023 20:01:28 +0000	[thread overview]
Message-ID: <20230316200131.2113244-7-e@80x24.org> (raw)
In-Reply-To: <20230316200131.2113244-1-e@80x24.org>

Listing refs, fingerprinting and root scanning can all be
parallelized to reduce runtime on SMP systems.

We'll use DESTROY-based dependency management with
parallelizagion as in LeiMirror to handle ref listing and
fingerprinting before serializing Xapian DB access to check
against the existing fingerprint.

We'll also delay root listing until we get a fingerprint
mismatch to speed up no-op indexing.
---
 lib/PublicInbox/CodeSearchIdx.pm | 197 +++++++++++++++++++++----------
 1 file changed, 132 insertions(+), 65 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 218338da..a926886e 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -26,7 +26,10 @@ use PublicInbox::SHA qw(sha256_hex);
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::SearchIdx qw(add_val);
 use PublicInbox::Config;
-use PublicInbox::Spawn qw(run_die);
+use PublicInbox::Spawn qw(spawn);
+use PublicInbox::OnDestroy;
+our $LIVE; # pid => callback
+our $LIVE_JOBS;
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
 # branches don't diverge by more than this number of commits...
@@ -106,26 +109,27 @@ sub progress {
 	$pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n");
 }
 
-sub store_repo ($$) {
-	my ($self, $repo) = @_;
+sub store_repo ($$$) {
+	my ($self, $git, $repo) = @_;
 	my $xdb = delete($repo->{shard})->idx_acquire;
 	$xdb->begin_transaction;
+	for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed?
 	if (defined $repo->{id}) {
 		my $doc = $xdb->get_document($repo->{id}) //
-			die "$self->{git}->{git_dir} doc #$repo->{id} gone";
+			die "$git->{git_dir} doc #$repo->{id} gone";
 		add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct});
-		my %new = map { $_ => undef } @{$self->{roots}};
+		my %new = map { $_ => undef } @{$repo->{roots}};
 		my $old = xap_terms('G', $doc);
 		delete @new{keys %$old};
 		$doc->add_boolean_term('G'.$_) for keys %new;
-		delete @$old{@{$self->{roots}}};
+		delete @$old{@{$repo->{roots}}};
 		$doc->remove_term('G'.$_) for keys %$old;
 		$doc->set_data($repo->{fp});
 		$xdb->replace_document($repo->{id}, $doc);
 	} else {
 		my $new = $PublicInbox::Search::X{Document}->new;
 		add_val($new, PublicInbox::CodeSearch::CT, $repo->{ct});
-		$new->add_boolean_term("P$self->{git}->{git_dir}");
+		$new->add_boolean_term("P$git->{git_dir}");
 		$new->add_boolean_term('T'.'r');
 		$new->add_boolean_term('G'.$_) for @{$repo->{roots}};
 		$new->set_data($repo->{fp}); # \n delimited
@@ -201,75 +205,98 @@ sub docids_by_postlist ($$) { # consider moving to PublicInbox::Search
 	@ids;
 }
 
-sub get_roots ($$) {
-	my ($self, $refs) = @_;
-	my @roots = $self->{git}->qx([qw(rev-list --stdin --max-parents=0)],
-		undef, { 0 => $refs });
-	die "git rev-list \$?=$?" if $?;
-	sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
-	chomp(@roots);
-	scalar(@roots) ? \@roots : undef;
+sub cidx_reap ($$) {
+	my ($self, $jobs) = @_;
+	while (keys(%$LIVE) >= $jobs) {
+		my $pid = waitpid(-1, 0) // die "waitpid(-1): $!";
+		last if $pid < 0;
+		if (my $x = delete $LIVE->{$pid}) {
+			my $cb = shift @$x;
+			$cb->(@$x) if $cb;
+		} else {
+			warn "reaped unknown PID=$pid ($?)\n";
+		}
+	}
 }
 
 # this is different from the grokmirror-compatible fingerprint since we
 # only care about --heads (branches) and --tags, and not even their names
-sub cidx_fp ($) {
-	my ($self) = @_;
+sub fp_start ($$$) {
+	my ($self, $git, $prep_repo) = @_;
+	return if !$LIVE; # premature exit
+	cidx_reap($self, $LIVE_JOBS);
 	open my $refs, '+>', undef or die "open: $!";
-	run_die(['git', "--git-dir=$self->{git}->{git_dir}",
+	my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
 		qw(show-ref --heads --tags --hash)], undef, { 1 => $refs });
+	$git->{-repo}->{refs} = $refs;
+	$LIVE->{$pid} = [ \&fp_fini, $self, $git, $prep_repo ];
+}
+
+sub fp_fini {
+	my ($self, $git, $prep_repo) = @_;
+	my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
 	seek($refs, 0, SEEK_SET) or die "seek: $!";
 	my $buf;
 	my $dig = PublicInbox::SHA->new(256);
 	while (read($refs, $buf, 65536)) { $dig->add($buf) }
-	sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
-	($dig->hexdigest, $refs);
+	$git->{-repo}->{fp} = $dig->hexdigest;
 }
 
-# TODO: should we also index gitweb.owner and the full fingerprint for grokmirror?
-sub prep_git_dir ($) {
-	my ($self) = @_;
-	my $git_dir = $self->{git}->{git_dir};
-	my $ct = $self->{git}->qx([qw[for-each-ref
-		--sort=-committerdate --format=%(committerdate:raw) --count=1
+sub ct_start ($$$) {
+	my ($self, $git, $prep_repo) = @_;
+	return if !$LIVE; # premature exit
+	cidx_reap($self, $LIVE_JOBS);
+	my ($rd, $pid) = $git->popen([qw[for-each-ref --sort=-committerdate
+		--format=%(committerdate:raw) --count=1
 		refs/heads/ refs/tags/]]);
-	my $repo = {};
-	@$repo{qw(fp refs)} = cidx_fp($self);
-	$repo->{roots} = get_roots($self, $repo->{refs});
-	if (!$repo->{roots} || !defined($ct)) {
-		warn "W: $git_dir has no root commits, skipping\n";
+	$LIVE->{$pid} = [ \&ct_fini, $self, $git, $rd, $prep_repo ];
+}
+
+sub ct_fini {
+	my ($self, $git, $rd, $prep_repo) = @_;
+	defined(my $ct = <$rd>) or return;
+	$ct =~ s/\s+.*\z//s; # drop TZ + LF
+	$git->{-repo}->{ct} = $ct + 0;
+}
+
+# TODO: also index gitweb.owner and the full fingerprint for grokmirror?
+sub prep_repo ($$) {
+	my ($self, $git) = @_;
+	return if !$LIVE; # premature exit
+	my $repo = $git->{-repo} // die 'BUG: no {-repo}';
+	my $git_dir = $git->{git_dir};
+	if (!defined($repo->{ct})) {
+		warn "W: $git_dir has no commits, skipping\n";
+		delete $git->{-repo};
 		return;
 	}
-	$ct =~ s/ .*\z//s; # drop TZ
-	$repo->{ct} = $ct + 0;
 	my $n = git_dir_hash($git_dir) % $self->{nshard};
 	my $shard = $repo->{shard} = bless { %$self, shard => $n }, ref($self);
 	delete @$shard{qw(lockfh lock_path)};
 	local $shard->{xdb};
 	my $xdb = $shard->idx_acquire;
 	my @docids = docids_by_postlist($shard, 'P'.$git_dir);
-	my $docid = shift(@docids) // return $repo;
+	my $docid = shift(@docids) // return get_roots($self, $git);
 	if (@docids) {
 		warn "BUG: $git_dir indexed multiple times, culling\n";
-		$xdb->begin_transaction;
-		for (@docids) { $xdb->delete_document($_) }
-		$xdb->commit_transaction;
+		$repo->{to_delete} = \@docids; # XXX needed?
 	}
 	my $doc = $xdb->get_document($docid) //
 		die "BUG: no #$docid ($git_dir)";
 	my $old_fp = $doc->get_data;
 	if ($old_fp eq $repo->{fp}) { # no change
-		progress($self, 'unchanged');
+		progress($self, "$git_dir unchanged");
+		delete $git->{-repo};
 		return;
 	}
 	$repo->{id} = $docid;
-	$repo;
+	get_roots($self, $git);
 }
 
-sub partition_refs ($$) {
-	my ($self, $refs) = @_; # show-ref --heads --tags --hash output
-	my $fh = $self->{git}->popen(qw(rev-list --stdin), undef,
-					{ 0 => $refs });
+sub partition_refs ($$$) {
+	my ($self, $git, $refs) = @_; # show-ref --heads --tags --hash output
+	sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
+	my $fh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
 	close $refs or die "close: $!";
 	local $self->{xdb};
 	my $xdb = $self->{-opt}->{reindex} ? undef : $self->xdb;
@@ -292,22 +319,27 @@ sub partition_refs ($$) {
 	close($fh);
 	if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) {
 		$self->{nchange} += $nchange;
-		progress($self, "$nchange commits");
+		progress($self, "$git->{git_dir}: $nchange commits");
 		for my $fh (@shard_in) {
 			$fh->flush or die "flush: $!";
 			sysseek($fh, 0, SEEK_SET) or die "seek: $!";
 		}
 		return @shard_in;
 	}
-	die "git-rev-list: \$?=$?\n";
+	die "git --git-dir=$git->{git_dir} rev-list: \$?=$?\n";
 }
 
-sub index_git_dir ($$) {
-	my ($self, $git_dir) = @_;
-	local $self->{git} = PublicInbox::Git->new($git_dir); # for ->patch_id
-	my $repo = prep_git_dir($self) or return;
-	local $self->{current_info} = $git_dir;
-	my @shard_in = partition_refs($self, delete($repo->{refs}));
+sub index_repo {
+	my ($self, $git, $roots) = @_;
+	return if !$LIVE; # premature exit
+	my $repo = delete $git->{-repo} or return;
+	seek($roots, 0, SEEK_SET) or die "seek: $!";
+	chomp(my @roots = <$roots>);
+	close($roots) or die "close: $!";
+	@roots or return warn("E: $git->{git_dir} has no root commits\n");
+	$repo->{roots} = \@roots;
+	local $self->{current_info} = $git->{git_dir};
+	my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
 	my %pids;
 	my $fwd_kill = sub {
 		my ($sig) = @_;
@@ -323,12 +355,13 @@ sub index_git_dir ($$) {
 		my $pid = fork // die "fork: $!";
 		if ($pid == 0) { # no RNG use, here
 			$0 = "code index [$n]";
+			$self->{git} = $git;
 			$self->{shard} = $n;
 			$self->{current_info} = "$self->{current_info} [$n]";
 			delete @$self{qw(lockfh lock_path)};
 			my $in = $shard_in[$n];
 			@shard_in = ();
-			$self->{roots} = delete $repo->{roots};
+			$self->{roots} = \@roots;
 			undef $repo;
 			eval { shard_worker($self, $in, $sigset) };
 			warn "E: $@" if $@;
@@ -339,18 +372,41 @@ sub index_git_dir ($$) {
 	}
 	PublicInbox::DS::sig_setmask($sigset);
 	@shard_in = ();
-	my $err;
+	my ($err, @todo);
 	while (keys %pids) {
-		my $pid = waitpid(-1, 0) or last;
-		my $j = delete $pids{$pid} // "unknown PID:$pid";
-		next if $? == 0;
-		warn "PID:$pid $j exited with \$?=$?\n";
-		$err = 1;
+		my $pid = waitpid(-1, 0) // die "waitpid: $!";
+		if (my $j = delete $pids{$pid}) {
+			next if $? == 0;
+			warn "PID:$pid $j exited with \$?=$?\n";
+			$err = 1;
+		} elsif (my $todo = delete $LIVE->{$pid}) {
+			warn "PID:$pid exited with \$?=$?\n" if $?;
+			push @todo, $todo;
+		} else {
+			warn "reaped unknown PID=$pid ($?)\n";
+		}
 	}
 	die "subprocess(es) failed\n" if $err;
-	store_repo($self, $repo);
-	progress($self, 'done');
+	store_repo($self, $git, $repo);
+	progress($self, "$git->{git_dir}: done");
 	# TODO: check fp afterwards?
+	while (my $x = shift @todo) {
+		my $cb = shift @$x;
+		$cb->(@$x) if $cb;
+	}
+}
+
+sub get_roots ($$) {
+	my ($self, $git) = @_;
+	return if !$LIVE; # premature exit
+	cidx_reap($self, $LIVE_JOBS);
+	my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
+	sysseek($refs, 0, SEEK_SET) or die "seek: $!";
+	open my $roots, '+>', undef or die "open: $!";
+	my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
+			qw(rev-list --stdin --max-parents=0)],
+			undef, { 0 => $refs, 1 => $roots });
+	$LIVE->{$pid} = [ \&index_repo, $self, $git, $roots ];
 }
 
 # for PublicInbox::SearchIdx::patch_id and with_umask
@@ -389,6 +445,21 @@ W: memory usage may be high for large indexing runs
 EOM
 }
 
+sub scan_git_dirs ($) {
+	my ($self) = @_;
+	local $LIVE_JOBS = $self->{-opt}->{jobs} //
+			PublicInbox::IPC::detect_nproc() // 2;
+	local $LIVE = {};
+	for (@{$self->{git_dirs}}) {
+		my $git = PublicInbox::Git->new($_);
+		my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,
+							$self, $git);
+		fp_start($self, $git, $prep_repo);
+		ct_start($self, $git, $prep_repo);
+	}
+	cidx_reap($self, 0);
+}
+
 sub cidx_run {
 	my ($self) = @_;
 	cidx_init($self);
@@ -414,11 +485,7 @@ sub cidx_run {
 	}
 	local $self->{nchange} = 0;
 	# do_prune($self) if $self->{-opt}->{prune}; TODO
-	if ($self->{-opt}->{scan} // 1) {
-		for my $gd (@{$self->{git_dirs}}) {
-			index_git_dir($self, $gd);
-		}
-	}
+	scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
 	$self->lock_release(!!$self->{nchange});
 }
 

  parent reply	other threads:[~2023-03-16 20:01 UTC|newest]

Thread overview: 11+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-03-16 20:01 [PATCH 01/10] ipc: move nproc_shards from v2writable Eric Wong
2023-03-16 20:01 ` [PATCH 02/10] search: relocate all_terms from lei_search Eric Wong
2023-03-16 20:01 ` [PATCH 03/10] admin: hoist out resolve_git_dir Eric Wong
2023-03-16 20:01 ` [PATCH 04/10] admin: ensure resolved GIT_DIR is absolute Eric Wong
2023-03-16 20:01 ` [PATCH 05/10] test_common: create_inbox: use `$!' properly on mkdir failure Eric Wong
2023-03-16 20:01 ` [PATCH 06/10] codesearch: initial cut w/ -cindex tool Eric Wong
2023-03-16 20:01 ` Eric Wong [this message]
2023-03-16 20:01 ` [PATCH 08/10] cindex: use read-only shards during prep phases Eric Wong
2023-03-16 20:01 ` [PATCH 09/10] searchidxshard: improve comment wording Eric Wong
2023-03-16 20:01 ` [PATCH 10/10] cindex: use DS and workqueues for parallelism Eric Wong
  -- strict thread matches above, loose matches on Subject: below --
2023-03-15 10:10 [PATCH 01/10] ipc: move nproc_shards from v2writable Eric Wong
2023-03-15 10:11 ` [PATCH 07/10] cindex: parallelize prep phases Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20230316200131.2113244-7-e@80x24.org \
    --to=e@80x24.org \
    --cc=spew@80x24.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).