dumping ground for random patches and texts
 help / color / mirror / Atom feed
* [PATCH 01/23] lei: more consistent dedupe and ovv_buf init
@ 2021-02-01  0:46 Eric Wong
  2021-02-01  0:46 ` [PATCH 02/23] ipc: switch wq to use the event loop Eric Wong
                   ` (21 more replies)
  0 siblings, 22 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:46 UTC (permalink / raw)
  To: spew

This fixes "--dedupe none" with Maildir which doesn't
create the object at all.
---
 lib/PublicInbox/LeiDedupe.pm   |  4 ++--
 lib/PublicInbox/LeiOverview.pm | 18 ++++++++++--------
 lib/PublicInbox/LeiToMail.pm   |  3 +--
 3 files changed, 13 insertions(+), 12 deletions(-)

diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm
index 3f478aa4..e3ae8e33 100644
--- a/lib/PublicInbox/LeiDedupe.pm
+++ b/lib/PublicInbox/LeiDedupe.pm
@@ -103,8 +103,8 @@ sub new {
 	bless [ $skv, undef, undef, $m ], $cls;
 }
 
-# returns true on unseen messages according to the deduplication strategy,
-# returns false if seen
+# returns true on seen messages according to the deduplication strategy,
+# returns false if unseen
 sub is_dup {
 	my ($self, $eml, $oid) = @_;
 	!$self->[1]->($eml, $oid);
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index c67e2747..fa041457 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -92,13 +92,14 @@ sub new {
 			ovv_out_lk_init($self);
 		}
 	}
-	if (!$json) {
+	if ($json) {
+		$lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei);
+	} else {
 		# default to the cheapest sort since MUA usually resorts
 		$lei->{opt}->{'sort'} //= 'docid' if $dst ne '/dev/stdout';
 		$lei->{l2m} = eval { PublicInbox::LeiToMail->new($lei) };
 		return $lei->fail($@) if $@;
 	}
-	$lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei);
 	$self;
 }
 
@@ -201,15 +202,19 @@ sub _json_pretty {
 
 sub ovv_each_smsg_cb { # runs in wq worker usually
 	my ($self, $lei, $ibxish) = @_;
-	my $json;
+	my ($json, $dedupe);
 	$lei->{1}->autoflush(1);
-	my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing';
 	if (my $pkg = $self->{json}) {
 		$json = $pkg->new;
 		$json->utf8->canonical;
 		$json->ascii(1) if $lei->{opt}->{ascii};
 	}
-	my $l2m = $lei->{l2m} or $dedupe->prepare_dedupe;
+	my $l2m = $lei->{l2m};
+	if (!$l2m) {
+		$dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing';
+		$dedupe->prepare_dedupe;
+	}
+	$lei->{ovv_buf} = \(my $buf = '') if !$l2m;
 	if ($l2m && !$ibxish) { # remote https?:// mboxrd
 		delete $l2m->{-wq_s1};
 		my $g2m = $l2m->can('git_to_mail');
@@ -241,7 +246,6 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
 		my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git
 		$self->{git} = $git; # for ovv_atexit_child
 		my $g2m = $l2m->can('git_to_mail');
-		$dedupe->prepare_dedupe;
 		sub {
 			my ($smsg, $mitem) = @_;
 			$smsg->{pct} = get_pct($mitem) if $mitem;
@@ -249,7 +253,6 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
 		};
 	} elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
 		my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
-		$lei->{ovv_buf} = \(my $buf = '');
 		sub { # DIY prettiness :P
 			my ($smsg, $mitem) = @_;
 			return if $dedupe->is_smsg_dup($smsg);
@@ -273,7 +276,6 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
 		}
 	} elsif ($json) {
 		my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL
-		$lei->{ovv_buf} = \(my $buf = '');
 		sub {
 			my ($smsg, $mitem) = @_;
 			return if $dedupe->is_smsg_dup($smsg);
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 61b546b5..244bfb67 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -323,7 +323,7 @@ sub _buf2maildir {
 sub _maildir_write_cb ($$) {
 	my ($self, $lei) = @_;
 	my $dedupe = $lei->{dedupe};
-	$dedupe->prepare_dedupe;
+	$dedupe->prepare_dedupe if $dedupe;
 	my $dst = $lei->{ovv}->{dst};
 	sub { # for git_to_mail
 		my ($buf, $smsg, $eml) = @_;
@@ -464,7 +464,6 @@ sub write_mail { # via ->wq_do
 	my $wcb = $self->{wcb} //= do { # first message
 		my %sig = $lei->atfork_child_wq($self);
 		@SIG{keys %sig} = values %sig; # not local
-		$lei->{dedupe}->prepare_dedupe;
 		$self->write_cb($lei);
 	};
 	my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 02/23] ipc: switch wq to use the event loop
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
@ 2021-02-01  0:46 ` Eric Wong
  2021-02-01  0:46 ` [PATCH 03/23] lei: remove per-child SIG{__WARN__} Eric Wong
                   ` (20 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:46 UTC (permalink / raw)
  To: spew

This will let us to maximize the capability of our
asynchronous git API.

I tried using a synchronous API; and even with libgit2 in the
same process to avoid the IPC cost failed to match the
throughput afforded by this change.  This is because libgit2
is built (at least on Debian) with the SHA-1 collision code.
---
 MANIFEST                     |  1 +
 lib/PublicInbox/IPC.pm       | 17 +++++++++++------
 lib/PublicInbox/LeiToMail.pm | 26 +++++++++++---------------
 lib/PublicInbox/WQWorker.pm  | 34 ++++++++++++++++++++++++++++++++++
 4 files changed, 57 insertions(+), 21 deletions(-)
 create mode 100644 lib/PublicInbox/WQWorker.pm

diff --git a/MANIFEST b/MANIFEST
index 2077ab12..c10775e4 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -228,6 +228,7 @@ lib/PublicInbox/V2Writable.pm
 lib/PublicInbox/View.pm
 lib/PublicInbox/ViewDiff.pm
 lib/PublicInbox/ViewVCS.pm
+lib/PublicInbox/WQWorker.pm
 lib/PublicInbox/WWW.pm
 lib/PublicInbox/WWW.pod
 lib/PublicInbox/Watch.pm
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index d2ff038d..479c4377 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -14,6 +14,7 @@ use Carp qw(confess croak);
 use PublicInbox::DS qw(dwaitpid);
 use PublicInbox::Spawn;
 use PublicInbox::OnDestroy;
+use PublicInbox::WQWorker;
 use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
 use Errno qw(EMSGSIZE);
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
@@ -151,6 +152,8 @@ sub wq_wait_old {
 # for base class, override in sub classes
 sub ipc_atfork_prepare {}
 
+sub wq_atexit_child {}
+
 sub ipc_atfork_child {
 	my ($self) = @_;
 	my $io = delete($self->{-ipc_atfork_child_close}) or return;
@@ -251,10 +254,11 @@ sub ipc_sibling_atfork_child {
 	$pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
 }
 
-sub _recv_and_run {
+sub recv_and_run {
 	my ($self, $s2, $len, $full_stream) = @_;
 	my @fds = $recv_cmd->($s2, my $buf, $len);
-	my $n = length($buf // '') or return;
+	return if scalar(@fds) && !defined($fds[0]);
+	my $n = length($buf) or return 0;
 	my $nfd = 0;
 	for my $fd (@fds) {
 		if (open(my $cmdfh, '+<&=', $fd)) {
@@ -281,14 +285,15 @@ sub _recv_and_run {
 
 sub wq_worker_loop ($) {
 	my ($self) = @_;
-	my $len = $self->{wq_req_len} // (4096 * 33);
-	my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
-	1 while (_recv_and_run($self, $s2, $len));
+	my $wqw = PublicInbox::WQWorker->new($self);
+	PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} });
+	PublicInbox::DS->EventLoop;
+	PublicInbox::DS->Reset;
 }
 
 sub do_sock_stream { # via wq_do, for big requests
 	my ($self, $len) = @_;
-	_recv_and_run($self, delete $self->{0}, $len, 1);
+	recv_and_run($self, delete $self->{0}, $len, 1);
 }
 
 sub wq_do { # always async
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 244bfb67..1f6c2a3b 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -12,11 +12,16 @@ use PublicInbox::ProcessPipe;
 use PublicInbox::Spawn qw(which spawn popen_rd);
 use PublicInbox::LeiDedupe;
 use PublicInbox::OnDestroy;
+use PublicInbox::Git;
+use PublicInbox::GitAsyncCat;
 use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
 use Errno qw(EEXIST ESPIPE ENOENT);
-use PublicInbox::Git;
+
+# struggles with short-lived repos, Gcf2Client makes little sense with lei;
+# but we may use in-process libgit2 in the future.
+$PublicInbox::GitAsyncCat::GCF2C = 0;
 
 my %kw2char = ( # Maildir characters
 	draft => 'D',
@@ -467,27 +472,18 @@ sub write_mail { # via ->wq_do
 		$self->write_cb($lei);
 	};
 	my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
-	$git->cat_async($smsg->{blob}, \&git_to_mail, [$wcb, $smsg, $not_done]);
+	git_async_cat($git, $smsg->{blob}, \&git_to_mail,
+				[$wcb, $smsg, $not_done]);
 }
 
-# We rely on OnDestroy to run this before ->DESTROY, since ->DESTROY
-# ordering is unstable at worker exit and may cause segfaults
-sub reap_gits {
+sub wq_atexit_child {
 	my ($self) = @_;
 	delete $self->{wcb};
 	for my $git (delete @$self{grep(/\A$$\0/, keys %$self)}) {
 		$git->async_wait_all;
 	}
-}
-
-sub DESTROY { delete $_[0]->{wcb} }
-
-sub ipc_atfork_child { # runs after IPC::wq_worker_loop
-	my ($self) = @_;
-	$self->SUPER::ipc_atfork_child;
-	# reap_gits needs to run before $self->DESTROY,
-	# IPC.pm will ensure that.
-	PublicInbox::OnDestroy->new($$, \&reap_gits, $self);
+	$SIG{__WARN__} = 'DEFAULT';
+	$SIG{PIPE} = 'DEFAULT';
 }
 
 1;
diff --git a/lib/PublicInbox/WQWorker.pm b/lib/PublicInbox/WQWorker.pm
new file mode 100644
index 00000000..25a5e4fb
--- /dev/null
+++ b/lib/PublicInbox/WQWorker.pm
@@ -0,0 +1,34 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# for PublicInbox::IPC wq_* (work queue) workers
+package PublicInbox::WQWorker;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLEXCLUSIVE EPOLLET);
+use Errno qw(EAGAIN ECONNRESET);
+use IO::Handle (); # blocking
+
+sub new {
+	my (undef, $wq) = @_;
+	my $s2 = $wq->{-wq_s2} // die 'BUG: no -wq_s2';
+	$s2->blocking(0);
+	my $self = bless { sock => $s2, wq => $wq }, __PACKAGE__;
+	$self->SUPER::new($s2, EPOLLEXCLUSIVE|EPOLLIN|EPOLLET);
+	$self;
+}
+
+sub event_step {
+	my ($self) = @_;
+	my $n;
+	do {
+		$n = $self->{wq}->recv_and_run($self->{sock}, 4096 * 33);
+	} while ($n);
+	return if !defined($n) && $! == EAGAIN; # likely
+	warn "wq worker error: $!\n" if !defined($n) && $! != ECONNRESET;
+	$self->{wq}->wq_atexit_child;
+	$self->close; # PublicInbox::DS::close
+}
+
+1;

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 03/23] lei: remove per-child SIG{__WARN__}
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
  2021-02-01  0:46 ` [PATCH 02/23] ipc: switch wq to use the event loop Eric Wong
@ 2021-02-01  0:46 ` Eric Wong
  2021-02-01  0:46 ` [PATCH 04/23] lei: remove SIGPIPE handler Eric Wong
                   ` (19 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:46 UTC (permalink / raw)
  To: spew

The top-level $SIG{__WARN__} using $current_lei does the job,
already.
---
 lib/PublicInbox/LEI.pm | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 3ed330f9..ceba16e4 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -359,8 +359,7 @@ sub atfork_child_wq {
 	%PATH2CFG = ();
 	undef $errors_log;
 	$quit = \&CORE::exit;
-	(__WARN__ => sub { err($self, @_) },
-	PIPE => sub {
+	(PIPE => sub {
 		$self->x_it(13); # SIGPIPE = 13
 		# we need to close explicitly to avoid Perl warning on SIGPIPE
 		for my $i (1, 2) {

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 04/23] lei: remove SIGPIPE handler
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
  2021-02-01  0:46 ` [PATCH 02/23] ipc: switch wq to use the event loop Eric Wong
  2021-02-01  0:46 ` [PATCH 03/23] lei: remove per-child SIG{__WARN__} Eric Wong
@ 2021-02-01  0:46 ` Eric Wong
  2021-02-01  0:46 ` [PATCH 05/23] ipc: more helpful ETOOMANYREFS error messages Eric Wong
                   ` (18 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:46 UTC (permalink / raw)
  To: spew

It doesn't save us any code, and the action-at-a-distance
element was making it confusing to track down actual problems.
Another potential problem was keeping references alive too long.

So do like we would a C100K server and check every write
while still ensuring lei(1) exit with a proper SIGPIPE
iff needed.
---
 lib/PublicInbox/IPC.pm         | 10 +++---
 lib/PublicInbox/LEI.pm         | 56 +++++++++++++++++++++-------------
 lib/PublicInbox/LeiExternal.pm |  3 +-
 lib/PublicInbox/LeiOverview.pm | 33 ++++++++------------
 lib/PublicInbox/LeiToMail.pm   | 45 ++++++++++++---------------
 lib/PublicInbox/LeiXSearch.pm  | 17 ++++-------
 t/lei_to_mail.t                | 31 ++++++++++---------
 7 files changed, 96 insertions(+), 99 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 479c4377..172552b9 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -139,8 +139,10 @@ sub ipc_worker_spawn {
 
 sub ipc_worker_reap { # dwaitpid callback
 	my ($self, $pid) = @_;
-	# SIGTERM (15) is our default exit signal
-	warn "PID:$pid died with \$?=$?\n" if $? && ($? & 127) != 15;
+	return if !$?;
+	# TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
+	my $s = $? & 127;
+	warn "PID:$pid died with \$?=$?\n" if $s != 15 && $s != 13;
 }
 
 sub wq_wait_old {
@@ -278,7 +280,7 @@ sub recv_and_run {
 	undef $buf;
 	my $sub = shift @$args;
 	eval { $self->$sub(@$args) };
-	warn "$$ wq_worker: $@" if $@ && ref($@) ne 'PublicInbox::SIGPIPE';
+	warn "$$ wq_worker: $@" if $@;
 	delete @$self{0..($nfd-1)};
 	$n;
 }
@@ -320,7 +322,7 @@ sub wq_do { # always async
 	} else {
 		@$self{0..$#$ios} = @$ios;
 		eval { $self->$sub(@args) };
-		warn "wq_do: $@" if $@ && ref($@) ne 'PublicInbox::SIGPIPE';
+		warn "wq_do: $@" if $@;
 		delete @$self{0..$#$ios}; # don't close
 	}
 }
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index ceba16e4..b915bb0c 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -12,7 +12,7 @@ use parent qw(PublicInbox::DS PublicInbox::LeiExternal
 	PublicInbox::LeiQuery);
 use Getopt::Long ();
 use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
-use Errno qw(EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET);
+use Errno qw(EPIPE EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET);
 use Cwd qw(getcwd);
 use POSIX ();
 use IO::Handle ();
@@ -277,7 +277,11 @@ sub x_it ($$) {
 	dump_and_clear_log();
 	if (my $sock = $self->{sock}) {
 		send($sock, "x_it $code", MSG_EOR);
-	} elsif (!($code & 127)) { # oneshot, ignore signals
+	} elsif (my $signum = ($code & 127)) { # oneshot, usually SIGPIPE (13)
+		$SIG{PIPE} = 'DEFAULT'; # $SIG{$signum} doesn't work
+		kill $signum, $$;
+		sleep; # wait for signal
+	} else { # oneshot
 		# don't want to end up using $? from child processes
 		for my $f (qw(lxs l2m)) {
 			my $wq = delete $self->{$f} or next;
@@ -287,14 +291,15 @@ sub x_it ($$) {
 	}
 }
 
-sub puts ($;@) { print { shift->{1} } map { "$_\n" } @_ }
-
-sub out ($;@) { print { shift->{1} } @_ }
-
 sub err ($;@) {
 	my $self = shift;
-	my $err = $self->{2} // ($self->{pgr} // [])->[2] // *STDERR{IO};
-	print $err @_, (substr($_[-1], -1, 1) eq "\n" ? () : "\n");
+	my $err = $self->{2} // ($self->{pgr} // [])->[2] // *STDERR{GLOB};
+	my $eor = (substr($_[-1], -1, 1) eq "\n" ? () : "\n");
+	print $err @_, $eor and return;
+	my $old_err = delete $self->{2};
+	close($old_err) if $! == EPIPE && $old_err;;
+	$err = $self->{2} = ($self->{pgr} // [])->[2] // *STDERR{GLOB};
+	print $err @_, $eor or print STDERR @_, $eor;
 }
 
 sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) }
@@ -306,6 +311,17 @@ sub fail ($$;$) {
 	undef;
 }
 
+sub out ($;@) {
+	my $self = shift;
+	return if print { $self->{1} // return } @_; # likely
+	return note_sigpipe($self, 1) if $! == EPIPE;
+	my $err = "error writing to stdout: $!";
+	delete $self->{1};
+	fail($self, $err);
+}
+
+sub puts ($;@) { out(shift, map { "$_\n" } @_) }
+
 sub child_error { # passes non-fatal curl exit codes to user
 	my ($self, $child_error) = @_; # child_error is $?
 	if (my $sock = $self->{sock}) { # send to lei(1) client
@@ -350,27 +366,23 @@ sub io_restore ($$) {
 	}
 }
 
-# usage: my %sig = $lei->atfork_child_wq($wq);
-#	 local @SIG{keys %sig} = values %sig;
+# triggers sigpipe_handler
+sub note_sigpipe {
+	my ($self, $fd) = @_;
+	close(delete($self->{$fd})); # explicit close silences Perl warning
+	syswrite($self->{op_pipe}, '!') if $self->{op_pipe};
+	x_it($self, 13);
+}
+
 sub atfork_child_wq {
 	my ($self, $wq) = @_;
 	io_restore($self, $wq);
+	-p $self->{op_pipe} or die 'BUG: {op_pipe} expected';
 	io_restore($self->{l2m}, $wq);
 	%PATH2CFG = ();
 	undef $errors_log;
 	$quit = \&CORE::exit;
-	(PIPE => sub {
-		$self->x_it(13); # SIGPIPE = 13
-		# we need to close explicitly to avoid Perl warning on SIGPIPE
-		for my $i (1, 2) {
-			next unless $self->{$i} && (-p $self->{$i} || -S _);
-			close(delete $self->{$i});
-		}
-		# trigger the LeiXSearch $done OpPipe:
-		syswrite($self->{op_pipe}, '!') if $self->{op_pipe};
-		$SIG{PIPE} = 'DEFAULT';
-		die bless(\"$_[0]", 'PublicInbox::SIGPIPE'),
-	});
+	$current_lei = $self; # for SIG{__WARN__}
 }
 
 sub io_extract ($;@) {
diff --git a/lib/PublicInbox/LeiExternal.pm b/lib/PublicInbox/LeiExternal.pm
index bf07c41c..b1176824 100644
--- a/lib/PublicInbox/LeiExternal.pm
+++ b/lib/PublicInbox/LeiExternal.pm
@@ -31,11 +31,10 @@ sub _externals_each {
 
 sub lei_ls_external {
 	my ($self, @argv) = @_;
-	my $out = $self->{1};
 	my ($OFS, $ORS) = $self->{opt}->{z} ? ("\0", "\0\0") : (" ", "\n");
 	$self->_externals_each(sub {
 		my ($loc, $boost_val) = @_;
-		print $out $loc, $OFS, 'boost=', $boost_val, $ORS;
+		$self->out($loc, $OFS, 'boost=', $boost_val, $ORS);
 	});
 }
 
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index fa041457..1d62ffe2 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -107,28 +107,22 @@ sub new {
 sub ovv_begin {
 	my ($self, $lei) = @_;
 	if ($self->{fmt} eq 'json') {
-		print { $lei->{1} } '[';
+		$lei->out('[');
 	} # TODO HTML/Atom/...
 }
 
 # called once by parent (via PublicInbox::EOFpipe)
 sub ovv_end {
 	my ($self, $lei) = @_;
-	my $out = $lei->{1} or return;
 	if ($self->{fmt} eq 'json') {
 		# JSON doesn't allow trailing commas, and preventing
 		# trailing commas is a PITA when parallelizing outputs
-		print $out "null]\n";
+		$lei->out("null]\n");
 	} elsif ($self->{fmt} eq 'concatjson') {
-		print $out "\n";
+		$lei->out("\n");
 	}
 }
 
-sub ovv_atfork_child {
-	my ($self) = @_;
-	# reopen dedupe here
-}
-
 # prepares an smsg for JSON
 sub _unbless_smsg {
 	my ($smsg, $mitem) = @_;
@@ -168,9 +162,8 @@ sub ovv_atexit_child {
 		$git->async_wait_all;
 	}
 	if (my $bref = delete $lei->{ovv_buf}) {
-		my $out = $lei->{1} or return;
 		my $lk = $self->lock_for_scope;
-		print $out $$bref;
+		$lei->out($$bref);
 	}
 }
 
@@ -268,11 +261,10 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
 				}
 			} sort keys %$smsg);
 			$buf .= $EOR;
-			if (length($buf) > 65536) {
-				my $lk = $self->lock_for_scope;
-				print { $lei->{1} } $buf;
-				$buf = '';
-			}
+			return if length($buf) < 65536;
+			my $lk = $self->lock_for_scope;
+			$lei->out($buf);
+			$buf = '';
 		}
 	} elsif ($json) {
 		my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL
@@ -280,11 +272,10 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
 			my ($smsg, $mitem) = @_;
 			return if $dedupe->is_smsg_dup($smsg);
 			$buf .= $json->encode(_unbless_smsg(@_)) . $ORS;
-			if (length($buf) > 65536) {
-				my $lk = $self->lock_for_scope;
-				print { $lei->{1} } $buf;
-				$buf = '';
-			}
+			return if length($buf) < 65536;
+			my $lk = $self->lock_for_scope;
+			$lei->out($buf);
+			$buf = '';
 		}
 	} # else { ...
 }
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 1f6c2a3b..01e7cec5 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -17,7 +17,7 @@ use PublicInbox::GitAsyncCat;
 use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
-use Errno qw(EEXIST ESPIPE ENOENT);
+use Errno qw(EEXIST ESPIPE ENOENT EPIPE);
 
 # struggles with short-lived repos, Gcf2Client makes little sense with lei;
 # but we may use in-process libgit2 in the future.
@@ -68,14 +68,16 @@ sub _mbox_hdr_buf ($$$) {
 }
 
 sub atomic_append { # for on-disk destinations (O_APPEND, or O_EXCL)
-	my ($fh, $buf) = @_;
-	defined(my $w = syswrite($fh, $$buf)) or die "write: $!";
-	$w == length($$buf) or die "short write: $w != ".length($$buf);
-}
-
-sub _print_full {
-	my ($fh, $buf) = @_;
-	print $fh $$buf or die "print: $!";
+	my ($lei, $buf) = @_;
+	if (defined(my $w = syswrite($lei->{1} // return, $$buf))) {
+		return if $w == length($$buf);
+		$buf = "short atomic write: $w != ".length($$buf);
+	} elsif ($! == EPIPE) {
+		return $lei->note_sigpipe(1);
+	} else {
+		$buf = "atomic write: $!";
+	}
+	$lei->fail($buf);
 }
 
 sub eml2mboxrd ($;$) {
@@ -248,24 +250,19 @@ sub _mbox_write_cb ($$) {
 	my $ovv = $lei->{ovv};
 	my $m = 'eml2'.$ovv->{fmt};
 	my $eml2mbox = $self->can($m) or die "$self->$m missing";
-	my $out = $lei->{1} // die "no stdout ($m, $ovv->{dst})"; # redirected earlier
-	$out->autoflush(1);
-	my $write = $ovv->{lock_path} ? \&_print_full : \&atomic_append;
+	$lei->{1} // die "no stdout ($m, $ovv->{dst})"; # redirected earlier
+	$lei->{1}->autoflush(1);
+	my $atomic_append = !defined($ovv->{lock_path});
 	my $dedupe = $lei->{dedupe};
 	$dedupe->prepare_dedupe;
 	sub { # for git_to_mail
 		my ($buf, $smsg, $eml) = @_;
-		return unless $out;
 		$eml //= PublicInbox::Eml->new($buf);
-		if (!$dedupe->is_dup($eml, $smsg->{blob})) {
-			$buf = $eml2mbox->($eml, $smsg);
-			my $lk = $ovv->lock_for_scope;
-			eval { $write->($out, $buf) };
-			if ($@) {
-				die $@ if ref($@) ne 'PublicInbox::SIGPIPE';
-				undef $out
-			}
-		}
+		return if $dedupe->is_dup($eml, $smsg->{blob});
+		$buf = $eml2mbox->($eml, $smsg);
+		return atomic_append($lei, $buf) if $atomic_append;
+		my $lk = $ovv->lock_for_scope;
+		$lei->out($$buf);
 	}
 }
 
@@ -467,8 +464,7 @@ sub write_mail { # via ->wq_do
 	my ($self, $git_dir, $smsg, $lei) = @_;
 	my $not_done = delete $self->{$lei->{each_smsg_not_done}};
 	my $wcb = $self->{wcb} //= do { # first message
-		my %sig = $lei->atfork_child_wq($self);
-		@SIG{keys %sig} = values %sig; # not local
+		$lei->atfork_child_wq($self);
 		$self->write_cb($lei);
 	};
 	my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
@@ -483,7 +479,6 @@ sub wq_atexit_child {
 		$git->async_wait_all;
 	}
 	$SIG{__WARN__} = 'DEFAULT';
-	$SIG{PIPE} = 'DEFAULT';
 }
 
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index e69b637c..de82a7da 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -109,8 +109,7 @@ sub wait_startq ($) {
 sub query_thread_mset { # for --thread
 	my ($self, $lei, $ibxish) = @_;
 	local $0 = "$0 query_thread_mset";
-	my %sig = $lei->atfork_child_wq($self);
-	local @SIG{keys %sig} = values %sig;
+	$lei->atfork_child_wq($self);
 	my $startq = delete $lei->{startq};
 
 	my ($srch, $over) = ($ibxish->search, $ibxish->over);
@@ -145,8 +144,7 @@ sub query_thread_mset { # for --thread
 sub query_mset { # non-parallel for non-"--thread" users
 	my ($self, $lei) = @_;
 	local $0 = "$0 query_mset";
-	my %sig = $lei->atfork_child_wq($self);
-	local @SIG{keys %sig} = values %sig;
+	$lei->atfork_child_wq($self);
 	my $startq = delete $lei->{startq};
 	my $mo = { %{$lei->{mset_opt}} };
 	my $mset;
@@ -187,8 +185,7 @@ sub kill_reap {
 sub query_remote_mboxrd {
 	my ($self, $lei, $uris) = @_;
 	local $0 = "$0 query_remote_mboxrd";
-	my %sig = $lei->atfork_child_wq($self); # keep $self->{5} startq
-	local @SIG{keys %sig} = values %sig;
+	$lei->atfork_child_wq($self);
 	my ($opt, $env) = @$lei{qw(opt env)};
 	my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm');
 	push(@qform, t => 1) if $opt->{thread};
@@ -351,9 +348,7 @@ sub start_query { # always runs in main (lei-daemon) process
 sub query_prepare { # called by wq_do
 	my ($self, $lei) = @_;
 	local $0 = "$0 query_prepare";
-	my %sig = $lei->atfork_child_wq($self);
-	-p $lei->{op_pipe} or die "BUG: \$done pipe expected";
-	local @SIG{keys %sig} = values %sig;
+	$lei->atfork_child_wq($self);
 	delete $lei->{l2m}->{-wq_s1};
 	eval { $lei->{l2m}->do_augment($lei) };
 	$lei->fail($@) if $@;
@@ -363,11 +358,11 @@ sub query_prepare { # called by wq_do
 sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
 	my ($lei) = @_;
 	my $lxs = delete $lei->{lxs};
-	if ($lxs && $lxs->wq_kill_old) {
-		kill 'PIPE', $$;
+	if ($lxs && $lxs->wq_kill_old) { # is this the daemon?
 		$lxs->wq_wait_old;
 	}
 	close(delete $lei->{1}) if $lei->{1};
+	$lei->x_it(13);
 }
 
 sub do_query {
diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t
index 47c0e3d4..f7535687 100644
--- a/t/lei_to_mail.t
+++ b/t/lei_to_mail.t
@@ -12,6 +12,7 @@ use List::Util qw(shuffle);
 require_mods(qw(DBD::SQLite));
 require PublicInbox::MboxReader;
 require PublicInbox::LeiOverview;
+require PublicInbox::LEI;
 use_ok 'PublicInbox::LeiToMail';
 my $from = "Content-Length: 10\nSubject: x\n\nFrom hell\n";
 my $noeol = "Subject: x\n\nFrom hell";
@@ -73,7 +74,11 @@ for my $mbox (@MBOX) {
 my ($tmpdir, $for_destroy) = tmpdir();
 local $ENV{TMPDIR} = $tmpdir;
 open my $err, '>>', "$tmpdir/lei.err" or BAIL_OUT $!;
-my $lei = { 2 => $err };
+my $lei = bless { 2 => $err }, 'PublicInbox::LEI';
+my $commit = sub {
+	$_[0] = undef; # wcb
+	delete $lei->{1};
+};
 my $buf = <<'EOM';
 From: x@example.com
 Subject: x
@@ -98,9 +103,7 @@ my $wcb_get = sub {
 	my $zpipe = $l2m->pre_augment($lei);
 	$l2m->do_augment($lei);
 	$l2m->post_augment($lei, $zpipe);
-	my $cb = $l2m->write_cb($lei);
-	delete $lei->{1};
-	$cb;
+	$l2m->write_cb($lei);
 };
 
 my $deadbeef = { blob => 'deadbeef', kw => [ qw(seen) ] };
@@ -109,7 +112,7 @@ my $orig = do {
 	is(ref $wcb, 'CODE', 'write_cb returned callback');
 	ok(-f $fn && !-s _, 'empty file created');
 	$wcb->(\(my $dup = $buf), $deadbeef);
-	undef $wcb;
+	$commit->($wcb);
 	open my $fh, '<', $fn or BAIL_OUT $!;
 	my $raw = do { local $/; <$fh> };
 	like($raw, qr/^blah\n/sm, 'wrote content');
@@ -119,7 +122,7 @@ my $orig = do {
 	$wcb = $wcb_get->($mbox, $fn);
 	ok(-f $fn && !-s _, 'truncated mbox destination');
 	$wcb->(\($dup = $buf), $deadbeef);
-	undef $wcb;
+	$commit->($wcb);
 	open $fh, '<', $fn or BAIL_OUT $!;
 	is(do { local $/; <$fh> }, $raw, 'jobs > 1');
 	$raw;
@@ -134,7 +137,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
 		my $f = "$fn.$zsfx";
 		my $wcb = $wcb_get->($mbox, $f);
 		$wcb->(\(my $dup = $buf), $deadbeef);
-		undef $wcb;
+		$commit->($wcb);
 		my $uncompressed = xqx([@$dc_cmd, $f]);
 		is($uncompressed, $orig, "$zsfx works unlocked");
 
@@ -142,13 +145,13 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
 		unlink $f or BAIL_OUT "unlink $!";
 		$wcb = $wcb_get->($mbox, $f);
 		$wcb->(\($dup = $buf), $deadbeef);
-		undef $wcb;
+		$commit->($wcb);
 		is(xqx([@$dc_cmd, $f]), $orig, "$zsfx matches with lock");
 
 		local $lei->{opt} = { augment => 1 };
 		$wcb = $wcb_get->($mbox, $f);
 		$wcb->(\($dup = $buf . "\nx\n"), $deadbeef);
-		undef $wcb; # commit
+		$commit->($wcb);
 
 		my $cat = popen_rd([@$dc_cmd, $f]);
 		my @raw;
@@ -160,7 +163,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
 		local $lei->{opt} = { augment => 1, jobs => 2 };
 		$wcb = $wcb_get->($mbox, $f);
 		$wcb->(\($dup = $buf . "\ny\n"), $deadbeef);
-		undef $wcb; # commit
+		$commit->($wcb);
 
 		my @raw3;
 		$cat = popen_rd([@$dc_cmd, $f]);
@@ -183,7 +186,7 @@ if ('default deduplication uses content_hash') {
 	my $wcb = $wcb_get->('mboxo', $fn);
 	$deadbeef->{kw} = [];
 	$wcb->(\(my $x = $buf), $deadbeef) for (1..2);
-	undef $wcb; # undef to commit changes
+	$commit->($wcb);
 	my $cmp = '';
 	open my $fh, '<', $fn or BAIL_OUT $!;
 	PublicInbox::MboxReader->mboxo($fh, sub { $cmp .= $as_orig->(@_) });
@@ -192,7 +195,7 @@ if ('default deduplication uses content_hash') {
 	local $lei->{opt} = { augment => 1 };
 	$wcb = $wcb_get->('mboxo', $fn);
 	$wcb->(\($x = $buf . "\nx\n"), $deadbeef) for (1..2);
-	undef $wcb; # undef to commit changes
+	$commit->($wcb);
 	open $fh, '<', $fn or BAIL_OUT $!;
 	my @x;
 	PublicInbox::MboxReader->mboxo($fh, sub { push @x, $as_orig->(@_) });
@@ -206,7 +209,7 @@ if ('default deduplication uses content_hash') {
 	local $lei->{1} = $tmp;
 	my $wcb = $wcb_get->('mboxrd', '/dev/stdout');
 	$wcb->(\(my $x = $buf), $deadbeef);
-	undef $wcb; # commit
+	$commit->($wcb);
 	seek($tmp, 0, SEEK_SET) or BAIL_OUT $!;
 	my $cmp = '';
 	PublicInbox::MboxReader->mboxrd($tmp, sub { $cmp .= $as_orig->(@_) });
@@ -220,7 +223,7 @@ SKIP: { # FIFO support
 	my $cat = popen_rd([which('cat'), $fn]);
 	my $wcb = $wcb_get->('mboxo', $fn);
 	$wcb->(\(my $x = $buf), $deadbeef);
-	undef $wcb; # commit
+	$commit->($wcb);
 	my $cmp = '';
 	PublicInbox::MboxReader->mboxo($cat, sub { $cmp .= $as_orig->(@_) });
 	is($cmp, $buf, 'message written to FIFO');

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 05/23] ipc: more helpful ETOOMANYREFS error messages
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
                   ` (2 preceding siblings ...)
  2021-02-01  0:46 ` [PATCH 04/23] lei: remove SIGPIPE handler Eric Wong
@ 2021-02-01  0:46 ` Eric Wong
  2021-02-01  0:46 ` [PATCH 06/23] lei: remove syslog dependency Eric Wong
                   ` (17 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:46 UTC (permalink / raw)
  To: spew

ETOOMANYREFS is probably a unfamiliar error to most users, so
give a hint about RLIMIT_NOFILE.  This can be hit on my system
running 3 simultaneous queries with my system default limit of
1024.

There's also no need to import Errno constants for uncommon
errors, so we'll stop using Errno, here.

In lei, we'll also try to bump RLIMIT_NOFILE as much as possible
to avoid this error.
---
 lib/PublicInbox/IPC.pm | 6 +++---
 lib/PublicInbox/LEI.pm | 5 +++++
 2 files changed, 8 insertions(+), 3 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 172552b9..37f02944 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -16,7 +16,6 @@ use PublicInbox::Spawn;
 use PublicInbox::OnDestroy;
 use PublicInbox::WQWorker;
 use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
-use Errno qw(EMSGSIZE);
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
 use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF();
 my $WQ_MAX_WORKERS = 4096;
@@ -303,8 +302,9 @@ sub wq_do { # always async
 	if (my $s1 = $self->{-wq_s1}) { # run in worker
 		my $fds = [ map { fileno($_) } @$ios ];
 		my $n = $send_cmd->($s1, $fds, freeze([$sub, @args]), MSG_EOR);
-		return if defined($n);
-		croak "sendmsg error: $!" if $! != EMSGSIZE;
+		return if defined($n); # likely
+		croak "sendmsg: $! (check RLIMIT_NOFILE)" if $!{ETOOMANYREFS};
+		croak "sendmsg: $!" if !$!{EMSGSIZE};
 		socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
 			croak "socketpair: $!";
 		my $buf = freeze([$sub, @args]);
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index b915bb0c..22cd20f6 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -925,6 +925,11 @@ sub lazy_start {
 		$! = $errno; # allow interpolation to stringify in die
 		die "connect($path): $!";
 	}
+	if (eval { require BSD::Resource }) {
+		my $NOFILE = BSD::Resource::RLIMIT_NOFILE();
+		my ($s, $h) = BSD::Resource::getrlimit($NOFILE);
+		BSD::Resource::setrlimit($NOFILE, $h, $h) if $s < $h;
+	}
 	umask(077) // die("umask(077): $!");
 	local $listener;
 	socket($listener, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!";

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 06/23] lei: remove syslog dependency
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
                   ` (3 preceding siblings ...)
  2021-02-01  0:46 ` [PATCH 05/23] ipc: more helpful ETOOMANYREFS error messages Eric Wong
@ 2021-02-01  0:46 ` Eric Wong
  2021-02-01  0:46 ` [PATCH 07/23] sharedkv: release {dbh} before rmtree Eric Wong
                   ` (16 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:46 UTC (permalink / raw)
  To: spew

It doesn't seem necessary now that we redirect and write
stuff to errors.log.
---
 lib/PublicInbox/LEI.pm | 17 ++++++-----------
 1 file changed, 6 insertions(+), 11 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 22cd20f6..c0b90451 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -14,10 +14,9 @@ use Getopt::Long ();
 use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
 use Errno qw(EPIPE EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET);
 use Cwd qw(getcwd);
-use POSIX ();
+use POSIX qw(strftime);
 use IO::Handle ();
 use Fcntl qw(SEEK_SET);
-use Sys::Syslog qw(syslog openlog);
 use PublicInbox::Config;
 use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLET);
 use PublicInbox::Sigfd;
@@ -1007,9 +1006,9 @@ sub lazy_start {
 				warn "$path dev/ino changed, quitting\n";
 				$path = undef;
 			}
-		} elsif (defined($path)) {
-			warn "stat($path): $!, quitting ...\n";
-			undef $path; # don't unlink
+		} elsif (defined($path)) { # ENOENT is common
+			warn "stat($path): $!, quitting ...\n" if $! != ENOENT;
+			undef $path;
 			$quit->();
 		}
 		return 1 if defined($path);
@@ -1029,18 +1028,14 @@ sub lazy_start {
 	# STDIN was redirected to /dev/null above, closing STDERR and
 	# STDOUT will cause the calling `lei' client process to finish
 	# reading the <$daemon> pipe.
-	openlog($path, 'pid', 'user');
 	local $SIG{__WARN__} = sub {
-		$current_lei ? err($current_lei, @_) : syslog('warning', "@_");
+		$current_lei ? err($current_lei, @_) : warn(
+		  strftime('%Y-%m-%dT%H:%M:%SZ', gmtime(time))," $$ ", @_);
 	};
-	my $on_destroy = PublicInbox::OnDestroy->new($$, sub {
-		syslog('crit', "$@") if $@;
-	});
 	open STDERR, '>&STDIN' or die "redirect stderr failed: $!";
 	open STDOUT, '>&STDIN' or die "redirect stdout failed: $!";
 	# $daemon pipe to `lei' closed, main loop begins:
 	PublicInbox::DS->EventLoop;
-	@$on_destroy = (); # cancel on_destroy if we get here
 	exit($exit_code // 0);
 }
 

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 07/23] sharedkv: release {dbh} before rmtree
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
                   ` (4 preceding siblings ...)
  2021-02-01  0:46 ` [PATCH 06/23] lei: remove syslog dependency Eric Wong
@ 2021-02-01  0:46 ` Eric Wong
  2021-02-01  0:46 ` [PATCH 08/23] sharedkv: retry rmtree in DESTROY Eric Wong
                   ` (15 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:46 UTC (permalink / raw)
  To: spew

This may be needed to avoid warnings/errors when
operating in single process mode.
---
 lib/PublicInbox/SharedKV.pm | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lib/PublicInbox/SharedKV.pm b/lib/PublicInbox/SharedKV.pm
index 94f2429f..e400c6cf 100644
--- a/lib/PublicInbox/SharedKV.pm
+++ b/lib/PublicInbox/SharedKV.pm
@@ -145,6 +145,7 @@ SELECT COUNT(k) FROM kv
 
 sub DESTROY {
 	my ($self) = @_;
+	delete $self->{dbh};
 	my $dir = delete $self->{"tmp$$.$self"} or return;
 	rmtree($dir);
 }

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 08/23] sharedkv: retry rmtree in DESTROY
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
                   ` (5 preceding siblings ...)
  2021-02-01  0:46 ` [PATCH 07/23] sharedkv: release {dbh} before rmtree Eric Wong
@ 2021-02-01  0:46 ` Eric Wong
  2021-02-01  0:46 ` [PATCH 09/23] lei: keep $lei around until workers are reaped Eric Wong
                   ` (14 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:46 UTC (permalink / raw)
  To: spew

And note number of tries.
---
 lib/PublicInbox/SharedKV.pm | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/lib/PublicInbox/SharedKV.pm b/lib/PublicInbox/SharedKV.pm
index e400c6cf..f5d09cc1 100644
--- a/lib/PublicInbox/SharedKV.pm
+++ b/lib/PublicInbox/SharedKV.pm
@@ -147,7 +147,13 @@ sub DESTROY {
 	my ($self) = @_;
 	delete $self->{dbh};
 	my $dir = delete $self->{"tmp$$.$self"} or return;
-	rmtree($dir);
+	my $tries = 0;
+	do {
+		$! = 0;
+		eval { rmtree($dir) };
+	} while ($@ && $!{ENOENT} && $tries++ < 5);
+	warn "error removing $dir: $@" if $@;
+	warn "Took $tries tries to remove $dir\n" if $tries;
 }
 
 1;

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 09/23] lei: keep $lei around until workers are reaped
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
                   ` (6 preceding siblings ...)
  2021-02-01  0:46 ` [PATCH 08/23] sharedkv: retry rmtree in DESTROY Eric Wong
@ 2021-02-01  0:46 ` Eric Wong
  2021-02-01  0:46 ` [PATCH 10/23] lei_dedupe: use Digest::SHA Eric Wong
                   ` (13 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:46 UTC (permalink / raw)
  To: spew

This prevents SharedKV->DESTROY in lei-daemon from triggering
before DB handles are closed in lei2mail processes.  The
{each_smsg_not_done} pipe was not sufficient in this case:
that gets closed at the end of the last git_to_mail callback
invocation.
---
 lib/PublicInbox/IPC.pm        | 10 +++++-----
 lib/PublicInbox/LEI.pm        |  2 +-
 lib/PublicInbox/LeiXSearch.pm |  4 ++--
 3 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 37f02944..689f32d0 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -137,7 +137,7 @@ sub ipc_worker_spawn {
 }
 
 sub ipc_worker_reap { # dwaitpid callback
-	my ($self, $pid) = @_;
+	my ($args, $pid) = @_;
 	return if !$?;
 	# TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
 	my $s = $? & 127;
@@ -145,9 +145,9 @@ sub ipc_worker_reap { # dwaitpid callback
 }
 
 sub wq_wait_old {
-	my ($self) = @_;
+	my ($self, $args) = @_;
 	my $pids = delete $self->{"-wq_old_pids.$$"} or return;
-	dwaitpid($_, \&ipc_worker_reap, $self) for @$pids;
+	dwaitpid($_, \&ipc_worker_reap, [$self, $args]) for @$pids;
 }
 
 # for base class, override in sub classes
@@ -164,7 +164,7 @@ sub ipc_atfork_child {
 
 # idempotent, can be called regardless of whether worker is active or not
 sub ipc_worker_stop {
-	my ($self) = @_;
+	my ($self, $args) = @_;
 	my ($pid, $ppid) = delete(@$self{qw(-ipc_pid -ipc_ppid)});
 	my ($w_req, $r_res) = delete(@$self{qw(-ipc_req -ipc_res)});
 	if (!$w_req && !$r_res) {
@@ -175,7 +175,7 @@ sub ipc_worker_stop {
 	$w_req = $r_res = undef;
 
 	return if $$ != $ppid;
-	dwaitpid($pid, \&ipc_worker_reap, $self);
+	dwaitpid($pid, \&ipc_worker_reap, [$self, $args]);
 }
 
 # use this if we have multiple readers reading curl or "pigz -dc"
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index c0b90451..4f7ed171 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -860,7 +860,7 @@ sub dclose {
 		if ($wq->wq_kill) {
 			$wq->wq_close
 		} elsif ($wq->wq_kill_old) {
-			$wq->wq_wait_old;
+			$wq->wq_wait_old($self);
 		}
 	}
 	close(delete $self->{1}) if $self->{1}; # may reap_compress
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index de82a7da..b4a9b89d 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -283,7 +283,7 @@ sub query_done { # EOF callback
 	my $has_l2m = exists $lei->{l2m};
 	for my $f (qw(lxs l2m)) {
 		my $wq = delete $lei->{$f} or next;
-		$wq->wq_wait_old;
+		$wq->wq_wait_old($lei);
 	}
 	$lei->{ovv}->ovv_end($lei);
 	if ($has_l2m) { # close() calls LeiToMail reap_compress
@@ -359,7 +359,7 @@ sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
 	my ($lei) = @_;
 	my $lxs = delete $lei->{lxs};
 	if ($lxs && $lxs->wq_kill_old) { # is this the daemon?
-		$lxs->wq_wait_old;
+		$lxs->wq_wait_old($lei);
 	}
 	close(delete $lei->{1}) if $lei->{1};
 	$lei->x_it(13);

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 10/23] lei_dedupe: use Digest::SHA
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
                   ` (7 preceding siblings ...)
  2021-02-01  0:46 ` [PATCH 09/23] lei: keep $lei around until workers are reaped Eric Wong
@ 2021-02-01  0:46 ` Eric Wong
  2021-02-01  0:46 ` [PATCH 11/23] lei_xsearch: load smsg package Eric Wong
                   ` (12 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:46 UTC (permalink / raw)
  To: spew

While it's loaded by ContentHash, we use Digest::SHA directly in
this package for smsg and OID-only deduplication.
---
 lib/PublicInbox/LeiDedupe.pm | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm
index e3ae8e33..55488376 100644
--- a/lib/PublicInbox/LeiDedupe.pm
+++ b/lib/PublicInbox/LeiDedupe.pm
@@ -5,6 +5,7 @@ use strict;
 use v5.10.1;
 use PublicInbox::SharedKV;
 use PublicInbox::ContentHash qw(content_hash);
+use Digest::SHA ();
 
 # n.b. mutt sets most of these headers not sure about Bytes
 our @OID_IGNORE = qw(Status X-Status Content-Length Lines Bytes);

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 11/23] lei_xsearch: load smsg package
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
                   ` (8 preceding siblings ...)
  2021-02-01  0:46 ` [PATCH 10/23] lei_dedupe: use Digest::SHA Eric Wong
@ 2021-02-01  0:46 ` Eric Wong
  2021-02-01  0:46 ` [PATCH 12/23] lei: deep clone {ovv} for l2m workers Eric Wong
                   ` (11 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:46 UTC (permalink / raw)
  To: spew

We use $smsg->populate here, so ensure it's loaded although
PublicInbox::Search currently loads it.
---
 lib/PublicInbox/LeiXSearch.pm | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index b4a9b89d..4d390ee4 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -16,6 +16,7 @@ use File::Spec ();
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::Spawn qw(popen_rd spawn which);
 use PublicInbox::MID qw(mids);
+use PublicInbox::Smsg;
 use Fcntl qw(SEEK_SET F_SETFL O_APPEND O_RDWR);
 
 sub new {

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 12/23] lei: deep clone {ovv} for l2m workers
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
                   ` (9 preceding siblings ...)
  2021-02-01  0:46 ` [PATCH 11/23] lei_xsearch: load smsg package Eric Wong
@ 2021-02-01  0:46 ` Eric Wong
  2021-02-01  0:46 ` [PATCH 13/23] sharedkv: lock before unref {dbh} Eric Wong
                   ` (10 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:46 UTC (permalink / raw)
  To: spew

We don't need to send the temporary xsearch {git} object over to
workers, just the directory name.
---
 lib/PublicInbox/LEI.pm | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 4f7ed171..08554932 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -401,8 +401,9 @@ sub atfork_parent_wq {
 	my ($self, $wq) = @_;
 	my $env = delete $self->{env}; # env is inherited at fork
 	my $lei = bless { %$self }, ref($self);
-	if (my $dedupe = delete $lei->{dedupe}) {
-		$lei->{dedupe} = $wq->deep_clone($dedupe);
+	for my $f (qw(dedupe ovv)) {
+		my $tmp = delete($lei->{$f}) or next;
+		$lei->{$f} = $wq->deep_clone($tmp);
 	}
 	$self->{env} = $env;
 	delete @$lei{qw(3 -lei_store cfg old_1 pgr lxs)}; # keep l2m

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 13/23] sharedkv: lock before unref {dbh}
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
                   ` (10 preceding siblings ...)
  2021-02-01  0:46 ` [PATCH 12/23] lei: deep clone {ovv} for l2m workers Eric Wong
@ 2021-02-01  0:46 ` Eric Wong
  2021-02-01  0:47 ` [PATCH 14/23] sharedkv: dbh_release Eric Wong
                   ` (9 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:46 UTC (permalink / raw)
  To: spew

---
 lib/PublicInbox/SharedKV.pm | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lib/PublicInbox/SharedKV.pm b/lib/PublicInbox/SharedKV.pm
index f5d09cc1..bd1fd6c8 100644
--- a/lib/PublicInbox/SharedKV.pm
+++ b/lib/PublicInbox/SharedKV.pm
@@ -145,6 +145,7 @@ SELECT COUNT(k) FROM kv
 
 sub DESTROY {
 	my ($self) = @_;
+	my $lock = $self->lock_for_scope; # for WAL (fixes SIGSEGV?)
 	delete $self->{dbh};
 	my $dir = delete $self->{"tmp$$.$self"} or return;
 	my $tries = 0;

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 14/23] sharedkv: dbh_release
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
                   ` (11 preceding siblings ...)
  2021-02-01  0:46 ` [PATCH 13/23] sharedkv: lock before unref {dbh} Eric Wong
@ 2021-02-01  0:47 ` Eric Wong
  2021-02-01  0:47 ` [PATCH 15/23] sharedkv: move lock_for_scope Eric Wong
                   ` (8 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:47 UTC (permalink / raw)
  To: spew

---
 lib/PublicInbox/LeiDedupe.pm |  1 +
 lib/PublicInbox/SharedKV.pm  | 10 ++++++++--
 2 files changed, 9 insertions(+), 2 deletions(-)

diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm
index 55488376..5c83fd80 100644
--- a/lib/PublicInbox/LeiDedupe.pm
+++ b/lib/PublicInbox/LeiDedupe.pm
@@ -126,6 +126,7 @@ sub prepare_dedupe {
 sub pause_dedupe {
 	my ($self) = @_;
 	my $skv = $self->[0];
+	$skv->dbh_release;
 	delete($skv->{dbh}) if $skv;
 }
 
diff --git a/lib/PublicInbox/SharedKV.pm b/lib/PublicInbox/SharedKV.pm
index bd1fd6c8..a7d4de41 100644
--- a/lib/PublicInbox/SharedKV.pm
+++ b/lib/PublicInbox/SharedKV.pm
@@ -143,10 +143,16 @@ SELECT COUNT(k) FROM kv
 	$sth->fetchrow_array;
 }
 
+sub dbh_release {
+	my ($self, $lock) = @_;
+	$lock //= $self->lock_for_scope; # for WAL (fixes SIGSEGV?)
+	my $dbh = delete $self->{dbh} or return;
+	$dbh->disconnect;
+}
+
 sub DESTROY {
 	my ($self) = @_;
-	my $lock = $self->lock_for_scope; # for WAL (fixes SIGSEGV?)
-	delete $self->{dbh};
+	dbh_release($self);
 	my $dir = delete $self->{"tmp$$.$self"} or return;
 	my $tries = 0;
 	do {

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 15/23] sharedkv: move lock_for_scope
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
                   ` (12 preceding siblings ...)
  2021-02-01  0:47 ` [PATCH 14/23] sharedkv: dbh_release Eric Wong
@ 2021-02-01  0:47 ` Eric Wong
  2021-02-01  0:47 ` [PATCH 16/23] sharedkv: clear CachedKids before disconnect Eric Wong
                   ` (7 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:47 UTC (permalink / raw)
  To: spew

---
 lib/PublicInbox/SharedKV.pm | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/PublicInbox/SharedKV.pm b/lib/PublicInbox/SharedKV.pm
index a7d4de41..65e0a8d4 100644
--- a/lib/PublicInbox/SharedKV.pm
+++ b/lib/PublicInbox/SharedKV.pm
@@ -145,8 +145,8 @@ SELECT COUNT(k) FROM kv
 
 sub dbh_release {
 	my ($self, $lock) = @_;
-	$lock //= $self->lock_for_scope; # for WAL (fixes SIGSEGV?)
 	my $dbh = delete $self->{dbh} or return;
+	$lock //= $self->lock_for_scope; # for WAL (fixes SIGSEGV?)
 	$dbh->disconnect;
 }
 

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 16/23] sharedkv: clear CachedKids before disconnect
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
                   ` (13 preceding siblings ...)
  2021-02-01  0:47 ` [PATCH 15/23] sharedkv: move lock_for_scope Eric Wong
@ 2021-02-01  0:47 ` Eric Wong
  2021-02-01  0:47 ` [PATCH 17/23] lei: increase initial timeout Eric Wong
                   ` (6 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:47 UTC (permalink / raw)
  To: spew

This is needed to avoid warnings like:

  ->disconnect invalidates 1 active statement handle
  (either destroy statement handles or call finish on
  them before disconnecting)
---
 lib/PublicInbox/SharedKV.pm | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lib/PublicInbox/SharedKV.pm b/lib/PublicInbox/SharedKV.pm
index 65e0a8d4..9bb93823 100644
--- a/lib/PublicInbox/SharedKV.pm
+++ b/lib/PublicInbox/SharedKV.pm
@@ -147,6 +147,7 @@ sub dbh_release {
 	my ($self, $lock) = @_;
 	my $dbh = delete $self->{dbh} or return;
 	$lock //= $self->lock_for_scope; # for WAL (fixes SIGSEGV?)
+	%{$dbh->{CachedKids}} = (); # cleanup prepare_cached
 	$dbh->disconnect;
 }
 

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 17/23] lei: increase initial timeout
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
                   ` (14 preceding siblings ...)
  2021-02-01  0:47 ` [PATCH 16/23] sharedkv: clear CachedKids before disconnect Eric Wong
@ 2021-02-01  0:47 ` Eric Wong
  2021-02-01  0:47 ` [PATCH 18/23] sharedkv: use lock_for_scope_fast Eric Wong
                   ` (5 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:47 UTC (permalink / raw)
  To: spew

PublicInbox::Listener unconditionally sets O_NONBLOCK upon
accept(), so we need a larger timeout under heavy load since
there's no "dataready" accept filter on the listener.

With O_NONBLOCK already set, we don't have to set it at
->event_step_init
---
 lib/PublicInbox/LEI.pm | 7 ++++---
 script/lei             | 3 ++-
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 08554932..e2f22a75 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -824,7 +824,7 @@ sub accept_dispatch { # Listener {post_accept} callback
 	$sock->autoflush(1);
 	my $self = bless { sock => $sock }, __PACKAGE__;
 	vec(my $rvec = '', fileno($sock), 1) = 1;
-	select($rvec, undef, undef, 1) or
+	select($rvec, undef, undef, 60) or
 		return send($sock, 'timed out waiting to recv FDs', MSG_EOR);
 	my @fds = $recv_cmd->($sock, my $buf, 4096 * 33); # >MAX_ARG_STRLEN
 	if (scalar(@fds) == 4) {
@@ -834,7 +834,9 @@ sub accept_dispatch { # Listener {post_accept} callback
 			send($sock, "open(+<&=$fd) (FD=$i): $!", MSG_EOR);
 		}
 	} else {
-		return send($sock, "recv_cmd failed: $!", MSG_EOR);
+		my $msg = "recv_cmd failed: $!";
+		warn $msg;
+		return send($sock, $msg, MSG_EOR);
 	}
 	$self->{2}->autoflush(1); # keep stdout buffered until x_it|DESTROY
 	# $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
@@ -898,7 +900,6 @@ sub event_step {
 sub event_step_init {
 	my ($self) = @_;
 	if (my $sock = $self->{sock}) { # using DS->EventLoop
-		$sock->blocking(0);
 		$self->SUPER::new($sock, EPOLLIN|EPOLLET);
 	}
 }
diff --git a/script/lei b/script/lei
index 006c1180..f92dd302 100755
--- a/script/lei
+++ b/script/lei
@@ -79,7 +79,8 @@ Falling back to (slow) one-shot mode
 	my $buf = join("\0", scalar(@ARGV), @ARGV);
 	while (my ($k, $v) = each %ENV) { $buf .= "\0$k=$v" }
 	$buf .= "\0\0";
-	$send_cmd->($sock, [ 0, 1, 2, fileno($dh) ], $buf, MSG_EOR);
+	$send_cmd->($sock, [ 0, 1, 2, fileno($dh) ], $buf, MSG_EOR) or
+		die "sendmsg: $!";
 	my $x_it_code = 0;
 	while (1) {
 		my (@fds) = $recv_cmd->($sock, $buf, 4096 * 33);

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 18/23] sharedkv: use lock_for_scope_fast
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
                   ` (15 preceding siblings ...)
  2021-02-01  0:47 ` [PATCH 17/23] lei: increase initial timeout Eric Wong
@ 2021-02-01  0:47 ` Eric Wong
  2021-02-01  0:47 ` [PATCH 19/23] lei_to_mail: reduce spew on Maildir removal Eric Wong
                   ` (4 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:47 UTC (permalink / raw)
  To: spew

This allows us to avoid repeated open() and close() syscalls
and speeds up the new xt/stress-sharedkv.t maintainer test
by roughly 7%.
---
 MANIFEST                    |  1 +
 lib/PublicInbox/Lock.pm     | 17 +++++++++++++
 lib/PublicInbox/SharedKV.pm | 14 +++++------
 xt/stress-sharedkv.t        | 50 +++++++++++++++++++++++++++++++++++++
 4 files changed, 75 insertions(+), 7 deletions(-)
 create mode 100644 xt/stress-sharedkv.t

diff --git a/MANIFEST b/MANIFEST
index c10775e4..a715214e 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -444,3 +444,4 @@ xt/perf-msgview.t
 xt/perf-nntpd.t
 xt/perf-threading.t
 xt/solver.t
+xt/stress-sharedkv.t
diff --git a/lib/PublicInbox/Lock.pm b/lib/PublicInbox/Lock.pm
index bb213de4..c0c4c15c 100644
--- a/lib/PublicInbox/Lock.pm
+++ b/lib/PublicInbox/Lock.pm
@@ -41,6 +41,23 @@ sub lock_for_scope {
 	PublicInbox::OnDestroy->new(@single_pid, \&lock_release, $self);
 }
 
+sub lock_acquire_fast {
+	$_[0]->{lockfh} or return lock_acquire($_[0]);
+	flock($_[0]->{lockfh}, LOCK_EX) or croak "lock (fast) failed: $!";
+}
+
+sub lock_release_fast {
+	flock($_[0]->{lockfh} // return, LOCK_UN) or
+			croak "unlock (fast) $_[0]->{lock_path}: $!";
+}
+
+# caller must use return value
+sub lock_for_scope_fast {
+	my ($self, @single_pid) = @_;
+	lock_acquire_fast($self) or return; # lock_path not set
+	PublicInbox::OnDestroy->new(@single_pid, \&lock_release_fast, $self);
+}
+
 sub new_tmp {
 	my ($cls, $ident) = @_;
 	my $tmp = File::Temp->new("$ident.lock-XXXXXX", TMPDIR => 1);
diff --git a/lib/PublicInbox/SharedKV.pm b/lib/PublicInbox/SharedKV.pm
index 9bb93823..0f25e6ca 100644
--- a/lib/PublicInbox/SharedKV.pm
+++ b/lib/PublicInbox/SharedKV.pm
@@ -17,7 +17,7 @@ sub dbh {
 	my ($self, $lock) = @_;
 	$self->{dbh} //= do {
 		my $f = $self->{filename};
-		$lock //= $self->lock_for_scope;
+		$lock //= $self->lock_for_scope_fast;
 		my $dbh = DBI->connect("dbi:SQLite:dbname=$f", '', '', {
 			AutoCommit => 1,
 			RaiseError => 1,
@@ -58,13 +58,13 @@ sub new {
 
 sub index_values {
 	my ($self) = @_;
-	my $lock = $self->lock_for_scope;
+	my $lock = $self->lock_for_scope_fast;
 	$self->dbh($lock)->do('CREATE INDEX IF NOT EXISTS idx_v ON kv (v)');
 }
 
 sub set_maybe {
 	my ($self, $key, $val, $lock) = @_;
-	$lock //= $self->lock_for_scope;
+	$lock //= $self->lock_for_scope_fast;
 	my $e = $self->{dbh}->prepare_cached(<<'')->execute($key, $val);
 INSERT OR IGNORE INTO kv (k,v) VALUES (?, ?)
 
@@ -83,7 +83,7 @@ SELECT k,v FROM kv
 
 sub delete_by_val {
 	my ($self, $val, $lock) = @_;
-	$lock //= $self->lock_for_scope;
+	$lock //= $self->lock_for_scope_fast;
 	$self->{dbh}->prepare_cached(<<'')->execute($val) + 0;
 DELETE FROM kv WHERE v = ?
 
@@ -91,7 +91,7 @@ DELETE FROM kv WHERE v = ?
 
 sub replace_values {
 	my ($self, $oldval, $newval, $lock) = @_;
-	$lock //= $self->lock_for_scope;
+	$lock //= $self->lock_for_scope_fast;
 	$self->{dbh}->prepare_cached(<<'')->execute($newval, $oldval) + 0;
 UPDATE kv SET v = ? WHERE v = ?
 
@@ -122,7 +122,7 @@ SELECT v FROM kv WHERE k = ?
 
 sub xchg {
 	my ($self, $key, $newval, $lock) = @_;
-	$lock //= $self->lock_for_scope;
+	$lock //= $self->lock_for_scope_fast;
 	my $oldval = get($self, $key);
 	if (defined $newval) {
 		set($self, $key, $newval);
@@ -146,7 +146,7 @@ SELECT COUNT(k) FROM kv
 sub dbh_release {
 	my ($self, $lock) = @_;
 	my $dbh = delete $self->{dbh} or return;
-	$lock //= $self->lock_for_scope; # for WAL (fixes SIGSEGV?)
+	$lock //= $self->lock_for_scope_fast; # for WAL (fixes SIGSEGV?)
 	%{$dbh->{CachedKids}} = (); # cleanup prepare_cached
 	$dbh->disconnect;
 }
diff --git a/xt/stress-sharedkv.t b/xt/stress-sharedkv.t
new file mode 100644
index 00000000..70de9ffc
--- /dev/null
+++ b/xt/stress-sharedkv.t
@@ -0,0 +1,50 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use v5.10.1;
+use Test::More;
+use Benchmark qw(:all);
+use PublicInbox::TestCommon;
+require_ok 'PublicInbox::SharedKV';
+my ($tmpdir, $for_destroy) = tmpdir();
+local $ENV{TMPDIR} = $tmpdir;
+my $skv = PublicInbox::SharedKV->new;
+my $ipc = bless {}, 'StressSharedKV';
+$ipc->wq_workers_start('stress-sharedkv', $ENV{TEST_NPROC}//4);
+my $nr = $ENV{TEST_STRESS_NR} // 100_000;
+my $ios = [];
+my $t = timeit(1, sub {
+	for my $i (1..$nr) {
+		$ipc->wq_do('test_set_maybe', $ios, $skv, $i);
+		$ipc->wq_do('test_set_maybe', $ios, $skv, $i);
+	}
+});
+diag "$nr sets done ".timestr($t);
+
+for my $w ($ipc->wq_workers) {
+	$ipc->wq_do('test_skv_done', $ios);
+}
+diag "done requested";
+
+$ipc->wq_close;
+done_testing;
+
+package StressSharedKV;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+use Digest::SHA qw(sha1);
+
+sub test_set_maybe {
+	my ($self, $skv, $i) = @_;
+	my $wcb = $self->{wcb} //= do {
+		$skv->dbh;
+		sub { $skv->set_maybe(sha1($_[0]), '') };
+	};
+	$wcb->($i + time);
+}
+
+sub test_skv_done {
+	my ($self) = @_;
+	delete $self->{wcb};
+}

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 19/23] lei_to_mail: reduce spew on Maildir removal
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
                   ` (16 preceding siblings ...)
  2021-02-01  0:47 ` [PATCH 18/23] sharedkv: use lock_for_scope_fast Eric Wong
@ 2021-02-01  0:47 ` Eric Wong
  2021-02-01  0:47 ` [PATCH 20/23] sharedkv: do not set cache_size by default Eric Wong
                   ` (3 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:47 UTC (permalink / raw)
  To: spew

At most, we'll only warn once per worker when a Maildir
disappears from under us.  We'll also use the '!' OpPipe
to note the exceptional condition, and use '|' to SIGPIPE
so it's a bit easier for hackers to remember.
---
 lib/PublicInbox/LEI.pm        |  8 ++++----
 lib/PublicInbox/LeiToMail.pm  | 12 +++++++-----
 lib/PublicInbox/LeiXSearch.pm | 20 ++++++++++++--------
 3 files changed, 23 insertions(+), 17 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index e2f22a75..17ad18b9 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -305,7 +305,8 @@ sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) }
 
 sub fail ($$;$) {
 	my ($self, $buf, $exit_code) = @_;
-	err($self, $buf);
+	err($self, $buf) if defined $buf;
+	syswrite($self->{op_pipe}, '!') if $self->{op_pipe}; # fail_handler
 	x_it($self, ($exit_code // 1) << 8);
 	undef;
 }
@@ -365,11 +366,10 @@ sub io_restore ($$) {
 	}
 }
 
-# triggers sigpipe_handler
-sub note_sigpipe {
+sub note_sigpipe { # triggers sigpipe_handler
 	my ($self, $fd) = @_;
 	close(delete($self->{$fd})); # explicit close silences Perl warning
-	syswrite($self->{op_pipe}, '!') if $self->{op_pipe};
+	syswrite($self->{op_pipe}, '|') if $self->{op_pipe};
 	x_it($self, 13);
 }
 
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 01e7cec5..c6c5f84b 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -296,14 +296,14 @@ sub _buf2maildir {
 	my $kw = $smsg->{kw} // [];
 	my $sfx = join('', sort(map { $kw2char{$_} // () } @$kw));
 	my $rand = ''; # chosen by die roll :P
-	my ($tmp, $fh, $final);
+	my ($tmp, $fh, $final, $ok);
 	my $common = $smsg->{blob} // _rand;
 	if (defined(my $pct = $smsg->{pct})) { $common .= "=$pct" }
 	do {
 		$tmp = $dst.'tmp/'.$rand.$common;
-	} while (!sysopen($fh, $tmp, O_CREAT|O_EXCL|O_WRONLY) &&
+	} while (!($ok = sysopen($fh, $tmp, O_CREAT|O_EXCL|O_WRONLY)) &&
 		$! == EEXIST && ($rand = _rand.','));
-	if (print $fh $$buf and close($fh)) {
+	if ($ok && print $fh $$buf and close($fh)) {
 		# ignore new/ and write only to cur/, otherwise MUAs
 		# with R/W access to the Maildir will end up doing
 		# a mass rename which can take a while with thousands
@@ -316,9 +316,10 @@ sub _buf2maildir {
 			($rand = _rand.','));
 		unlink($tmp) or warn "W: failed to unlink $tmp: $!\n";
 	} else {
-		my $err = $!;
+		my $err = "Error writing $smsg->{blob} to $dst: $!\n";
+		$_[0] = undef; # clobber dst
 		unlink($tmp);
-		die "Error writing $smsg->{blob} to $dst: $err";
+		die $err;
 	}
 }
 
@@ -329,6 +330,7 @@ sub _maildir_write_cb ($$) {
 	my $dst = $lei->{ovv}->{dst};
 	sub { # for git_to_mail
 		my ($buf, $smsg, $eml) = @_;
+		$dst // return $lei->fail; # dst may be undef-ed in last run
 		$buf //= \($eml->as_string);
 		return _buf2maildir($dst, $buf, $smsg) if !$dedupe;
 		$eml //= PublicInbox::Eml->new($$buf); # copy buf
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 4d390ee4..f630e79a 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -356,14 +356,17 @@ sub query_prepare { # called by wq_do
 	syswrite($lei->{op_pipe}, '.') == 1 or die "do_post_augment trigger: $!"
 }
 
-sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
-	my ($lei) = @_;
-	my $lxs = delete $lei->{lxs};
-	if ($lxs && $lxs->wq_kill_old) { # is this the daemon?
-		$lxs->wq_wait_old($lei);
+sub fail_handler ($;$$) {
+	my ($lei, $code, $io) = @_;
+	if (my $lxs = delete $lei->{lxs}) {
+		$lxs->wq_wait_old($lei) if $lxs->wq_kill_old; # lei-daemon
 	}
-	close(delete $lei->{1}) if $lei->{1};
-	$lei->x_it(13);
+	close($io) if $io; # needed to avoid warnings on SIGPIPE
+	$lei->x_it($code // (1 >> 8));
+}
+
+sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
+	fail_handler($_[0], 13, delete $_[0]->{1});
 }
 
 sub do_query {
@@ -384,7 +387,8 @@ sub do_query {
 	$lei->event_step_init; # wait for shutdowns
 	my $done_op = {
 		'' => [ \&query_done, $lei ],
-		'!' => [ \&sigpipe_handler, $lei ]
+		'|' => [ \&sigpipe_handler, $lei ],
+		'!' => [ \&fail_handler, $lei ]
 	};
 	my $in_loop = exists $lei->{sock};
 	$done = PublicInbox::OpPipe->new($done, $done_op, $in_loop);

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 20/23] sharedkv: do not set cache_size by default
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
                   ` (17 preceding siblings ...)
  2021-02-01  0:47 ` [PATCH 19/23] lei_to_mail: reduce spew on Maildir removal Eric Wong
@ 2021-02-01  0:47 ` Eric Wong
  2021-02-01  0:47 ` [PATCH 21/23] import: reap git-config(1) synchronously Eric Wong
                   ` (2 subsequent siblings)
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:47 UTC (permalink / raw)
  To: spew

These DBs will probably be too small to be worth increasing
the size of.
---
 lib/PublicInbox/SharedKV.pm | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/lib/PublicInbox/SharedKV.pm b/lib/PublicInbox/SharedKV.pm
index 0f25e6ca..c71a5b4d 100644
--- a/lib/PublicInbox/SharedKV.pm
+++ b/lib/PublicInbox/SharedKV.pm
@@ -27,7 +27,9 @@ sub dbh {
 		});
 		my $opt = $self->{opt} // {};
 		$dbh->do('PRAGMA synchronous = OFF') if !$opt->{fsync};
-		$dbh->do('PRAGMA cache_size = '.($opt->{cache_size} || 80000));
+		if (my $s = $opt->{cache_size}) {
+			$dbh->do("PRAGMA cache_size = $s");
+		}
 		$dbh->do('PRAGMA journal_mode = '.
 				($opt->{journal_mode} // 'WAL'));
 		$dbh->do(<<'');

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 21/23] import: reap git-config(1) synchronously
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
                   ` (18 preceding siblings ...)
  2021-02-01  0:47 ` [PATCH 20/23] sharedkv: do not set cache_size by default Eric Wong
@ 2021-02-01  0:47 ` Eric Wong
  2021-02-01  0:47 ` [PATCH 22/23] lei_dedupe: avoid anonymous subs Eric Wong
  2021-02-01  0:47 ` [PATCH 23/23] lei_to_mail: avoid anonymous sub Eric Wong
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:47 UTC (permalink / raw)
  To: spew

This avoids a zombie if another step of the event loop
takes too long.
---
 lib/PublicInbox/Import.pm | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm
index 8a06a661..a070aa1e 100644
--- a/lib/PublicInbox/Import.pm
+++ b/lib/PublicInbox/Import.pm
@@ -24,6 +24,7 @@ sub default_branch () {
 		delete local $ENV{GIT_CONFIG};
 		my $r = popen_rd([qw(git config --global init.defaultBranch)]);
 		chomp(my $h = <$r> // '');
+		close $r;
 		$h eq '' ? 'refs/heads/master' : $h;
 	}
 }

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 22/23] lei_dedupe: avoid anonymous subs
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
                   ` (19 preceding siblings ...)
  2021-02-01  0:47 ` [PATCH 21/23] import: reap git-config(1) synchronously Eric Wong
@ 2021-02-01  0:47 ` Eric Wong
  2021-02-01  0:47 ` [PATCH 23/23] lei_to_mail: avoid anonymous sub Eric Wong
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:47 UTC (permalink / raw)
  To: spew

It doesn't save us much in parameter passing, and anonymous
subs can tickle stack refcounting bugs in Perl when Carp
is generating a stack trace:

https://rt.perl.org/Public/Bug/Display.html?id=131046
https://rt.perl.org/Public/Bug/Display.html?id=52610
---
 lib/PublicInbox/LeiDedupe.pm | 86 ++++++++++++++++++------------------
 t/lei_dedupe.t               |  2 +-
 2 files changed, 44 insertions(+), 44 deletions(-)

diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm
index 5c83fd80..6f12ec3d 100644
--- a/lib/PublicInbox/LeiDedupe.pm
+++ b/lib/PublicInbox/LeiDedupe.pm
@@ -44,50 +44,48 @@ sub smsg_hash ($) {
 }
 
 # the paranoid option
-sub dedupe_oid ($) {
-	my ($skv) = @_;
-	(sub { # may be called in a child process
-		my ($eml, $oid) = @_;
-		$skv->set_maybe(_oidbin($oid) // _regen_oid($eml), '');
-	}, sub {
-		my ($smsg) = @_;
-		$skv->set_maybe(_oidbin($smsg->{blob}), '');
-	});
+sub dedupe_eml_oid  {
+	my ($self, $eml, $oid) = @_;
+	$self->[0]->set_maybe(_oidbin($oid) // _regen_oid($eml), '');
+}
+
+sub dedupe_smsg_oid {
+	my ($self, $smsg) = @_;
+	$self->[0]->set_maybe(_oidbin($smsg->{blob}), '');
 }
 
 # dangerous if there's duplicate messages with different Message-IDs
-sub dedupe_mid ($) {
-	my ($skv) = @_;
-	(sub { # may be called in a child process
-		my ($eml, $oid) = @_;
-		# TODO: lei will support non-public messages w/o Message-ID
-		my $mid = $eml->header_raw('Message-ID') // _oidbin($oid) //
-			content_hash($eml);
-		$skv->set_maybe($mid, '');
-	}, sub {
-		my ($smsg) = @_;
-		my $mid = $smsg->{mid};
-		$mid = undef if $mid eq '';
-		$mid //= smsg_hash($smsg) // _oidbin($smsg->{blob});
-		$skv->set_maybe($mid, '');
-	});
+sub dedupe_eml_mid {
+	my ($self, $eml, $oid) = @_;
+	# TODO: lei will support non-public messages w/o Message-ID
+	my $mid = $eml->header_raw('Message-ID') // _oidbin($oid) //
+		content_hash($eml);
+	$self->[0]->set_maybe($mid, '');
+}
+
+sub dedupe_smsg_mid {
+	my ($self, $smsg) = @_;
+	my $mid = $smsg->{mid};
+	$mid = undef if $mid eq '';
+	$mid //= smsg_hash($smsg) // _oidbin($smsg->{blob});
+	$self->[0]->set_maybe($mid, '');
 }
 
 # our default deduplication strategy (used by v2, also)
-sub dedupe_content ($) {
-	my ($skv) = @_;
-	(sub { # may be called in a child process
-		my ($eml) = @_; # oid = $_[1], ignored
-		$skv->set_maybe(content_hash($eml), '');
-	}, sub {
-		my ($smsg) = @_;
-		$skv->set_maybe(smsg_hash($smsg), '');
-	});
+sub dedupe_eml_content {
+	my ($self, $eml) = @_; # oid = $_[1], ignored
+	$self->[0]->set_maybe(content_hash($eml), '');
+}
+
+sub dedupe_smsg_content {
+	my ($self, $smsg) = @_;
+	$self->[0]->set_maybe(smsg_hash($smsg), '');
 }
 
 # no deduplication at all
-sub true { 1 }
-sub dedupe_none ($) { (\&true, \&true) }
+sub dedupe_eml_none { 1 }
+
+sub dedupe_smsg_none { 1 }
 
 sub new {
 	my ($cls, $lei) = @_;
@@ -96,30 +94,32 @@ sub new {
 
 	# allow "none" to bypass Eml->new if writing to directory:
 	return if ($dd eq 'none' && substr($dst // '', -1) eq '/');
-	my $m = "dedupe_$dd";
-	$cls->can($m) or die "unsupported dedupe strategy: $dd\n";
 	my $skv = $dd eq 'none' ? undef : PublicInbox::SharedKV->new;
-
-	# [ $skv, $eml_cb, $smsg_cb, "dedupe_$dd" ]
-	bless [ $skv, undef, undef, $m ], $cls;
+	my $self = bless [ $skv, "dedupe_eml_$dd", "dedupe_smsg_$dd" ], $cls;
+	for my $i (1, 2) {
+		$self->can($self->[$i]) or
+			die "unsupported dedupe strategy: $dd\n";
+	}
+	$self
 }
 
 # returns true on seen messages according to the deduplication strategy,
 # returns false if unseen
 sub is_dup {
 	my ($self, $eml, $oid) = @_;
-	!$self->[1]->($eml, $oid);
+	my $m = $self->[1];
+	!$self->$m($eml, $oid);
 }
 
 sub is_smsg_dup {
 	my ($self, $smsg) = @_;
-	!$self->[2]->($smsg);
+	my $m = $self->[2];
+	!$self->$m($smsg);
 }
 
 sub prepare_dedupe {
 	my ($self) = @_;
 	my $skv = $self->[0];
-	$self->[1] or @$self[1,2] = $self->can($self->[3])->($skv);
 	$skv ? $skv->dbh : undef;
 }
 
diff --git a/t/lei_dedupe.t b/t/lei_dedupe.t
index bcb06a0a..39c004a9 100644
--- a/t/lei_dedupe.t
+++ b/t/lei_dedupe.t
@@ -22,7 +22,7 @@ my $check_storable = sub {
 	SKIP: {
 		require_mods('Storable', 1);
 		my $dup = Storable::thaw(Storable::freeze($x));
-		is_deeply($dup, $x, "$x->[3] round-trips through storable");
+		is_deeply($dup, $x, "$x->[2] round-trips through storable");
 	}
 };
 

^ permalink raw reply related	[flat|nested] 23+ messages in thread

* [PATCH 23/23] lei_to_mail: avoid anonymous sub
  2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
                   ` (20 preceding siblings ...)
  2021-02-01  0:47 ` [PATCH 22/23] lei_dedupe: avoid anonymous subs Eric Wong
@ 2021-02-01  0:47 ` Eric Wong
  21 siblings, 0 replies; 23+ messages in thread
From: Eric Wong @ 2021-02-01  0:47 UTC (permalink / raw)
  To: spew

It doesn't save us much in parameter passing, and anonymous
subs can tickle stack refcounting bugs in Perl when Carp
is generating a stack trace:

https://rt.perl.org/Public/Bug/Display.html?id=131046
https://rt.perl.org/Public/Bug/Display.html?id=52610
---
 lib/PublicInbox/LeiToMail.pm | 78 ++++++++++++++++++------------------
 t/lei_to_mail.t              |  7 +++-
 2 files changed, 45 insertions(+), 40 deletions(-)

diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index c6c5f84b..52eb89ac 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -142,7 +142,7 @@ sub eml2mboxcl2 {
 }
 
 sub git_to_mail { # git->cat_async callback
-	my ($bref, $oid, $type, $size, $arg) = @_;
+	my ($bref, $oid, $type, $size, $x) = @_;
 	if ($type ne 'blob') {
 		if ($type eq 'missing') {
 			warn "missing $oid\n";
@@ -150,11 +150,12 @@ sub git_to_mail { # git->cat_async callback
 			warn "unexpected type=$type for $oid\n";
 		}
 	}
-	my ($write_cb, $smsg) = @$arg;
+	# cb is _git_to_(maildir|mbox)
+	my ($smsg, $not_done, $cb, @args) = @$x;
 	if ($smsg->{blob} ne $oid) {
 		die "BUG: expected=$smsg->{blob} got=$oid";
 	}
-	$write_cb->($bref, $smsg) if $size > 0;
+	$cb->($bref, $smsg, @args) if $size > 0;
 }
 
 sub reap_compress { # dwaitpid callback
@@ -245,25 +246,26 @@ sub _augment { # MboxReader eml_cb
 	$lei->{dedupe}->is_dup($eml);
 }
 
-sub _mbox_write_cb ($$) {
+sub _mbox_write_prepare {
 	my ($self, $lei) = @_;
 	my $ovv = $lei->{ovv};
 	my $m = 'eml2'.$ovv->{fmt};
 	my $eml2mbox = $self->can($m) or die "$self->$m missing";
 	$lei->{1} // die "no stdout ($m, $ovv->{dst})"; # redirected earlier
 	$lei->{1}->autoflush(1);
-	my $atomic_append = !defined($ovv->{lock_path});
-	my $dedupe = $lei->{dedupe};
-	$dedupe->prepare_dedupe;
-	sub { # for git_to_mail
-		my ($buf, $smsg, $eml) = @_;
-		$eml //= PublicInbox::Eml->new($buf);
-		return if $dedupe->is_dup($eml, $smsg->{blob});
-		$buf = $eml2mbox->($eml, $smsg);
-		return atomic_append($lei, $buf) if $atomic_append;
-		my $lk = $ovv->lock_for_scope;
-		$lei->out($$buf);
-	}
+	$lei->{-atomic_append} = !defined($ovv->{lock_path});
+	$lei->{dedupe}->prepare_dedupe;
+	($lei, $eml2mbox);
+}
+
+sub _git_to_mbox { # from git_to_mail
+	my ($bref, $smsg, $lei, $eml2mbox) = @_;
+	my $eml = PublicInbox::Eml->new($bref);
+	return if $lei->{dedupe}->is_dup($eml, $smsg->{blob});
+	$bref = $eml2mbox->($eml, $smsg);
+	return atomic_append($lei, $bref) if $lei->{-atomic_append};
+	my $lk = $lei->{ovv}->lock_for_scope;
+	$lei->out($$bref);
 }
 
 sub _maildir_each_file ($$;@) {
@@ -292,7 +294,8 @@ sub _rand () {
 }
 
 sub _buf2maildir {
-	my ($dst, $buf, $smsg) = @_;
+	my ($lei, $buf, $smsg) = @_;
+	my $dst = $lei->{-dst};
 	my $kw = $smsg->{kw} // [];
 	my $sfx = join('', sort(map { $kw2char{$_} // () } @$kw));
 	my $rand = ''; # chosen by die roll :P
@@ -317,34 +320,31 @@ sub _buf2maildir {
 		unlink($tmp) or warn "W: failed to unlink $tmp: $!\n";
 	} else {
 		my $err = "Error writing $smsg->{blob} to $dst: $!\n";
-		$_[0] = undef; # clobber dst
 		unlink($tmp);
-		die $err;
+		$lei->fail($err);
 	}
 }
 
-sub _maildir_write_cb ($$) {
+sub _maildir_write_prepare {
 	my ($self, $lei) = @_;
 	my $dedupe = $lei->{dedupe};
 	$dedupe->prepare_dedupe if $dedupe;
-	my $dst = $lei->{ovv}->{dst};
-	sub { # for git_to_mail
-		my ($buf, $smsg, $eml) = @_;
-		$dst // return $lei->fail; # dst may be undef-ed in last run
-		$buf //= \($eml->as_string);
-		return _buf2maildir($dst, $buf, $smsg) if !$dedupe;
-		$eml //= PublicInbox::Eml->new($$buf); # copy buf
-		return if $dedupe->is_dup($eml, $smsg->{blob});
-		undef $eml;
-		_buf2maildir($dst, $buf, $smsg);
-	}
+	$lei->{-dst} = $lei->{ovv}->{dst}; # shortcut
+	($lei);
+}
+
+sub _git_to_maildir { # for git_to_mail
+	my ($bref, $smsg, $lei) = @_;
+	my $dedupe = $lei->{dedupe} // return _buf2maildir($lei, $bref, $smsg);
+	return if $dedupe->is_dup(PublicInbox::Eml->new($$bref), $smsg->{blob});
+	_buf2maildir($lei, $bref, $smsg);
 }
 
-sub write_cb { # returns a callback for git_to_mail
+sub write_prepare {
 	my ($self, $lei) = @_;
-	# _mbox_write_cb or _maildir_write_cb
-	my $m = "_$self->{base_type}_write_cb";
-	$self->$m($lei);
+	# _mbox_write_prepare or _maildir_write_prepare
+	my $m = "_$self->{base_type}_write_prepare";
+	[ $self->can("_git_to_$self->{base_type}"), $self->$m($lei) ];
 }
 
 sub new {
@@ -465,18 +465,18 @@ sub post_augment { # fast (spawn compressor or mkdir), runs in main daemon
 sub write_mail { # via ->wq_do
 	my ($self, $git_dir, $smsg, $lei) = @_;
 	my $not_done = delete $self->{$lei->{each_smsg_not_done}};
-	my $wcb = $self->{wcb} //= do { # first message
+	my $m_args = $self->{cb_args} //= do { # first message
 		$lei->atfork_child_wq($self);
-		$self->write_cb($lei);
+		write_prepare($self, $lei);
 	};
 	my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
 	git_async_cat($git, $smsg->{blob}, \&git_to_mail,
-				[$wcb, $smsg, $not_done]);
+				[ $smsg, $not_done, @$m_args ]);
 }
 
 sub wq_atexit_child {
 	my ($self) = @_;
-	delete $self->{wcb};
+	delete $self->{cb_args};
 	for my $git (delete @$self{grep(/\A$$\0/, keys %$self)}) {
 		$git->async_wait_all;
 	}
diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t
index f7535687..917aca3a 100644
--- a/t/lei_to_mail.t
+++ b/t/lei_to_mail.t
@@ -103,7 +103,12 @@ my $wcb_get = sub {
 	my $zpipe = $l2m->pre_augment($lei);
 	$l2m->do_augment($lei);
 	$l2m->post_augment($lei, $zpipe);
-	$l2m->write_cb($lei);
+	# n.b.: this internal API is unstable and changed several times...
+	my ($cb, @args) = @{$l2m->write_prepare($lei)};
+	sub {
+		my ($bref, $smsg) = @_;
+		$cb->($bref, $smsg, @args);
+	}
 };
 
 my $deadbeef = { blob => 'deadbeef', kw => [ qw(seen) ] };

^ permalink raw reply related	[flat|nested] 23+ messages in thread

end of thread, other threads:[~2021-02-01  0:47 UTC | newest]

Thread overview: 23+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2021-02-01  0:46 [PATCH 01/23] lei: more consistent dedupe and ovv_buf init Eric Wong
2021-02-01  0:46 ` [PATCH 02/23] ipc: switch wq to use the event loop Eric Wong
2021-02-01  0:46 ` [PATCH 03/23] lei: remove per-child SIG{__WARN__} Eric Wong
2021-02-01  0:46 ` [PATCH 04/23] lei: remove SIGPIPE handler Eric Wong
2021-02-01  0:46 ` [PATCH 05/23] ipc: more helpful ETOOMANYREFS error messages Eric Wong
2021-02-01  0:46 ` [PATCH 06/23] lei: remove syslog dependency Eric Wong
2021-02-01  0:46 ` [PATCH 07/23] sharedkv: release {dbh} before rmtree Eric Wong
2021-02-01  0:46 ` [PATCH 08/23] sharedkv: retry rmtree in DESTROY Eric Wong
2021-02-01  0:46 ` [PATCH 09/23] lei: keep $lei around until workers are reaped Eric Wong
2021-02-01  0:46 ` [PATCH 10/23] lei_dedupe: use Digest::SHA Eric Wong
2021-02-01  0:46 ` [PATCH 11/23] lei_xsearch: load smsg package Eric Wong
2021-02-01  0:46 ` [PATCH 12/23] lei: deep clone {ovv} for l2m workers Eric Wong
2021-02-01  0:46 ` [PATCH 13/23] sharedkv: lock before unref {dbh} Eric Wong
2021-02-01  0:47 ` [PATCH 14/23] sharedkv: dbh_release Eric Wong
2021-02-01  0:47 ` [PATCH 15/23] sharedkv: move lock_for_scope Eric Wong
2021-02-01  0:47 ` [PATCH 16/23] sharedkv: clear CachedKids before disconnect Eric Wong
2021-02-01  0:47 ` [PATCH 17/23] lei: increase initial timeout Eric Wong
2021-02-01  0:47 ` [PATCH 18/23] sharedkv: use lock_for_scope_fast Eric Wong
2021-02-01  0:47 ` [PATCH 19/23] lei_to_mail: reduce spew on Maildir removal Eric Wong
2021-02-01  0:47 ` [PATCH 20/23] sharedkv: do not set cache_size by default Eric Wong
2021-02-01  0:47 ` [PATCH 21/23] import: reap git-config(1) synchronously Eric Wong
2021-02-01  0:47 ` [PATCH 22/23] lei_dedupe: avoid anonymous subs Eric Wong
2021-02-01  0:47 ` [PATCH 23/23] lei_to_mail: avoid anonymous sub Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).