dumping ground for random patches and texts
 help / color / mirror / Atom feed
* [PATCH] WIPcheckpoint
@ 2024-04-11 21:09 Eric Wong
  0 siblings, 0 replies; 2+ messages in thread
From: Eric Wong @ 2024-04-11 21:09 UTC (permalink / raw)
  To: spew

---
 lib/PublicInbox/LEI.pm                |  6 +++---
 lib/PublicInbox/LeiInput.pm           |  2 +-
 lib/PublicInbox/LeiRefreshMailSync.pm |  2 +-
 lib/PublicInbox/LeiRemote.pm          |  2 +-
 lib/PublicInbox/LeiStore.pm           | 27 ++++++++++++++++++++-------
 lib/PublicInbox/LeiXSearch.pm         |  4 ++--
 t/lei-store-fail.t                    |  2 +-
 7 files changed, 29 insertions(+), 16 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 1ad101ec..6782581f 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1442,7 +1442,7 @@ sub wq_eof { # EOF callback for main daemon
 	my ($lei, $wq_fld) = @_;
 	local $current_lei = $lei;
 	my $wq = delete $lei->{$wq_fld // 'wq1'};
-	$lei->sto_done_request($wq);
+	$lei->sto_checkpoint($wq);
 	$wq // $lei->fail; # already failed
 }
 
@@ -1547,7 +1547,7 @@ sub lms {
 	(-f $f || $creat) ? PublicInbox::LeiMailSync->new($f) : undef;
 }
 
-sub sto_done_request {
+sub sto_checkpoint {
 	my ($lei, $wq) = @_;
 	return unless $lei->{sto} && $lei->{sto}->{-wq_s1};
 	local $current_lei = $lei;
@@ -1557,7 +1557,7 @@ sub sto_done_request {
 		my $s = ($wq ? $wq->{lei_sock} : undef) // $lei->{sock};
 		my $errfh = $lei->{2} // *STDERR{GLOB};
 		my @io = $s ? ($errfh, $s) : ($errfh);
-		eval { $lei->{sto}->wq_io_do('done', \@io) };
+		eval { $lei->{sto}->wq_io_do('checkpoint', \@io) };
 	}
 	warn($@) if $@;
 }
diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm
index d003d983..d46c9ad5 100644
--- a/lib/PublicInbox/LeiInput.pm
+++ b/lib/PublicInbox/LeiInput.pm
@@ -499,7 +499,7 @@ sub process_inputs {
 	}
 	# always commit first, even on error partial work is acceptable for
 	# lei <import|tag|convert>
-	$self->{lei}->sto_done_request;
+	$self->{lei}->sto_checkpoint;
 	$self->{lei}->fail($err) if $err;
 }
 
diff --git a/lib/PublicInbox/LeiRefreshMailSync.pm b/lib/PublicInbox/LeiRefreshMailSync.pm
index a60a9a5e..3d8ab408 100644
--- a/lib/PublicInbox/LeiRefreshMailSync.pm
+++ b/lib/PublicInbox/LeiRefreshMailSync.pm
@@ -60,7 +60,7 @@ sub input_path_url { # overrides PublicInbox::LeiInput::input_path_url
 			$self->folder_missing($$uri);
 		}
 	} else { die "BUG: $input not supported" }
-	$self->{lei}->sto_done_request;
+	$self->{lei}->sto_checkpoint;
 }
 
 sub lei_refresh_mail_sync {
diff --git a/lib/PublicInbox/LeiRemote.pm b/lib/PublicInbox/LeiRemote.pm
index 559fb8d5..3a42b7de 100644
--- a/lib/PublicInbox/LeiRemote.pm
+++ b/lib/PublicInbox/LeiRemote.pm
@@ -51,7 +51,7 @@ sub mset {
 	$fh = IO::Uncompress::Gunzip->new($fh, MultiStream=>1, AutoClose=>1);
 	eval { PublicInbox::MboxReader->mboxrd($fh, \&each_mboxrd_eml, $self) };
 	my $err = $@ ? ": $@" : '';
-	my $wait = $self->{lei}->{sto}->wq_do('done');
+	my $wait = $self->{lei}->{sto}->wq_do('checkpoint');
 	$lei->child_error($?, "@$cmd failed$err") if $err || $?;
 	$self; # we are the mset (and $ibx, and $self)
 }
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 2eb09eca..037a228b 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -118,7 +118,7 @@ sub cat_blob {
 
 sub schedule_commit {
 	my ($self, $sec) = @_;
-	add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&done, $self);
+	add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&checkpoint, $self);
 }
 
 # follows the stderr file
@@ -391,7 +391,7 @@ sub reindex_done {
 	my ($self) = @_;
 	my ($eidx, $tl) = eidx_init($self);
 	$eidx->git->async_wait_all;
-	# ->done to be called via sto_done_request
+	# ->done to be called via sto_checkpoint
 }
 
 sub add_eml {
@@ -573,11 +573,24 @@ sub set_xvmd {
 
 sub checkpoint {
 	my ($self, $wait) = @_;
-	if (my $im = $self->{im}) {
-		$wait ? $im->barrier : $im->checkpoint;
+	my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_checkpoint
+	my $cmd = $wait ? 'barrier' : 'checkpoint';
+	if ($errfh) {
+		my @err;
+		if ($self->{im}) {
+			eval { $self->{im}->$cmd };
+			push(@err, "E: import $cmd: $@\n") if $@;
+		}
+		delete $self->{lms};
+		eval { $self->{priv_eidx}->checkpoint($wait) };
+		push(@err, "E: priv_eidx checkpoint: $@\n") if $@;
+		print { $errfh // \*STDERR } @err;
+		send($lei_sock, 'child_error 256', 0) if @err && $lei_sock;
+	} else {
+		$self->{im}->$cmd if $self->{im};
+		delete $self->{lms};
+		$self->{priv_eidx}->checkpoint($wait);
 	}
-	delete $self->{lms};
-	$self->{priv_eidx}->checkpoint($wait);
 }
 
 sub xchg_stderr {
@@ -596,7 +609,7 @@ sub xchg_stderr {
 
 sub done {
 	my ($self) = @_;
-	my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_done_request
+	my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_checkpoint
 	my @err;
 	if (my $im = delete($self->{im})) {
 		eval { $im->done };
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index fc95d401..597933ff 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -355,7 +355,7 @@ print STDERR $_;
 						$self, $lei, $each_smsg);
 		};
 		my ($exc, $code) = ($@, $?);
-		$lei->sto_done_request if delete($self->{-sto_imported});
+		$lei->sto_checkpoint if delete($self->{-sto_imported});
 		die "E: $exc" if $exc && !$code;
 		my $nr = delete $lei->{-nr_remote_eml} // 0;
 		if (!$code) { # don't update if no results, maybe MTA is down
@@ -391,7 +391,7 @@ sub query_done { # EOF callback for main daemon
 	delete $lei->{lxs};
 	($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
 		warn "BUG: {sto} missing with --mail-sync";
-	$lei->sto_done_request;
+	$lei->sto_checkpoint;
 	$lei->{ovv}->ovv_end($lei);
 	if ($l2m) { # close() calls LeiToMail reap_compress
 		$l2m->finish_output($lei);
diff --git a/t/lei-store-fail.t b/t/lei-store-fail.t
index c2f03148..df6e975a 100644
--- a/t/lei-store-fail.t
+++ b/t/lei-store-fail.t
@@ -39,7 +39,7 @@ EOM
 	lei_ok qw(q m:testmessage@example.com);
 	is($lei_out, "[null]\n", 'delayed commit is unindexed');
 
-	# make immediate ->sto_done_request fail from mboxrd import:
+	# make immediate ->sto_checkpoint fail from mboxrd import:
 	remove_tree("$ENV{HOME}/.local/share/lei/store");
 	# subsequent lei commands are undefined behavior,
 	# but we need to make sure the current lei command fails:

^ permalink raw reply related	[flat|nested] 2+ messages in thread

* [PATCH] WIPcheckpoint
@ 2024-04-12 19:32 Eric Wong
  0 siblings, 0 replies; 2+ messages in thread
From: Eric Wong @ 2024-04-12 19:32 UTC (permalink / raw)
  To: spew

xchg-lei-store
---
 lib/PublicInbox/LEI.pm                |  6 +++---
 lib/PublicInbox/LeiInput.pm           |  2 +-
 lib/PublicInbox/LeiRefreshMailSync.pm |  2 +-
 lib/PublicInbox/LeiRemote.pm          |  2 +-
 lib/PublicInbox/LeiStore.pm           | 20 +++++++++++++++-----
 lib/PublicInbox/LeiXSearch.pm         |  4 ++--
 t/lei-store-fail.t                    |  2 +-
 7 files changed, 24 insertions(+), 14 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 5b46686a..a5bc9367 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -1443,7 +1443,7 @@ sub wq_eof { # EOF callback for main daemon
 	my ($lei, $wq_fld) = @_;
 	local $current_lei = $lei;
 	my $wq = delete $lei->{$wq_fld // 'wq1'};
-	$lei->sto_done_request($wq);
+	$lei->sto_checkpoint($wq);
 	$wq // $lei->fail; # already failed
 }
 
@@ -1548,7 +1548,7 @@ sub lms {
 	(-f $f || $creat) ? PublicInbox::LeiMailSync->new($f) : undef;
 }
 
-sub sto_done_request {
+sub sto_checkpoint {
 	my ($lei, $wq) = @_;
 	return unless $lei->{sto} && $lei->{sto}->{-wq_s1};
 	local $current_lei = $lei;
@@ -1558,7 +1558,7 @@ sub sto_done_request {
 		my $s = ($wq ? $wq->{lei_sock} : undef) // $lei->{sock};
 		my $errfh = $lei->{2} // *STDERR{GLOB};
 		my @io = $s ? ($errfh, $s) : ($errfh);
-		eval { $lei->{sto}->wq_io_do('done', \@io) };
+		eval { $lei->{sto}->wq_io_do('checkpoint', \@io, 1) };
 	}
 	warn($@) if $@;
 }
diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm
index d003d983..d46c9ad5 100644
--- a/lib/PublicInbox/LeiInput.pm
+++ b/lib/PublicInbox/LeiInput.pm
@@ -499,7 +499,7 @@ sub process_inputs {
 	}
 	# always commit first, even on error partial work is acceptable for
 	# lei <import|tag|convert>
-	$self->{lei}->sto_done_request;
+	$self->{lei}->sto_checkpoint;
 	$self->{lei}->fail($err) if $err;
 }
 
diff --git a/lib/PublicInbox/LeiRefreshMailSync.pm b/lib/PublicInbox/LeiRefreshMailSync.pm
index a60a9a5e..3d8ab408 100644
--- a/lib/PublicInbox/LeiRefreshMailSync.pm
+++ b/lib/PublicInbox/LeiRefreshMailSync.pm
@@ -60,7 +60,7 @@ sub input_path_url { # overrides PublicInbox::LeiInput::input_path_url
 			$self->folder_missing($$uri);
 		}
 	} else { die "BUG: $input not supported" }
-	$self->{lei}->sto_done_request;
+	$self->{lei}->sto_checkpoint;
 }
 
 sub lei_refresh_mail_sync {
diff --git a/lib/PublicInbox/LeiRemote.pm b/lib/PublicInbox/LeiRemote.pm
index ddcaf2c9..1e287f9f 100644
--- a/lib/PublicInbox/LeiRemote.pm
+++ b/lib/PublicInbox/LeiRemote.pm
@@ -51,7 +51,7 @@ sub mset {
 	$fh = IO::Uncompress::Gunzip->new($fh, MultiStream=>1, AutoClose=>1);
 	eval { PublicInbox::MboxReader->mboxrd($fh, \&each_mboxrd_eml, $self) };
 	my $err = $@ ? ": $@" : '';
-	my $wait = $self->{lei}->{sto}->wq_do('done');
+	my $wait = $self->{lei}->{sto}->wq_do('checkpoint');
 	$lei->child_error($?, "@$cmd failed$err") if $err || $?;
 	$self; # we are the mset (and $ibx, and $self)
 }
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 0df2352c..311055a5 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -118,7 +118,7 @@ sub cat_blob {
 
 sub schedule_commit {
 	my ($self, $sec) = @_;
-	add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&done, $self);
+	add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&checkpoint, $self);
 }
 
 # follows the stderr file
@@ -391,7 +391,7 @@ sub reindex_done {
 	my ($self) = @_;
 	my ($eidx, $tl) = eidx_init($self);
 	$eidx->git->async_wait_all;
-	# ->done to be called via sto_done_request
+	# ->done to be called via sto_checkpoint
 }
 
 sub add_eml {
@@ -573,9 +573,19 @@ sub set_xvmd {
 
 sub checkpoint {
 	my ($self, $wait) = @_;
-	$self->{im}->barrier if $self->{im};
+	my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_checkpoint
+	my @err;
+	if ($self->{im}) {
+		eval { $self->{im}->barrier };
+		push(@err, "E: import barrier: $@\n") if $@;
+	}
 	delete $self->{lms};
-	$self->{priv_eidx}->checkpoint($wait);
+	eval { $self->{priv_eidx}->checkpoint($wait) };
+	push(@err, "E: priv_eidx checkpoint: $@\n") if $@;
+	print { $errfh // \*STDERR } @err;
+	send($lei_sock, 'child_error 256', 0) if @err && $lei_sock;
+	xchg_stderr($self);
+	die @err if @err;
 }
 
 sub xchg_stderr {
@@ -594,7 +604,7 @@ sub xchg_stderr {
 
 sub done {
 	my ($self) = @_;
-	my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_done_request
+	my ($errfh, $lei_sock) = @$self{0, 1};
 	my @err;
 	if (my $im = delete($self->{im})) {
 		eval { $im->done };
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index d4f34733..60137cb4 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -363,7 +363,7 @@ print STDERR $_;
 						$self, $lei, $each_smsg);
 		};
 		my ($exc, $code) = ($@, $?);
-		$lei->sto_done_request if delete($self->{-sto_imported});
+		$lei->sto_checkpoint if delete($self->{-sto_imported});
 		die "E: $exc" if $exc && !$code;
 		my $nr = delete $lei->{-nr_remote_eml} // 0;
 		if (!$code) { # don't update if no results, maybe MTA is down
@@ -399,7 +399,7 @@ sub query_done { # EOF callback for main daemon
 	delete $lei->{lxs};
 	($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
 		warn "BUG: {sto} missing with --mail-sync";
-	$lei->sto_done_request;
+	$lei->sto_checkpoint;
 	$lei->{ovv}->ovv_end($lei);
 	if ($l2m) { # close() calls LeiToMail reap_compress
 		$l2m->finish_output($lei);
diff --git a/t/lei-store-fail.t b/t/lei-store-fail.t
index c2f03148..df6e975a 100644
--- a/t/lei-store-fail.t
+++ b/t/lei-store-fail.t
@@ -39,7 +39,7 @@ EOM
 	lei_ok qw(q m:testmessage@example.com);
 	is($lei_out, "[null]\n", 'delayed commit is unindexed');
 
-	# make immediate ->sto_done_request fail from mboxrd import:
+	# make immediate ->sto_checkpoint fail from mboxrd import:
 	remove_tree("$ENV{HOME}/.local/share/lei/store");
 	# subsequent lei commands are undefined behavior,
 	# but we need to make sure the current lei command fails:

^ permalink raw reply related	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2024-04-12 19:32 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2024-04-12 19:32 [PATCH] WIPcheckpoint Eric Wong
  -- strict thread matches above, loose matches on Subject: below --
2024-04-11 21:09 Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).