about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/CodeSearchIdx.pm99
1 files changed, 53 insertions, 46 deletions
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 555a1efe..ec0fc6e3 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -105,6 +105,7 @@ our (
         @JOIN_DT, # YYYYmmddHHMMSS for dt:
         $QRY_STR, # common query string for both code and inbox associations
         $DUMP_IBX_WPIPE, # goes to sort(1)
+        $ANY_SHARD, # shard round-robin for scan fingerprinting
         @OFF2ROOT,
 );
 
@@ -416,51 +417,42 @@ sub run_git {
 
 # this is different from the grokmirror-compatible fingerprint since we
 # only care about --heads (branches) and --tags, and not even their names
-sub fp_start ($$$) {
-        my ($self, $git, $prep_repo) = @_;
+sub fp_start ($$) {
+        my ($self, $git) = @_;
         return if $DO_QUIT;
         open my $refs, '+>', undef;
         $git->{-repo}->{refs} = $refs;
-        run_git([qw(show-ref --heads --tags --hash)], { 1 => $refs },
-                \&fp_fini, $self, $git, $prep_repo);
-}
-
-sub fp_fini { # run_git cb
-        my (undef, $self, $git, $prep_repo) = @_;
-        my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
-        sysseek($refs, 0, SEEK_SET);
-        $git->{-repo}->{fp} = sha_all(256, $refs)->hexdigest;
+        my ($c, $p) = PublicInbox::PktOp->pair;
+        my $next_on_err = PublicInbox::OnDestroy->new(\&index_next, $self);
+        $c->{ops}->{fp_done} = [ $self, $git, $next_on_err ];
+        $IDX_SHARDS[++$ANY_SHARD % scalar(@IDX_SHARDS)]->wq_io_do('fp_async',
+                                        [ $p->{op_p}, $refs ], $git->{git_dir})
 }
 
-sub ct_start ($$$) {
-        my ($self, $git, $prep_repo) = @_;
-        return if $DO_QUIT;
-        run_git([ qw[for-each-ref --sort=-committerdate
-                --format=%(committerdate:raw) --count=1
-                refs/heads/ refs/tags/] ], undef, # capture like qx
-                \&ct_fini, $self, $git, $prep_repo);
+sub fp_async { # via wq_io_do in worker
+        my ($self, $git_dir) = @_;
+        my $op_p = delete $self->{0} // die 'BUG: no {0} op_p';
+        my $refs = delete $self->{1} // die 'BUG: no {1} refs';
+        my $git = PublicInbox::Git->new($git_dir);
+        run_git([qw(show-ref --heads --tags --hash)], { 1 => $refs },
+                \&fp_async_done, $self, $git, $op_p);
 }
 
-sub ct_fini { # run_git cb
-        my ($opt, $self, $git, $prep_repo) = @_;
-        my ($ct) = split(/\s+/, ${$opt->{1}}); # drop TZ + LF
-        $git->{-repo}->{ct} = $ct + 0;
+sub fp_async_done { # run_git cb from worker
+        my ($opt, $self, $git, $op_p) = @_;
+        my $refs = delete $opt->{1} // 'BUG: no {-repo}->{refs}';
+        sysseek($refs, 0, SEEK_SET);
+        send($op_p, 'fp_done '.sha_all(256, $refs)->hexdigest, 0);
 }
 
-# TODO: also index gitweb.owner and the full fingerprint for grokmirror?
-sub prep_repo ($$) {
-        my ($self, $git) = @_;
+sub fp_done { # called parent via PktOp by fp_async_done
+        my ($self, $git, $next_on_err, $hex) = @_;
+        $next_on_err->cancel;
         return if $DO_QUIT;
-        return index_next($self) if $git->{-cidx_err};
-        my $repo = $git->{-repo} // die 'BUG: no {-repo}';
-        if (!defined($repo->{ct})) {
-                warn "W: $git->{git_dir} has no commits, skipping\n";
-                delete $git->{-repo};
-                return index_next($self);
-        }
+        $git->{-repo}->{fp} = $hex;
         my $n = git_dir_hash($git->{git_dir}) % scalar(@RDONLY_XDB);
         my $shard = bless { %$self, shard => $n }, ref($self);
-        $repo->{shard_n} = $n;
+        $git->{-repo}->{shard_n} = $n;
         delete @$shard{qw(lockfh lock_path)};
         local $shard->{xdb} = $RDONLY_XDB[$n] // die "BUG: shard[$n] undef";
         $shard->retry_reopen(\&check_existing, $self, $git);
@@ -469,7 +461,7 @@ sub prep_repo ($$) {
 sub check_existing { # retry_reopen callback
         my ($shard, $self, $git) = @_;
         my @docids = $shard->docids_of_git_dir($git->{git_dir});
-        my $docid = shift(@docids) // return get_roots($self, $git);
+        my $docid = shift(@docids) // return prep_repo($self, $git); # new repo
         my $doc = $shard->get_doc($docid) //
                         die "BUG: no #$docid ($git->{git_dir})";
         my $old_fp = $REINDEX ? "\0invalid" : $doc->get_data;
@@ -482,7 +474,7 @@ sub check_existing { # retry_reopen callback
                 warn "BUG: $git->{git_dir} indexed multiple times, culling\n";
                 $git->{-repo}->{to_delete} = \@docids; # XXX needed?
         }
-        get_roots($self, $git);
+        prep_repo($self, $git);
 }
 
 sub partition_refs ($$$) {
@@ -604,13 +596,9 @@ sub index_next ($) {
         my ($self) = @_;
         return if $DO_QUIT;
         if ($IDXQ && @$IDXQ) {
-                index_repo(undef, $self, shift @$IDXQ);
+                index_repo($self, shift @$IDXQ);
         } elsif ($SCANQ && @$SCANQ) {
-                my $git = shift @$SCANQ;
-                my $prep_repo = PublicInbox::OnDestroy->new(\&prep_repo,
-                                                                $self, $git);
-                fp_start($self, $git, $prep_repo);
-                ct_start($self, $git, $prep_repo);
+                fp_start $self, shift @$SCANQ;
         } elsif ($TMPDIR) {
                 delete $TODO{dump_roots_start};
                 delete $TODO{dump_ibx_start}; # runs OnDestroy once
@@ -649,12 +637,17 @@ sub index_done { # OnDestroy cb called when done indexing each code repo
         # repo_stored will fire once store_repo is done
 }
 
-sub index_repo { # run_git cb
-        my (undef, $self, $git) = @_;
+sub index_repo {
+        my ($self, $git) = @_;
         return if $DO_QUIT;
+        my $repo = $git->{-repo} // die 'BUG: no {-repo}';
         return index_next($self) if $git->{-cidx_err};
+        if (!defined($repo->{ct})) {
+                warn "W: $git->{git_dir} has no commits, skipping\n";
+                return index_next($self);
+        }
         return push(@$IDXQ, $git) if $REPO_CTX; # busy
-        my $repo = delete $git->{-repo} or return index_next($self);
+        delete $git->{-repo};
         my $roots_fh = delete $repo->{roots_fh} // die 'BUG: no {roots_fh}';
         seek($roots_fh, 0, SEEK_SET);
         chomp(my @roots = PublicInbox::IO::read_all $roots_fh);
@@ -685,15 +678,28 @@ sub index_repo { # run_git cb
         # shard_done fires when shard_index is done
 }
 
-sub get_roots ($$) {
+sub ct_fini { # run_git cb
+        my ($opt, $self, $git, $index_repo) = @_;
+        my ($ct) = split(/\s+/, ${$opt->{1}}); # drop TZ + LF
+        $git->{-repo}->{ct} = $ct + 0;
+}
+
+# TODO: also index gitweb.owner and the full fingerprint for grokmirror?
+sub prep_repo ($$) {
         my ($self, $git) = @_;
         return if $DO_QUIT;
+        my $index_repo = PublicInbox::OnDestroy->new(\&index_repo, $self, $git);
         my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
         sysseek($refs, 0, SEEK_SET);
         open my $roots_fh, '+>', undef;
         $git->{-repo}->{roots_fh} = $roots_fh;
         run_git([ qw(rev-list --stdin --max-parents=0) ],
-                { 0 => $refs, 1 => $roots_fh }, \&index_repo, $self, $git)
+                { 0 => $refs, 1 => $roots_fh }, \&PublicInbox::Config::noop,
+                $self, $git, $index_repo);
+        run_git([ qw[for-each-ref --sort=-committerdate
+                --format=%(committerdate:raw) --count=1
+                refs/heads/ refs/tags/] ], undef, # capture like qx
+                \&ct_fini, $self, $git, $index_repo);
 }
 
 # for PublicInbox::SearchIdx `git patch-id' call and with_umask
@@ -1295,6 +1301,7 @@ sub cidx_run { # main entry point
                 init_join_prefork($self)
         }
         local @IDX_SHARDS = cidx_init($self); # forks workers
+        local $ANY_SHARD = -1;
         local $self->{current_info} = '';
         local $MY_SIG = {
                 CHLD => \&PublicInbox::DS::enqueue_reap,