From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 55D921F487 for ; Tue, 4 Apr 2023 20:30:07 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1680640207; bh=N62ua6ALHz4pyxiYAcJkQolLb+LY+Tia+M5ydjtsXm0=; h=From:To:Subject:Date:In-Reply-To:References:From; b=qZjuac2NdxZwaB5eAQ1Onq6JpW97yPVtEnv4/NIgwvoKdaE85pPGTnNzlMIw3712g xamgYqg8WSmnwLTX11rOz8qlZYeM/KeO5jCD5dBslNRCf3Sye78BddrY48X8/8KCJO WxTiekX2Q2PAMYD+m8RENjR2MnMBx0BEKNdKSgu4= From: Eric Wong To: spew@80x24.org Subject: [PATCH 5/5] cindex: enter event loop once per run Date: Tue, 4 Apr 2023 20:30:06 +0000 Message-Id: <20230404203006.1717810-5-e@80x24.org> In-Reply-To: <20230404203006.1717810-1-e@80x24.org> References: <20230404203006.1717810-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This avoids needing to alter the sigmask for systems without signalfd or EVFILT_SIGNAL. --- lib/PublicInbox/CodeSearchIdx.pm | 207 ++++++++++++++----------------- 1 file changed, 94 insertions(+), 113 deletions(-) diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 05007afd..1000dc6f 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -35,7 +35,6 @@ use Socket qw(MSG_EOR); use Carp (); our ( $LIVE, # pid => cmd - $DEFER, # [ [ cb, @args ], ... ] $LIVE_JOBS, # integer $MY_SIG, # like %SIG $SIGSET, @@ -55,7 +54,10 @@ our ( $PRUNE_NR, # total number pruned $PRUNE_DONE, # marks off prune completions $NCHANGE, # current number of changes + $REPO_CTX, # current repo being indexed in shards %ACTIVE_GIT_DIR, # GIT_DIR => undef mapping for prune + $IDX_TODO, # [ $git0, $root0, $git1, $root1, ...] + $GIT_TODO, # [ GIT_DIR0, GIT_DIR1, ...] ); # stop walking history if we see >$SEEN_MAX existing commits, this assumes @@ -285,8 +287,8 @@ sub cidx_read_log_p { } sub shard_done { # called via PktOp on shard_index completion - my ($self, $n) = @_; - $self->{-shard_ok}->{$n} = 1 if defined($self->{-shard_ok}); + my ($self, $repo_ctx, $on_destroy, $n) = @_; + $repo_ctx->{shard_ok}->{$n} = 1; } sub prune_done { # called via PktOp->event_step completion @@ -298,16 +300,6 @@ sub prune_done { # called via PktOp->event_step completion progress($self, 'prune done') } -sub prune_busy { # post_loop_do - return if $DO_QUIT; - grep(defined, @$PRUNE_DONE) != @IDX_SHARDS; -} - -sub await_prune () { - local @PublicInbox::DS::post_loop_do = (\&prune_busy); - PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if prune_busy(); -} - sub seen ($$) { my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH" for (1..100) { @@ -336,32 +328,6 @@ sub docids_by_postlist ($$) { # consider moving to PublicInbox::Search @ids; } -sub run_deferred () { - my $n; - while (defined(my $x = shift(@{$DEFER // []}))) { - my $cb = shift @$x; - $cb->(@$x); - ++$n; - } - $n; -} - -sub need_reap { # post_loop_do - my (undef, $jobs) = @_; - return if !$LIVE || $DO_QUIT; - scalar(keys(%$LIVE)) > $jobs; -} - -sub cidx_reap ($$) { - my ($self, $jobs) = @_; - while (run_deferred()) {} - local @PublicInbox::DS::post_loop_do = (\&need_reap, $jobs); - while (need_reap(undef, $jobs)) { - PublicInbox::DS::event_loop($MY_SIG, $SIGSET); - } - while (!$jobs && run_deferred()) {} -} - sub cidx_await_cb { # awaitpid cb my ($pid, $cb, $self, $git, @args) = @_; return if !$LIVE || $DO_QUIT; @@ -371,7 +337,7 @@ sub cidx_await_cb { # awaitpid cb $git->{-cidx_err} = 1; return warn("@$cmd error: \$?=$?\n"); } - push(@$DEFER, [ $cb, $self, $git, @args ]) if $DEFER; + $cb->($self, $git, @args); } sub cidx_await ($$$$$@) { @@ -385,7 +351,6 @@ sub cidx_await ($$$$$@) { sub fp_start ($$$) { my ($self, $git, $prep_repo) = @_; return if !$LIVE || $DO_QUIT; - cidx_reap($self, $LIVE_JOBS); open my $refs, '+>', undef or die "open: $!"; my $cmd = ['git', "--git-dir=$git->{git_dir}", qw(show-ref --heads --tags --hash)]; @@ -407,7 +372,6 @@ sub fp_fini { # cidx_await cb sub ct_start ($$$) { my ($self, $git, $prep_repo) = @_; return if !$LIVE || $DO_QUIT; - cidx_reap($self, $LIVE_JOBS); my $cmd = [ 'git', "--git-dir=$git->{git_dir}", qw[for-each-ref --sort=-committerdate --format=%(committerdate:raw) --count=1 @@ -426,12 +390,13 @@ sub ct_fini { # cidx_await cb # TODO: also index gitweb.owner and the full fingerprint for grokmirror? sub prep_repo ($$) { my ($self, $git) = @_; - return if !$LIVE || $DO_QUIT || $git->{-cidx_err}; + return if !$LIVE || $DO_QUIT; + return index_next($self) if $git->{-cidx_err}; my $repo = $git->{-repo} // die 'BUG: no {-repo}'; if (!defined($repo->{ct})) { warn "W: $git->{git_dir} has no commits, skipping\n"; delete $git->{-repo}; - return; + return index_next($self); } my $n = git_dir_hash($git->{git_dir}) % $self->{nshard}; my $shard = bless { %$self, shard => $n }, ref($self); @@ -450,7 +415,7 @@ sub check_existing { # retry_reopen callback my $old_fp = $REINDEX ? "\0invalid" : $doc->get_data; if ($old_fp eq $git->{-repo}->{fp}) { # no change delete $git->{-repo}; - return; + return index_next($self); } $git->{-repo}->{docid} = $docid; if (@docids) { @@ -510,71 +475,88 @@ sub shard_commit { # via wq_io_do send($op_p, "shard_done $self->{shard}", MSG_EOR); } -sub consumer_open { # post_loop_do - my (undef, $c) = @_; # $c is PublicInbox::PktOp - $DO_QUIT ? undef : defined($c->{sock}); +sub index_next ($) { + my ($self) = @_; + return if $DO_QUIT; + if ($IDX_TODO && @$IDX_TODO) { + index_repo($self, shift @$IDX_TODO); + } elsif ($GIT_TODO && @$GIT_TODO) { + my $git = PublicInbox::Git->new(shift @$GIT_TODO); + my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo, + $self, $git); + fp_start($self, $git, $prep_repo); + ct_start($self, $git, $prep_repo); + } + # else: wait for shards_active (post_loop_do) callback } -sub wait_active ($$$$) { - my ($self, $git, $active, $c) = @_; - local @PublicInbox::DS::post_loop_do = (\&consumer_open, $c); - PublicInbox::DS::event_loop($MY_SIG, $SIGSET); - my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$active; - die "E: $git->{git_dir} $n shards failed" if $n && !$DO_QUIT; +sub next_repos { # OnDestroy cb + my ($repo_ctx) = @_; + progress($repo_ctx->{self}, "$repo_ctx->{repo}->{git_dir}: done"); + return if $DO_QUIT; + if ($REPO_CTX) { + $REPO_CTX == $repo_ctx or die "BUG: $REPO_CTX != $repo_ctx"; + $REPO_CTX = undef; + index_next($repo_ctx->{self}); + } } -sub commit_active_shards ($$$) { - my ($self, $git, $active) = @_; - local $self->{-shard_ok} = {}; +sub commit_shard { # OnDestroy cb + my ($repo_ctx) = @_; + my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)}; + + my $n = grep { ! $repo_ctx->{shard_ok}->{$_} } keys %$active; + die "E: $repo->{git_dir} $n shards failed" if $n && !$DO_QUIT; + + $repo_ctx->{shard_ok} = {}; + if (!$DO_QUIT) { + my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', + $repo); + (!defined($id) || $id <= 0) and + die "E: store_repo $repo->{git_dir}: id=$id"; + $active->{$repo->{shard_n}} = undef; + } + my $next = PublicInbox::OnDestroy->new($$, \&next_repos, $repo_ctx); my ($c, $p) = PublicInbox::PktOp->pair; - $c->{ops}->{shard_done} = [ $self ]; + $c->{ops}->{shard_done} = [ $repo_ctx->{self}, $repo_ctx, $next ]; for my $n (keys %$active) { $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]); } - undef $p; - wait_active($self, $git, $active, $c); + undef $p; # shard_done fires when all shards are committed } sub index_repo { # cidx_await cb - my ($self, $git, $roots) = @_; - return if $git->{-cidx_err} || $DO_QUIT; - my $repo = delete $git->{-repo} or return; - seek($roots, 0, SEEK_SET) or die "seek: $!"; - chomp(my @roots = <$roots>); - close($roots) or die "close: $!"; - @roots or return warn("E: $git->{git_dir} has no root commits\n"); + my ($self, $git) = @_; + return if $DO_QUIT; + return index_next($self) if $git->{-cidx_err}; + return push(@$IDX_TODO, $git) if $REPO_CTX; # busy + my $repo = delete $git->{-repo} or return index_next($self); + my $roots_fh = delete $repo->{roots_fh} // die 'BUG: no {roots_fh}'; + seek($roots_fh, 0, SEEK_SET) or die "seek: $!"; + chomp(my @roots = <$roots_fh>); + close($roots_fh) or die "close: $!"; + if (!@roots) { + warn("E: $git->{git_dir} has no root commits\n"); + return index_next($self); + } $repo->{roots} = \@roots; local $self->{current_info} = $git->{git_dir}; my @shard_in = partition_refs($self, $git, delete($repo->{refs})); - local $self->{-shard_ok} = {}; - my $active = {}; + $repo->{git_dir} = $git->{git_dir}; + my $repo_ctx = $REPO_CTX = { self => $self, repo => $repo }; + my $commit_shard = PublicInbox::OnDestroy->new($$, \&commit_shard, + $repo_ctx); my ($c, $p) = PublicInbox::PktOp->pair; - $c->{ops}->{shard_done} = [ $self ]; + $c->{ops}->{shard_done} = [ $self, $repo_ctx, $commit_shard ]; for my $n (0..$#shard_in) { -s $shard_in[$n] or next; last if $DO_QUIT; $IDX_SHARDS[$n]->wq_io_do('shard_index', [ $shard_in[$n], $p->{op_p} ], $git, \@roots); - $active->{$n} = undef; + $repo_ctx->{active}->{$n} = undef; } - undef $p; - @shard_in = (); - wait_active($self, $git, $active, $c); - if ($DO_QUIT) { - commit_active_shards($self, $git, $active); - progress($self, "$git->{git_dir}: done"); - return; - } - $repo->{git_dir} = $git->{git_dir}; - my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo); - if ($id > 0) { - $active->{$repo->{shard_n}} = undef; - commit_active_shards($self, $git, $active); - progress($self, "$git->{git_dir}: done"); - return run_deferred(); - } - die "E: store_repo $git->{git_dir}: id=$id"; + # shard_done fires when shard_index is done } sub get_roots ($$) { @@ -582,11 +564,12 @@ sub get_roots ($$) { return if !$LIVE || $DO_QUIT; my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}'; sysseek($refs, 0, SEEK_SET) or die "seek: $!"; - open my $roots, '+>', undef or die "open: $!"; + open my $roots_fh, '+>', undef or die "open: $!"; my $cmd = [ 'git', "--git-dir=$git->{git_dir}", qw(rev-list --stdin --max-parents=0) ]; - my $pid = spawn($cmd, undef, { 0 => $refs, 1 => $roots }); - cidx_await($pid, $cmd, \&index_repo, $self, $git, $roots); + my $pid = spawn($cmd, undef, { 0 => $refs, 1 => $roots_fh }); + $git->{-repo}->{roots_fh} = $roots_fh; + cidx_await($pid, $cmd, \&index_repo, $self, $git); } # for PublicInbox::SearchIdx::patch_id and with_umask @@ -653,15 +636,8 @@ EOM sub scan_git_dirs ($) { my ($self) = @_; - for (@{$self->{git_dirs}}) { - my $git = PublicInbox::Git->new($_); - my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo, - $self, $git); - fp_start($self, $git, $prep_repo); - ct_start($self, $git, $prep_repo); - last if $DO_QUIT; - } - cidx_reap($self, 0); + @$GIT_TODO = @{$self->{git_dirs}}; + index_next($self) for (1..$LIVE_JOBS); } sub prune_cb { # git->check_async callback @@ -734,7 +710,15 @@ sub prune_start { # via wq_io_do in IDX_SHARDS sub shards_active { # post_loop_do return if $DO_QUIT; - scalar(grep { $_->{-cidx_quit} } (@IDX_SHARDS)); + return if grep(defined, $PRUNE_DONE, $GIT_TODO, $IDX_TODO, $LIVE) != 4; + return 1 if grep(defined, @$PRUNE_DONE) != @IDX_SHARDS; + return 1 if scalar(@$GIT_TODO) || scalar(@$IDX_TODO) || $REPO_CTX; + return 1 if keys(%$LIVE); + for my $s (grep { $_->{-wq_s1} } @IDX_SHARDS) { + $s->{-cidx_quit} = 1; + $s->wq_close; + } + scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS); } # signal handlers @@ -792,6 +776,7 @@ sub prep_umask ($) { sub start_prune ($) { my ($self) = @_; + return (@$PRUNE_DONE = map { 1 } @IDX_SHARDS) if !$self->{-opt}->{prune}; init_tmp_git_dir($self); my @active_git_dir = (@{$self->{git_dirs}}, @GIT_DIR_GONE); my ($c, $p) = PublicInbox::PktOp->pair; @@ -805,14 +790,15 @@ sub start_prune ($) { sub cidx_run { # main entry point my ($self) = @_; my $restore_umask = prep_umask($self); - local $DEFER = []; local $SIGSET = PublicInbox::DS::block_signals( POSIX::SIGTSTP, POSIX::SIGCONT); my $restore = PublicInbox::OnDestroy->new($$, \&PublicInbox::DS::sig_setmask, $SIGSET); local $LIVE = {}; local $PRUNE_DONE = []; - local ($DO_QUIT, $TMP_GIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE); + local $IDX_TODO = []; + local ($DO_QUIT, $TMP_GIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, + $GIT_TODO, $REPO_CTX); local $BATCH_BYTES = $self->{-opt}->{batch_size} // $PublicInbox::SearchIdx::BATCH_BYTES; local @IDX_SHARDS = cidx_init($self); @@ -858,14 +844,8 @@ sub cidx_run { # main entry point local $LIVE_JOBS = $self->{-opt}->{jobs} || PublicInbox::IPC::detect_nproc() || 2; local @RDONLY_XDB = $self->xdb_shards_flat; - start_prune($self) if $self->{-opt}->{prune}; + start_prune($self); scan_git_dirs($self) if $self->{-opt}->{scan} // 1; - await_prune if $self->{-opt}->{prune}; - - for my $s (@IDX_SHARDS) { - $s->{-cidx_quit} = 1; - $s->wq_close; - } local @PublicInbox::DS::post_loop_do = (\&shards_active); PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active(); @@ -888,10 +868,11 @@ sub shard_done_wait { # awaitpid cb via ipc_worker_reap return if $DO_QUIT || !$LIVE; if ($? == 0) { # success $quit_req // warn 'BUG: {-cidx_quit} unset'; - return; + } else { + warn "PID:$pid $shard->{shard} exited with \$?=$?\n"; + ++$self->{shard_err} if defined($self->{shard_err}); } - warn "PID:$pid $shard->{shard} exited with \$?=$?\n"; - ++$self->{shard_err} if defined($self->{shard_err}); + PublicInbox::DS::enqueue_reap() if !shards_active(); # once more for PLC } sub with_umask { # TODO get rid of this treewide and rely on OnDestroy