* [PATCH 1/2] v2 + lei/store: always wait for fast-import checkpoint
@ 2024-04-15 5:14 Eric Wong
2024-04-15 5:14 ` [PATCH 2/2] lei: use ->barrier to commit to lei/store Eric Wong
0 siblings, 1 reply; 2+ messages in thread
From: Eric Wong @ 2024-04-15 5:14 UTC (permalink / raw)
To: spew
Since data going to git is the most important, always ensure
data is written to git before attempting to write anything to
SQLite or Xapian.
---
lib/PublicInbox/LeiStore.pm | 4 +---
lib/PublicInbox/V2Writable.pm | 8 +-------
2 files changed, 2 insertions(+), 10 deletions(-)
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 2eb09eca..0df2352c 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -573,9 +573,7 @@ sub set_xvmd {
sub checkpoint {
my ($self, $wait) = @_;
- if (my $im = $self->{im}) {
- $wait ? $im->barrier : $im->checkpoint;
- }
+ $self->{im}->barrier if $self->{im};
delete $self->{lms};
$self->{priv_eidx}->checkpoint($wait);
}
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index fb259396..43f37f60 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -507,13 +507,7 @@ sub set_last_commits ($) { # this is NOT for ExtSearchIdx
sub checkpoint ($;$) {
my ($self, $wait) = @_;
- if (my $im = $self->{im}) {
- if ($wait) {
- $im->barrier;
- } else {
- $im->checkpoint;
- }
- }
+ $self->{im}->barrier if $self->{im};
my $shards = $self->{idx_shards};
if ($shards) {
my $dbh = $self->{mm}->{dbh} if $self->{mm};
^ permalink raw reply related [flat|nested] 2+ messages in thread
* [PATCH 2/2] lei: use ->barrier to commit to lei/store
2024-04-15 5:14 [PATCH 1/2] v2 + lei/store: always wait for fast-import checkpoint Eric Wong
@ 2024-04-15 5:14 ` Eric Wong
0 siblings, 0 replies; 2+ messages in thread
From: Eric Wong @ 2024-04-15 5:14 UTC (permalink / raw)
To: spew
barrier (synchronous checkpoint) is better than ->done with
parallel lei commands being issued (via '&' or different
terminals), since repeatedly stopping and restarting processes
doesn't play nicely with expensive tasks like `lei reindex'.
---
lib/PublicInbox/ExtSearchIdx.pm | 1 +
lib/PublicInbox/LEI.pm | 6 +++---
lib/PublicInbox/LeiInput.pm | 2 +-
lib/PublicInbox/LeiRefreshMailSync.pm | 2 +-
lib/PublicInbox/LeiRemote.pm | 4 ++--
lib/PublicInbox/LeiStore.pm | 26 ++++++++++++++++++--------
lib/PublicInbox/LeiToMail.pm | 3 ++-
lib/PublicInbox/LeiXSearch.pm | 4 ++--
t/lei-store-fail.t | 2 +-
9 files changed, 31 insertions(+), 19 deletions(-)
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm
index ebbffffc..763a124c 100644
--- a/lib/PublicInbox/ExtSearchIdx.pm
+++ b/lib/PublicInbox/ExtSearchIdx.pm
@@ -1424,5 +1424,6 @@ no warnings 'once';
*idx_shard = \&PublicInbox::V2Writable::idx_shard;
*reindex_checkpoint = \&PublicInbox::V2Writable::reindex_checkpoint;
*checkpoint = \&PublicInbox::V2Writable::checkpoint;
+*barrier = \&PublicInbox::V2Writable::barrier;
1;
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 5b46686a..e9a0de6c 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1443,7 +1443,7 @@ sub wq_eof { # EOF callback for main daemon
my ($lei, $wq_fld) = @_;
local $current_lei = $lei;
my $wq = delete $lei->{$wq_fld // 'wq1'};
- $lei->sto_done_request($wq);
+ $lei->sto_barrier_request($wq);
$wq // $lei->fail; # already failed
}
@@ -1548,7 +1548,7 @@ sub lms {
(-f $f || $creat) ? PublicInbox::LeiMailSync->new($f) : undef;
}
-sub sto_done_request {
+sub sto_barrier_request {
my ($lei, $wq) = @_;
return unless $lei->{sto} && $lei->{sto}->{-wq_s1};
local $current_lei = $lei;
@@ -1558,7 +1558,7 @@ sub sto_done_request {
my $s = ($wq ? $wq->{lei_sock} : undef) // $lei->{sock};
my $errfh = $lei->{2} // *STDERR{GLOB};
my @io = $s ? ($errfh, $s) : ($errfh);
- eval { $lei->{sto}->wq_io_do('done', \@io) };
+ eval { $lei->{sto}->wq_io_do('barrier', \@io, 1) };
}
warn($@) if $@;
}
diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm
index d003d983..c388f7dc 100644
--- a/lib/PublicInbox/LeiInput.pm
+++ b/lib/PublicInbox/LeiInput.pm
@@ -499,7 +499,7 @@ sub process_inputs {
}
# always commit first, even on error partial work is acceptable for
# lei <import|tag|convert>
- $self->{lei}->sto_done_request;
+ $self->{lei}->sto_barrier_request;
$self->{lei}->fail($err) if $err;
}
diff --git a/lib/PublicInbox/LeiRefreshMailSync.pm b/lib/PublicInbox/LeiRefreshMailSync.pm
index a60a9a5e..dde23274 100644
--- a/lib/PublicInbox/LeiRefreshMailSync.pm
+++ b/lib/PublicInbox/LeiRefreshMailSync.pm
@@ -60,7 +60,7 @@ sub input_path_url { # overrides PublicInbox::LeiInput::input_path_url
$self->folder_missing($$uri);
}
} else { die "BUG: $input not supported" }
- $self->{lei}->sto_done_request;
+ $self->{lei}->sto_barrier_request;
}
sub lei_refresh_mail_sync {
diff --git a/lib/PublicInbox/LeiRemote.pm b/lib/PublicInbox/LeiRemote.pm
index ddcaf2c9..d6fc40a4 100644
--- a/lib/PublicInbox/LeiRemote.pm
+++ b/lib/PublicInbox/LeiRemote.pm
@@ -1,4 +1,4 @@
-# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
# Make remote externals HTTP(S) inboxes behave like
@@ -51,7 +51,7 @@ sub mset {
$fh = IO::Uncompress::Gunzip->new($fh, MultiStream=>1, AutoClose=>1);
eval { PublicInbox::MboxReader->mboxrd($fh, \&each_mboxrd_eml, $self) };
my $err = $@ ? ": $@" : '';
- my $wait = $self->{lei}->{sto}->wq_do('done');
+ my $wait = $self->{lei}->{sto}->wq_do('barrier');
$lei->child_error($?, "@$cmd failed$err") if $err || $?;
$self; # we are the mset (and $ibx, and $self)
}
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 0df2352c..162c915f 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -81,7 +81,7 @@ sub importer {
delete $self->{im};
$im->done;
undef $im;
- $self->checkpoint;
+ $self->barrier;
$max = $self->{priv_eidx}->{mg}->git_epochs + 1;
}
my (undef, $tl) = eidx_init($self); # acquire lock
@@ -118,7 +118,7 @@ sub cat_blob {
sub schedule_commit {
my ($self, $sec) = @_;
- add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&done, $self);
+ add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&barrier, $self);
}
# follows the stderr file
@@ -391,7 +391,7 @@ sub reindex_done {
my ($self) = @_;
my ($eidx, $tl) = eidx_init($self);
$eidx->git->async_wait_all;
- # ->done to be called via sto_done_request
+ # ->done to be called via sto_barrier_request
}
sub add_eml {
@@ -571,11 +571,21 @@ sub set_xvmd {
sto_export_kw($self, $smsg->{num}, $vmd);
}
-sub checkpoint {
- my ($self, $wait) = @_;
- $self->{im}->barrier if $self->{im};
+sub barrier {
+ my ($self) = @_;
+ my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_barrier_request
+ my @err;
+ if ($self->{im}) {
+ eval { $self->{im}->barrier };
+ push(@err, "E: import barrier: $@\n") if $@;
+ }
delete $self->{lms};
- $self->{priv_eidx}->checkpoint($wait);
+ eval { $self->{priv_eidx}->barrier };
+ push(@err, "E: priv_eidx barrier: $@\n") if $@;
+ print { $errfh // \*STDERR } @err;
+ send($lei_sock, 'child_error 256', 0) if @err && $lei_sock;
+ xchg_stderr($self);
+ die @err if @err;
}
sub xchg_stderr {
@@ -594,7 +604,7 @@ sub xchg_stderr {
sub done {
my ($self) = @_;
- my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_done_request
+ my ($errfh, $lei_sock) = @$self{0, 1};
my @err;
if (my $im = delete($self->{im})) {
eval { $im->done };
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index dfae29e9..593547f6 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -724,8 +724,9 @@ sub post_augment {
my ($self, $lei, @args) = @_;
$self->{-au_noted}++ and $lei->qerr("# writing to $self->{dst} ...");
+ # FIXME: this synchronous wait can be slow w/ parallel callers
my $wait = $lei->{opt}->{'import-before'} ?
- $lei->{sto}->wq_do('checkpoint', 1) : 0;
+ $lei->{sto}->wq_do('barrier') : 0;
# _post_augment_mbox
my $m = $self->can("_post_augment_$self->{base_type}") or return;
$m->($self, $lei, @args);
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index d4f34733..5a5a1adc 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -363,7 +363,7 @@ print STDERR $_;
$self, $lei, $each_smsg);
};
my ($exc, $code) = ($@, $?);
- $lei->sto_done_request if delete($self->{-sto_imported});
+ $lei->sto_barrier_request if delete($self->{-sto_imported});
die "E: $exc" if $exc && !$code;
my $nr = delete $lei->{-nr_remote_eml} // 0;
if (!$code) { # don't update if no results, maybe MTA is down
@@ -399,7 +399,7 @@ sub query_done { # EOF callback for main daemon
delete $lei->{lxs};
($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
warn "BUG: {sto} missing with --mail-sync";
- $lei->sto_done_request;
+ $lei->sto_barrier_request;
$lei->{ovv}->ovv_end($lei);
if ($l2m) { # close() calls LeiToMail reap_compress
$l2m->finish_output($lei);
diff --git a/t/lei-store-fail.t b/t/lei-store-fail.t
index c2f03148..1e83e383 100644
--- a/t/lei-store-fail.t
+++ b/t/lei-store-fail.t
@@ -39,7 +39,7 @@ EOM
lei_ok qw(q m:testmessage@example.com);
is($lei_out, "[null]\n", 'delayed commit is unindexed');
- # make immediate ->sto_done_request fail from mboxrd import:
+ # make immediate ->sto_barrier_request fail from mboxrd import:
remove_tree("$ENV{HOME}/.local/share/lei/store");
# subsequent lei commands are undefined behavior,
# but we need to make sure the current lei command fails:
^ permalink raw reply related [flat|nested] 2+ messages in thread
end of thread, other threads:[~2024-04-15 5:14 UTC | newest]
Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2024-04-15 5:14 [PATCH 1/2] v2 + lei/store: always wait for fast-import checkpoint Eric Wong
2024-04-15 5:14 ` [PATCH 2/2] lei: use ->barrier to commit to lei/store Eric Wong
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).