dumping ground for random patches and texts
 help / color / mirror / Atom feed
* [PATCH 01/15] lei: more consistent dedupe initialization
@ 2021-01-30 10:19 Eric Wong
  2021-01-30 10:19 ` [PATCH 02/15] lei_to_mail: mbox*: reduce scope of lock Eric Wong
                   ` (13 more replies)
  0 siblings, 14 replies; 15+ messages in thread
From: Eric Wong @ 2021-01-30 10:19 UTC (permalink / raw)
  To: spew

This fixes "--dedupe none" with Maildir
---
 lib/PublicInbox/LeiDedupe.pm   |  4 ++--
 lib/PublicInbox/LeiOverview.pm | 18 ++++++++++--------
 lib/PublicInbox/LeiToMail.pm   |  3 +--
 3 files changed, 13 insertions(+), 12 deletions(-)

diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm
index 3f478aa4..e3ae8e33 100644
--- a/lib/PublicInbox/LeiDedupe.pm
+++ b/lib/PublicInbox/LeiDedupe.pm
@@ -103,8 +103,8 @@ sub new {
 	bless [ $skv, undef, undef, $m ], $cls;
 }
 
-# returns true on unseen messages according to the deduplication strategy,
-# returns false if seen
+# returns true on seen messages according to the deduplication strategy,
+# returns false if unseen
 sub is_dup {
 	my ($self, $eml, $oid) = @_;
 	!$self->[1]->($eml, $oid);
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index c67e2747..fa041457 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -92,13 +92,14 @@ sub new {
 			ovv_out_lk_init($self);
 		}
 	}
-	if (!$json) {
+	if ($json) {
+		$lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei);
+	} else {
 		# default to the cheapest sort since MUA usually resorts
 		$lei->{opt}->{'sort'} //= 'docid' if $dst ne '/dev/stdout';
 		$lei->{l2m} = eval { PublicInbox::LeiToMail->new($lei) };
 		return $lei->fail($@) if $@;
 	}
-	$lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei);
 	$self;
 }
 
@@ -201,15 +202,19 @@ sub _json_pretty {
 
 sub ovv_each_smsg_cb { # runs in wq worker usually
 	my ($self, $lei, $ibxish) = @_;
-	my $json;
+	my ($json, $dedupe);
 	$lei->{1}->autoflush(1);
-	my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing';
 	if (my $pkg = $self->{json}) {
 		$json = $pkg->new;
 		$json->utf8->canonical;
 		$json->ascii(1) if $lei->{opt}->{ascii};
 	}
-	my $l2m = $lei->{l2m} or $dedupe->prepare_dedupe;
+	my $l2m = $lei->{l2m};
+	if (!$l2m) {
+		$dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing';
+		$dedupe->prepare_dedupe;
+	}
+	$lei->{ovv_buf} = \(my $buf = '') if !$l2m;
 	if ($l2m && !$ibxish) { # remote https?:// mboxrd
 		delete $l2m->{-wq_s1};
 		my $g2m = $l2m->can('git_to_mail');
@@ -241,7 +246,6 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
 		my $git = $ibxish->git; # (LeiXSearch|Inbox|ExtSearch)->git
 		$self->{git} = $git; # for ovv_atexit_child
 		my $g2m = $l2m->can('git_to_mail');
-		$dedupe->prepare_dedupe;
 		sub {
 			my ($smsg, $mitem) = @_;
 			$smsg->{pct} = get_pct($mitem) if $mitem;
@@ -249,7 +253,6 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
 		};
 	} elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
 		my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
-		$lei->{ovv_buf} = \(my $buf = '');
 		sub { # DIY prettiness :P
 			my ($smsg, $mitem) = @_;
 			return if $dedupe->is_smsg_dup($smsg);
@@ -273,7 +276,6 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
 		}
 	} elsif ($json) {
 		my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL
-		$lei->{ovv_buf} = \(my $buf = '');
 		sub {
 			my ($smsg, $mitem) = @_;
 			return if $dedupe->is_smsg_dup($smsg);
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 61b546b5..244bfb67 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -323,7 +323,7 @@ sub _buf2maildir {
 sub _maildir_write_cb ($$) {
 	my ($self, $lei) = @_;
 	my $dedupe = $lei->{dedupe};
-	$dedupe->prepare_dedupe;
+	$dedupe->prepare_dedupe if $dedupe;
 	my $dst = $lei->{ovv}->{dst};
 	sub { # for git_to_mail
 		my ($buf, $smsg, $eml) = @_;
@@ -464,7 +464,6 @@ sub write_mail { # via ->wq_do
 	my $wcb = $self->{wcb} //= do { # first message
 		my %sig = $lei->atfork_child_wq($self);
 		@SIG{keys %sig} = values %sig; # not local
-		$lei->{dedupe}->prepare_dedupe;
 		$self->write_cb($lei);
 	};
 	my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 02/15] lei_to_mail: mbox*: reduce scope of lock
  2021-01-30 10:19 [PATCH 01/15] lei: more consistent dedupe initialization Eric Wong
@ 2021-01-30 10:19 ` Eric Wong
  2021-01-30 10:19 ` [PATCH 03/15] ipc: switch wq to use the event loop Eric Wong
                   ` (12 subsequent siblings)
  13 siblings, 0 replies; 15+ messages in thread
From: Eric Wong @ 2021-01-30 10:19 UTC (permalink / raw)
  To: spew

---
 lib/PublicInbox/LeiToMail.pm | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 244bfb67..bf597ee7 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -254,8 +254,10 @@ sub _mbox_write_cb ($$) {
 		$eml //= PublicInbox::Eml->new($buf);
 		if (!$dedupe->is_dup($eml, $smsg->{blob})) {
 			$buf = $eml2mbox->($eml, $smsg);
-			my $lk = $ovv->lock_for_scope;
-			eval { $write->($out, $buf) };
+			eval {
+				my $lk = $ovv->lock_for_scope;
+				$write->($out, $buf);
+			};
 			if ($@) {
 				die $@ if ref($@) ne 'PublicInbox::SIGPIPE';
 				undef $out

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 03/15] ipc: switch wq to use the event loop
  2021-01-30 10:19 [PATCH 01/15] lei: more consistent dedupe initialization Eric Wong
  2021-01-30 10:19 ` [PATCH 02/15] lei_to_mail: mbox*: reduce scope of lock Eric Wong
@ 2021-01-30 10:19 ` Eric Wong
  2021-01-30 10:19 ` [PATCH 04/15] lei: no per-child SIG{__WARN__} Eric Wong
                   ` (11 subsequent siblings)
  13 siblings, 0 replies; 15+ messages in thread
From: Eric Wong @ 2021-01-30 10:19 UTC (permalink / raw)
  To: spew

This will let us to maximize the capability of our
asynchronous git API.

I tried using a synchronous API; and even with libgit2 in the
same process to avoid the IPC cost failed to match the
throughput afforded by this change.  This is because libgit2
is built (at least on Debian) with the SHA-1 collision code.
---
 MANIFEST                     |  1 +
 lib/PublicInbox/IPC.pm       | 17 +++++++++++------
 lib/PublicInbox/LeiToMail.pm | 26 +++++++++++---------------
 lib/PublicInbox/WQWorker.pm  | 34 ++++++++++++++++++++++++++++++++++
 4 files changed, 57 insertions(+), 21 deletions(-)
 create mode 100644 lib/PublicInbox/WQWorker.pm

diff --git a/MANIFEST b/MANIFEST
index 2077ab12..c10775e4 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -228,6 +228,7 @@ lib/PublicInbox/V2Writable.pm
 lib/PublicInbox/View.pm
 lib/PublicInbox/ViewDiff.pm
 lib/PublicInbox/ViewVCS.pm
+lib/PublicInbox/WQWorker.pm
 lib/PublicInbox/WWW.pm
 lib/PublicInbox/WWW.pod
 lib/PublicInbox/Watch.pm
diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index d2ff038d..479c4377 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -14,6 +14,7 @@ use Carp qw(confess croak);
 use PublicInbox::DS qw(dwaitpid);
 use PublicInbox::Spawn;
 use PublicInbox::OnDestroy;
+use PublicInbox::WQWorker;
 use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
 use Errno qw(EMSGSIZE);
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
@@ -151,6 +152,8 @@ sub wq_wait_old {
 # for base class, override in sub classes
 sub ipc_atfork_prepare {}
 
+sub wq_atexit_child {}
+
 sub ipc_atfork_child {
 	my ($self) = @_;
 	my $io = delete($self->{-ipc_atfork_child_close}) or return;
@@ -251,10 +254,11 @@ sub ipc_sibling_atfork_child {
 	$pid == $$ and die "BUG: $$ ipc_atfork_child called on itself";
 }
 
-sub _recv_and_run {
+sub recv_and_run {
 	my ($self, $s2, $len, $full_stream) = @_;
 	my @fds = $recv_cmd->($s2, my $buf, $len);
-	my $n = length($buf // '') or return;
+	return if scalar(@fds) && !defined($fds[0]);
+	my $n = length($buf) or return 0;
 	my $nfd = 0;
 	for my $fd (@fds) {
 		if (open(my $cmdfh, '+<&=', $fd)) {
@@ -281,14 +285,15 @@ sub _recv_and_run {
 
 sub wq_worker_loop ($) {
 	my ($self) = @_;
-	my $len = $self->{wq_req_len} // (4096 * 33);
-	my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
-	1 while (_recv_and_run($self, $s2, $len));
+	my $wqw = PublicInbox::WQWorker->new($self);
+	PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} });
+	PublicInbox::DS->EventLoop;
+	PublicInbox::DS->Reset;
 }
 
 sub do_sock_stream { # via wq_do, for big requests
 	my ($self, $len) = @_;
-	_recv_and_run($self, delete $self->{0}, $len, 1);
+	recv_and_run($self, delete $self->{0}, $len, 1);
 }
 
 sub wq_do { # always async
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index bf597ee7..6d2ce7fc 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -12,11 +12,16 @@ use PublicInbox::ProcessPipe;
 use PublicInbox::Spawn qw(which spawn popen_rd);
 use PublicInbox::LeiDedupe;
 use PublicInbox::OnDestroy;
+use PublicInbox::Git;
+use PublicInbox::GitAsyncCat;
 use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
 use Errno qw(EEXIST ESPIPE ENOENT);
-use PublicInbox::Git;
+
+# struggles with short-lived repos, Gcf2Client makes little sense with lei;
+# but we may use in-process libgit2 in the future.
+$PublicInbox::GitAsyncCat::GCF2C = 0;
 
 my %kw2char = ( # Maildir characters
 	draft => 'D',
@@ -469,27 +474,18 @@ sub write_mail { # via ->wq_do
 		$self->write_cb($lei);
 	};
 	my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
-	$git->cat_async($smsg->{blob}, \&git_to_mail, [$wcb, $smsg, $not_done]);
+	git_async_cat($git, $smsg->{blob}, \&git_to_mail,
+				[$wcb, $smsg, $not_done]);
 }
 
-# We rely on OnDestroy to run this before ->DESTROY, since ->DESTROY
-# ordering is unstable at worker exit and may cause segfaults
-sub reap_gits {
+sub wq_atexit_child {
 	my ($self) = @_;
 	delete $self->{wcb};
 	for my $git (delete @$self{grep(/\A$$\0/, keys %$self)}) {
 		$git->async_wait_all;
 	}
-}
-
-sub DESTROY { delete $_[0]->{wcb} }
-
-sub ipc_atfork_child { # runs after IPC::wq_worker_loop
-	my ($self) = @_;
-	$self->SUPER::ipc_atfork_child;
-	# reap_gits needs to run before $self->DESTROY,
-	# IPC.pm will ensure that.
-	PublicInbox::OnDestroy->new($$, \&reap_gits, $self);
+	$SIG{__WARN__} = 'DEFAULT';
+	$SIG{PIPE} = 'DEFAULT';
 }
 
 1;
diff --git a/lib/PublicInbox/WQWorker.pm b/lib/PublicInbox/WQWorker.pm
new file mode 100644
index 00000000..25a5e4fb
--- /dev/null
+++ b/lib/PublicInbox/WQWorker.pm
@@ -0,0 +1,34 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# for PublicInbox::IPC wq_* (work queue) workers
+package PublicInbox::WQWorker;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLEXCLUSIVE EPOLLET);
+use Errno qw(EAGAIN ECONNRESET);
+use IO::Handle (); # blocking
+
+sub new {
+	my (undef, $wq) = @_;
+	my $s2 = $wq->{-wq_s2} // die 'BUG: no -wq_s2';
+	$s2->blocking(0);
+	my $self = bless { sock => $s2, wq => $wq }, __PACKAGE__;
+	$self->SUPER::new($s2, EPOLLEXCLUSIVE|EPOLLIN|EPOLLET);
+	$self;
+}
+
+sub event_step {
+	my ($self) = @_;
+	my $n;
+	do {
+		$n = $self->{wq}->recv_and_run($self->{sock}, 4096 * 33);
+	} while ($n);
+	return if !defined($n) && $! == EAGAIN; # likely
+	warn "wq worker error: $!\n" if !defined($n) && $! != ECONNRESET;
+	$self->{wq}->wq_atexit_child;
+	$self->close; # PublicInbox::DS::close
+}
+
+1;

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 04/15] lei: no per-child SIG{__WARN__}
  2021-01-30 10:19 [PATCH 01/15] lei: more consistent dedupe initialization Eric Wong
  2021-01-30 10:19 ` [PATCH 02/15] lei_to_mail: mbox*: reduce scope of lock Eric Wong
  2021-01-30 10:19 ` [PATCH 03/15] ipc: switch wq to use the event loop Eric Wong
@ 2021-01-30 10:19 ` Eric Wong
  2021-01-30 10:19 ` [PATCH 05/15] lei: NO SIGPIPE Eric Wong
                   ` (10 subsequent siblings)
  13 siblings, 0 replies; 15+ messages in thread
From: Eric Wong @ 2021-01-30 10:19 UTC (permalink / raw)
  To: spew

current_lei does the job
---
 lib/PublicInbox/LEI.pm | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 3ed330f9..ceba16e4 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -359,8 +359,7 @@ sub atfork_child_wq {
 	%PATH2CFG = ();
 	undef $errors_log;
 	$quit = \&CORE::exit;
-	(__WARN__ => sub { err($self, @_) },
-	PIPE => sub {
+	(PIPE => sub {
 		$self->x_it(13); # SIGPIPE = 13
 		# we need to close explicitly to avoid Perl warning on SIGPIPE
 		for my $i (1, 2) {

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 05/15] lei: NO SIGPIPE
  2021-01-30 10:19 [PATCH 01/15] lei: more consistent dedupe initialization Eric Wong
                   ` (2 preceding siblings ...)
  2021-01-30 10:19 ` [PATCH 04/15] lei: no per-child SIG{__WARN__} Eric Wong
@ 2021-01-30 10:19 ` Eric Wong
  2021-01-30 10:19 ` [PATCH 06/15] ipc: more helpful ETOOMANYREFS error messages Eric Wong
                   ` (9 subsequent siblings)
  13 siblings, 0 replies; 15+ messages in thread
From: Eric Wong @ 2021-01-30 10:19 UTC (permalink / raw)
  To: spew

---
 lib/PublicInbox/IPC.pm         | 10 +++---
 lib/PublicInbox/LEI.pm         | 56 +++++++++++++++++++++-------------
 lib/PublicInbox/LeiExternal.pm |  3 +-
 lib/PublicInbox/LeiOverview.pm | 33 ++++++++------------
 lib/PublicInbox/LeiToMail.pm   | 47 ++++++++++++----------------
 lib/PublicInbox/LeiXSearch.pm  | 17 ++++-------
 t/lei_to_mail.t                | 31 ++++++++++---------
 7 files changed, 96 insertions(+), 101 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 479c4377..172552b9 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -139,8 +139,10 @@ sub ipc_worker_spawn {
 
 sub ipc_worker_reap { # dwaitpid callback
 	my ($self, $pid) = @_;
-	# SIGTERM (15) is our default exit signal
-	warn "PID:$pid died with \$?=$?\n" if $? && ($? & 127) != 15;
+	return if !$?;
+	# TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
+	my $s = $? & 127;
+	warn "PID:$pid died with \$?=$?\n" if $s != 15 && $s != 13;
 }
 
 sub wq_wait_old {
@@ -278,7 +280,7 @@ sub recv_and_run {
 	undef $buf;
 	my $sub = shift @$args;
 	eval { $self->$sub(@$args) };
-	warn "$$ wq_worker: $@" if $@ && ref($@) ne 'PublicInbox::SIGPIPE';
+	warn "$$ wq_worker: $@" if $@;
 	delete @$self{0..($nfd-1)};
 	$n;
 }
@@ -320,7 +322,7 @@ sub wq_do { # always async
 	} else {
 		@$self{0..$#$ios} = @$ios;
 		eval { $self->$sub(@args) };
-		warn "wq_do: $@" if $@ && ref($@) ne 'PublicInbox::SIGPIPE';
+		warn "wq_do: $@" if $@;
 		delete @$self{0..$#$ios}; # don't close
 	}
 }
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index ceba16e4..b915bb0c 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -12,7 +12,7 @@ use parent qw(PublicInbox::DS PublicInbox::LeiExternal
 	PublicInbox::LeiQuery);
 use Getopt::Long ();
 use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
-use Errno qw(EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET);
+use Errno qw(EPIPE EAGAIN EINTR ECONNREFUSED ENOENT ECONNRESET);
 use Cwd qw(getcwd);
 use POSIX ();
 use IO::Handle ();
@@ -277,7 +277,11 @@ sub x_it ($$) {
 	dump_and_clear_log();
 	if (my $sock = $self->{sock}) {
 		send($sock, "x_it $code", MSG_EOR);
-	} elsif (!($code & 127)) { # oneshot, ignore signals
+	} elsif (my $signum = ($code & 127)) { # oneshot, usually SIGPIPE (13)
+		$SIG{PIPE} = 'DEFAULT'; # $SIG{$signum} doesn't work
+		kill $signum, $$;
+		sleep; # wait for signal
+	} else { # oneshot
 		# don't want to end up using $? from child processes
 		for my $f (qw(lxs l2m)) {
 			my $wq = delete $self->{$f} or next;
@@ -287,14 +291,15 @@ sub x_it ($$) {
 	}
 }
 
-sub puts ($;@) { print { shift->{1} } map { "$_\n" } @_ }
-
-sub out ($;@) { print { shift->{1} } @_ }
-
 sub err ($;@) {
 	my $self = shift;
-	my $err = $self->{2} // ($self->{pgr} // [])->[2] // *STDERR{IO};
-	print $err @_, (substr($_[-1], -1, 1) eq "\n" ? () : "\n");
+	my $err = $self->{2} // ($self->{pgr} // [])->[2] // *STDERR{GLOB};
+	my $eor = (substr($_[-1], -1, 1) eq "\n" ? () : "\n");
+	print $err @_, $eor and return;
+	my $old_err = delete $self->{2};
+	close($old_err) if $! == EPIPE && $old_err;;
+	$err = $self->{2} = ($self->{pgr} // [])->[2] // *STDERR{GLOB};
+	print $err @_, $eor or print STDERR @_, $eor;
 }
 
 sub qerr ($;@) { $_[0]->{opt}->{quiet} or err(shift, @_) }
@@ -306,6 +311,17 @@ sub fail ($$;$) {
 	undef;
 }
 
+sub out ($;@) {
+	my $self = shift;
+	return if print { $self->{1} // return } @_; # likely
+	return note_sigpipe($self, 1) if $! == EPIPE;
+	my $err = "error writing to stdout: $!";
+	delete $self->{1};
+	fail($self, $err);
+}
+
+sub puts ($;@) { out(shift, map { "$_\n" } @_) }
+
 sub child_error { # passes non-fatal curl exit codes to user
 	my ($self, $child_error) = @_; # child_error is $?
 	if (my $sock = $self->{sock}) { # send to lei(1) client
@@ -350,27 +366,23 @@ sub io_restore ($$) {
 	}
 }
 
-# usage: my %sig = $lei->atfork_child_wq($wq);
-#	 local @SIG{keys %sig} = values %sig;
+# triggers sigpipe_handler
+sub note_sigpipe {
+	my ($self, $fd) = @_;
+	close(delete($self->{$fd})); # explicit close silences Perl warning
+	syswrite($self->{op_pipe}, '!') if $self->{op_pipe};
+	x_it($self, 13);
+}
+
 sub atfork_child_wq {
 	my ($self, $wq) = @_;
 	io_restore($self, $wq);
+	-p $self->{op_pipe} or die 'BUG: {op_pipe} expected';
 	io_restore($self->{l2m}, $wq);
 	%PATH2CFG = ();
 	undef $errors_log;
 	$quit = \&CORE::exit;
-	(PIPE => sub {
-		$self->x_it(13); # SIGPIPE = 13
-		# we need to close explicitly to avoid Perl warning on SIGPIPE
-		for my $i (1, 2) {
-			next unless $self->{$i} && (-p $self->{$i} || -S _);
-			close(delete $self->{$i});
-		}
-		# trigger the LeiXSearch $done OpPipe:
-		syswrite($self->{op_pipe}, '!') if $self->{op_pipe};
-		$SIG{PIPE} = 'DEFAULT';
-		die bless(\"$_[0]", 'PublicInbox::SIGPIPE'),
-	});
+	$current_lei = $self; # for SIG{__WARN__}
 }
 
 sub io_extract ($;@) {
diff --git a/lib/PublicInbox/LeiExternal.pm b/lib/PublicInbox/LeiExternal.pm
index bf07c41c..b1176824 100644
--- a/lib/PublicInbox/LeiExternal.pm
+++ b/lib/PublicInbox/LeiExternal.pm
@@ -31,11 +31,10 @@ sub _externals_each {
 
 sub lei_ls_external {
 	my ($self, @argv) = @_;
-	my $out = $self->{1};
 	my ($OFS, $ORS) = $self->{opt}->{z} ? ("\0", "\0\0") : (" ", "\n");
 	$self->_externals_each(sub {
 		my ($loc, $boost_val) = @_;
-		print $out $loc, $OFS, 'boost=', $boost_val, $ORS;
+		$self->out($loc, $OFS, 'boost=', $boost_val, $ORS);
 	});
 }
 
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index fa041457..1d62ffe2 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -107,28 +107,22 @@ sub new {
 sub ovv_begin {
 	my ($self, $lei) = @_;
 	if ($self->{fmt} eq 'json') {
-		print { $lei->{1} } '[';
+		$lei->out('[');
 	} # TODO HTML/Atom/...
 }
 
 # called once by parent (via PublicInbox::EOFpipe)
 sub ovv_end {
 	my ($self, $lei) = @_;
-	my $out = $lei->{1} or return;
 	if ($self->{fmt} eq 'json') {
 		# JSON doesn't allow trailing commas, and preventing
 		# trailing commas is a PITA when parallelizing outputs
-		print $out "null]\n";
+		$lei->out("null]\n");
 	} elsif ($self->{fmt} eq 'concatjson') {
-		print $out "\n";
+		$lei->out("\n");
 	}
 }
 
-sub ovv_atfork_child {
-	my ($self) = @_;
-	# reopen dedupe here
-}
-
 # prepares an smsg for JSON
 sub _unbless_smsg {
 	my ($smsg, $mitem) = @_;
@@ -168,9 +162,8 @@ sub ovv_atexit_child {
 		$git->async_wait_all;
 	}
 	if (my $bref = delete $lei->{ovv_buf}) {
-		my $out = $lei->{1} or return;
 		my $lk = $self->lock_for_scope;
-		print $out $$bref;
+		$lei->out($$bref);
 	}
 }
 
@@ -268,11 +261,10 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
 				}
 			} sort keys %$smsg);
 			$buf .= $EOR;
-			if (length($buf) > 65536) {
-				my $lk = $self->lock_for_scope;
-				print { $lei->{1} } $buf;
-				$buf = '';
-			}
+			return if length($buf) < 65536;
+			my $lk = $self->lock_for_scope;
+			$lei->out($buf);
+			$buf = '';
 		}
 	} elsif ($json) {
 		my $ORS = $self->{fmt} eq 'json' ? ",\n" : "\n"; # JSONL
@@ -280,11 +272,10 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
 			my ($smsg, $mitem) = @_;
 			return if $dedupe->is_smsg_dup($smsg);
 			$buf .= $json->encode(_unbless_smsg(@_)) . $ORS;
-			if (length($buf) > 65536) {
-				my $lk = $self->lock_for_scope;
-				print { $lei->{1} } $buf;
-				$buf = '';
-			}
+			return if length($buf) < 65536;
+			my $lk = $self->lock_for_scope;
+			$lei->out($buf);
+			$buf = '';
 		}
 	} # else { ...
 }
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 6d2ce7fc..01e7cec5 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -17,7 +17,7 @@ use PublicInbox::GitAsyncCat;
 use Symbol qw(gensym);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
-use Errno qw(EEXIST ESPIPE ENOENT);
+use Errno qw(EEXIST ESPIPE ENOENT EPIPE);
 
 # struggles with short-lived repos, Gcf2Client makes little sense with lei;
 # but we may use in-process libgit2 in the future.
@@ -68,14 +68,16 @@ sub _mbox_hdr_buf ($$$) {
 }
 
 sub atomic_append { # for on-disk destinations (O_APPEND, or O_EXCL)
-	my ($fh, $buf) = @_;
-	defined(my $w = syswrite($fh, $$buf)) or die "write: $!";
-	$w == length($$buf) or die "short write: $w != ".length($$buf);
-}
-
-sub _print_full {
-	my ($fh, $buf) = @_;
-	print $fh $$buf or die "print: $!";
+	my ($lei, $buf) = @_;
+	if (defined(my $w = syswrite($lei->{1} // return, $$buf))) {
+		return if $w == length($$buf);
+		$buf = "short atomic write: $w != ".length($$buf);
+	} elsif ($! == EPIPE) {
+		return $lei->note_sigpipe(1);
+	} else {
+		$buf = "atomic write: $!";
+	}
+	$lei->fail($buf);
 }
 
 sub eml2mboxrd ($;$) {
@@ -248,26 +250,19 @@ sub _mbox_write_cb ($$) {
 	my $ovv = $lei->{ovv};
 	my $m = 'eml2'.$ovv->{fmt};
 	my $eml2mbox = $self->can($m) or die "$self->$m missing";
-	my $out = $lei->{1} // die "no stdout ($m, $ovv->{dst})"; # redirected earlier
-	$out->autoflush(1);
-	my $write = $ovv->{lock_path} ? \&_print_full : \&atomic_append;
+	$lei->{1} // die "no stdout ($m, $ovv->{dst})"; # redirected earlier
+	$lei->{1}->autoflush(1);
+	my $atomic_append = !defined($ovv->{lock_path});
 	my $dedupe = $lei->{dedupe};
 	$dedupe->prepare_dedupe;
 	sub { # for git_to_mail
 		my ($buf, $smsg, $eml) = @_;
-		return unless $out;
 		$eml //= PublicInbox::Eml->new($buf);
-		if (!$dedupe->is_dup($eml, $smsg->{blob})) {
-			$buf = $eml2mbox->($eml, $smsg);
-			eval {
-				my $lk = $ovv->lock_for_scope;
-				$write->($out, $buf);
-			};
-			if ($@) {
-				die $@ if ref($@) ne 'PublicInbox::SIGPIPE';
-				undef $out
-			}
-		}
+		return if $dedupe->is_dup($eml, $smsg->{blob});
+		$buf = $eml2mbox->($eml, $smsg);
+		return atomic_append($lei, $buf) if $atomic_append;
+		my $lk = $ovv->lock_for_scope;
+		$lei->out($$buf);
 	}
 }
 
@@ -469,8 +464,7 @@ sub write_mail { # via ->wq_do
 	my ($self, $git_dir, $smsg, $lei) = @_;
 	my $not_done = delete $self->{$lei->{each_smsg_not_done}};
 	my $wcb = $self->{wcb} //= do { # first message
-		my %sig = $lei->atfork_child_wq($self);
-		@SIG{keys %sig} = values %sig; # not local
+		$lei->atfork_child_wq($self);
 		$self->write_cb($lei);
 	};
 	my $git = $self->{"$$\0$git_dir"} //= PublicInbox::Git->new($git_dir);
@@ -485,7 +479,6 @@ sub wq_atexit_child {
 		$git->async_wait_all;
 	}
 	$SIG{__WARN__} = 'DEFAULT';
-	$SIG{PIPE} = 'DEFAULT';
 }
 
 1;
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index e69b637c..de82a7da 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -109,8 +109,7 @@ sub wait_startq ($) {
 sub query_thread_mset { # for --thread
 	my ($self, $lei, $ibxish) = @_;
 	local $0 = "$0 query_thread_mset";
-	my %sig = $lei->atfork_child_wq($self);
-	local @SIG{keys %sig} = values %sig;
+	$lei->atfork_child_wq($self);
 	my $startq = delete $lei->{startq};
 
 	my ($srch, $over) = ($ibxish->search, $ibxish->over);
@@ -145,8 +144,7 @@ sub query_thread_mset { # for --thread
 sub query_mset { # non-parallel for non-"--thread" users
 	my ($self, $lei) = @_;
 	local $0 = "$0 query_mset";
-	my %sig = $lei->atfork_child_wq($self);
-	local @SIG{keys %sig} = values %sig;
+	$lei->atfork_child_wq($self);
 	my $startq = delete $lei->{startq};
 	my $mo = { %{$lei->{mset_opt}} };
 	my $mset;
@@ -187,8 +185,7 @@ sub kill_reap {
 sub query_remote_mboxrd {
 	my ($self, $lei, $uris) = @_;
 	local $0 = "$0 query_remote_mboxrd";
-	my %sig = $lei->atfork_child_wq($self); # keep $self->{5} startq
-	local @SIG{keys %sig} = values %sig;
+	$lei->atfork_child_wq($self);
 	my ($opt, $env) = @$lei{qw(opt env)};
 	my @qform = (q => $lei->{mset_opt}->{qstr}, x => 'm');
 	push(@qform, t => 1) if $opt->{thread};
@@ -351,9 +348,7 @@ sub start_query { # always runs in main (lei-daemon) process
 sub query_prepare { # called by wq_do
 	my ($self, $lei) = @_;
 	local $0 = "$0 query_prepare";
-	my %sig = $lei->atfork_child_wq($self);
-	-p $lei->{op_pipe} or die "BUG: \$done pipe expected";
-	local @SIG{keys %sig} = values %sig;
+	$lei->atfork_child_wq($self);
 	delete $lei->{l2m}->{-wq_s1};
 	eval { $lei->{l2m}->do_augment($lei) };
 	$lei->fail($@) if $@;
@@ -363,11 +358,11 @@ sub query_prepare { # called by wq_do
 sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
 	my ($lei) = @_;
 	my $lxs = delete $lei->{lxs};
-	if ($lxs && $lxs->wq_kill_old) {
-		kill 'PIPE', $$;
+	if ($lxs && $lxs->wq_kill_old) { # is this the daemon?
 		$lxs->wq_wait_old;
 	}
 	close(delete $lei->{1}) if $lei->{1};
+	$lei->x_it(13);
 }
 
 sub do_query {
diff --git a/t/lei_to_mail.t b/t/lei_to_mail.t
index 47c0e3d4..f7535687 100644
--- a/t/lei_to_mail.t
+++ b/t/lei_to_mail.t
@@ -12,6 +12,7 @@ use List::Util qw(shuffle);
 require_mods(qw(DBD::SQLite));
 require PublicInbox::MboxReader;
 require PublicInbox::LeiOverview;
+require PublicInbox::LEI;
 use_ok 'PublicInbox::LeiToMail';
 my $from = "Content-Length: 10\nSubject: x\n\nFrom hell\n";
 my $noeol = "Subject: x\n\nFrom hell";
@@ -73,7 +74,11 @@ for my $mbox (@MBOX) {
 my ($tmpdir, $for_destroy) = tmpdir();
 local $ENV{TMPDIR} = $tmpdir;
 open my $err, '>>', "$tmpdir/lei.err" or BAIL_OUT $!;
-my $lei = { 2 => $err };
+my $lei = bless { 2 => $err }, 'PublicInbox::LEI';
+my $commit = sub {
+	$_[0] = undef; # wcb
+	delete $lei->{1};
+};
 my $buf = <<'EOM';
 From: x@example.com
 Subject: x
@@ -98,9 +103,7 @@ my $wcb_get = sub {
 	my $zpipe = $l2m->pre_augment($lei);
 	$l2m->do_augment($lei);
 	$l2m->post_augment($lei, $zpipe);
-	my $cb = $l2m->write_cb($lei);
-	delete $lei->{1};
-	$cb;
+	$l2m->write_cb($lei);
 };
 
 my $deadbeef = { blob => 'deadbeef', kw => [ qw(seen) ] };
@@ -109,7 +112,7 @@ my $orig = do {
 	is(ref $wcb, 'CODE', 'write_cb returned callback');
 	ok(-f $fn && !-s _, 'empty file created');
 	$wcb->(\(my $dup = $buf), $deadbeef);
-	undef $wcb;
+	$commit->($wcb);
 	open my $fh, '<', $fn or BAIL_OUT $!;
 	my $raw = do { local $/; <$fh> };
 	like($raw, qr/^blah\n/sm, 'wrote content');
@@ -119,7 +122,7 @@ my $orig = do {
 	$wcb = $wcb_get->($mbox, $fn);
 	ok(-f $fn && !-s _, 'truncated mbox destination');
 	$wcb->(\($dup = $buf), $deadbeef);
-	undef $wcb;
+	$commit->($wcb);
 	open $fh, '<', $fn or BAIL_OUT $!;
 	is(do { local $/; <$fh> }, $raw, 'jobs > 1');
 	$raw;
@@ -134,7 +137,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
 		my $f = "$fn.$zsfx";
 		my $wcb = $wcb_get->($mbox, $f);
 		$wcb->(\(my $dup = $buf), $deadbeef);
-		undef $wcb;
+		$commit->($wcb);
 		my $uncompressed = xqx([@$dc_cmd, $f]);
 		is($uncompressed, $orig, "$zsfx works unlocked");
 
@@ -142,13 +145,13 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
 		unlink $f or BAIL_OUT "unlink $!";
 		$wcb = $wcb_get->($mbox, $f);
 		$wcb->(\($dup = $buf), $deadbeef);
-		undef $wcb;
+		$commit->($wcb);
 		is(xqx([@$dc_cmd, $f]), $orig, "$zsfx matches with lock");
 
 		local $lei->{opt} = { augment => 1 };
 		$wcb = $wcb_get->($mbox, $f);
 		$wcb->(\($dup = $buf . "\nx\n"), $deadbeef);
-		undef $wcb; # commit
+		$commit->($wcb);
 
 		my $cat = popen_rd([@$dc_cmd, $f]);
 		my @raw;
@@ -160,7 +163,7 @@ for my $zsfx (qw(gz bz2 xz)) { # XXX should we support zst, zz, lzo, lzma?
 		local $lei->{opt} = { augment => 1, jobs => 2 };
 		$wcb = $wcb_get->($mbox, $f);
 		$wcb->(\($dup = $buf . "\ny\n"), $deadbeef);
-		undef $wcb; # commit
+		$commit->($wcb);
 
 		my @raw3;
 		$cat = popen_rd([@$dc_cmd, $f]);
@@ -183,7 +186,7 @@ if ('default deduplication uses content_hash') {
 	my $wcb = $wcb_get->('mboxo', $fn);
 	$deadbeef->{kw} = [];
 	$wcb->(\(my $x = $buf), $deadbeef) for (1..2);
-	undef $wcb; # undef to commit changes
+	$commit->($wcb);
 	my $cmp = '';
 	open my $fh, '<', $fn or BAIL_OUT $!;
 	PublicInbox::MboxReader->mboxo($fh, sub { $cmp .= $as_orig->(@_) });
@@ -192,7 +195,7 @@ if ('default deduplication uses content_hash') {
 	local $lei->{opt} = { augment => 1 };
 	$wcb = $wcb_get->('mboxo', $fn);
 	$wcb->(\($x = $buf . "\nx\n"), $deadbeef) for (1..2);
-	undef $wcb; # undef to commit changes
+	$commit->($wcb);
 	open $fh, '<', $fn or BAIL_OUT $!;
 	my @x;
 	PublicInbox::MboxReader->mboxo($fh, sub { push @x, $as_orig->(@_) });
@@ -206,7 +209,7 @@ if ('default deduplication uses content_hash') {
 	local $lei->{1} = $tmp;
 	my $wcb = $wcb_get->('mboxrd', '/dev/stdout');
 	$wcb->(\(my $x = $buf), $deadbeef);
-	undef $wcb; # commit
+	$commit->($wcb);
 	seek($tmp, 0, SEEK_SET) or BAIL_OUT $!;
 	my $cmp = '';
 	PublicInbox::MboxReader->mboxrd($tmp, sub { $cmp .= $as_orig->(@_) });
@@ -220,7 +223,7 @@ SKIP: { # FIFO support
 	my $cat = popen_rd([which('cat'), $fn]);
 	my $wcb = $wcb_get->('mboxo', $fn);
 	$wcb->(\(my $x = $buf), $deadbeef);
-	undef $wcb; # commit
+	$commit->($wcb);
 	my $cmp = '';
 	PublicInbox::MboxReader->mboxo($cat, sub { $cmp .= $as_orig->(@_) });
 	is($cmp, $buf, 'message written to FIFO');

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 06/15] ipc: more helpful ETOOMANYREFS error messages
  2021-01-30 10:19 [PATCH 01/15] lei: more consistent dedupe initialization Eric Wong
                   ` (3 preceding siblings ...)
  2021-01-30 10:19 ` [PATCH 05/15] lei: NO SIGPIPE Eric Wong
@ 2021-01-30 10:19 ` Eric Wong
  2021-01-30 10:19 ` [PATCH 07/15] lei: remove syslog dependency Eric Wong
                   ` (8 subsequent siblings)
  13 siblings, 0 replies; 15+ messages in thread
From: Eric Wong @ 2021-01-30 10:19 UTC (permalink / raw)
  To: spew

ETOOMANYREFS is probably a unfamiliar error to most users, so
give a hint about RLIMIT_NOFILE.  This can be hit on my system
running 3 simultaneous queries with my system default limit of
1024.

There's also no need to import Errno constants for uncommon
errors, so we'll stop using Errno, here.

In lei, we'll also try to bump RLIMIT_NOFILE as much as possible
to avoid this error.
---
 lib/PublicInbox/IPC.pm | 6 +++---
 lib/PublicInbox/LEI.pm | 5 +++++
 2 files changed, 8 insertions(+), 3 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 172552b9..37f02944 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -16,7 +16,6 @@ use PublicInbox::Spawn;
 use PublicInbox::OnDestroy;
 use PublicInbox::WQWorker;
 use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
-use Errno qw(EMSGSIZE);
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
 use constant PIPE_BUF => $^O eq 'linux' ? 4096 : POSIX::_POSIX_PIPE_BUF();
 my $WQ_MAX_WORKERS = 4096;
@@ -303,8 +302,9 @@ sub wq_do { # always async
 	if (my $s1 = $self->{-wq_s1}) { # run in worker
 		my $fds = [ map { fileno($_) } @$ios ];
 		my $n = $send_cmd->($s1, $fds, freeze([$sub, @args]), MSG_EOR);
-		return if defined($n);
-		croak "sendmsg error: $!" if $! != EMSGSIZE;
+		return if defined($n); # likely
+		croak "sendmsg: $! (check RLIMIT_NOFILE)" if $!{ETOOMANYREFS};
+		croak "sendmsg: $!" if !$!{EMSGSIZE};
 		socketpair(my $r, my $w, AF_UNIX, SOCK_STREAM, 0) or
 			croak "socketpair: $!";
 		my $buf = freeze([$sub, @args]);
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index b915bb0c..22cd20f6 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -925,6 +925,11 @@ sub lazy_start {
 		$! = $errno; # allow interpolation to stringify in die
 		die "connect($path): $!";
 	}
+	if (eval { require BSD::Resource }) {
+		my $NOFILE = BSD::Resource::RLIMIT_NOFILE();
+		my ($s, $h) = BSD::Resource::getrlimit($NOFILE);
+		BSD::Resource::setrlimit($NOFILE, $h, $h) if $s < $h;
+	}
 	umask(077) // die("umask(077): $!");
 	local $listener;
 	socket($listener, AF_UNIX, SOCK_SEQPACKET, 0) or die "socket: $!";

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 07/15] lei: remove syslog dependency
  2021-01-30 10:19 [PATCH 01/15] lei: more consistent dedupe initialization Eric Wong
                   ` (4 preceding siblings ...)
  2021-01-30 10:19 ` [PATCH 06/15] ipc: more helpful ETOOMANYREFS error messages Eric Wong
@ 2021-01-30 10:19 ` Eric Wong
  2021-01-30 10:19 ` [PATCH 08/15] sharedkv: release {dbh} before rmtree Eric Wong
                   ` (7 subsequent siblings)
  13 siblings, 0 replies; 15+ messages in thread
From: Eric Wong @ 2021-01-30 10:19 UTC (permalink / raw)
  To: spew

It doesn't seem necessary now that we redirect and write
stuff to errors.log.
---
 lib/PublicInbox/LEI.pm | 14 ++++----------
 1 file changed, 4 insertions(+), 10 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 22cd20f6..b1a5ae43 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -17,7 +17,6 @@ use Cwd qw(getcwd);
 use POSIX ();
 use IO::Handle ();
 use Fcntl qw(SEEK_SET);
-use Sys::Syslog qw(syslog openlog);
 use PublicInbox::Config;
 use PublicInbox::Syscall qw(SFD_NONBLOCK EPOLLIN EPOLLET);
 use PublicInbox::Sigfd;
@@ -1007,9 +1006,9 @@ sub lazy_start {
 				warn "$path dev/ino changed, quitting\n";
 				$path = undef;
 			}
-		} elsif (defined($path)) {
-			warn "stat($path): $!, quitting ...\n";
-			undef $path; # don't unlink
+		} elsif (defined($path)) { # ENOENT is common
+			warn "stat($path): $!, quitting ...\n" if $! != ENOENT;
+			undef $path;
 			$quit->();
 		}
 		return 1 if defined($path);
@@ -1029,18 +1028,13 @@ sub lazy_start {
 	# STDIN was redirected to /dev/null above, closing STDERR and
 	# STDOUT will cause the calling `lei' client process to finish
 	# reading the <$daemon> pipe.
-	openlog($path, 'pid', 'user');
 	local $SIG{__WARN__} = sub {
-		$current_lei ? err($current_lei, @_) : syslog('warning', "@_");
+		$current_lei ? err($current_lei, @_) : warn(@_);
 	};
-	my $on_destroy = PublicInbox::OnDestroy->new($$, sub {
-		syslog('crit', "$@") if $@;
-	});
 	open STDERR, '>&STDIN' or die "redirect stderr failed: $!";
 	open STDOUT, '>&STDIN' or die "redirect stdout failed: $!";
 	# $daemon pipe to `lei' closed, main loop begins:
 	PublicInbox::DS->EventLoop;
-	@$on_destroy = (); # cancel on_destroy if we get here
 	exit($exit_code // 0);
 }
 

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 08/15] sharedkv: release {dbh} before rmtree
  2021-01-30 10:19 [PATCH 01/15] lei: more consistent dedupe initialization Eric Wong
                   ` (5 preceding siblings ...)
  2021-01-30 10:19 ` [PATCH 07/15] lei: remove syslog dependency Eric Wong
@ 2021-01-30 10:19 ` Eric Wong
  2021-01-30 10:19 ` [PATCH 09/15] content_hash: skip Sender for cross posted messages Eric Wong
                   ` (6 subsequent siblings)
  13 siblings, 0 replies; 15+ messages in thread
From: Eric Wong @ 2021-01-30 10:19 UTC (permalink / raw)
  To: spew

This may be needed to avoid warnings/errors when
operating in single process mode.
---
 lib/PublicInbox/SharedKV.pm | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lib/PublicInbox/SharedKV.pm b/lib/PublicInbox/SharedKV.pm
index 94f2429f..e400c6cf 100644
--- a/lib/PublicInbox/SharedKV.pm
+++ b/lib/PublicInbox/SharedKV.pm
@@ -145,6 +145,7 @@ SELECT COUNT(k) FROM kv
 
 sub DESTROY {
 	my ($self) = @_;
+	delete $self->{dbh};
 	my $dir = delete $self->{"tmp$$.$self"} or return;
 	rmtree($dir);
 }

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 09/15] content_hash: skip Sender for cross posted messages
  2021-01-30 10:19 [PATCH 01/15] lei: more consistent dedupe initialization Eric Wong
                   ` (6 preceding siblings ...)
  2021-01-30 10:19 ` [PATCH 08/15] sharedkv: release {dbh} before rmtree Eric Wong
@ 2021-01-30 10:19 ` Eric Wong
  2021-01-30 10:19 ` [PATCH 10/15] sharedkv: retry rmtree Eric Wong
                   ` (5 subsequent siblings)
  13 siblings, 0 replies; 15+ messages in thread
From: Eric Wong @ 2021-01-30 10:19 UTC (permalink / raw)
  To: spew

This regression was introduced long ago and matches behavior
originally specified in the comments.  It makes a noticeable
improvement with search results using -extindex ("all") and
lei results with multiple inboxes.

Update some style bits at the top of the test case while
we're at it.

Fixes: f0ef0a56a8957d6f ("v2: improve deduplication checks")
---
 lib/PublicInbox/ContentHash.pm |  7 +++----
 t/content_hash.t               | 14 +++++++++++++-
 2 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/lib/PublicInbox/ContentHash.pm b/lib/PublicInbox/ContentHash.pm
index 838fdd6f..4dbe7b50 100644
--- a/lib/PublicInbox/ContentHash.pm
+++ b/lib/PublicInbox/ContentHash.pm
@@ -68,10 +68,9 @@ sub content_digest ($) {
 
 	# Only use Sender: if From is not present
 	foreach my $h (qw(From Sender)) {
-		my @v = $eml->header($h);
-		if (@v) {
-			digest_addr($dig, $h, $_) foreach @v;
-		}
+		my @v = $eml->header($h) or next;
+		digest_addr($dig, $h, $_) foreach @v;
+		last;
 	}
 	foreach my $h (qw(Subject Date)) {
 		my @v = $eml->header($h);
diff --git a/t/content_hash.t b/t/content_hash.t
index 3f02b1b3..060665f6 100644
--- a/t/content_hash.t
+++ b/t/content_hash.t
@@ -1,7 +1,8 @@
+#!perl -w
 # Copyright (C) 2018-2021 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 use strict;
-use warnings;
+use v5.10.1;
 use Test::More;
 use PublicInbox::ContentHash qw(content_hash);
 use PublicInbox::Eml;
@@ -19,6 +20,17 @@ EOF
 my $orig = content_hash($mime);
 my $reload = content_hash(PublicInbox::Eml->new($mime->as_string));
 is($orig, $reload, 'content_hash matches after serialization');
+{
+	my $s1 = PublicInbox::Eml->new($mime->as_string);
+	$s1->header_set('Sender', 's@example.com');
+	is(content_hash($s1), $orig, "Sender ignored when 'From' present");
+	my $s2 = PublicInbox::Eml->new($s1->as_string);
+	$s1->header_set('Sender', 'sender@example.com');
+	is(content_hash($s2), $orig, "Sender really ignored 'From'");
+	$_->header_set('From') for ($s1, $s2);
+	isnt(content_hash($s1), content_hash($s2),
+		'sender accounted when From missing');
+}
 
 foreach my $h (qw(From To Cc)) {
 	my $n = q("Quoted N'Ame" <foo@EXAMPLE.com>);

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 10/15] sharedkv: retry rmtree
  2021-01-30 10:19 [PATCH 01/15] lei: more consistent dedupe initialization Eric Wong
                   ` (7 preceding siblings ...)
  2021-01-30 10:19 ` [PATCH 09/15] content_hash: skip Sender for cross posted messages Eric Wong
@ 2021-01-30 10:19 ` Eric Wong
  2021-01-30 10:19 ` [PATCH 11/15] lei: keep $lei around until workers are reaped Eric Wong
                   ` (4 subsequent siblings)
  13 siblings, 0 replies; 15+ messages in thread
From: Eric Wong @ 2021-01-30 10:19 UTC (permalink / raw)
  To: spew

---
 lib/PublicInbox/SharedKV.pm | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/lib/PublicInbox/SharedKV.pm b/lib/PublicInbox/SharedKV.pm
index e400c6cf..2d1bf53c 100644
--- a/lib/PublicInbox/SharedKV.pm
+++ b/lib/PublicInbox/SharedKV.pm
@@ -147,7 +147,12 @@ sub DESTROY {
 	my ($self) = @_;
 	delete $self->{dbh};
 	my $dir = delete $self->{"tmp$$.$self"} or return;
-	rmtree($dir);
+	my $tries = 5;
+	do {
+		$! = 0;
+		eval { rmtree($dir) };
+	} while ($@ && $!{ENOENT} && $tries--);
+	warn "error removing $dir: $@" if $@;
 }
 
 1;

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 11/15] lei: keep $lei around until workers are reaped
  2021-01-30 10:19 [PATCH 01/15] lei: more consistent dedupe initialization Eric Wong
                   ` (8 preceding siblings ...)
  2021-01-30 10:19 ` [PATCH 10/15] sharedkv: retry rmtree Eric Wong
@ 2021-01-30 10:19 ` Eric Wong
  2021-01-30 10:19 ` [PATCH 12/15] lei_dedupe: use Digest::SHA Eric Wong
                   ` (3 subsequent siblings)
  13 siblings, 0 replies; 15+ messages in thread
From: Eric Wong @ 2021-01-30 10:19 UTC (permalink / raw)
  To: spew

This prevents SharedKV->DESTROY in lei-daemon from triggering
before DB handles are closed in lei2mail processes.  The
{each_smsg_not_done} pipe was not sufficient in this case:
that gets closed at the end of the last git_to_mail callback
invocation.
---
 lib/PublicInbox/IPC.pm        | 10 +++++-----
 lib/PublicInbox/LEI.pm        |  2 +-
 lib/PublicInbox/LeiXSearch.pm |  4 ++--
 3 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index 37f02944..689f32d0 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -137,7 +137,7 @@ sub ipc_worker_spawn {
 }
 
 sub ipc_worker_reap { # dwaitpid callback
-	my ($self, $pid) = @_;
+	my ($args, $pid) = @_;
 	return if !$?;
 	# TERM(15) is our default exit signal, PIPE(13) is likely w/ pager
 	my $s = $? & 127;
@@ -145,9 +145,9 @@ sub ipc_worker_reap { # dwaitpid callback
 }
 
 sub wq_wait_old {
-	my ($self) = @_;
+	my ($self, $args) = @_;
 	my $pids = delete $self->{"-wq_old_pids.$$"} or return;
-	dwaitpid($_, \&ipc_worker_reap, $self) for @$pids;
+	dwaitpid($_, \&ipc_worker_reap, [$self, $args]) for @$pids;
 }
 
 # for base class, override in sub classes
@@ -164,7 +164,7 @@ sub ipc_atfork_child {
 
 # idempotent, can be called regardless of whether worker is active or not
 sub ipc_worker_stop {
-	my ($self) = @_;
+	my ($self, $args) = @_;
 	my ($pid, $ppid) = delete(@$self{qw(-ipc_pid -ipc_ppid)});
 	my ($w_req, $r_res) = delete(@$self{qw(-ipc_req -ipc_res)});
 	if (!$w_req && !$r_res) {
@@ -175,7 +175,7 @@ sub ipc_worker_stop {
 	$w_req = $r_res = undef;
 
 	return if $$ != $ppid;
-	dwaitpid($pid, \&ipc_worker_reap, $self);
+	dwaitpid($pid, \&ipc_worker_reap, [$self, $args]);
 }
 
 # use this if we have multiple readers reading curl or "pigz -dc"
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index b1a5ae43..e2e692e9 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -860,7 +860,7 @@ sub dclose {
 		if ($wq->wq_kill) {
 			$wq->wq_close
 		} elsif ($wq->wq_kill_old) {
-			$wq->wq_wait_old;
+			$wq->wq_wait_old($self);
 		}
 	}
 	close(delete $self->{1}) if $self->{1}; # may reap_compress
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index de82a7da..b4a9b89d 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -283,7 +283,7 @@ sub query_done { # EOF callback
 	my $has_l2m = exists $lei->{l2m};
 	for my $f (qw(lxs l2m)) {
 		my $wq = delete $lei->{$f} or next;
-		$wq->wq_wait_old;
+		$wq->wq_wait_old($lei);
 	}
 	$lei->{ovv}->ovv_end($lei);
 	if ($has_l2m) { # close() calls LeiToMail reap_compress
@@ -359,7 +359,7 @@ sub sigpipe_handler { # handles SIGPIPE from l2m/lxs workers
 	my ($lei) = @_;
 	my $lxs = delete $lei->{lxs};
 	if ($lxs && $lxs->wq_kill_old) { # is this the daemon?
-		$lxs->wq_wait_old;
+		$lxs->wq_wait_old($lei);
 	}
 	close(delete $lei->{1}) if $lei->{1};
 	$lei->x_it(13);

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 12/15] lei_dedupe: use Digest::SHA
  2021-01-30 10:19 [PATCH 01/15] lei: more consistent dedupe initialization Eric Wong
                   ` (9 preceding siblings ...)
  2021-01-30 10:19 ` [PATCH 11/15] lei: keep $lei around until workers are reaped Eric Wong
@ 2021-01-30 10:19 ` Eric Wong
  2021-01-30 10:19 ` [PATCH 13/15] sharedkv: note number of tries Eric Wong
                   ` (2 subsequent siblings)
  13 siblings, 0 replies; 15+ messages in thread
From: Eric Wong @ 2021-01-30 10:19 UTC (permalink / raw)
  To: spew

While it's loaded by ContentHash, we use Digest::SHA directly in
this package for smsg and OID-only deduplication.
---
 lib/PublicInbox/LeiDedupe.pm | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm
index e3ae8e33..55488376 100644
--- a/lib/PublicInbox/LeiDedupe.pm
+++ b/lib/PublicInbox/LeiDedupe.pm
@@ -5,6 +5,7 @@ use strict;
 use v5.10.1;
 use PublicInbox::SharedKV;
 use PublicInbox::ContentHash qw(content_hash);
+use Digest::SHA ();
 
 # n.b. mutt sets most of these headers not sure about Bytes
 our @OID_IGNORE = qw(Status X-Status Content-Length Lines Bytes);

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 13/15] sharedkv: note number of tries
  2021-01-30 10:19 [PATCH 01/15] lei: more consistent dedupe initialization Eric Wong
                   ` (10 preceding siblings ...)
  2021-01-30 10:19 ` [PATCH 12/15] lei_dedupe: use Digest::SHA Eric Wong
@ 2021-01-30 10:19 ` Eric Wong
  2021-01-30 10:19 ` [PATCH 14/15] lei_xsearch: load smsg package Eric Wong
  2021-01-30 10:19 ` [PATCH 15/15] lei: deep clone {ovv} Eric Wong
  13 siblings, 0 replies; 15+ messages in thread
From: Eric Wong @ 2021-01-30 10:19 UTC (permalink / raw)
  To: spew

---
 lib/PublicInbox/SharedKV.pm | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/lib/PublicInbox/SharedKV.pm b/lib/PublicInbox/SharedKV.pm
index 2d1bf53c..f5d09cc1 100644
--- a/lib/PublicInbox/SharedKV.pm
+++ b/lib/PublicInbox/SharedKV.pm
@@ -147,12 +147,13 @@ sub DESTROY {
 	my ($self) = @_;
 	delete $self->{dbh};
 	my $dir = delete $self->{"tmp$$.$self"} or return;
-	my $tries = 5;
+	my $tries = 0;
 	do {
 		$! = 0;
 		eval { rmtree($dir) };
-	} while ($@ && $!{ENOENT} && $tries--);
+	} while ($@ && $!{ENOENT} && $tries++ < 5);
 	warn "error removing $dir: $@" if $@;
+	warn "Took $tries tries to remove $dir\n" if $tries;
 }
 
 1;

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 14/15] lei_xsearch: load smsg package
  2021-01-30 10:19 [PATCH 01/15] lei: more consistent dedupe initialization Eric Wong
                   ` (11 preceding siblings ...)
  2021-01-30 10:19 ` [PATCH 13/15] sharedkv: note number of tries Eric Wong
@ 2021-01-30 10:19 ` Eric Wong
  2021-01-30 10:19 ` [PATCH 15/15] lei: deep clone {ovv} Eric Wong
  13 siblings, 0 replies; 15+ messages in thread
From: Eric Wong @ 2021-01-30 10:19 UTC (permalink / raw)
  To: spew

We use $smsg->populate here, so ensure it's loaded although
PublicInbox::Search currently loads it.
---
 lib/PublicInbox/LeiXSearch.pm | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index b4a9b89d..4d390ee4 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -16,6 +16,7 @@ use File::Spec ();
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::Spawn qw(popen_rd spawn which);
 use PublicInbox::MID qw(mids);
+use PublicInbox::Smsg;
 use Fcntl qw(SEEK_SET F_SETFL O_APPEND O_RDWR);
 
 sub new {

^ permalink raw reply related	[flat|nested] 15+ messages in thread

* [PATCH 15/15] lei: deep clone {ovv}
  2021-01-30 10:19 [PATCH 01/15] lei: more consistent dedupe initialization Eric Wong
                   ` (12 preceding siblings ...)
  2021-01-30 10:19 ` [PATCH 14/15] lei_xsearch: load smsg package Eric Wong
@ 2021-01-30 10:19 ` Eric Wong
  13 siblings, 0 replies; 15+ messages in thread
From: Eric Wong @ 2021-01-30 10:19 UTC (permalink / raw)
  To: spew

---
 lib/PublicInbox/LEI.pm | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index e2e692e9..1fb67eef 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -401,8 +401,9 @@ sub atfork_parent_wq {
 	my ($self, $wq) = @_;
 	my $env = delete $self->{env}; # env is inherited at fork
 	my $lei = bless { %$self }, ref($self);
-	if (my $dedupe = delete $lei->{dedupe}) {
-		$lei->{dedupe} = $wq->deep_clone($dedupe);
+	for my $f (qw(dedupe ovv)) {
+		my $tmp = delete($lei->{$f}) or next;
+		$lei->{$f} = $wq->deep_clone($tmp);
 	}
 	$self->{env} = $env;
 	delete @$lei{qw(3 -lei_store cfg old_1 pgr lxs)}; # keep l2m

^ permalink raw reply related	[flat|nested] 15+ messages in thread

end of thread, other threads:[~2021-01-30 10:20 UTC | newest]

Thread overview: 15+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2021-01-30 10:19 [PATCH 01/15] lei: more consistent dedupe initialization Eric Wong
2021-01-30 10:19 ` [PATCH 02/15] lei_to_mail: mbox*: reduce scope of lock Eric Wong
2021-01-30 10:19 ` [PATCH 03/15] ipc: switch wq to use the event loop Eric Wong
2021-01-30 10:19 ` [PATCH 04/15] lei: no per-child SIG{__WARN__} Eric Wong
2021-01-30 10:19 ` [PATCH 05/15] lei: NO SIGPIPE Eric Wong
2021-01-30 10:19 ` [PATCH 06/15] ipc: more helpful ETOOMANYREFS error messages Eric Wong
2021-01-30 10:19 ` [PATCH 07/15] lei: remove syslog dependency Eric Wong
2021-01-30 10:19 ` [PATCH 08/15] sharedkv: release {dbh} before rmtree Eric Wong
2021-01-30 10:19 ` [PATCH 09/15] content_hash: skip Sender for cross posted messages Eric Wong
2021-01-30 10:19 ` [PATCH 10/15] sharedkv: retry rmtree Eric Wong
2021-01-30 10:19 ` [PATCH 11/15] lei: keep $lei around until workers are reaped Eric Wong
2021-01-30 10:19 ` [PATCH 12/15] lei_dedupe: use Digest::SHA Eric Wong
2021-01-30 10:19 ` [PATCH 13/15] sharedkv: note number of tries Eric Wong
2021-01-30 10:19 ` [PATCH 14/15] lei_xsearch: load smsg package Eric Wong
2021-01-30 10:19 ` [PATCH 15/15] lei: deep clone {ovv} Eric Wong

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).