From: Eric Wong <e@80x24.org>
To: spew@80x24.org
Subject: [PATCH 4/5] cindex: share PktOp across indexing workers
Date: Tue, 4 Apr 2023 20:30:05 +0000 [thread overview]
Message-ID: <20230404203006.1717810-4-e@80x24.org> (raw)
In-Reply-To: <20230404203006.1717810-1-e@80x24.org>
Using fewer sockets simplifies completion checks, too.
---
lib/PublicInbox/CodeSearchIdx.pm | 54 ++++++++++++++++----------------
1 file changed, 27 insertions(+), 27 deletions(-)
diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm
index 14342683..05007afd 100644
--- a/lib/PublicInbox/CodeSearchIdx.pm
+++ b/lib/PublicInbox/CodeSearchIdx.pm
@@ -504,36 +504,35 @@ sub partition_refs ($$$) {
}
sub shard_commit { # via wq_io_do
- my ($self, $n) = @_;
+ my ($self) = @_;
my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
$self->commit_txn_lazy;
- send($op_p, "shard_done $n", MSG_EOR);
+ send($op_p, "shard_done $self->{shard}", MSG_EOR);
}
-sub consumers_open { # post_loop_do
- my (undef, $consumers) = @_;
- return if $DO_QUIT;
- scalar(grep { $_->{sock} } values %$consumers);
+sub consumer_open { # post_loop_do
+ my (undef, $c) = @_; # $c is PublicInbox::PktOp
+ $DO_QUIT ? undef : defined($c->{sock});
}
-sub wait_consumers ($$$) {
- my ($self, $git, $consumers) = @_;
- local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
+sub wait_active ($$$$) {
+ my ($self, $git, $active, $c) = @_;
+ local @PublicInbox::DS::post_loop_do = (\&consumer_open, $c);
PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
- my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
+ my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$active;
die "E: $git->{git_dir} $n shards failed" if $n && !$DO_QUIT;
}
-sub commit_used_shards ($$$) {
- my ($self, $git, $consumers) = @_;
+sub commit_active_shards ($$$) {
+ my ($self, $git, $active) = @_;
local $self->{-shard_ok} = {};
- for my $n (keys %$consumers) {
- my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{shard_done} = [ $self ];
- $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
- $consumers->{$n} = $c;
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{shard_done} = [ $self ];
+ for my $n (keys %$active) {
+ $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]);
}
- wait_consumers($self, $git, $consumers);
+ undef $p;
+ wait_active($self, $git, $active, $c);
}
sub index_repo { # cidx_await cb
@@ -547,30 +546,31 @@ sub index_repo { # cidx_await cb
$repo->{roots} = \@roots;
local $self->{current_info} = $git->{git_dir};
my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
- local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1
- my $consumers = {};
+ local $self->{-shard_ok} = {};
+ my $active = {};
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{shard_done} = [ $self ];
for my $n (0..$#shard_in) {
-s $shard_in[$n] or next;
last if $DO_QUIT;
- my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{shard_done} = [ $self ];
$IDX_SHARDS[$n]->wq_io_do('shard_index',
[ $shard_in[$n], $p->{op_p} ],
$git, \@roots);
- $consumers->{$n} = $c;
+ $active->{$n} = undef;
}
+ undef $p;
@shard_in = ();
- wait_consumers($self, $git, $consumers);
+ wait_active($self, $git, $active, $c);
if ($DO_QUIT) {
- commit_used_shards($self, $git, $consumers);
+ commit_active_shards($self, $git, $active);
progress($self, "$git->{git_dir}: done");
return;
}
$repo->{git_dir} = $git->{git_dir};
my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
if ($id > 0) {
- $consumers->{$repo->{shard_n}} = undef;
- commit_used_shards($self, $git, $consumers);
+ $active->{$repo->{shard_n}} = undef;
+ commit_active_shards($self, $git, $active);
progress($self, "$git->{git_dir}: done");
return run_deferred();
}
next prev parent reply other threads:[~2023-04-04 20:30 UTC|newest]
Thread overview: 5+ messages / expand[flat|nested] mbox.gz Atom feed top
2023-04-04 20:30 [PATCH 1/5] ipc: support awaitpid in WQ workers Eric Wong
2023-04-04 20:30 ` [PATCH 2/5] cindex: do prune work while waiting for `git log -p' Eric Wong
2023-04-04 20:30 ` [PATCH 3/5] cindex: share PktOp socket across prune workers Eric Wong
2023-04-04 20:30 ` Eric Wong [this message]
2023-04-04 20:30 ` [PATCH 5/5] cindex: enter event loop once per run Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20230404203006.1717810-4-e@80x24.org \
--to=e@80x24.org \
--cc=spew@80x24.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).