dumping ground for random patches and texts
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: spew@80x24.org
Subject: [PATCH 2/2] wip
Date: Thu, 14 Jan 2021 22:24:19 -1200	[thread overview]
Message-ID: <20210115102419.968-2-e@80x24.org> (raw)
In-Reply-To: <20210115102419.968-1-e@80x24.org>

---
 lib/PublicInbox/IPC.pm         |  9 +++-
 lib/PublicInbox/LEI.pm         | 15 ++++--
 lib/PublicInbox/LeiDedupe.pm   |  3 +-
 lib/PublicInbox/LeiOverview.pm | 10 +++-
 lib/PublicInbox/LeiQuery.pm    | 15 ++----
 lib/PublicInbox/LeiToMail.pm   |  1 -
 lib/PublicInbox/LeiXSearch.pm  | 83 +++++++++++++++++++++++-----------
 lib/PublicInbox/OpPipe.pm      | 38 ++++++++++++++++
 8 files changed, 127 insertions(+), 47 deletions(-)
 create mode 100644 lib/PublicInbox/OpPipe.pm

diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm
index fbc91f6f..eabb894a 100644
--- a/lib/PublicInbox/IPC.pm
+++ b/lib/PublicInbox/IPC.pm
@@ -242,7 +242,10 @@ sub wq_worker_loop ($) {
 	my $len = $self->{wq_req_len} // (4096 * 33);
 	my $s2 = $self->{-wq_s2} // die 'BUG: no -wq_s2';
 	while (1) {
-		my @fds = $recv_cmd->($s2, my $buf, $len) or return; # EOF
+		warn "$$ WAITING $recv_cmd";
+		my @fds = $recv_cmd->($s2, my $buf, $len);
+		use Data::Dumper; warn "$$ GOT ".length($buf), Dumper(\@fds);
+		@fds or return; # EOF
 		my @m = @{$self->{-wq_recv_modes} // [qw( +<&= >&= >&= )]};
 		my $nfd = 0;
 		for my $fd (@fds) {
@@ -291,8 +294,9 @@ sub _wq_worker_start ($$) {
 		PublicInbox::DS::sig_setmask($oldset);
 		# ensure we properly exit even if warn() dies:
 		my $end = PublicInbox::OnDestroy->new($$, sub { exit(!!$@) });
-		my $on_destroy = $self->ipc_atfork_child;
+		my $on_destroy = eval {$self->ipc_atfork_child };
 		eval { wq_worker_loop($self) };
+		warn "$$ WORKER DONE";
 		warn "worker $self->{-wq_ident} PID:$$ died: $@" if $@;
 		undef $on_destroy;
 		undef $end; # trigger exit
@@ -385,6 +389,7 @@ sub wq_close {
 sub wq_kill {
 	my ($self, $sig) = @_;
 	my $workers = $self->{-wq_workers} or return;
+	warn Carp::longmess("KILLING $$ $self: ".join(' ', keys %$workers));
 	kill($sig // 'TERM', keys %$workers);
 }
 
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 1f4a3082..7680f5c6 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -270,8 +270,11 @@ sub fail ($$;$) {
 
 sub atfork_prepare_wq {
 	my ($self, $wq) = @_;
-	push @{$wq->{-ipc_atfork_child_close}}, @TO_CLOSE_ATFORK_CHILD,
-				grep { defined } @$self{qw(0 1 2 sock)}
+	my $tcafc = $wq->{-ipc_atfork_child_close};
+	push @$tcafc, @TO_CLOSE_ATFORK_CHILD;
+	if (my $sock = $self->{sock}) {
+		push @$tcafc, @$self{qw(0 1 2 sock)}
+	}
 }
 
 # usage: my %sig = $lei->atfork_child_wq($wq);
@@ -705,7 +708,7 @@ sub accept_dispatch { # Listener {post_accept} callback
 sub dclose {
 	my ($self) = @_;
 	delete $self->{lxs}; # stops LeiXSearch queries
-	$self->close; # PublicInbox::DS::close
+	$self->close if $self->{sock}; # PublicInbox::DS::close
 }
 
 # for long-running results
@@ -736,8 +739,10 @@ sub event_step {
 
 sub event_step_init {
 	my ($self) = @_;
-	$self->{sock}->blocking(0);
-	$self->SUPER::new($self->{sock}, EPOLLIN|EPOLLET);
+	if (my $sock = $self->{sock}) { # using DS->EventLoop
+		$sock->blocking(0);
+		$self->SUPER::new($sock, EPOLLIN|EPOLLET);
+	}
 }
 
 sub noop {}
diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm
index 81754361..3f478aa4 100644
--- a/lib/PublicInbox/LeiDedupe.pm
+++ b/lib/PublicInbox/LeiDedupe.pm
@@ -89,8 +89,9 @@ sub true { 1 }
 sub dedupe_none ($) { (\&true, \&true) }
 
 sub new {
-	my ($cls, $lei, $dst) = @_;
+	my ($cls, $lei) = @_;
 	my $dd = $lei->{opt}->{dedupe} // 'content';
+	my $dst = $lei->{ovv}->{dst};
 
 	# allow "none" to bypass Eml->new if writing to directory:
 	return if ($dd eq 'none' && substr($dst // '', -1) eq '/');
diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm
index 6b601edb..4afe73a3 100644
--- a/lib/PublicInbox/LeiOverview.pm
+++ b/lib/PublicInbox/LeiOverview.pm
@@ -15,6 +15,8 @@ use PublicInbox::MID qw($MID_EXTRACT);
 use PublicInbox::Address qw(pairs);
 use PublicInbox::Config;
 use PublicInbox::Search qw(get_pct);
+use PublicInbox::LeiDedupe;
+use PublicInbox::LeiToMail;
 
 # cf. https://en.wikipedia.org/wiki/JSON_streaming
 my $JSONL = 'ldjson|ndjson|jsonl'; # 3 names for the same thing
@@ -54,13 +56,14 @@ sub new {
 
 	}
 	$fmt //= 'json' if $dst eq '/dev/stdout';
-	$fmt //= 'maildir'; # TODO
+	$fmt //= 'maildir';
 
 	if (index($dst, '://') < 0) { # not a URL, so assume path
 		 $dst = File::Spec->canonpath($dst);
 	} # else URL
 
 	my $self = bless { fmt => $fmt, dst => $dst }, $class;
+	$lei->{ovv} = $self;
 	my $json;
 	if ($fmt =~ /\A($JSONL|(?:concat)?json)\z/) {
 		$json = $self->{json} = ref(PublicInbox::Config->json);
@@ -80,8 +83,11 @@ sub new {
 	} elsif ($json) {
 		return $lei->fail('JSON formats only output to stdout');
 	} else {
-		return $lei->fail("TODO: $dst -f $fmt");
+		# default to the cheapest sort since MUA usually resorts
+		$lei->{opt}->{'sort'} //= 'docid';
+		$lei->{l2m} = PublicInbox::LeiToMail->new($lei);
 	}
+	$lei->{dedupe} = PublicInbox::LeiDedupe->new($lei);
 	$self;
 }
 
diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm
index 69d2f9a6..2598d85b 100644
--- a/lib/PublicInbox/LeiQuery.pm
+++ b/lib/PublicInbox/LeiQuery.pm
@@ -23,8 +23,6 @@ sub _vivify_external { # _externals_each callback
 # the main "lei q SEARCH_TERMS" method
 sub lei_q {
 	my ($self, @argv) = @_;
-	my $sto = $self->_lei_store(1);
-	my $cfg = $self->_lei_cfg(1);
 	my $opt = $self->{opt};
 
 	# --local is enabled by default
@@ -32,7 +30,6 @@ sub lei_q {
 	my @srcs;
 	require PublicInbox::LeiXSearch;
 	require PublicInbox::LeiOverview;
-	require PublicInbox::LeiDedupe;
 	my $lxs = PublicInbox::LeiXSearch->new;
 
 	# --external is enabled by default, but allow --no-external
@@ -46,10 +43,10 @@ sub lei_q {
 	$lxs->wq_workers_start('lei_xsearch', $j, $self->oldset)
 		// $lxs->wq_workers($j);
 
-	unshift(@srcs, $sto->search) if $opt->{'local'};
 	# no forking workers after this
-	$self->{ovv} = PublicInbox::LeiOverview->new($self);
-	$self->{dd} = PublicInbox::LeiDedupe->new($self);
+	my $ovv = PublicInbox::LeiOverview->new($self) or return;
+	my $sto = $self->_lei_store(1);
+	unshift(@srcs, $sto->search) if $opt->{'local'};
 	my %mset_opt = map { $_ => $opt->{$_} } qw(thread limit offset);
 	$mset_opt{asc} = $opt->{'reverse'} ? 1 : 0;
 	$mset_opt{qstr} = join(' ', map {;
@@ -66,15 +63,13 @@ sub lei_q {
 		} elsif ($sort =~ /\Areceived(?:-?[aA]t)?\z/) {
 			# the default
 		} else {
-			die "unrecognized --sort=$sort\n";
+			return $self->fail("unrecognized --sort=$sort");
 		}
 	}
-	# $self->out($json->encode(\%mset_opt));
 	# descending docid order
 	$mset_opt{relevance} //= -2 if $opt->{thread};
-	# my $wcb = PublicInbox::LeiToMail->write_cb($out, $self);
 	$self->{mset_opt} = \%mset_opt;
-	$self->{ovv}->ovv_begin($self);
+	$ovv->ovv_begin($self);
 	$lxs->do_query($self, \@srcs);
 }
 
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 5d4b7978..31ac33ad 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -321,7 +321,6 @@ sub new {
 	} else {
 		die "bad mail --format=$fmt\n";
 	}
-	my $dedupe = $lei->{dedupe} //= PublicInbox::LeiDedupe->new($lei, $dst);
 	$self;
 }
 
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 25ded544..562e2351 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -8,6 +8,7 @@ package PublicInbox::LeiXSearch;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::LeiSearch PublicInbox::IPC);
+use PublicInbox::OpPipe;
 
 sub new {
 	my ($class) = @_;
@@ -103,8 +104,9 @@ sub query_thread_mset { # for --thread
 	my $mo = { %{$lei->{mset_opt}} };
 	my $mset;
 	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
-	my $dd = $lei->{dd};
-	$dd->prepare_dedupe;
+	my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing';
+	$dedupe->prepare_dedupe;
+	warn "HI";
 	do {
 		$mset = $srch->mset($mo->{qstr}, $mo);
 		my $ids = $srch->mset_to_artnums($mset, $mo);
@@ -114,7 +116,7 @@ sub query_thread_mset { # for --thread
 		while ($over->expand_thread($ctx)) {
 			for my $n (@{$ctx->{xids}}) {
 				my $smsg = $over->get_art($n) or next;
-				next if $dd->is_smsg_dup($smsg);
+				next if $dedupe->is_smsg_dup($smsg);
 				my $mitem = delete $n2item{$smsg->{num}};
 				$each_smsg->($smsg, $mitem);
 			}
@@ -126,19 +128,21 @@ sub query_thread_mset { # for --thread
 
 sub query_mset { # non-parallel for non-"--thread" users
 	my ($self, $lei, $srcs) = @_;
+	warn "AAAAAAA";
 	my %sig = $lei->atfork_child_wq($self);
 	local @SIG{keys %sig} = values %sig;
 	my $mo = { %{$lei->{mset_opt}} };
 	my $mset;
 	$self->attach_external($_) for @$srcs;
 	my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
-	my $dd = $lei->{dd};
-	$dd->prepare_dedupe;
+	my $dedupe = $lei->{dedupe} // die 'BUG: {dedupe} missing';
+	$dedupe->prepare_dedupe;
+	warn "HI";
 	do {
 		$mset = $self->mset($mo->{qstr}, $mo);
 		for my $it ($mset->items) {
 			my $smsg = smsg_for($self, $it) or next;
-			next if $dd->is_smsg_dup($smsg);
+			next if $dedupe->is_smsg_dup($smsg);
 			$each_smsg->($smsg, $it);
 		}
 	} while (_mset_more($mset, $mo));
@@ -151,40 +155,67 @@ sub query_done { # PublicInbox::EOFpipe callback
 	$lei->dclose;
 }
 
-sub do_query {
-	my ($self, $lei_orig, $srcs) = @_;
-	my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+sub start_query {
+	my ($self, $io, $lei, $srcs) = @_;
 	my $remotes = $self->{remotes} // [];
-	pipe(my ($eof_wait, $qry_done)) or die "pipe $!";
-	$io[0] = $qry_done; # don't need stdin
-
 	if ($lei->{opt}->{thread}) {
 		$lei->{-parallel} = scalar(@$remotes) + scalar(@$srcs) - 1;
 		for my $ibxish (@$srcs) {
-			$self->wq_do('query_thread_mset', \@io, $lei, $ibxish);
+			$self->wq_do('query_thread_mset', $io, $lei, $ibxish);
 		}
 	} else {
 		$lei->{-parallel} = scalar(@$remotes);
-		$self->wq_do('query_mset', \@io, $lei, $srcs);
+		$self->wq_do('query_mset', $io, $lei, $srcs);
 	}
 	# TODO
 	for my $rmt (@$remotes) {
-		$self->wq_do('query_thread_mbox', \@io, $lei, $rmt);
+		$self->wq_do('query_thread_mbox', $io, $lei, $rmt);
 	}
-	@io = ();
-	close $qry_done; # fully closed when children are done
-
-	# query_done will run when query_*mset close $qry_done
-	if ($lei_orig->{sock}) { # watch for client premature exit
-		require PublicInbox::EOFpipe;
-		PublicInbox::EOFpipe->new($eof_wait, \&query_done, $lei_orig);
-		$lei_orig->{lxs} = $self;
-		$lei_orig->event_step_init;
+	close $io->[0]; # qry_status_wr
+	@$io = ();
+}
+
+sub query_prepare { # wq_do
+	my ($self, $lei) = @_;
+	eval { $lei->{l2m}->do_prepare($lei) };
+	return $lei->fail($@) if $@;
+	# trigger PublicInbox::OpPipe->event_step
+	my $qry_status_wr = $self->{0} or
+		return $lei->fail('BUG: qry_status_wr missing');
+	$qry_status_wr->autoflush(1);
+	print $qry_status_wr '.' or # this should never fail...
+		return $lei->fail("BUG? print qry_status_wr: $!");
+}
+
+sub do_query {
+	my ($self, $lei_orig, $srcs) = @_;
+	my ($lei, @io) = $lei_orig->atfork_parent_wq($self);
+	use Data::Dumper; warn Dumper($lei);
+	pipe(my ($qry_status_rd, $qry_status_wr)) or die "pipe $!";
+	$io[0] = $qry_status_wr; # don't need stdin
+	$qry_status_wr = undef;
+
+	$lei_orig->{lxs} = $self;
+	$lei_orig->event_step_init; # wait for shutdowns
+	my $op_map = {
+		'' => [ \&query_done, $lei_orig ],
+	};
+	my $in_loop = exists $lei_orig->{sock};
+	my $opp = PublicInbox::OpPipe->new($qry_status_rd, $op_map, $in_loop);
+	if ($lei->{l2m}) {
+		$op_map->{'.'} = [ \&start_query, $self, \@io, $lei, $srcs ];
+		$self->wq_do('query_prepare', \@io, $lei);
+		$opp->event_step if !$in_loop;
 	} else {
+		start_query($self, \@io, $lei, $srcs);
+	}
+	unless ($in_loop) {
 		$self->wq_close;
-		read($eof_wait, my $buf, 1); # wait for close($lei->{0})
-		query_done($lei_orig); # may SIGPIPE
+		warn "DONE";
+		$opp->event_step;
+		warn "DONE";
 	}
+	warn "DONE";
 }
 
 sub ipc_atfork_prepare {
diff --git a/lib/PublicInbox/OpPipe.pm b/lib/PublicInbox/OpPipe.pm
new file mode 100644
index 00000000..5364b525
--- /dev/null
+++ b/lib/PublicInbox/OpPipe.pm
@@ -0,0 +1,38 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# bytecode dispatch pipe, reads a byte, runs a sub
+# byte => [ sub, @operands ]
+package PublicInbox::OpPipe;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN);
+
+sub new {
+	my ($cls, $rd, $op_map, $in_loop) = @_;
+	my $self = bless { sock => $rd, op_map => $op_map }, $cls;
+	# 1031: F_SETPIPE_SZ, 4096: page size
+	fcntl($rd, 1031, 4096) if $^O eq 'linux';
+	if ($in_loop) { # iff using DS->EventLoop
+		$rd->blocking(0);
+		$self->SUPER::new($rd, EPOLLIN);
+	}
+	$self;
+}
+
+sub event_step {
+	my ($self) = @_;
+	my $rd = $self->{sock};
+	my $byte;
+	until (defined(sysread($rd, $byte, 1))) {
+		return if $!{EAGAIN};
+		next if $!{EINTR};
+		die "read \$rd: $!";
+	}
+	my $op = $self->{op_map}->{$byte} or die "BUG: unknown byte `$byte'";
+	my ($sub, @args) = @$op;
+	$sub->(@args);
+}
+
+1;

  reply	other threads:[~2021-01-15 10:24 UTC|newest]

Thread overview: 11+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-01-15 10:24 [PATCH 1/2] lei_to_mail: prepare Eric Wong
2021-01-15 10:24 ` Eric Wong [this message]
  -- strict thread matches above, loose matches on Subject: below --
2023-05-13 22:16 [PATCH 1/2] search: add comments wrt codesearch, reduce ops Eric Wong
2023-05-13 22:16 ` [PATCH 2/2] WIP Eric Wong
2023-05-04 11:07 [PATCH 1/2] reuse open(2) from rb_file_load_ok on POSIX-like system Eric Wong
2023-05-04 11:07 ` [PATCH 2/2] wip Eric Wong
2023-04-17 23:06 [PATCH 1/2] reuse open(2) from rb_file_load_ok on POSIX-like system Eric Wong
2023-04-17 23:06 ` [PATCH 2/2] wip Eric Wong
2020-03-31  6:42 [PATCH 1/2] v2writable: index Message-IDs w/ spaces properly Eric Wong
2020-03-31  6:42 ` [PATCH 2/2] WIP Eric Wong
2019-11-03  3:17 [PATCH 1/2] edit: hoist out prompt functions into sub functions Eric Wong
2019-11-03  3:17 ` [PATCH 2/2] wip Eric Wong
2019-09-23  3:16 [PATCH 1/2] edit: hoist out prompt functions into sub functions ew
2019-09-23  3:16 ` [PATCH 2/2] wip ew
2019-01-07  8:02 [PATCH 1/2] hoist out resolve_repo_dir from -index Eric Wong
2019-01-07  8:02 ` [PATCH 2/2] wip Eric Wong
2018-09-09  8:35 [PATCH 1/2] cont.c (fiber_memsize): do not rely on ROOT_FIBER_CONTEXT Eric Wong
2018-09-09  8:35 ` [PATCH 2/2] wip Eric Wong
2018-09-09  8:00 [PATCH 1/2] test_fiber: show crash on GC after forking Eric Wong
2018-09-09  8:00 ` [PATCH 2/2] wip Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210115102419.968-2-e@80x24.org \
    --to=e@80x24.org \
    --cc=spew@80x24.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).