dumping ground for random patches and texts
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: spew@80x24.org
Subject: [PATCH] cindex associate wip
Date: Tue, 15 Aug 2023 07:40:59 +0000	[thread overview]
Message-ID: <20230815074059.3034065-1-e@80x24.org> (raw)

---
 lib/PublicInbox/CodeSearchIdx.pm | 398 ++++++++++++++++++++++++++++---
 script/public-inbox-cindex       |   4 +-
 2 files changed, 363 insertions(+), 39 deletions(-)

diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index ba14e52a..e7c5cd17 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -11,6 +11,26 @@
 #
 # We shard repos using the first 32-bits of sha256($ABS_GIT_DIR)
 #
+# --associate joins root commits of coderepos to inboxes based on prefixes.
+#
+# Internally, each inbox is assigned a non-negative integer index ($IBX_ID),
+# and each root commit object ID (SHA-1/SHA-256 hex) is also assigned
+# a non-negative integer index ($ROOT_COMMIT_OID_ID).
+#
+# associate dumps to 2 intermediate files in $TMPDIR:
+#
+# * to_root_id - each line is of the format:
+#
+#	$PFX $ROOT_COMMIT_OID_ID
+#
+# * to_ibx_id - each line is of the format:
+#
+#	$PFX $IBX_ID
+#
+# In both cases, $PFX is typically the value of the patchid (XDFID) but it
+# can be configured to use any combination of patchid, dfpre, dfpost or
+# dfblob.
+#
 # See PublicInbox::CodeSearch (read-only API) for more
 package PublicInbox::CodeSearchIdx;
 use v5.12;
@@ -32,7 +52,7 @@ use PublicInbox::CidxLogP;
 use PublicInbox::CidxComm;
 use PublicInbox::Git qw(%OFMT2HEXLEN);
 use PublicInbox::Compat qw(uniqstr);
-use Socket qw(MSG_EOR);
+use Socket qw(MSG_EOR AF_UNIX SOCK_SEQPACKET);
 use Carp ();
 our (
 	$LIVE, # pid => cmd
@@ -55,8 +75,15 @@ our (
 	%ALT_FH, # hexlen => tmp IO for TMPDIR git alternates
 	$TMPDIR, # File::Temp->newdir object for prune
 	@PRUNE_QUEUE, # GIT_DIRs to prepare for pruning
-	$PRUNE_ENV, # env for awk(1), comm(1), sort(1) commands during prune
+	%TODO, @IBXDIRQ, @IBXDIRS,
+	@JOIN, # join(1) command for associate
+	$CMD_ENV, # env for awk(1), comm(1), sort(1) commands during prune
 	@AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands
+	@ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST
+	$QRY_STR, # common query string for both code and inbox associations
+	$IBXDIR_FEED, # SOCK_SEQPACKET
+	@DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK, # for associate
+	@ID2ROOT,
 );
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
@@ -64,6 +91,9 @@ our (
 # git walks commits quickly if it doesn't have to read trees
 our $SEEN_MAX = 100000;
 
+# window for commits/emails to determine a inbox <-> coderepo association
+my $ASSOC_MAX = 50000;
+
 our @PRUNE_BATCH = qw(git _ cat-file --batch-all-objects --batch-check);
 
 # TODO: do we care about committer name + email? or tree OID?
@@ -455,6 +485,186 @@ sub shard_commit { # via wq_io_do
 	send($op_p, "shard_done $self->{shard}", MSG_EOR);
 }
 
+sub dump_shard_roots_done { # via PktOp on dump_shard_roots completion
+	my ($self, $associate, $n) = @_;
+	return if $DO_QUIT;
+	progress($self, "dump_shard_roots [$n] done");
+	$DUMP_SHARD_ROOTS_OK[$n] = 1;
+	# may run associate()
+}
+
+sub assoc_max_init ($) {
+	my ($self) = @_;
+	my $max = $self->{-opt}->{'associate-max'} // $ASSOC_MAX;
+	$max = $ASSOC_MAX if !$max;
+	$max < 0 ? ((2 ** 31) - 1) : $max;
+}
+
+# dump the patchids of each shard: $XDFID $ROOT1 $ROOT2..
+sub dump_shard_roots { # via wq_io_do for associate
+	my ($self, $root2id, $qry_str) = @_;
+	my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
+	my $w = delete($self->{1}) // die 'BUG: no {1} $w sort pipe';
+	local $0 = "$0 dump_shard_roots";
+	# sort lock is necessary if we have may root ids which cause a
+	# row length to exceed POSIX PIPE_BUF (via `$G' below)
+	my $sort_lock = bless { lock_path => "$TMPDIR/to_root_id.lock" },
+		'PublicInbox::Lock';
+	$w->autoflush(1);
+	my $xdb = $self->begin_txn_lazy;
+	my $qry = $self->cqparse_new->parse_query($qry_str, $self->{qp_flags});
+	$qry = $PublicInbox::Search::X{Query}->new(
+					PublicInbox::Search::OP_FILTER(),
+					$qry, 'T'.'c');
+	my $enq = $PublicInbox::Search::X{Enquire}->new($xdb);
+	$enq->set_query($qry);
+	$enq->set_weighting_scheme($PublicInbox::Search::X{BoolWeight}->new);
+	$enq->set_docid_order($PublicInbox::Search::ENQ_DESCENDING);
+	my ($off, $lim) = (0, 10000);
+	my $max = assoc_max_init($self);
+	my $buf;
+	while ($max > 0) {
+		$lim = $max if $max < $lim;
+		my $mset = $enq->get_mset($off, $lim);
+		my $size = $mset->size or last;
+		$buf = ''; # resets w/o free(3)-ing
+		for my $x ($mset->items) {
+			my $doc = $x->get_document;
+			my $G = join(' ', map {
+				$root2id->{pack('H*', $_)};
+			} xap_terms('G', $doc));
+			for my $p (@ASSOC_PFX) {
+				$buf .= "$_ $G\n" for (xap_terms($p, $doc));
+			}
+		}
+		$sort_lock->lock_acquire_fast;
+		print $w $buf or die "print: $!";
+		$sort_lock->lock_release_fast;
+		$max -= $size;
+		$off += $size;
+		progress($self, "dump_shard_roots [$self->{shard}] $off");
+	};
+	send($op_p, "dump_shard_roots_done $self->{shard}", MSG_EOR);
+}
+
+sub dump_roots_once {
+	my ($self, $associate) = @_;
+	$associate // die 'BUG: no $associate';
+	$TODO{associating} = 1;
+	progress($self, 'dumping IDs from coderepos');
+	local $self->{xdb};
+	my $x = $self->xdb->allterms_begin('G');
+	my $end = $self->xdb->allterms_end('G');
+	my $id = 0;
+	my (%root2id, $oidbin, $oidhex);
+	for (; $x != $end; $x++) {
+		$oidbin = pack('H*', $oidhex = substr($x->get_termname, 1));
+		progress($self, "root $oidhex => $id");
+		push @ID2ROOT, $oidbin;
+		$root2id{$oidbin} = $id++;
+	}
+	pipe(my ($r, $w)) or die "pipe: $!";
+	my @sort = (@SORT, '-k1,1');
+	my $dst = "$TMPDIR/to_root_id";
+	open my $fh, '>', $dst or die "open($dst): $!";
+	my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $r, 1 => $fh });
+	warn "# $sort_pid = @sort";
+	close $r or die "close: $!";
+	awaitpid($sort_pid, \&cmd_done, ['dr', @sort], $associate);
+	my ($c, $p) = PublicInbox::PktOp->pair;
+	$c->{ops}->{dump_shard_roots_done} = [ $self, $associate ];
+	my @arg = ('dump_shard_roots', [ $p->{op_p}, $w ], \%root2id, $QRY_STR);
+	$_->wq_io_do(@arg) for @IDX_SHARDS;
+	progress($self, 'waiting on dump_shard_roots sort');
+}
+
+sub recv_ibx_done { # via PktOp on recv_ibx completion
+	my ($self, $pid, $n) = @_;
+	return if $DO_QUIT;
+	progress($self, "recv_ibx [$n] done");
+	$RECV_IBX_OK[$n] = 1;
+}
+
+# causes a worker to become a dumper for inbox/extindex
+sub recv_ibx { # wq_io_do
+	my ($self, $qry_str) = @_;
+	my ($op_p, $r_ibx, $sort_w) = delete @$self{0..2};
+	$op_p // die 'BUG: no $op_p';
+	$r_ibx // die 'BUG: no $r_ibx';
+	$sort_w // die 'BUG: no $sort_w';
+
+	# writes to this pipe are never longer than POSIX PIPE_BUF,
+	# so rely on POSIX atomicity guarantees
+	$sort_w->autoflush(1);
+	while (1) { # reads dump_ibx
+		recv($r_ibx, my $d, 4096 + 128, 0) // die "recv: $!";
+		warn "$$ recv: $d>\n";
+		last if $d eq ''; # EOF
+		send($op_p, 'index_next', MSG_EOR); # kick the parent for more
+		$d =~ s/\A([0-9]+)=// or die 'BUG: no ibx_idx';
+		my $ibx_id = $1;
+		my $ibx;
+		if (-f "$d/ei.lock") {
+			require PublicInbox::ExtSearch;
+			$ibx = PublicInbox::ExtSearch->new($d);
+		} elsif (-f "$d/inbox.lock" || -d "$d/public-inbox") {
+			$ibx = PublicInbox::Inbox->new({ inboxdir => $d });
+		}
+		my $srch = $ibx ? $ibx->search : do {
+			warn "W: no search index: $d (ignoring)\n";
+			undef;
+		};
+		my $max = $srch ? assoc_max_init($self) : 0;
+		progress($self, "dumping $d...") if $max;
+		my $opt = { limit => 10000, offset => 0, relevance => -2 };
+		while ($max > 0) {
+			$opt->{limit} = $max if $max < $opt->{limit};
+			my $mset = $srch->mset($qry_str, $opt);
+			for my $x ($mset->items) {
+				my $doc = $x->get_document;
+				for my $p (@ASSOC_PFX) {
+					for (xap_terms($p, $doc)) {
+						print $sort_w "$_ $ibx_id\n" or
+							die "print: $!";
+					}
+				}
+			}
+			my $size = $mset->size or last;
+			$max -= $size;
+			$opt->{offset} += $size;
+		}
+	}
+	send($op_p, "recv_ibx_done $self->{shard}", MSG_EOR);
+}
+
+sub dump_ibx { # sends to recv_ibx loop
+	my ($self, $id_dir) = @_; # id_dir: "$IBX_ID=$INBOXDIR"
+	my $n = length($id_dir);
+	my $w = send($IBXDIR_FEED, $id_dir, MSG_EOR) // die "send: $!";
+	$n == $w or die "send($id_dir) $w != $n";
+}
+
+# repurpose shard workers to dump inbox patchids with perfect balance
+sub dump_ibx_start {
+	my ($self, $associate) = @_;
+	pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
+	my @sort = (@SORT, '-k1,1');
+	my $dst = "$TMPDIR/to_ibx_id";
+	open my $fh, '>', $dst or die "open($dst): $!";
+	my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fh });
+	close $sort_r or die "close: $!";
+	awaitpid($sort_pid, \&cmd_done, [ 'dibx', @sort ], $associate);
+
+	my ($r, $w);
+	socketpair($r, $w, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
+	my ($c, $p) = PublicInbox::PktOp->pair;
+	$c->{ops}->{recv_ibx_done} = [ $self, $associate ];
+	$c->{ops}->{index_next} = [ $self ];
+	my $io = [ $p->{op_p}, $r, $sort_w ];
+	$_->wq_io_do('recv_ibx', $io, $QRY_STR) for @IDX_SHARDS;
+	$IBXDIR_FEED = $w;
+}
+
 sub index_next ($) {
 	my ($self) = @_;
 	return if $DO_QUIT;
@@ -466,6 +676,12 @@ sub index_next ($) {
 							$self, $git);
 		fp_start($self, $git, $prep_repo);
 		ct_start($self, $git, $prep_repo);
+	} elsif ($TMPDIR) {
+		delete $TODO{dump_ibx_start}; # runs OnDestroy once
+		return dump_ibx($self, shift @IBXDIRQ) if @IBXDIRQ;
+		progress($self, 'done dumping inboxes') if $IBXDIR_FEED;
+		undef $IBXDIR_FEED; # done dumping inboxes, dump roots
+		dump_roots_once($self, delete($TODO{associate}) // return);
 	}
 	# else: wait for shards_active (post_loop_do) callback
 }
@@ -502,7 +718,7 @@ sub commit_shard { # OnDestroy cb
 	for my $n (keys %$active) {
 		$IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]);
 	}
-	undef $p; # shard_done fires when all shards are committed
+	# shard_done fires when all shards are committed
 }
 
 sub index_repo { # cidx_await cb
@@ -629,7 +845,6 @@ EOM
 sub scan_git_dirs ($) {
 	my ($self) = @_;
 	@$GIT_TODO = @{$self->{git_dirs}};
-	index_next($self) for (1..$LIVE_JOBS);
 }
 
 sub prune_do { # via wq_io_do in IDX_SHARDS
@@ -661,7 +876,7 @@ sub shards_active { # post_loop_do
 	return if grep(defined, $PRUNE_DONE, $GIT_TODO, $IDX_TODO, $LIVE) != 4;
 	return 1 if grep(defined, @$PRUNE_DONE) != @IDX_SHARDS;
 	return 1 if scalar(@$GIT_TODO) || scalar(@$IDX_TODO) || $REPO_CTX;
-	return 1 if keys(%$LIVE);
+	return 1 if keys(%$LIVE) || @IBXDIRQ || keys(%TODO);
 	for my $s (grep { $_->{-wq_s1} } @IDX_SHARDS) {
 		$s->{-cidx_quit} = 1 if defined($s->{-wq_s1});
 		$s->wq_close; # may recurse via awaitpid outside of event_loop
@@ -674,6 +889,7 @@ sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) }
 
 sub parent_quit {
 	$DO_QUIT = POSIX->can("SIG$_[0]")->();
+	$IBXDIR_FEED = undef;
 	kill_shards(@_);
 	warn "# SIG$_[0] received, quitting...\n";
 }
@@ -717,6 +933,7 @@ sub prep_alternate_end { # awaitpid callback for config extensions.objectFormat
 E: ignoring objdir=$objdir, unknown extensions.objectFormat=$fmt
 EOM
 	unless ($ALT_FH{$hexlen}) {
+		require PublicInbox::Import;
 		my $git_dir = "$TMPDIR/hexlen$hexlen.git";
 		PublicInbox::Import::init_bare($git_dir, 'cidx-all', $fmt);
 		my $f = "$git_dir/objects/info/alternates";
@@ -739,52 +956,124 @@ sub prep_alternate_start {
 	awaitpid($pid, \&prep_alternate_end, $o, $out, $run_prune);
 }
 
-sub prune_cmd_done { # awaitpid cb for sort, xapian-delve, sed failures
-	my ($pid, $cmd, $run_prune) = @_;
+sub cmd_done { # awaitpid cb for sort, xapian-delve, sed failures
+	my ($pid, $cmd, $run_on_destroy) = @_;
+	warn "# @$cmd pid=$pid \$?=$?";
 	$? and die "@$cmd failed: \$?=$?";
+	# $run_on_destroy calls associate or run_prune
+}
+
+# runs once all inboxes and shards are dumped via OnDestroy
+sub associate {
+	my ($self) = @_;
+	@IDX_SHARDS or return warn("# aborting on no shards\n");
+	grep(/1/, @DUMP_SHARD_ROOTS_OK) == @IDX_SHARDS or
+		die "E: shards not dumped properly\n";
+	grep(/1/, @RECV_IBX_OK) == @IDX_SHARDS or
+		die "E: inboxes not dumped properly\n";
+	progress($self, 'associating...');
+	system "ls -Rl $TMPDIR >&2";
+	warn "# Waiting for $TMPDIR/cont @JOIN";
+	#sleep(1) until -f "$TMPDIR/cont";
+	my @join = (@JOIN, 'to_ibx_id', 'to_root_id');
+	my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" });
+	my %score;
+	while (<$rd>) { # PFX ibx_id root_id
+		my (undef, $ibx_id, @root_id) = split(/ /, $_);
+		++$score{"$ibx_id $_"} for @root_id;
+	}
+	close $rd or die "@join failed: $?=$?";
+	my $min = $self->{-opt}->{'assoc-min'} // 10;
+	progress($self, scalar(keys %score).' potential pairings...');
+	for my $k (keys %score) {
+		my $nr = $score{$k};
+		my ($ibx_id, $root) = split(/ /, $k);
+		my $d = $IBXDIRS[$ibx_id];
+		$root = unpack('H*', $ID2ROOT[$root]);
+		if ($nr < $min) {
+			progress($self,
+"$d => $root only has $nr matches of $min required");
+			next;
+		} else {
+			progress($self, "$d => $root has $nr matches");
+		}
+	}
+	delete $TODO{associating};
+	# PublicInbox::DS::enqueue_reap();
+	# TODO
+	system "ls -Rl $TMPDIR >&2";
+	# warn "# Waiting for $TMPDIR/cont";
+	# sleep(1) until -f "$TMPDIR/cont";
+}
+
+sub require_progs {
+	my $op = shift;
+	while (my ($x, $argv) = splice(@_, 0, 2)) {
+		my $e = $x;
+		$e =~ tr/a-z-/A-Z_/;
+		my $c = $ENV{$e} // $x;
+		$argv->[0] //= which($c) // die "E: `$x' required for --$op\n";
+	}
+}
+
+sub init_associate_postfork ($) {
+	my ($self) = @_;
+	return unless $self->{-opt}->{associate};
+	require PublicInbox::ExtSearch;
+	require PublicInbox::Inbox;
+	require_progs('associate', join => \@JOIN);
+	$QRY_STR = $self->{-opt}->{'associate-date-range'} // '1.year.ago..';
+	substr($QRY_STR, 0, 0) = 'dt:';
+	scalar(@{$self->{git_dirs} //  []}) or die <<EOM;
+E: no coderepos to associate
+EOM
+	my $approx_git = PublicInbox::Git->new($self->{git_dirs}->[0]); # ugh
+	$self->query_approxidate($approx_git, $QRY_STR); # in-place
+	$TODO{associate} = PublicInbox::OnDestroy->new($$, \&associate, $self);
+	$TODO{dump_ibx_start} = PublicInbox::OnDestroy->new($$,
+				\&dump_ibx_start, $self, $TODO{associate});
+	if ($self->{-opt}->{all}) {
+		my $cfg = PublicInbox::Config->new;
+		my @k = grep(/\Apublicinbox\.(?:.+)\.(?:inboxdir|mainrepo)\z/,
+				keys %$cfg);
+		push @k, grep(/\Aextindex\.(?:.+)\.topdir\z/, keys %$cfg);
+		@IBXDIRS = map {
+			ref ? () : (File::Spec->canonpath($_))
+		} @$cfg{@k};
+	}
+	for (@{$self->{-opt}->{include} // []}) {
+		push @IBXDIRS, File::Spec->canonpath($_);
+	}
+	@IBXDIRS = uniqstr @IBXDIRS;
+	my $id = -1;
+	@IBXDIRQ = map { ++$id; "$id=$_" } @IBXDIRS;
 }
 
 sub init_prune ($) {
 	my ($self) = @_;
 	return (@$PRUNE_DONE = map { 1 } @IDX_SHARDS) if !$self->{-opt}->{prune};
 
-	require File::Temp;
-	require PublicInbox::Import;
-	$TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
-
 	# Dealing with millions of commits here at once, so use faster tools.
 	# xapian-delve is nearly an order-of-magnitude faster than Xapian Perl
 	# bindings.  sed/awk are faster than Perl for simple stream ops, and
 	# sort+comm are more memory-efficient with gigantic lists
 	my @delve = (undef, qw(-A Q -1));
 	my @sed = (undef, '-ne', 's/^Q//p');
-	@SORT = (undef, '-u');
 	@COMM = (undef, qw(-2 -3 indexed_commits -));
 	@AWK = (undef, '$2 == "commit" { print $1 }'); # --batch-check output
-	my @x = ('xapian-delve' => \@delve, sed => \@sed,
-		sort => \@SORT, comm => \@COMM, awk => \@AWK);
-	while (my ($x, $argv) = splice(@x, 0, 2)) {
-		my $e = $x;
-		$e =~ tr/a-z-/A-Z_/;
-		my $c = $ENV{$e} // $x;
-		$argv->[0] = which($c) // die "E: `$x' required for --prune\n";
-	}
+	require_progs('prune', 'xapian-delve' => \@delve, sed => \@sed,
+			comm => \@COMM, awk => \@AWK);
 	for (0..$#IDX_SHARDS) { push @delve, "$self->{xpfx}/$_" }
-	for (qw(parallel compress-program buffer-size)) { # GNU sort options
-		my $v = $self->{-opt}->{"sort-$_"};
-		push @SORT, "--$_=$v" if defined $v;
-	}
 	my $run_prune = PublicInbox::OnDestroy->new($$, \&run_prune, $self);
 	pipe(my ($sed_in, $delve_out)) or die "pipe: $!";
 	pipe(my ($sort_in, $sed_out)) or die "pipe: $!";
 	open(my $sort_out, '+>', "$TMPDIR/indexed_commits") or die "open: $!";
-	$PRUNE_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' };
-	my $pid = spawn(\@SORT, $PRUNE_ENV, { 0 => $sort_in, 1 => $sort_out });
-	awaitpid($pid, \&prune_cmd_done, \@SORT, $run_prune);
-	$pid = spawn(\@sed, $PRUNE_ENV, { 0 => $sed_in, 1 => $sed_out });
-	awaitpid($pid, \&prune_cmd_done, \@sed, $run_prune);
+	my $pid = spawn(\@SORT, $CMD_ENV, { 0 => $sort_in, 1 => $sort_out });
+	awaitpid($pid, \&cmd_done, \@SORT, $run_prune);
+	$pid = spawn(\@sed, $CMD_ENV, { 0 => $sed_in, 1 => $sed_out });
+	awaitpid($pid, \&cmd_done, \@sed, $run_prune);
 	$pid = spawn(\@delve, undef, { 1 => $delve_out });
-	awaitpid($pid, \&prune_cmd_done, \@delve, $run_prune);
+	awaitpid($pid, \&cmd_done, \@delve, $run_prune);
 	@PRUNE_QUEUE = @{$self->{git_dirs}};
 	for (1..$LIVE_JOBS) {
 		prep_alternate_start(shift(@PRUNE_QUEUE) // last, $run_prune);
@@ -809,14 +1098,14 @@ sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done
 	pipe(my ($awk_in, $batch_out)) or die "pipe: $!";
 	pipe(my ($sort_in, $awk_out)) or die "pipe: $!";
 	pipe(my ($comm_in, $sort_out)) or die "pipe: $!";
-	my $awk_pid = spawn(\@AWK, $PRUNE_ENV, { 0 => $awk_in, 1 => $awk_out });
-	my $sort_pid = spawn(\@SORT, $PRUNE_ENV,
+	my $awk_pid = spawn(\@AWK, $CMD_ENV, { 0 => $awk_in, 1 => $awk_out });
+	my $sort_pid = spawn(\@SORT, $CMD_ENV,
 				{ 0 => $sort_in, 1 => $sort_out });
-	my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $PRUNE_ENV,
+	my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $CMD_ENV,
 				{ 0 => $comm_in, -C => "$TMPDIR" });
-	awaitpid($awk_pid, \&prune_cmd_done, \@AWK);
-	awaitpid($sort_pid, \&prune_cmd_done, \@SORT);
-	awaitpid($comm_pid, \&prune_cmd_done, \@COMM);
+	awaitpid($awk_pid, \&cmd_done, \@AWK);
+	awaitpid($sort_pid, \&cmd_done, \@SORT);
+	awaitpid($comm_pid, \&cmd_done, \@COMM);
 	PublicInbox::CidxComm->new($comm_rd, $self); # calls cidx_read_comm
 	my $git_ver = PublicInbox::Git::git_version();
 	push @PRUNE_BATCH, '--buffer' if $git_ver ge v2.6;
@@ -856,6 +1145,22 @@ sub cidx_read_comm { # via PublicInbox::CidxComm::event_step
 	for (@gone) { close $_ or die "close: $!" };
 }
 
+sub init_associate_prefork ($) {
+	my ($self) = @_;
+	return unless $self->{-opt}->{associate};
+	my @unknown;
+	my @pfx = @{$self->{-opt}->{'associate-prefixes'} // [ 'patchid' ]};
+	for (map { split(/\s*,\s*/) } @pfx) {
+		my $v = $PublicInbox::Search::PATCH_BOOL_COMMON{$_} //
+			push(@unknown, $_);
+		push(@ASSOC_PFX, split(/ /, $v));
+	}
+	die <<EOM if @unknown;
+--associate-prefixes contains unsupported prefixes: @unknown
+EOM
+	@ASSOC_PFX = uniqstr @ASSOC_PFX;
+}
+
 sub cidx_run { # main entry point
 	my ($self) = @_;
 	my $restore_umask = prep_umask($self);
@@ -868,10 +1173,24 @@ sub cidx_run { # main entry point
 	local $IDX_TODO = [];
 	local $GIT_TODO = [];
 	local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
-		$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, @SORT, $PRUNE_ENV);
+		$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
+		%TODO, @IBXDIRQ, @JOIN, @ASSOC_PFX, $IBXDIR_FEED, @ID2ROOT,
+		@DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK);
 	local $BATCH_BYTES = $self->{-opt}->{batch_size} //
 				$PublicInbox::SearchIdx::BATCH_BYTES;
-	local @IDX_SHARDS = cidx_init($self);
+	local @SORT = (undef, '-u');
+	if (grep { $_ } @{$self->{-opt}}{qw(prune associate)}) {
+		require File::Temp;
+		$TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
+		$CMD_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' };
+		require_progs('(prune|associate)', sort => \@SORT);
+		for (qw(parallel compress-program buffer-size)) { # GNU sort
+			my $v = $self->{-opt}->{"sort-$_"};
+			push @SORT, "--$_=$v" if defined $v;
+		}
+		init_associate_prefork($self)
+	}
+	local @IDX_SHARDS = cidx_init($self); # forks workers
 	local $self->{current_info} = '';
 	local $MY_SIG = {
 		CHLD => \&PublicInbox::DS::enqueue_reap,
@@ -919,7 +1238,9 @@ sub cidx_run { # main entry point
 			PublicInbox::IPC::detect_nproc() || 2;
 	local @RDONLY_XDB = $self->xdb_shards_flat;
 	init_prune($self);
+	init_associate_postfork($self);
 	scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
+	index_next($self) for (1..$LIVE_JOBS);
 
 	# FreeBSD ignores/discards SIGCHLD while signals are blocked and
 	# EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending
@@ -928,6 +1249,7 @@ sub cidx_run { # main entry point
 	local @PublicInbox::DS::post_loop_do = (\&shards_active);
 	PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active();
 	PublicInbox::DS->Reset;
+	warn "$$ OK DONE";
 	$self->lock_release(!!$NCHANGE);
 }
 
diff --git a/script/public-inbox-cindex b/script/public-inbox-cindex
index 2f7796e7..888c8b10 100755
--- a/script/public-inbox-cindex
+++ b/script/public-inbox-cindex
@@ -26,8 +26,10 @@ See public-inbox-cindex(1) man page for full documentation.
 EOF
 my $opt = { fsync => 1, scan => 1 }; # --no-scan is hidden
 GetOptions($opt, qw(quiet|q verbose|v+ reindex jobs|j=i fsync|sync! dangerous
-		indexlevel|index-level|L=s
+		indexlevel|index-level|L=s associate associate-max=i
+		associate-date-range=s associate-prefixes=s@
 		batch_size|batch-size=s max_size|max-size=s
+		include|I=s@ only=s@ all
 		project-list=s exclude=s@
 		sort-parallel=s sort-compress-program=s sort-buffer-size=s
 		d=s update|u scan! prune dry-run|n C=s@ help|h))

                 reply	other threads:[~2023-08-15  7:41 UTC|newest]

Thread overview: [no followups] expand[flat|nested]  mbox.gz  Atom feed

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20230815074059.3034065-1-e@80x24.org \
    --to=e@80x24.org \
    --cc=spew@80x24.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).