dumping ground for random patches and texts
 help / color / mirror / Atom feed
From: Eric Wong <e@80x24.org>
To: spew@80x24.org
Subject: [PATCH] wip
Date: Sat,  5 Jun 2021 19:58:57 +0000	[thread overview]
Message-ID: <20210605195857.30889-1-e@80x24.org> (raw)

---
 MANIFEST                       |  1 +
 lib/PublicInbox/LEI.pm         |  7 ++--
 lib/PublicInbox/LeiImport.pm   | 33 ++++++++++++-------
 lib/PublicInbox/LeiInput.pm    | 29 +++++++++++------
 lib/PublicInbox/LeiMailSync.pm | 10 ++++++
 lib/PublicInbox/LeiPmdir.pm    | 59 ++++++++++++++++++++++++++++++++++
 lib/PublicInbox/LeiTag.pm      |  2 +-
 lib/PublicInbox/MdirReader.pm  | 22 +++++++------
 t/lei-import-maildir.t         |  2 +-
 9 files changed, 130 insertions(+), 35 deletions(-)
 create mode 100644 lib/PublicInbox/LeiPmdir.pm

diff --git a/MANIFEST b/MANIFEST
index 5a70a144..7bdbf252 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -221,6 +221,7 @@ lib/PublicInbox/LeiMailSync.pm
 lib/PublicInbox/LeiMirror.pm
 lib/PublicInbox/LeiOverview.pm
 lib/PublicInbox/LeiP2q.pm
+lib/PublicInbox/LeiPmdir.pm
 lib/PublicInbox/LeiQuery.pm
 lib/PublicInbox/LeiRediff.pm
 lib/PublicInbox/LeiRemote.pm
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 4b1a6da7..ee9f1f80 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -421,7 +421,7 @@ my %CONFIG_KEYS = (
 	'leistore.dir' => 'top-level storage location',
 );
 
-my @WQ_KEYS = qw(lxs l2m wq1 ikw); # internal workers
+my @WQ_KEYS = qw(lxs l2m wq1 ikw pmd); # internal workers
 
 sub _drop_wq {
 	my ($self) = @_;
@@ -565,7 +565,7 @@ sub pkt_op_pair {
 }
 
 sub workers_start {
-	my ($lei, $wq, $jobs, $ops) = @_;
+	my ($lei, $wq, $jobs, $ops, $flds) = @_;
 	$ops = {
 		'!' => [ \&fail_handler, $lei ],
 		'|' => [ \&sigpipe_handler, $lei ],
@@ -576,7 +576,8 @@ sub workers_start {
 	$ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ];
 	my $end = $lei->pkt_op_pair;
 	my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
-	$wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
+	$flds->{lei} = $lei;
+	$wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
 	delete $lei->{pkt_op_p};
 	my $op_c = delete $lei->{pkt_op_c};
 	@$end = ();
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 60f3241b..14b98480 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -6,6 +6,7 @@ package PublicInbox::LeiImport;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
+use PublicInbox::InboxWritable qw(eml_from_path);
 
 # /^input_/ subs are used by (or override) PublicInbox::LeiInput superclass
 
@@ -28,17 +29,26 @@ sub input_mbox_cb { # MboxReader callback
 	input_eml_cb($self, $eml, $vmd);
 }
 
-sub input_maildir_cb { # maildir_each_eml cb
-	my ($f, $kw, $eml, $self) = @_;
+sub pmdir_cb { # called via wq_io_do from LeiPmdir->each_mdir_fn
+	my ($self, $f, @args) = @_;
+	my ($folder, $bn) = ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) or
+		die "BUG: $f was not from a Maildir?\n";
+	my $fl = PublicInbox::MdirReader::maildir_basename_flags($bn);
+	return if index($fl, 'T') >= 0; # no Trashed messages
+	my $kw = PublicInbox::MdirReader::flags2kw($fl);
+	substr($folder, 0, 0) = 'maildir:'; # add prefix
+	my $lms = $self->{-lms_ro};
+	my $oidbin = $lms ? $lms->name_oidbin($folder, $bn) : undef;
+	my @docids = defined($oidbin) ?
+			$self->{over}->oidbin_exists($oidbin) : ();
 	my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
-	if ($self->{-mail_sync}) {
-		if ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) { # ugh...
-			$vmd->{sync_info} = [ "maildir:$1", \(my $n = $2) ];
-		} else {
-			warn "E: $f was not from a Maildir?\n";
-		}
+	if (scalar @docids) {
+		$self->{lse}->kw_changed(undef, $kw, \@docids) or return;
+		$self->{sto}->ipc_do('set_eml_vmd', undef, $vmd, \@docids);
+	} elsif (my $eml = eml_from_path($f)) {
+		$vmd->{sync_info} = [ $folder, $bn ] if $self->{-mail_sync};
+		$self->input_eml_cb($eml, $vmd);
 	}
-	$self->input_eml_cb($eml, $vmd);
 }
 
 sub input_net_cb { # imap_each / nntp_each
@@ -62,10 +72,10 @@ sub do_import_index ($$@) {
 	my $vmd_mod = $self->vmd_mod_extract(\@inputs);
 	return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err};
 	$self->{all_vmd} = $vmd_mod if scalar keys %$vmd_mod;
-	$self->prepare_inputs($lei, \@inputs) or return;
+	$lei->ale; # initialize for workers to read (before LeiPmdir->new)
 	$self->{-mail_sync} = $lei->{opt}->{'mail-sync'} // 1;
+	$self->prepare_inputs($lei, \@inputs) or return;
 
-	$lei->ale; # initialize for workers to read
 	my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
 	my $ikw;
 	if (my $net = $lei->{net}) {
@@ -93,6 +103,7 @@ sub do_import_index ($$@) {
 	$lei->{wq1} = $self;
 	$lei->{-err_type} = 'non-fatal';
 	$ikw->wq_close(1) if $ikw;
+	$lei->{pmd}->wq_close(1) if $lei->{pmd};
 	net_merge_all_done($self) unless $lei->{auth};
 	$op_c->op_wait_event($ops);
 }
diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm
index 4ff7a379..f0c141c8 100644
--- a/lib/PublicInbox/LeiInput.pm
+++ b/lib/PublicInbox/LeiInput.pm
@@ -151,9 +151,16 @@ sub input_path_url {
 		return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir';
 $input appears to be a maildir, not $ifmt
 EOM
-		PublicInbox::MdirReader->new->maildir_each_eml($input,
-					$self->can('input_maildir_cb'),
-					$self, @args);
+		my $mdr = PublicInbox::MdirReader->new;
+		if (my $pmd = $self->{pmd}) {
+			$mdr->maildir_each_file($input,
+						$pmd->can('each_mdir_fn'),
+						$pmd, @args);
+		} else {
+			$mdr->maildir_each_eml($input,
+						$self->can('input_maildir_cb'),
+						$self, @args);
+		}
 	} else {
 		$lei->fail("$input unsupported (TODO)");
 	}
@@ -215,7 +222,7 @@ sub prepare_inputs { # returns undef on error
 		push @{$sync->{no}}, '/dev/stdin' if $sync;
 	}
 	my $net = $lei->{net}; # NetWriter may be created by l2m
-	my (@f, @d);
+	my (@f, @md);
 	# e.g. Maildir:/home/user/Mail/ or imaps://example.com/INBOX
 	for my $input (@$inputs) {
 		my $input_path = $input;
@@ -247,11 +254,11 @@ sub prepare_inputs { # returns undef on error
 				PublicInbox::MboxReader->reads($ifmt) or return
 					$lei->fail("$ifmt not supported");
 			} elsif (-d $input_path) {
-				require PublicInbox::MdirReader;
 				$ifmt eq 'maildir' or return
 					$lei->fail("$ifmt not supported");
 				$sync and $input = 'maildir:'.
 						$lei->abs_path($input_path);
+				push @md, $input;
 			} else {
 				return $lei->fail("Unable to handle $input");
 			}
@@ -266,21 +273,18 @@ $input is `eml', not --in-format=$in_fmt
 			if ($devfd >= 0 || -f $input || -p _) {
 				push @{$sync->{no}}, $input if $sync;
 				push @f, $input;
-			} elsif (-d $input) {
+			} elsif (-d "$input/new" && -d "$input/cur") {
 				if ($sync) {
 					$input = $lei->abs_path($input);
 					push @{$sync->{ok}}, $input;
 				}
-				push @d, $input;
+				push @md, $input;
 			} else {
 				return $lei->fail("Unable to handle $input")
 			}
 		}
 	}
 	if (@f) { check_input_format($lei, \@f) or return }
-	if (@d) { # TODO: check for MH vs Maildir, here
-		require PublicInbox::MdirReader;
-	}
 	if ($sync && $sync->{no}) {
 		return $lei->fail(<<"") if !$sync->{ok};
 --mail-sync specified but no inputs support it
@@ -299,6 +303,11 @@ $input is `eml', not --in-format=$in_fmt
 		$lei->{auth} //= PublicInbox::LeiAuth->new;
 		$lei->{net} //= $net;
 	}
+	if (scalar(@md)) {
+		require PublicInbox::MdirReader;
+		require PublicInbox::LeiPmdir;
+		$self->{pmd} = PublicInbox::LeiPmdir->new($lei, $self);
+	}
 	$self->{inputs} = $inputs;
 }
 
diff --git a/lib/PublicInbox/LeiMailSync.pm b/lib/PublicInbox/LeiMailSync.pm
index 75603d89..c87685a0 100644
--- a/lib/PublicInbox/LeiMailSync.pm
+++ b/lib/PublicInbox/LeiMailSync.pm
@@ -375,6 +375,16 @@ EOM
 	$sth->fetchrow_array;
 }
 
+sub name_oidbin ($$$) {
+	my ($self, $mdir, $nm) = @_;
+	my $fid = $self->{fmap}->{$mdir} //= fid_for($self, $mdir) // return;
+	my $sth = $self->{dbh}->prepare_cached(<<EOM, undef, 1);
+SELECT oidbin FROM blob2name WHERE fid = ? AND name = ?
+EOM
+	$sth->execute($fid, $nm);
+	$sth->fetchrow_array;
+}
+
 sub imap_oid {
 	my ($self, $lei, $uid_uri) = @_;
 	my $mailbox_uri = $uid_uri->clone;
diff --git a/lib/PublicInbox/LeiPmdir.pm b/lib/PublicInbox/LeiPmdir.pm
new file mode 100644
index 00000000..3f9c9e4a
--- /dev/null
+++ b/lib/PublicInbox/LeiPmdir.pm
@@ -0,0 +1,59 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# WQ worker for dealing with parallel Maildir reads;
+# this does NOT use the {shard_info} field of LeiToMail
+# (and we may remove {shard_info})
+# WQ key: {pmd}
+package PublicInbox::LeiPmdir;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+use PublicInbox::MdirReader;
+
+sub new {
+	my ($cls, $lei, $ipt) = @_;
+	my $self = bless { -wq_ident => 'lei Maildir worker' }, $cls;
+	my ($op_c, $ops) = $lei->workers_start($self, $self->detect_nproc,
+		undef, { ipt => $ipt }); # LeiInput subclass
+	$op_c->{ops} = $ops; # for PktOp->event_step
+	$lei->{pmd} = $self;
+}
+
+sub ipc_atfork_child {
+	my ($self) = @_;
+	my $lei = $self->{lei};
+	$lei->_lei_atfork_child;
+	my $ipt = $self->{ipt} // die 'BUG: no self->{ipt}';
+	$ipt->{lei} = $lei;
+	$ipt->{sto} = $lei->{sto} // die 'BUG: no lei->{sto}';
+	$ipt->{lse} = $ipt->{sto}->search;
+	$ipt->{over} = $ipt->{lse}->over;
+	$ipt->{-lms_ro} //= $ipt->{lse}->lms; # may be undef or '0'
+	$self->SUPER::ipc_atfork_child;
+}
+
+sub each_mdir_fn { # maildir_each_file callback
+	my ($f, $self, @args) = @_;
+	$self->wq_io_do('mdir_iter', [], $f, @args);
+}
+
+sub mdir_iter { # via wq_io_do
+	my ($self, $f, @args) = @_;
+	$self->{ipt}->pmdir_cb($f, @args);
+}
+
+sub pmd_done_wait {
+	my ($arg, $pid) = @_;
+	my ($self, $lei) = @$arg;
+	my $wait = $lei->{sto}->ipc_do('done');
+	$lei->can('wq_done_wait')->($arg, $pid);
+}
+
+sub _lei_wq_eof { # EOF callback for main lei daemon
+	my ($lei) = @_;
+	my $pmd = delete $lei->{pmd} or return $lei->fail;
+	$pmd->wq_wait_old(\&pmd_done_wait, $lei);
+}
+
+1;
diff --git a/lib/PublicInbox/LeiTag.pm b/lib/PublicInbox/LeiTag.pm
index b6abd533..0a8644be 100644
--- a/lib/PublicInbox/LeiTag.pm
+++ b/lib/PublicInbox/LeiTag.pm
@@ -117,6 +117,6 @@ sub _complete_tag {
 
 no warnings 'once'; # the following works even when LeiAuth is lazy-loaded
 *net_merge_all = \&PublicInbox::LeiAuth::net_merge_all;
-*net_merge_all_done = \&PublicInbox::LeiInput::input_only_net_merge_all_done;
+*net_merge_all_done = \&PublicInbox::LeiInput::parallel_net_merge_all_done;
 
 1;
diff --git a/lib/PublicInbox/MdirReader.pm b/lib/PublicInbox/MdirReader.pm
index 304be63d..484bf0a8 100644
--- a/lib/PublicInbox/MdirReader.pm
+++ b/lib/PublicInbox/MdirReader.pm
@@ -87,17 +87,21 @@ sub maildir_each_eml {
 sub new { bless {}, __PACKAGE__ }
 
 sub flags2kw ($) {
-	my @unknown;
-	my %kw;
-	for (split(//, $_[0])) {
-		my $k = $c2kw{$_};
-		if (defined($k)) {
-			$kw{$k} = 1;
-		} else {
-			push @unknown, $_;
+	if (wantarray) {
+		my @unknown;
+		my %kw;
+		for (split(//, $_[0])) {
+			my $k = $c2kw{$_};
+			if (defined($k)) {
+				$kw{$k} = 1;
+			} else {
+				push @unknown, $_;
+			}
 		}
+		(\%kw, \@unknown);
+	} else {
+		[ sort(map { $c2kw{$_} // () } split(//, $_[0])) ];
 	}
-	(\%kw, \@unknown);
 }
 
 1;
diff --git a/t/lei-import-maildir.t b/t/lei-import-maildir.t
index 688b10ce..c81e7805 100644
--- a/t/lei-import-maildir.t
+++ b/t/lei-import-maildir.t
@@ -28,7 +28,7 @@ test_lei(sub {
 	is(scalar(keys %v), 1, 'inspect handles relative and absolute paths');
 	my $inspect = json_utf8->decode([ keys %v ]->[0]);
 	is_deeply($inspect, {"maildir:$md" => { 'name.count' => 1 }},
-		'inspect maildir: path had expected output');
+		'inspect maildir: path had expected output') or xbail($inspect);
 
 	lei_ok(qw(q s:boolean));
 	my $res = json_utf8->decode($lei_out);

             reply	other threads:[~2021-06-05 19:59 UTC|newest]

Thread overview: 23+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-06-05 19:58 Eric Wong [this message]
  -- strict thread matches above, loose matches on Subject: below --
2021-10-27 20:16 [PATCH] wip Eric Wong
2021-04-05  7:42 Eric Wong
2021-03-08  7:11 Eric Wong
2021-01-21  4:24 [PATCH] WIP Eric Wong
2021-01-03 22:57 [PATCH] wip Eric Wong
2020-12-27 11:36 [PATCH] WIP Eric Wong
2020-11-15  7:35 [PATCH] wip Eric Wong
2020-04-23  4:27 Eric Wong
2020-04-20  7:14 Eric Wong
2020-01-13  9:24 [PATCH] WIP Eric Wong
2019-05-11 22:55 Eric Wong
2019-01-02  9:21 [PATCH] wip Eric Wong
2018-07-06 21:31 Eric Wong
2018-06-24 11:55 Eric Wong
2018-06-24  8:39 Eric Wong
2017-07-15  1:42 [PATCH] WIP Eric Wong
2017-04-12 20:17 [PATCH] wip Eric Wong
2017-04-05 18:40 Eric Wong
2016-08-23 20:07 Eric Wong
2016-08-18  2:16 Eric Wong
2016-06-26  3:46 Eric Wong
2015-12-22  0:15 Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210605195857.30889-1-e@80x24.org \
    --to=e@80x24.org \
    --cc=spew@80x24.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).