about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-03-21 23:07:30 +0000
committerEric Wong <e@80x24.org>2023-03-25 09:37:52 +0000
commit6cd1c4cb956e4e44144bc71f316aa92ff12ddddf (patch)
tree0791a6643b33cc754d7b6bc5223af045d46b71e6 /lib
parent9076a420f4e023361c5a0b6edec399c6862721d2 (diff)
downloadpublic-inbox-6cd1c4cb956e4e44144bc71f316aa92ff12ddddf.tar.gz
While individual Xapian shards are consistent due to the use of
Xapian transactions, the data across shards still needs to be
in a consistent state for our search to work.
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/CodeSearchIdx.pm71
1 files changed, 48 insertions, 23 deletions
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 1a472b64..82f90368 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -38,6 +38,8 @@ our (
         $LIVE_JOBS, # integer
         $MY_SIG, # like %SIG
         $SIGSET,
+        $TXN_BYTES, # number of bytes in current shard transaction
+        $DO_QUIT, # signal number
         @RDONLY_SHARDS, # Xapian::Database
         @IDX_SHARDS # clones of self
 );
@@ -153,18 +155,14 @@ sub store_repo { # wq_do - returns docid
 sub shard_index { # via wq_io_do
         my ($self, $git, $n, $roots) = @_;
         local $self->{current_info} = "$git->{git_dir} [$n]";
-        my ($quit, $cmt);
+        my $cmt;
         local $self->{roots} = $roots;
         my $in = delete($self->{0}) // die 'BUG: no {0} input';
         my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
         my $batch_bytes = $self->{-opt}->{batch_size} //
                                 $PublicInbox::SearchIdx::BATCH_BYTES;
-        my $max = $batch_bytes;
-        my $set_quit = sub { $quit = shift };
-        local $SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import'
-        local $SIG{QUIT} = $set_quit;
-        local $SIG{TERM} = $set_quit;
-        local $SIG{INT} = $set_quit;
+        # local-ized in parent before fork
+        $TXN_BYTES = $batch_bytes;
         local $self->{git} = $git; # for patchid
         my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
         close $in or die "close: $!";
@@ -179,22 +177,23 @@ sub shard_index { # via wq_io_do
         $self->begin_txn_lazy;
         while (defined($buf = <$rd>)) {
                 chomp($buf);
-                $max -= length($buf);
+                $TXN_BYTES -= length($buf);
                 @$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
                 $/ = "\n";
                 add_commit($self, $cmt);
-                last if $quit; # likely SIGPIPE
+                last if $DO_QUIT;
                 ++$nr;
-                if ($max <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
+                if ($TXN_BYTES <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
                         progress($self, "[$n] $nr");
                         $self->{xdb}->commit_transaction;
-                        $max = $batch_bytes;
+                        $TXN_BYTES = $batch_bytes;
                         $self->{xdb}->begin_transaction;
                 }
                 $/ = $FS;
         }
         close($rd);
-        if (!$? || ($quit && ($? & 127) == POSIX::SIGPIPE)) {
+        if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
+                                ($? & 127) == POSIX::SIGPIPE))) {
                 send($op_p, "shard_done $n", MSG_EOR);
         } else {
                 warn "E: git @LOG_STDIN: \$?=$?\n";
@@ -254,7 +253,7 @@ sub need_reap { # post_loop_do
 sub cidx_reap ($$) {
         my ($self, $jobs) = @_;
         while (run_todo($self)) {}
-        local @PublicInbox::DS::post_loop_do = \(&need_reap, $jobs);
+        local @PublicInbox::DS::post_loop_do = (\&need_reap, $jobs);
         while (need_reap(undef, $jobs)) {
                 PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
         }
@@ -263,7 +262,7 @@ sub cidx_reap ($$) {
 
 sub cidx_await_cb { # awaitpid cb
         my ($pid, $cb, $self, $git, @args) = @_;
-        return if !$LIVE; # premature shutdown
+        return if !$LIVE || $DO_QUIT;
         my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd';
         PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC
         if ($?) {
@@ -283,7 +282,7 @@ sub cidx_await ($$$$$@) {
 # only care about --heads (branches) and --tags, and not even their names
 sub fp_start ($$$) {
         my ($self, $git, $prep_repo) = @_;
-        return if !$LIVE; # premature exit
+        return if !$LIVE || $DO_QUIT;
         cidx_reap($self, $LIVE_JOBS);
         open my $refs, '+>', undef or die "open: $!";
         my $cmd = ['git', "--git-dir=$git->{git_dir}",
@@ -305,7 +304,7 @@ sub fp_fini { # cidx_await cb
 
 sub ct_start ($$$) {
         my ($self, $git, $prep_repo) = @_;
-        return if !$LIVE; # premature exit
+        return if !$LIVE || $DO_QUIT;
         cidx_reap($self, $LIVE_JOBS);
         my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
                 qw[for-each-ref --sort=-committerdate
@@ -325,7 +324,7 @@ sub ct_fini { # cidx_await cb
 # TODO: also index gitweb.owner and the full fingerprint for grokmirror?
 sub prep_repo ($$) {
         my ($self, $git) = @_;
-        return if !$LIVE || $git->{-cidx_err}; # premature exit
+        return if !$LIVE || $DO_QUIT || $git->{-cidx_err};
         my $repo = $git->{-repo} // die 'BUG: no {-repo}';
         if (!defined($repo->{ct})) {
                 warn "W: $git->{git_dir} has no commits, skipping\n";
@@ -449,6 +448,11 @@ sub index_repo { # cidx_await cb
         PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
         my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
         die "E: $git->{git_dir} $n shards failed" if $n;
+        if ($DO_QUIT) {
+                commit_used_shards($self, $git, \%CONSUMERS);
+                progress($self, "$git->{git_dir}: done");
+                return;
+        }
         $repo->{git_dir} = $git->{git_dir};
         my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
         if ($id > 0) {
@@ -462,7 +466,7 @@ sub index_repo { # cidx_await cb
 
 sub get_roots ($$) {
         my ($self, $git) = @_;
-        return if !$LIVE; # premature exit
+        return if !$LIVE || $DO_QUIT;
         my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
         sysseek($refs, 0, SEEK_SET) or die "seek: $!";
         open my $roots, '+>', undef or die "open: $!";
@@ -489,6 +493,10 @@ sub load_existing ($) { # for -u/--update
         @$dirs = grep { !$uniq{$_}++ } @$dirs;
 }
 
+# SIG handlers:
+sub shard_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->() }
+sub shard_usr1 { $TXN_BYTES = -1 }
+
 sub cidx_init ($) {
         my ($self) = @_;
         my $dir = $self->{cidx_dir};
@@ -498,12 +506,13 @@ sub cidx_init ($) {
         }
         $self->lock_acquire;
         my @shards;
+        local $TXN_BYTES;
         for my $n (0..($self->{nshard} - 1)) {
                 my $shard = bless { %$self, shard => $n }, ref($self);
                 delete @$shard{qw(lockfh lock_path)};
                 $shard->idx_acquire;
                 $shard->idx_release;
-                $shard->wq_workers_start("shard[$n]", 1, undef, {
+                $shard->wq_workers_start("shard[$n]", 1, $SIGSET, {
                         siblings => \@shards, # for ipc_atfork_child
                 }, \&shard_done_wait, $self);
                 push @shards, $shard;
@@ -533,6 +542,15 @@ sub shards_active { # post_loop_do
         scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
 }
 
+# signal handlers
+sub kill_shards { $_->wq_kill(@_) for @IDX_SHARDS }
+
+sub parent_quit {
+        $DO_QUIT = $_[0];
+        kill_shards(@_);
+        warn "# SIG$_[0] received, quitting...\n";
+}
+
 sub cidx_run { # main entry point
         my ($self) = @_;
         local $self->{todo} = [];
@@ -541,13 +559,15 @@ sub cidx_run { # main entry point
         my $restore = PublicInbox::OnDestroy->new($$,
                 \&PublicInbox::DS::sig_setmask, $SIGSET);
         local $LIVE = {};
+        local $DO_QUIT;
         local @IDX_SHARDS = cidx_init($self);
         local $self->{current_info} = '';
-        my $cb = $SIG{__WARN__} || \&CORE::warn;
         local $MY_SIG = {
                 CHLD => \&PublicInbox::DS::enqueue_reap,
-                INT => sub { exit },
+                USR1 => \&kill_shards,
         };
+        $MY_SIG->{$_} = \&parent_quit for qw(TERM QUIT INT);
+        my $cb = $SIG{__WARN__} || \&CORE::warn;
         local $SIG{__WARN__} = sub {
                 my $m = shift @_;
                 $self->{current_info} eq '' or
@@ -594,14 +614,19 @@ sub cidx_run { # main entry point
 sub ipc_atfork_child {
         my ($self) = @_;
         $self->SUPER::ipc_atfork_child;
+        $SIG{USR1} = \&shard_usr1;
+        $SIG{$_} = \&shard_quit for qw(INT TERM QUIT);
         my $x = delete $self->{siblings} // die 'BUG: no {siblings}';
         $_->wq_close for @$x;
+        undef;
 }
 
 sub shard_done_wait { # awaitpid cb via ipc_worker_reap
         my ($pid, $shard, $self) = @_;
-        delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
-        return unless $?;
+        if ($? == 0) { # success
+                delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
+                return;
+        }
         warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
         ++$self->{shard_err} if defined($self->{shard_err});
 }