about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/PublicInbox/CodeSearchIdx.pm237
-rw-r--r--lib/PublicInbox/SearchIdx.pm12
2 files changed, 147 insertions, 102 deletions
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 9e70087e..035fab3e 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -39,12 +39,22 @@ our (
         $MY_SIG, # like %SIG
         $SIGSET,
         $TXN_BYTES, # number of bytes in current shard transaction
+        $BATCH_BYTES,
         $DO_QUIT, # signal number
-        @RDONLY_SHARDS, # Xapian::Database
+        @RDONLY_XDB, # Xapian::Database
         @IDX_SHARDS, # clones of self
         $MAX_SIZE,
         $TMP_GIT, # PublicInbox::Git object for --prune
         $REINDEX, # PublicInbox::SharedKV
+        @GIT_DIR_GONE, # [ git_dir1, git_dir2 ]
+        %TO_PRUNE, # (docid => docid) mapping (hash in case of retry_reopen)
+        $PRUNE_CUR, # per-shard document ID
+        $PRUNE_MAX, # per-shard document ID to stop at
+        $PRUNE_OP_P, # prune_done() notification socket
+        $PRUNE_NR, # total number pruned
+        @PRUNE_DONE, # marks off prune completions
+        $NCHANGE, # current number of changes
+        %ACTIVE_GIT_DIR, # GIT_DIR => undef mapping for prune
 );
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
@@ -137,7 +147,7 @@ sub store_repo { # wq_do - returns docid
         my $xdb = $self->{xdb};
         for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed?
         if (defined $repo->{docid}) {
-                my $doc = $xdb->get_document($repo->{docid}) //
+                my $doc = $self->get_doc($repo->{docid}) //
                         die "$repo->{git_dir} doc #$repo->{docid} gone";
                 add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct});
                 my %new = map { $_ => undef } @{$repo->{roots}};
@@ -160,12 +170,21 @@ sub store_repo { # wq_do - returns docid
         }
 }
 
-sub cidx_ckpoint ($$) {
+sub cidx_ckpoint ($;$) {
         my ($self, $msg) = @_;
-        progress($self, $msg);
+        progress($self, $msg) if defined($msg);
+        $TXN_BYTES = $BATCH_BYTES; # reset
+        if (my @to_prune = values(%TO_PRUNE)) {
+                %TO_PRUNE = ();
+                $PRUNE_NR += scalar(@to_prune);
+                progress($self,
+                  "prune [$self->{shard}] $PRUNE_NR ($PRUNE_CUR/$PRUNE_MAX)");
+                $self->begin_txn_lazy;
+                $self->{xdb}->delete_document($_) for @to_prune;
+        }
         return if $PublicInbox::Search::X{CLOEXEC_UNSET};
-        $self->{xdb}->commit_transaction;
-        $self->{xdb}->begin_transaction;
+        $self->commit_txn_lazy;
+        $self->begin_txn_lazy;
 }
 
 sub truncate_cmt ($$) {
@@ -198,17 +217,15 @@ EOM
 }
 
 # sharded reader for `git log --pretty=format: --stdin'
-sub shard_index { # via wq_io_do
+sub shard_index { # via wq_io_do in IDX_SHARDS
         my ($self, $git, $n, $roots) = @_;
         local $self->{current_info} = "$git->{git_dir} [$n]";
         local $self->{roots} = $roots;
         my $in = delete($self->{0}) // die 'BUG: no {0} input';
         my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
-        my $batch_bytes = $self->{-opt}->{batch_size} //
-                                $PublicInbox::SearchIdx::BATCH_BYTES;
         local $MAX_SIZE = $self->{-opt}->{max_size};
         # local-ized in parent before fork
-        $TXN_BYTES = $batch_bytes;
+        $TXN_BYTES = $BATCH_BYTES;
         local $self->{git} = $git; # for patchid
         return if $DO_QUIT;
         my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
@@ -233,17 +250,13 @@ sub shard_index { # via wq_io_do
                 } else {
                         @$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
                 }
-                $TXN_BYTES -= $len;
-                if ($TXN_BYTES <= 0) {
+                if (($TXN_BYTES -= $len) <= 0) {
                         cidx_ckpoint($self, "[$n] $nr");
-                        $TXN_BYTES = $batch_bytes - $len;
+                        $TXN_BYTES -= $len; # len may be huge, >TXN_BYTES;
                 }
                 update_commit($self, $cmt);
                 ++$nr;
-                if ($TXN_BYTES <= 0) {
-                        cidx_ckpoint($self, "[$n] $nr");
-                        $TXN_BYTES = $batch_bytes;
-                }
+                cidx_ckpoint($self, "[$n] $nr") if $TXN_BYTES <= 0;
                 $/ = $FS;
         }
         close($rd);
@@ -261,6 +274,21 @@ sub shard_done { # called via PktOp on shard_index completion
         $self->{-shard_ok}->{$n} = 1 if defined($self->{-shard_ok});
 }
 
+sub prune_done { # called via PktOp->event_step completion
+        my ($shard) = @_;
+        $PRUNE_DONE[$shard->{shard}] = 1;
+}
+
+sub prune_busy {
+        return if $DO_QUIT;
+        grep(defined, @PRUNE_DONE) != @IDX_SHARDS;
+}
+
+sub await_prune () {
+        local @PublicInbox::DS::post_loop_do = (\&prune_busy);
+        PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if prune_busy();
+}
+
 sub seen ($$) {
         my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH"
         for (1..100) {
@@ -390,7 +418,7 @@ sub prep_repo ($$) {
         my $shard = bless { %$self, shard => $n }, ref($self);
         $repo->{shard_n} = $n;
         delete @$shard{qw(lockfh lock_path)};
-        local $shard->{xdb} = $RDONLY_SHARDS[$n] // die "BUG: shard[$n] undef";
+        local $shard->{xdb} = $RDONLY_XDB[$n] // die "BUG: shard[$n] undef";
         $shard->retry_reopen(\&check_existing, $self, $git);
 }
 
@@ -398,7 +426,7 @@ sub check_existing { # retry_reopen callback
         my ($shard, $self, $git) = @_;
         my @docids = docids_by_postlist($shard, 'P'.$git->{git_dir});
         my $docid = shift(@docids) // return get_roots($self, $git);
-        my $doc = $shard->{xdb}->get_document($docid) //
+        my $doc = $shard->get_doc($docid) //
                         die "BUG: no #$docid ($git->{git_dir})";
         my $old_fp = $REINDEX ? "\0invalid" : $doc->get_data;
         if ($old_fp eq $git->{-repo}->{fp}) { # no change
@@ -418,24 +446,24 @@ sub partition_refs ($$$) {
         sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
         my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
         close $refs or die "close: $!";
-        my ($seen, $nchange) = (0, 0);
+        my $seen = 0;
         my @shard_in = map {
                 $_->reopen;
                 open my $fh, '+>', undef or die "open: $!";
                 $fh;
-        } @RDONLY_SHARDS;
+        } @RDONLY_XDB;
 
         while (defined(my $cmt = <$rfh>)) {
                 chomp $cmt;
-                my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS);
+                my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_XDB);
                 if ($REINDEX && $REINDEX->set_maybe(pack('H*', $cmt), '')) {
                         say { $shard_in[$n] } $cmt or die "say: $!";
-                        ++$nchange;
-                } elsif (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) {
+                        ++$NCHANGE;
+                } elsif (seen($RDONLY_XDB[$n], 'Q'.$cmt)) {
                         last if ++$seen > $SEEN_MAX;
                 } else {
                         say { $shard_in[$n] } $cmt or die "say: $!";
-                        ++$nchange;
+                        ++$NCHANGE;
                         $seen = 0;
                 }
                 if ($DO_QUIT) {
@@ -446,8 +474,7 @@ sub partition_refs ($$$) {
         close($rfh);
         return () if $DO_QUIT;
         if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) {
-                $self->{nchange} += $nchange;
-                progress($self, "$git->{git_dir}: $nchange commits");
+                progress($self, "$git->{git_dir}: $NCHANGE commits");
                 for my $fh (@shard_in) {
                         $fh->flush or die "flush: $!";
                         sysseek($fh, 0, SEEK_SET) or die "seek: $!";
@@ -548,25 +575,25 @@ sub git { $_[0]->{git} }
 
 sub load_existing ($) { # for -u/--update
         my ($self) = @_;
-        my $dirs = $self->{git_dirs} // [];
+        my $dirs = $self->{git_dirs} //= [];
         if ($self->{-opt}->{update} || $self->{-opt}->{prune}) {
                 local $self->{xdb};
                 $self->xdb or
                         die "E: $self->{cidx_dir} non-existent for --update\n";
-                my @missing;
                 my @cur = grep {
                         if (-e $_) {
                                 1;
                         } else {
-                                push @missing, $_;
+                                push @GIT_DIR_GONE, $_;
                                 undef;
                         }
                 } $self->all_terms('P');
-                @missing = () if $self->{-opt}->{prune};
-                @missing and warn "W: the following repos no longer exist:\n",
-                                (map { "W:\t$_\n" } @missing),
+                if (@GIT_DIR_GONE && !$self->{-opt}->{prune}) {
+                        warn "W: the following repos no longer exist:\n",
+                                (map { "W:\t$_\n" } @GIT_DIR_GONE),
                                 "W: use --prune to remove them from ",
                                 $self->{cidx_dir}, "\n";
+                }
                 push @$dirs, @cur;
         }
         my %uniq; # List::Util::uniq requires Perl 5.26+
@@ -586,13 +613,12 @@ sub cidx_init ($) {
         }
         $self->lock_acquire;
         my @shards;
-        local $TXN_BYTES;
         for my $n (0..($self->{nshard} - 1)) {
                 my $shard = bless { %$self, shard => $n }, ref($self);
                 delete @$shard{qw(lockfh lock_path)};
                 $shard->idx_acquire;
                 $shard->idx_release;
-                $shard->wq_workers_start("shard[$n]", 1, $SIGSET, {
+                $shard->wq_workers_start("cidx shard[$n]", 1, $SIGSET, {
                         siblings => \@shards, # for ipc_atfork_child
                 }, \&shard_done_wait, $self);
                 push @shards, $shard;
@@ -621,80 +647,79 @@ sub scan_git_dirs ($) {
 
 sub prune_cb { # git->check_async callback
         my ($hex, $type, undef, $self_id) = @_;
-        return if $type eq 'commit';
         my ($self, $id) = @$self_id;
+        return if $type eq 'commit';
+        progress($self, "$hex $type #$id") if ($self->{-opt}->{verbose}//0) > 1;
         my $len = $self->{xdb}->get_doclength($id);
-        progress($self, "$hex $type (doclength=$len)");
-        ++$self->{pruned};
-        $self->{xdb}->delete_document($id);
+        $TO_PRUNE{$id} = $id;
 
-        # all math around batch_bytes calculation is pretty fuzzy,
+        # all math around TXN_BYTES calculation is pretty fuzzy,
         # but need a way to regularly flush output to avoid OOM,
         # so assume the average term + position overhead is the
         # answer to everything: 42
-        return if ($self->{batch_bytes} -= ($len * 42)) > 0;
-        cidx_ckpoint($self, "[$self->{shard}] $self->{pruned}");
-        $self->{batch_bytes} = $self->{-opt}->{batch_size} //
-                        $PublicInbox::SearchIdx::BATCH_BYTES;
-}
-
-sub shard_prune { # via wq_io_do
-        my ($self, $n, $git_dir) = @_;
-        my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
-        my $git = PublicInbox::Git->new($git_dir); # TMP_GIT copy
-        $self->begin_txn_lazy;
-        my $xdb = $self->{xdb};
-        my $cur = $xdb->postlist_begin('Tc');
-        my $end = $xdb->postlist_end('Tc');
-        my ($id, @cmt, $oid);
-        local $self->{batch_bytes} = $self->{-opt}->{batch_size} //
-                                $PublicInbox::SearchIdx::BATCH_BYTES;
-        local $self->{pruned} = 0;
-        for (; $cur != $end && !$DO_QUIT; $cur++) {
-                @cmt = xap_terms('Q', $xdb, $id = $cur->get_docid);
-                scalar(@cmt) == 1 or
-                        warn "BUG? shard[$n] #$id has multiple commits: @cmt";
-                for $oid (@cmt) {
-                        $git->check_async($oid, \&prune_cb, [ $self, $id ]);
-                }
+        cidx_ckpoint($self) if ($TXN_BYTES -= ($len * 42)) <= 0;
+}
+
+sub prune_git_dir ($$$) {
+        my ($self, $id, $doc) = @_;
+        my @P = xap_terms('P', $doc);
+        scalar(@P) == 1 or warn
+"BUG? shard[$self->{shard}] #$id has zero or multiple paths: @P";
+        for my $P (@P) {
+                next if exists($ACTIVE_GIT_DIR{$P}) && -d $P;
+                $TO_PRUNE{$id} = $id;
+                progress($self, "$P gone #$id");
+                my $len = $self->{xdb}->get_doclength($id);
+                cidx_ckpoint($self) if ($TXN_BYTES -= ($len * 42)) <= 0;
         }
-        $git->async_wait_all;
-        for my $d ($self->all_terms('P')) { # GIT_DIR paths
-                last if $DO_QUIT;
-                next if -d $d;
-                for $id (docids_by_postlist($self, 'P'.$d)) {
-                        progress($self, "$d gone #$id");
-                        $xdb->delete_document($id);
-                }
-        }
-        $self->commit_txn_lazy;
-        $self->{pruned} and
-                progress($self, "[$n] pruned $self->{pruned} commits");
-        send($op_p, "shard_done $n", MSG_EOR);
 }
 
-sub do_prune ($) {
+sub event_step { # may be requeued via DS
         my ($self) = @_;
-        my $consumers = {};
-        my $git_dir = $TMP_GIT->{git_dir};
-        my $n = 0;
-        local $self->{-shard_ok} = {};
-        for my $s (@IDX_SHARDS) {
-                my ($c, $p) = PublicInbox::PktOp->pair;
-                $c->{ops}->{shard_done} = [ $self ];
-                $s->wq_io_do('shard_prune', [ $p->{op_p} ], $n, $git_dir);
-                $consumers->{$n++} = $c;
+        my $PRUNE_BATCH = 1000;
+        $TXN_BYTES = $BATCH_BYTES;
+        for (; --$PRUNE_BATCH && !$DO_QUIT && $PRUNE_CUR <= $PRUNE_MAX;
+                        $PRUNE_CUR++) {
+                my $doc = $self->get_doc($PRUNE_CUR) // next;
+                my @cmt = xap_terms('Q', $doc);
+                if (scalar(@cmt) == 0) {
+                        prune_git_dir($self, $PRUNE_CUR, $doc);
+                } else {
+                        scalar(@cmt) == 1 or warn
+"BUG? shard[$self->{shard}] #$PRUNE_CUR has multiple commits: @cmt";
+                        for my $o (@cmt) {
+                                $TMP_GIT->check_async($o, \&prune_cb,
+                                                        [$self, $PRUNE_CUR])
+                        }
+                }
         }
-        wait_consumers($self, $TMP_GIT, $consumers);
+        $TMP_GIT->async_wait_all;
+        cidx_ckpoint($self);
+        return PublicInbox::DS::requeue($self) if $PRUNE_CUR <= $PRUNE_MAX;
+        send($PRUNE_OP_P, 'prune_done', MSG_EOR);
+        $TMP_GIT->cleanup;
+        $TMP_GIT = $PRUNE_OP_P = $PRUNE_CUR = $PRUNE_MAX = undef;
+        %ACTIVE_GIT_DIR = ();
+}
+
+sub prune_start { # via wq_io_do in IDX_SHARDS
+        my ($self, $git_dir, @active_git_dir) = @_;
+        $PRUNE_CUR = 1;
+        $PRUNE_OP_P = delete $self->{0} // die 'BUG: no {0} op_p';
+        %ACTIVE_GIT_DIR = map { $_ => undef } @active_git_dir;
+        $TMP_GIT = PublicInbox::Git->new($git_dir); # TMP_GIT copy
+        $self->begin_txn_lazy;
+        $PRUNE_MAX = $self->{xdb}->get_lastdocid // 1;
+        event_step($self);
 }
 
 sub shards_active { # post_loop_do
         return if $DO_QUIT;
-        scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
+        scalar(grep { $_->{-cidx_quit} } (@IDX_SHARDS));
 }
 
 # signal handlers
-sub kill_shards { $_->wq_kill(@_) for @IDX_SHARDS }
+sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) }
 
 sub parent_quit {
         $DO_QUIT = POSIX->can("SIG$_[0]")->();
@@ -704,7 +729,6 @@ sub parent_quit {
 
 sub init_tmp_git_dir ($) {
         my ($self) = @_;
-        return unless $self->{-opt}->{prune};
         require File::Temp;
         require PublicInbox::Import;
         my $tmp = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
@@ -747,6 +771,18 @@ sub prep_umask ($) {
                 undef;
 }
 
+sub start_prune ($) {
+        my ($self) = @_;
+        init_tmp_git_dir($self);
+        my @active_git_dir = (@{$self->{git_dirs}}, @GIT_DIR_GONE);
+        for my $s (@IDX_SHARDS) {
+                my ($c, $p) = PublicInbox::PktOp->pair;
+                $c->{ops}->{prune_done} = [ $s ];
+                $s->wq_io_do('prune_start', [ $p->{op_p} ],
+                                $TMP_GIT->{git_dir}, @active_git_dir)
+        }
+}
+
 sub cidx_run { # main entry point
         my ($self) = @_;
         my $restore_umask = prep_umask($self);
@@ -756,7 +792,10 @@ sub cidx_run { # main entry point
         my $restore = PublicInbox::OnDestroy->new($$,
                 \&PublicInbox::DS::sig_setmask, $SIGSET);
         local $LIVE = {};
-        local ($DO_QUIT, $TMP_GIT, $REINDEX);
+        local ($DO_QUIT, $TMP_GIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE,
+                @PRUNE_DONE);
+        local $BATCH_BYTES = $self->{-opt}->{batch_size} //
+                                $PublicInbox::SearchIdx::BATCH_BYTES;
         local @IDX_SHARDS = cidx_init($self);
         local $self->{current_info} = '';
         local $MY_SIG = {
@@ -796,13 +835,13 @@ sub cidx_run { # main entry point
                         $_ =~ /$re/ ? (warn("# excluding $_\n"), 0) : 1;
                 } @{$self->{git_dirs}};
         }
-        local $self->{nchange} = 0;
+        local $NCHANGE = 0;
         local $LIVE_JOBS = $self->{-opt}->{jobs} ||
                         PublicInbox::IPC::detect_nproc() || 2;
-        local @RDONLY_SHARDS = $self->xdb_shards_flat;
-        init_tmp_git_dir($self);
-        do_prune($self) if $self->{-opt}->{prune};
+        local @RDONLY_XDB = $self->xdb_shards_flat;
+        start_prune($self) if $self->{-opt}->{prune};
         scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
+        await_prune if $self->{-opt}->{prune};
 
         for my $s (@IDX_SHARDS) {
                 $s->{-cidx_quit} = 1;
@@ -811,10 +850,10 @@ sub cidx_run { # main entry point
 
         local @PublicInbox::DS::post_loop_do = (\&shards_active);
         PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active();
-        $self->lock_release(!!$self->{nchange});
+        $self->lock_release(!!$NCHANGE);
 }
 
-sub ipc_atfork_child {
+sub ipc_atfork_child { # @IDX_SHARDS
         my ($self) = @_;
         $self->SUPER::ipc_atfork_child;
         $SIG{USR1} = \&shard_usr1;
diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm
index 3baeaa9c..b907772e 100644
--- a/lib/PublicInbox/SearchIdx.pm
+++ b/lib/PublicInbox/SearchIdx.pm
@@ -552,11 +552,17 @@ sub add_message {
         $smsg->{num};
 }
 
+sub get_doc ($$) {
+        my ($self, $docid) = @_;
+        eval { $self->{xdb}->get_document($docid) } // do {
+                die $@ if $@ && ref($@) !~ /\bDocNotFoundError\b/;
+                undef;
+        }
+}
+
 sub _get_doc ($$) {
         my ($self, $docid) = @_;
-        my $doc = eval { $self->{xdb}->get_document($docid) };
-        $doc // do {
-                warn "E: $@\n" if $@;
+        get_doc($self, $docid) // do {
                 warn "E: #$docid missing in Xapian\n";
                 undef;
         }