From: Eric Wong <e@80x24.org>
To: spew@80x24.org
Subject: [PATCH 10/10] cindex: use DS and workqueues for parallelism
Date: Thu, 16 Mar 2023 20:01:31 +0000 [thread overview]
Message-ID: <20230316200131.2113244-10-e@80x24.org> (raw)
In-Reply-To: <20230316200131.2113244-1-e@80x24.org>
This avoids forking new shard processes for each repo we scan,
but we can't avoid many excessive commits since we need to
ensure the `seen()' sub can avoid excessive work.
---
lib/PublicInbox/CodeSearchIdx.pm | 374 ++++++++++++++++++++-----------
1 file changed, 240 insertions(+), 134 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 02c9ed84..13fe1c28 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -14,9 +14,11 @@
# See PublicInbox::CodeSearch (read-only API) for more
package PublicInbox::CodeSearchIdx;
use v5.12;
-use parent qw(PublicInbox::Lock PublicInbox::CodeSearch PublicInbox::SearchIdx);
+# parent order matters, we want ->DESTROY from IPC, not SearchIdx
+use parent qw(PublicInbox::CodeSearch PublicInbox::IPC PublicInbox::SearchIdx);
use PublicInbox::Eml;
-use PublicInbox::DS ();
+use PublicInbox::DS qw(awaitpid);
+use PublicInbox::PktOp;
use PublicInbox::IPC qw(nproc_shards);
use PublicInbox::Admin;
use POSIX qw(WNOHANG SEEK_SET);
@@ -26,11 +28,19 @@ use PublicInbox::SHA qw(sha256_hex);
use PublicInbox::Search qw(xap_terms);
use PublicInbox::SearchIdx qw(add_val);
use PublicInbox::Config;
-use PublicInbox::Spawn qw(spawn);
+use PublicInbox::Spawn qw(spawn popen_rd);
use PublicInbox::OnDestroy;
-our $LIVE; # pid => callback
-our $LIVE_JOBS;
-our @XDB_SHARDS_FLAT;
+use Socket qw(MSG_EOR);
+use Carp ();
+our (
+ $LIVE, # pid => cmd
+ $DEFER, # [ [ cb, @args ], ... ]
+ $LIVE_JOBS, # integer
+ $MY_SIG, # like %SIG
+ $SIGSET,
+ @RDONLY_SHARDS, # Xapian::Database
+ @IDX_SHARDS # clones of self
+);
# stop walking history if we see >$SEEN_MAX existing commits, this assumes
# branches don't diverge by more than this number of commits...
@@ -110,14 +120,14 @@ sub progress {
$pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n");
}
-sub store_repo ($$$) {
- my ($self, $git, $repo) = @_;
- my $xdb = delete($repo->{shard})->idx_acquire;
- $xdb->begin_transaction;
+sub store_repo { # wq_do - returns docid
+ my ($self, $repo) = @_;
+ $self->begin_txn_lazy;
+ my $xdb = $self->{xdb};
for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed?
- if (defined $repo->{id}) {
- my $doc = $xdb->get_document($repo->{id}) //
- die "$git->{git_dir} doc #$repo->{id} gone";
+ if (defined $repo->{docid}) {
+ my $doc = $xdb->get_document($repo->{docid}) //
+ die "$repo->{git_dir} doc #$repo->{docid} gone";
add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct});
my %new = map { $_ => undef } @{$repo->{roots}};
my $old = xap_terms('G', $doc);
@@ -126,34 +136,38 @@ sub store_repo ($$$) {
delete @$old{@{$repo->{roots}}};
$doc->remove_term('G'.$_) for keys %$old;
$doc->set_data($repo->{fp});
- $xdb->replace_document($repo->{id}, $doc);
+ $xdb->replace_document($repo->{docid}, $doc);
+ $repo->{docid}
} else {
my $new = $PublicInbox::Search::X{Document}->new;
add_val($new, PublicInbox::CodeSearch::CT, $repo->{ct});
- $new->add_boolean_term("P$git->{git_dir}");
+ $new->add_boolean_term("P$repo->{git_dir}");
$new->add_boolean_term('T'.'r');
$new->add_boolean_term('G'.$_) for @{$repo->{roots}};
$new->set_data($repo->{fp}); # \n delimited
$xdb->add_document($new);
}
- $xdb->commit_transaction;
}
# sharded reader for `git log --pretty=format: --stdin'
-sub shard_worker ($$$) {
- my ($self, $r, $sigset) = @_;
+sub shard_index { # via wq_io_do
+ my ($self, $git, $n, $roots) = @_;
+ local $self->{current_info} = "$git->{git_dir} [$n]";
my ($quit, $cmt);
+ local $self->{roots} = $roots;
+ my $in = delete($self->{0}) // die 'BUG: no {0} input';
+ my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
my $batch_bytes = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
my $max = $batch_bytes;
- $SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import'
- $SIG{QUIT} = $SIG{TERM} = $SIG{INT} = sub { $quit = shift };
- PublicInbox::DS::sig_setmask($sigset);
-
- # the parent process of this shard process writes directly to
- # the stdin of `git log', we consume git log's stdout:
- my $rd = $self->{git}->popen(@LOG_STDIN, undef, { 0 => $r });
- close $r or die "close: $!";
+ my $set_quit = sub { $quit = shift };
+ local $SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import'
+ local $SIG{QUIT} = $set_quit;
+ local $SIG{TERM} = $set_quit;
+ local $SIG{INT} = $set_quit;
+ local $self->{git} = $git; # for patchid
+ my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
+ close $in or die "close: $!";
my $nr = 0;
# a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4
@@ -162,8 +176,7 @@ sub shard_worker ($$$) {
local $/ = $FS;
my $buf = <$rd> // return; # leading $FS
$buf eq $FS or die "BUG: not LF-NUL: $buf\n";
- my $xdb = $self->idx_acquire;
- $xdb->begin_transaction;
+ $self->begin_txn_lazy;
while (defined($buf = <$rd>)) {
chomp($buf);
$max -= length($buf);
@@ -174,24 +187,40 @@ sub shard_worker ($$$) {
++$nr;
if ($max <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
progress($self, $nr);
- $xdb->commit_transaction;
+ $self->{xdb}->commit_transaction;
$max = $batch_bytes;
- $xdb->begin_transaction;
+ $self->{xdb}->begin_transaction;
}
$/ = $FS;
}
close($rd);
if (!$? || ($quit && ($? & 127) == POSIX::SIGPIPE)) {
- $xdb->commit_transaction;
+ send($op_p, "shard_done $n", MSG_EOR);
} else {
warn "E: git @LOG_STDIN: \$?=$?\n";
- $xdb->cancel_transaction;
+ $self->{xdb}->cancel_transaction;
}
}
+sub shard_done { # called via PktOp on shard_index completion
+ my ($self, $n) = @_;
+ $self->{-shard_ok}->{$n} = 1 if defined($self->{-shard_ok});
+}
+
sub seen ($$) {
my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH"
- $xdb->postlist_begin($q) != $xdb->postlist_end($q)
+ for (1..100) {
+ my $ret = eval {
+ $xdb->postlist_begin($q) != $xdb->postlist_end($q);
+ };
+ return $ret unless $@;
+ if (ref($@) =~ /\bDatabaseModifiedError\b/) {
+ $xdb->reopen;
+ } else {
+ Carp::croak($@);
+ }
+ }
+ Carp::croak('too many Xapian DB modifications in progress');
}
# used to select the shard for a GIT_DIR
@@ -206,18 +235,42 @@ sub docids_by_postlist ($$) { # consider moving to PublicInbox::Search
@ids;
}
+sub run_todo ($) {
+ my ($self) = @_;
+ my $n;
+ while (defined(my $x = shift(@{$self->{todo} // []}))) {
+ my $cb = shift @$x;
+ $cb->(@$x);
+ ++$n;
+ }
+ $n;
+}
+
sub cidx_reap ($$) {
my ($self, $jobs) = @_;
- while (keys(%$LIVE) >= $jobs) {
- my $pid = waitpid(-1, 0) // die "waitpid(-1): $!";
- last if $pid < 0;
- if (my $x = delete $LIVE->{$pid}) {
- my $cb = shift @$x;
- $cb->(@$x) if $cb;
- } else {
- warn "reaped unknown PID=$pid ($?)\n";
- }
+ while (run_todo($self)) {}
+ my $cb = sub { keys(%$LIVE) > $jobs };
+ PublicInbox::DS->SetPostLoopCallback($cb);
+ PublicInbox::DS::event_loop($MY_SIG, $SIGSET) while $cb->();
+ while (!$jobs && run_todo($self)) {}
+}
+
+sub cidx_await_cb { # awaitpid cb
+ my ($pid, $cb, $self, $git, @args) = @_;
+ return if !$LIVE; # premature shutdown
+ my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd';
+ PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC
+ if ($?) {
+ $git->{-cidx_err} = 1;
+ return warn("@$cmd error: \$?=$?\n");
}
+ push(@$DEFER, [ $cb, $self, $git, @args ]) if $DEFER;
+}
+
+sub cidx_await ($$$$$@) {
+ my ($pid, $cmd, $cb, $self, $git, @args) = @_;
+ $LIVE->{$pid} = $cmd;
+ awaitpid($pid, \&cidx_await_cb, $cb, $self, $git, @args);
}
# this is different from the grokmirror-compatible fingerprint since we
@@ -227,13 +280,14 @@ sub fp_start ($$$) {
return if !$LIVE; # premature exit
cidx_reap($self, $LIVE_JOBS);
open my $refs, '+>', undef or die "open: $!";
- my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
- qw(show-ref --heads --tags --hash)], undef, { 1 => $refs });
+ my $cmd = ['git', "--git-dir=$git->{git_dir}",
+ qw(show-ref --heads --tags --hash)];
+ my $pid = spawn($cmd, undef, { 1 => $refs });
$git->{-repo}->{refs} = $refs;
- $LIVE->{$pid} = [ \&fp_fini, $self, $git, $prep_repo ];
+ cidx_await($pid, $cmd, \&fp_fini, $self, $git, $prep_repo);
}
-sub fp_fini {
+sub fp_fini { # cidx_await cb
my ($self, $git, $prep_repo) = @_;
my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
seek($refs, 0, SEEK_SET) or die "seek: $!";
@@ -247,13 +301,15 @@ sub ct_start ($$$) {
my ($self, $git, $prep_repo) = @_;
return if !$LIVE; # premature exit
cidx_reap($self, $LIVE_JOBS);
- my ($rd, $pid) = $git->popen([qw[for-each-ref --sort=-committerdate
+ my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
+ qw[for-each-ref --sort=-committerdate
--format=%(committerdate:raw) --count=1
- refs/heads/ refs/tags/]]);
- $LIVE->{$pid} = [ \&ct_fini, $self, $git, $rd, $prep_repo ];
+ refs/heads/ refs/tags/] ];
+ my ($rd, $pid) = popen_rd($cmd);
+ cidx_await($pid, $cmd, \&ct_fini, $self, $git, $rd, $prep_repo);
}
-sub ct_fini {
+sub ct_fini { # cidx_await cb
my ($self, $git, $rd, $prep_repo) = @_;
defined(my $ct = <$rd>) or return;
$ct =~ s/\s+.*\z//s; # drop TZ + LF
@@ -263,34 +319,38 @@ sub ct_fini {
# TODO: also index gitweb.owner and the full fingerprint for grokmirror?
sub prep_repo ($$) {
my ($self, $git) = @_;
- return if !$LIVE; # premature exit
+ return if !$LIVE || $git->{-cidx_err}; # premature exit
my $repo = $git->{-repo} // die 'BUG: no {-repo}';
- my $git_dir = $git->{git_dir};
if (!defined($repo->{ct})) {
- warn "W: $git_dir has no commits, skipping\n";
+ warn "W: $git->{git_dir} has no commits, skipping\n";
delete $git->{-repo};
return;
}
- my $n = git_dir_hash($git_dir) % $self->{nshard};
- my $shard = $repo->{shard} = bless { %$self, shard => $n }, ref($self);
+ my $n = git_dir_hash($git->{git_dir}) % $self->{nshard};
+ my $shard = bless { %$self, shard => $n }, ref($self);
+ $repo->{shard_n} = $n;
delete @$shard{qw(lockfh lock_path)};
- my $xdb = $XDB_SHARDS_FLAT[$n] // die "BUG: shard[$n] undef";
- $xdb->reopen;
- my @docids = docids_by_postlist({ xdb => $xdb }, 'P'.$git_dir);
+ local $shard->{xdb} = $RDONLY_SHARDS[$n] // die "BUG: shard[$n] undef";
+ $shard->retry_reopen(\&check_existing, $self, $git);
+}
+
+sub check_existing { # retry_reopen callback
+ my ($shard, $self, $git) = @_;
+ my @docids = docids_by_postlist($shard, 'P'.$git->{git_dir});
my $docid = shift(@docids) // return get_roots($self, $git);
- if (@docids) {
- warn "BUG: $git_dir indexed multiple times, culling\n";
- $repo->{to_delete} = \@docids; # XXX needed?
- }
- my $doc = $xdb->get_document($docid) //
- die "BUG: no #$docid ($git_dir)";
+ my $doc = $shard->{xdb}->get_document($docid) //
+ die "BUG: no #$docid ($git->{git_dir})";
my $old_fp = $doc->get_data;
- if ($old_fp eq $repo->{fp}) { # no change
- progress($self, "$git_dir unchanged");
+ if ($old_fp eq $git->{-repo}->{fp}) { # no change
+ progress($self, "$git->{git_dir} unchanged");
delete $git->{-repo};
return;
}
- $repo->{id} = $docid;
+ $git->{-repo}->{docid} = $docid;
+ if (@docids) {
+ warn "BUG: $git->{git_dir} indexed multiple times, culling\n";
+ $git->{-repo}->{to_delete} = \@docids; # XXX needed?
+ }
get_roots($self, $git);
}
@@ -304,12 +364,12 @@ sub partition_refs ($$$) {
$_->reopen;
open my $fh, '+>', undef or die "open: $!";
$fh;
- } @XDB_SHARDS_FLAT;
+ } @RDONLY_SHARDS;
while (defined(my $cmt = <$fh>)) {
chomp $cmt;
- my $n = hex(substr($cmt, 0, 8)) % scalar(@XDB_SHARDS_FLAT);
- if (seen($XDB_SHARDS_FLAT[$n], 'Q'.$cmt)) {
+ my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS);
+ if (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) {
last if ++$seen > $SEEN_MAX;
} else {
say { $shard_in[$n] } $cmt or die "say: $!";
@@ -330,9 +390,33 @@ sub partition_refs ($$$) {
die "git --git-dir=$git->{git_dir} rev-list: \$?=$?\n";
}
-sub index_repo {
+sub shard_commit { # via wq_io_do
+ my ($self, $n) = @_;
+ my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
+ $self->commit_txn_lazy;
+ send($op_p, "shard_done $n", MSG_EOR);
+}
+
+sub commit_used_shards ($$$) {
+ my ($self, $git, $consumers) = @_;
+ local $self->{-shard_ok} = {};
+ for my $n (keys %$consumers) {
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{shard_done} = [ $self ];
+ $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
+ $consumers->{$n} = $c;
+ }
+ PublicInbox::DS->SetPostLoopCallback(sub {
+ scalar(grep { $_->{sock} } values %$consumers);
+ });
+ PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+ my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
+ die "E: $git->{git_dir} $n shards failed" if $n;
+}
+
+sub index_repo { # cidx_await cb
my ($self, $git, $roots) = @_;
- return if !$LIVE; # premature exit
+ return if $git->{-cidx_err};
my $repo = delete $git->{-repo} or return;
seek($roots, 0, SEEK_SET) or die "seek: $!";
chomp(my @roots = <$roots>);
@@ -341,73 +425,45 @@ sub index_repo {
$repo->{roots} = \@roots;
local $self->{current_info} = $git->{git_dir};
my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
- my %pids;
- my $fwd_kill = sub {
- my ($sig) = @_;
- kill($sig, $_) for keys %pids;
- };
- local $SIG{USR1} = $fwd_kill;
- local $SIG{QUIT} = $fwd_kill;
- local $SIG{INT} = $fwd_kill;
- local $SIG{TERM} = $fwd_kill;
- my $sigset = PublicInbox::DS::block_signals();
- for (my $n = 0; $n <= $#shard_in; $n++) {
+ local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1
+ my %CONSUMERS;
+ for my $n (0..$#shard_in) {
-s $shard_in[$n] or next;
- my $pid = fork // die "fork: $!";
- if ($pid == 0) { # no RNG use, here
- $0 = "code index [$n]";
- $self->{git} = $git;
- $self->{shard} = $n;
- $self->{current_info} = "$self->{current_info} [$n]";
- delete @$self{qw(lockfh lock_path)};
- my $in = $shard_in[$n];
- @shard_in = ();
- $self->{roots} = \@roots;
- undef $repo;
- eval { shard_worker($self, $in, $sigset) };
- warn "E: $@" if $@;
- POSIX::_exit($@ ? 1 : 0);
- } else {
- $pids{$pid} = "code index [$n]";
- }
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{shard_done} = [ $self ];
+ $IDX_SHARDS[$n]->wq_io_do('shard_index',
+ [ $shard_in[$n], $p->{op_p} ],
+ $git, $n, \@roots);
+ $CONSUMERS{$n} = $c;
}
- PublicInbox::DS::sig_setmask($sigset);
@shard_in = ();
- my ($err, @todo);
- while (keys %pids) {
- my $pid = waitpid(-1, 0) // die "waitpid: $!";
- if (my $j = delete $pids{$pid}) {
- next if $? == 0;
- warn "PID:$pid $j exited with \$?=$?\n";
- $err = 1;
- } elsif (my $todo = delete $LIVE->{$pid}) {
- warn "PID:$pid exited with \$?=$?\n" if $?;
- push @todo, $todo;
- } else {
- warn "reaped unknown PID=$pid ($?)\n";
- }
- }
- die "subprocess(es) failed\n" if $err;
- store_repo($self, $git, $repo);
- progress($self, "$git->{git_dir}: done");
- # TODO: check fp afterwards?
- while (my $x = shift @todo) {
- my $cb = shift @$x;
- $cb->(@$x) if $cb;
+ PublicInbox::DS->SetPostLoopCallback(sub {
+ scalar(grep { $_->{sock} } values %CONSUMERS);
+ });
+ PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+ my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
+ die "E: $git->{git_dir} $n shards failed" if $n;
+ $repo->{git_dir} = $git->{git_dir};
+ my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
+ if ($id > 0) {
+ $CONSUMERS{$repo->{shard_n}} = undef;
+ commit_used_shards($self, $git, \%CONSUMERS);
+ progress($self, "$git->{git_dir}: done");
+ return run_todo($self);
}
+ die "E: store_repo $git->{git_dir}: id=$id";
}
sub get_roots ($$) {
my ($self, $git) = @_;
return if !$LIVE; # premature exit
- cidx_reap($self, $LIVE_JOBS);
my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
sysseek($refs, 0, SEEK_SET) or die "seek: $!";
open my $roots, '+>', undef or die "open: $!";
- my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
- qw(rev-list --stdin --max-parents=0)],
- undef, { 0 => $refs, 1 => $roots });
- $LIVE->{$pid} = [ \&index_repo, $self, $git, $roots ];
+ my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
+ qw(rev-list --stdin --max-parents=0) ];
+ my $pid = spawn($cmd, undef, { 0 => $refs, 1 => $roots });
+ cidx_await($pid, $cmd, \&index_repo, $self, $git, $roots);
}
# for PublicInbox::SearchIdx::patch_id and with_umask
@@ -434,9 +490,17 @@ sub cidx_init ($) {
warn "# creating $dir\n" if !$self->{-opt}->{quiet};
File::Path::mkpath($dir);
}
+ $self->lock_acquire;
+ my @shards;
for my $n (0..($self->{nshard} - 1)) {
my $shard = bless { %$self, shard => $n }, ref($self);
+ delete @$shard{qw(lockfh lock_path)};
$shard->idx_acquire;
+ $shard->idx_release;
+ $shard->wq_workers_start("shard[$n]", 1, undef, {
+ siblings => \@shards, # for ipc_atfork_child
+ }, \&shard_done_wait, $self);
+ push @shards, $shard;
}
# this warning needs to happen after idx_acquire
state $once;
@@ -444,14 +508,11 @@ sub cidx_init ($) {
W: Xapian v1.2.21..v1.2.24 were missing close-on-exec on OFD locks,
W: memory usage may be high for large indexing runs
EOM
+ @shards;
}
sub scan_git_dirs ($) {
my ($self) = @_;
- local $LIVE_JOBS = $self->{-opt}->{jobs} //
- PublicInbox::IPC::detect_nproc() // 2;
- local $LIVE = {};
- local @XDB_SHARDS_FLAT = $self->xdb_shards_flat;
for (@{$self->{git_dirs}}) {
my $git = PublicInbox::Git->new($_);
my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,
@@ -462,18 +523,31 @@ sub scan_git_dirs ($) {
cidx_reap($self, 0);
}
-sub cidx_run {
+sub shards_active { # PostLoopCallback
+ scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
+}
+
+sub cidx_run { # main entry point
my ($self) = @_;
- cidx_init($self);
+ local $self->{todo} = [];
+ local $DEFER = $self->{todo};
+ local $SIGSET = PublicInbox::DS::block_signals();
+ my $restore = PublicInbox::OnDestroy->new($$,
+ \&PublicInbox::DS::sig_setmask, $SIGSET);
+ local $LIVE = {};
+ local @IDX_SHARDS = cidx_init($self);
local $self->{current_info} = '';
my $cb = $SIG{__WARN__} || \&CORE::warn;
+ local $MY_SIG = {
+ CHLD => \&PublicInbox::DS::enqueue_reap,
+ INT => sub { exit },
+ };
local $SIG{__WARN__} = sub {
my $m = shift @_;
$self->{current_info} eq '' or
$m =~ s/\A(#?\s*)/$1$self->{current_info}: /;
$cb->($m, @_);
};
- $self->lock_acquire;
load_existing($self);
my @nc = grep { File::Spec->canonpath($_) ne $_ } @{$self->{git_dirs}};
if (@nc) {
@@ -486,9 +560,41 @@ sub cidx_run {
warn "E: canonicalized and attempting to continue\n";
}
local $self->{nchange} = 0;
+ local $LIVE_JOBS = $self->{-opt}->{jobs} ||
+ PublicInbox::IPC::detect_nproc() || 2;
+ local @RDONLY_SHARDS = $self->xdb_shards_flat;
+
# do_prune($self) if $self->{-opt}->{prune}; TODO
scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
+
+ for my $s (@IDX_SHARDS) {
+ $s->{-cidx_quit} = 1;
+ $s->wq_close;
+ }
+
+ PublicInbox::DS->SetPostLoopCallback(\&shards_active);
+ PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active();
$self->lock_release(!!$self->{nchange});
}
+sub ipc_atfork_child {
+ my ($self) = @_;
+ $self->SUPER::ipc_atfork_child;
+ my $x = delete $self->{siblings} // die 'BUG: no {siblings}';
+ $_->wq_close for @$x;
+}
+
+sub shard_done_wait { # awaitpid cb via ipc_worker_reap
+ my ($pid, $shard, $self) = @_;
+ delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
+ return unless $?;
+ warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
+ ++$self->{shard_err} if defined($self->{shard_err});
+}
+
+sub with_umask { # TODO
+ my ($self, $cb, @arg) = @_;
+ $cb->(@arg);
+}
+
1;
prev parent reply other threads:[~2023-03-16 20:01 UTC|newest]
Thread overview: 10+ messages / expand[flat|nested] mbox.gz Atom feed top
2023-03-16 20:01 [PATCH 01/10] ipc: move nproc_shards from v2writable Eric Wong
2023-03-16 20:01 ` [PATCH 02/10] search: relocate all_terms from lei_search Eric Wong
2023-03-16 20:01 ` [PATCH 03/10] admin: hoist out resolve_git_dir Eric Wong
2023-03-16 20:01 ` [PATCH 04/10] admin: ensure resolved GIT_DIR is absolute Eric Wong
2023-03-16 20:01 ` [PATCH 05/10] test_common: create_inbox: use `$!' properly on mkdir failure Eric Wong
2023-03-16 20:01 ` [PATCH 06/10] codesearch: initial cut w/ -cindex tool Eric Wong
2023-03-16 20:01 ` [PATCH 07/10] cindex: parallelize prep phases Eric Wong
2023-03-16 20:01 ` [PATCH 08/10] cindex: use read-only shards during " Eric Wong
2023-03-16 20:01 ` [PATCH 09/10] searchidxshard: improve comment wording Eric Wong
2023-03-16 20:01 ` Eric Wong [this message]
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20230316200131.2113244-10-e@80x24.org \
--to=e@80x24.org \
--cc=spew@80x24.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).