diff options
-rw-r--r-- | lib/PublicInbox/CodeSearchIdx.pm | 99 |
1 files changed, 53 insertions, 46 deletions
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 555a1efe..ec0fc6e3 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -105,6 +105,7 @@ our ( @JOIN_DT, # YYYYmmddHHMMSS for dt: $QRY_STR, # common query string for both code and inbox associations $DUMP_IBX_WPIPE, # goes to sort(1) + $ANY_SHARD, # shard round-robin for scan fingerprinting @OFF2ROOT, ); @@ -416,51 +417,42 @@ sub run_git { # this is different from the grokmirror-compatible fingerprint since we # only care about --heads (branches) and --tags, and not even their names -sub fp_start ($$$) { - my ($self, $git, $prep_repo) = @_; +sub fp_start ($$) { + my ($self, $git) = @_; return if $DO_QUIT; open my $refs, '+>', undef; $git->{-repo}->{refs} = $refs; - run_git([qw(show-ref --heads --tags --hash)], { 1 => $refs }, - \&fp_fini, $self, $git, $prep_repo); -} - -sub fp_fini { # run_git cb - my (undef, $self, $git, $prep_repo) = @_; - my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}'; - sysseek($refs, 0, SEEK_SET); - $git->{-repo}->{fp} = sha_all(256, $refs)->hexdigest; + my ($c, $p) = PublicInbox::PktOp->pair; + my $next_on_err = PublicInbox::OnDestroy->new(\&index_next, $self); + $c->{ops}->{fp_done} = [ $self, $git, $next_on_err ]; + $IDX_SHARDS[++$ANY_SHARD % scalar(@IDX_SHARDS)]->wq_io_do('fp_async', + [ $p->{op_p}, $refs ], $git->{git_dir}) } -sub ct_start ($$$) { - my ($self, $git, $prep_repo) = @_; - return if $DO_QUIT; - run_git([ qw[for-each-ref --sort=-committerdate - --format=%(committerdate:raw) --count=1 - refs/heads/ refs/tags/] ], undef, # capture like qx - \&ct_fini, $self, $git, $prep_repo); +sub fp_async { # via wq_io_do in worker + my ($self, $git_dir) = @_; + my $op_p = delete $self->{0} // die 'BUG: no {0} op_p'; + my $refs = delete $self->{1} // die 'BUG: no {1} refs'; + my $git = PublicInbox::Git->new($git_dir); + run_git([qw(show-ref --heads --tags --hash)], { 1 => $refs }, + \&fp_async_done, $self, $git, $op_p); } -sub ct_fini { # run_git cb - my ($opt, $self, $git, $prep_repo) = @_; - my ($ct) = split(/\s+/, ${$opt->{1}}); # drop TZ + LF - $git->{-repo}->{ct} = $ct + 0; +sub fp_async_done { # run_git cb from worker + my ($opt, $self, $git, $op_p) = @_; + my $refs = delete $opt->{1} // 'BUG: no {-repo}->{refs}'; + sysseek($refs, 0, SEEK_SET); + send($op_p, 'fp_done '.sha_all(256, $refs)->hexdigest, 0); } -# TODO: also index gitweb.owner and the full fingerprint for grokmirror? -sub prep_repo ($$) { - my ($self, $git) = @_; +sub fp_done { # called parent via PktOp by fp_async_done + my ($self, $git, $next_on_err, $hex) = @_; + $next_on_err->cancel; return if $DO_QUIT; - return index_next($self) if $git->{-cidx_err}; - my $repo = $git->{-repo} // die 'BUG: no {-repo}'; - if (!defined($repo->{ct})) { - warn "W: $git->{git_dir} has no commits, skipping\n"; - delete $git->{-repo}; - return index_next($self); - } + $git->{-repo}->{fp} = $hex; my $n = git_dir_hash($git->{git_dir}) % scalar(@RDONLY_XDB); my $shard = bless { %$self, shard => $n }, ref($self); - $repo->{shard_n} = $n; + $git->{-repo}->{shard_n} = $n; delete @$shard{qw(lockfh lock_path)}; local $shard->{xdb} = $RDONLY_XDB[$n] // die "BUG: shard[$n] undef"; $shard->retry_reopen(\&check_existing, $self, $git); @@ -469,7 +461,7 @@ sub prep_repo ($$) { sub check_existing { # retry_reopen callback my ($shard, $self, $git) = @_; my @docids = $shard->docids_of_git_dir($git->{git_dir}); - my $docid = shift(@docids) // return get_roots($self, $git); + my $docid = shift(@docids) // return prep_repo($self, $git); # new repo my $doc = $shard->get_doc($docid) // die "BUG: no #$docid ($git->{git_dir})"; my $old_fp = $REINDEX ? "\0invalid" : $doc->get_data; @@ -482,7 +474,7 @@ sub check_existing { # retry_reopen callback warn "BUG: $git->{git_dir} indexed multiple times, culling\n"; $git->{-repo}->{to_delete} = \@docids; # XXX needed? } - get_roots($self, $git); + prep_repo($self, $git); } sub partition_refs ($$$) { @@ -604,13 +596,9 @@ sub index_next ($) { my ($self) = @_; return if $DO_QUIT; if ($IDXQ && @$IDXQ) { - index_repo(undef, $self, shift @$IDXQ); + index_repo($self, shift @$IDXQ); } elsif ($SCANQ && @$SCANQ) { - my $git = shift @$SCANQ; - my $prep_repo = PublicInbox::OnDestroy->new(\&prep_repo, - $self, $git); - fp_start($self, $git, $prep_repo); - ct_start($self, $git, $prep_repo); + fp_start $self, shift @$SCANQ; } elsif ($TMPDIR) { delete $TODO{dump_roots_start}; delete $TODO{dump_ibx_start}; # runs OnDestroy once @@ -649,12 +637,17 @@ sub index_done { # OnDestroy cb called when done indexing each code repo # repo_stored will fire once store_repo is done } -sub index_repo { # run_git cb - my (undef, $self, $git) = @_; +sub index_repo { + my ($self, $git) = @_; return if $DO_QUIT; + my $repo = $git->{-repo} // die 'BUG: no {-repo}'; return index_next($self) if $git->{-cidx_err}; + if (!defined($repo->{ct})) { + warn "W: $git->{git_dir} has no commits, skipping\n"; + return index_next($self); + } return push(@$IDXQ, $git) if $REPO_CTX; # busy - my $repo = delete $git->{-repo} or return index_next($self); + delete $git->{-repo}; my $roots_fh = delete $repo->{roots_fh} // die 'BUG: no {roots_fh}'; seek($roots_fh, 0, SEEK_SET); chomp(my @roots = PublicInbox::IO::read_all $roots_fh); @@ -685,15 +678,28 @@ sub index_repo { # run_git cb # shard_done fires when shard_index is done } -sub get_roots ($$) { +sub ct_fini { # run_git cb + my ($opt, $self, $git, $index_repo) = @_; + my ($ct) = split(/\s+/, ${$opt->{1}}); # drop TZ + LF + $git->{-repo}->{ct} = $ct + 0; +} + +# TODO: also index gitweb.owner and the full fingerprint for grokmirror? +sub prep_repo ($$) { my ($self, $git) = @_; return if $DO_QUIT; + my $index_repo = PublicInbox::OnDestroy->new(\&index_repo, $self, $git); my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}'; sysseek($refs, 0, SEEK_SET); open my $roots_fh, '+>', undef; $git->{-repo}->{roots_fh} = $roots_fh; run_git([ qw(rev-list --stdin --max-parents=0) ], - { 0 => $refs, 1 => $roots_fh }, \&index_repo, $self, $git) + { 0 => $refs, 1 => $roots_fh }, \&PublicInbox::Config::noop, + $self, $git, $index_repo); + run_git([ qw[for-each-ref --sort=-committerdate + --format=%(committerdate:raw) --count=1 + refs/heads/ refs/tags/] ], undef, # capture like qx + \&ct_fini, $self, $git, $index_repo); } # for PublicInbox::SearchIdx `git patch-id' call and with_umask @@ -1295,6 +1301,7 @@ sub cidx_run { # main entry point init_join_prefork($self) } local @IDX_SHARDS = cidx_init($self); # forks workers + local $ANY_SHARD = -1; local $self->{current_info} = ''; local $MY_SIG = { CHLD => \&PublicInbox::DS::enqueue_reap, |