From: Eric Wong <e@80x24.org>
To: spew@80x24.org
Subject: [PATCH] wip
Date: Sat, 5 Jun 2021 19:58:57 +0000 [thread overview]
Message-ID: <20210605195857.30889-1-e@80x24.org> (raw)
---
MANIFEST | 1 +
lib/PublicInbox/LEI.pm | 7 ++--
lib/PublicInbox/LeiImport.pm | 33 ++++++++++++-------
lib/PublicInbox/LeiInput.pm | 29 +++++++++++------
lib/PublicInbox/LeiMailSync.pm | 10 ++++++
lib/PublicInbox/LeiPmdir.pm | 59 ++++++++++++++++++++++++++++++++++
lib/PublicInbox/LeiTag.pm | 2 +-
lib/PublicInbox/MdirReader.pm | 22 +++++++------
t/lei-import-maildir.t | 2 +-
9 files changed, 130 insertions(+), 35 deletions(-)
create mode 100644 lib/PublicInbox/LeiPmdir.pm
diff --git a/MANIFEST b/MANIFEST
index 5a70a144..7bdbf252 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -221,6 +221,7 @@ lib/PublicInbox/LeiMailSync.pm
lib/PublicInbox/LeiMirror.pm
lib/PublicInbox/LeiOverview.pm
lib/PublicInbox/LeiP2q.pm
+lib/PublicInbox/LeiPmdir.pm
lib/PublicInbox/LeiQuery.pm
lib/PublicInbox/LeiRediff.pm
lib/PublicInbox/LeiRemote.pm
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index 4b1a6da7..ee9f1f80 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -421,7 +421,7 @@ my %CONFIG_KEYS = (
'leistore.dir' => 'top-level storage location',
);
-my @WQ_KEYS = qw(lxs l2m wq1 ikw); # internal workers
+my @WQ_KEYS = qw(lxs l2m wq1 ikw pmd); # internal workers
sub _drop_wq {
my ($self) = @_;
@@ -565,7 +565,7 @@ sub pkt_op_pair {
}
sub workers_start {
- my ($lei, $wq, $jobs, $ops) = @_;
+ my ($lei, $wq, $jobs, $ops, $flds) = @_;
$ops = {
'!' => [ \&fail_handler, $lei ],
'|' => [ \&sigpipe_handler, $lei ],
@@ -576,7 +576,8 @@ sub workers_start {
$ops->{''} //= [ $wq->can('_lei_wq_eof') || \&wq_eof, $lei ];
my $end = $lei->pkt_op_pair;
my $ident = $wq->{-wq_ident} // "lei-$lei->{cmd} worker";
- $wq->wq_workers_start($ident, $jobs, $lei->oldset, { lei => $lei });
+ $flds->{lei} = $lei;
+ $wq->wq_workers_start($ident, $jobs, $lei->oldset, $flds);
delete $lei->{pkt_op_p};
my $op_c = delete $lei->{pkt_op_c};
@$end = ();
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 60f3241b..14b98480 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -6,6 +6,7 @@ package PublicInbox::LeiImport;
use strict;
use v5.10.1;
use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
+use PublicInbox::InboxWritable qw(eml_from_path);
# /^input_/ subs are used by (or override) PublicInbox::LeiInput superclass
@@ -28,17 +29,26 @@ sub input_mbox_cb { # MboxReader callback
input_eml_cb($self, $eml, $vmd);
}
-sub input_maildir_cb { # maildir_each_eml cb
- my ($f, $kw, $eml, $self) = @_;
+sub pmdir_cb { # called via wq_io_do from LeiPmdir->each_mdir_fn
+ my ($self, $f, @args) = @_;
+ my ($folder, $bn) = ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) or
+ die "BUG: $f was not from a Maildir?\n";
+ my $fl = PublicInbox::MdirReader::maildir_basename_flags($bn);
+ return if index($fl, 'T') >= 0; # no Trashed messages
+ my $kw = PublicInbox::MdirReader::flags2kw($fl);
+ substr($folder, 0, 0) = 'maildir:'; # add prefix
+ my $lms = $self->{-lms_ro};
+ my $oidbin = $lms ? $lms->name_oidbin($folder, $bn) : undef;
+ my @docids = defined($oidbin) ?
+ $self->{over}->oidbin_exists($oidbin) : ();
my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
- if ($self->{-mail_sync}) {
- if ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) { # ugh...
- $vmd->{sync_info} = [ "maildir:$1", \(my $n = $2) ];
- } else {
- warn "E: $f was not from a Maildir?\n";
- }
+ if (scalar @docids) {
+ $self->{lse}->kw_changed(undef, $kw, \@docids) or return;
+ $self->{sto}->ipc_do('set_eml_vmd', undef, $vmd, \@docids);
+ } elsif (my $eml = eml_from_path($f)) {
+ $vmd->{sync_info} = [ $folder, $bn ] if $self->{-mail_sync};
+ $self->input_eml_cb($eml, $vmd);
}
- $self->input_eml_cb($eml, $vmd);
}
sub input_net_cb { # imap_each / nntp_each
@@ -62,10 +72,10 @@ sub do_import_index ($$@) {
my $vmd_mod = $self->vmd_mod_extract(\@inputs);
return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err};
$self->{all_vmd} = $vmd_mod if scalar keys %$vmd_mod;
- $self->prepare_inputs($lei, \@inputs) or return;
+ $lei->ale; # initialize for workers to read (before LeiPmdir->new)
$self->{-mail_sync} = $lei->{opt}->{'mail-sync'} // 1;
+ $self->prepare_inputs($lei, \@inputs) or return;
- $lei->ale; # initialize for workers to read
my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
my $ikw;
if (my $net = $lei->{net}) {
@@ -93,6 +103,7 @@ sub do_import_index ($$@) {
$lei->{wq1} = $self;
$lei->{-err_type} = 'non-fatal';
$ikw->wq_close(1) if $ikw;
+ $lei->{pmd}->wq_close(1) if $lei->{pmd};
net_merge_all_done($self) unless $lei->{auth};
$op_c->op_wait_event($ops);
}
diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm
index 4ff7a379..f0c141c8 100644
--- a/lib/PublicInbox/LeiInput.pm
+++ b/lib/PublicInbox/LeiInput.pm
@@ -151,9 +151,16 @@ sub input_path_url {
return $lei->fail(<<EOM) if $ifmt && $ifmt ne 'maildir';
$input appears to be a maildir, not $ifmt
EOM
- PublicInbox::MdirReader->new->maildir_each_eml($input,
- $self->can('input_maildir_cb'),
- $self, @args);
+ my $mdr = PublicInbox::MdirReader->new;
+ if (my $pmd = $self->{pmd}) {
+ $mdr->maildir_each_file($input,
+ $pmd->can('each_mdir_fn'),
+ $pmd, @args);
+ } else {
+ $mdr->maildir_each_eml($input,
+ $self->can('input_maildir_cb'),
+ $self, @args);
+ }
} else {
$lei->fail("$input unsupported (TODO)");
}
@@ -215,7 +222,7 @@ sub prepare_inputs { # returns undef on error
push @{$sync->{no}}, '/dev/stdin' if $sync;
}
my $net = $lei->{net}; # NetWriter may be created by l2m
- my (@f, @d);
+ my (@f, @md);
# e.g. Maildir:/home/user/Mail/ or imaps://example.com/INBOX
for my $input (@$inputs) {
my $input_path = $input;
@@ -247,11 +254,11 @@ sub prepare_inputs { # returns undef on error
PublicInbox::MboxReader->reads($ifmt) or return
$lei->fail("$ifmt not supported");
} elsif (-d $input_path) {
- require PublicInbox::MdirReader;
$ifmt eq 'maildir' or return
$lei->fail("$ifmt not supported");
$sync and $input = 'maildir:'.
$lei->abs_path($input_path);
+ push @md, $input;
} else {
return $lei->fail("Unable to handle $input");
}
@@ -266,21 +273,18 @@ $input is `eml', not --in-format=$in_fmt
if ($devfd >= 0 || -f $input || -p _) {
push @{$sync->{no}}, $input if $sync;
push @f, $input;
- } elsif (-d $input) {
+ } elsif (-d "$input/new" && -d "$input/cur") {
if ($sync) {
$input = $lei->abs_path($input);
push @{$sync->{ok}}, $input;
}
- push @d, $input;
+ push @md, $input;
} else {
return $lei->fail("Unable to handle $input")
}
}
}
if (@f) { check_input_format($lei, \@f) or return }
- if (@d) { # TODO: check for MH vs Maildir, here
- require PublicInbox::MdirReader;
- }
if ($sync && $sync->{no}) {
return $lei->fail(<<"") if !$sync->{ok};
--mail-sync specified but no inputs support it
@@ -299,6 +303,11 @@ $input is `eml', not --in-format=$in_fmt
$lei->{auth} //= PublicInbox::LeiAuth->new;
$lei->{net} //= $net;
}
+ if (scalar(@md)) {
+ require PublicInbox::MdirReader;
+ require PublicInbox::LeiPmdir;
+ $self->{pmd} = PublicInbox::LeiPmdir->new($lei, $self);
+ }
$self->{inputs} = $inputs;
}
diff --git a/lib/PublicInbox/LeiMailSync.pm b/lib/PublicInbox/LeiMailSync.pm
index 75603d89..c87685a0 100644
--- a/lib/PublicInbox/LeiMailSync.pm
+++ b/lib/PublicInbox/LeiMailSync.pm
@@ -375,6 +375,16 @@ EOM
$sth->fetchrow_array;
}
+sub name_oidbin ($$$) {
+ my ($self, $mdir, $nm) = @_;
+ my $fid = $self->{fmap}->{$mdir} //= fid_for($self, $mdir) // return;
+ my $sth = $self->{dbh}->prepare_cached(<<EOM, undef, 1);
+SELECT oidbin FROM blob2name WHERE fid = ? AND name = ?
+EOM
+ $sth->execute($fid, $nm);
+ $sth->fetchrow_array;
+}
+
sub imap_oid {
my ($self, $lei, $uid_uri) = @_;
my $mailbox_uri = $uid_uri->clone;
diff --git a/lib/PublicInbox/LeiPmdir.pm b/lib/PublicInbox/LeiPmdir.pm
new file mode 100644
index 00000000..3f9c9e4a
--- /dev/null
+++ b/lib/PublicInbox/LeiPmdir.pm
@@ -0,0 +1,59 @@
+# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# WQ worker for dealing with parallel Maildir reads;
+# this does NOT use the {shard_info} field of LeiToMail
+# (and we may remove {shard_info})
+# WQ key: {pmd}
+package PublicInbox::LeiPmdir;
+use strict;
+use v5.10.1;
+use parent qw(PublicInbox::IPC);
+use PublicInbox::MdirReader;
+
+sub new {
+ my ($cls, $lei, $ipt) = @_;
+ my $self = bless { -wq_ident => 'lei Maildir worker' }, $cls;
+ my ($op_c, $ops) = $lei->workers_start($self, $self->detect_nproc,
+ undef, { ipt => $ipt }); # LeiInput subclass
+ $op_c->{ops} = $ops; # for PktOp->event_step
+ $lei->{pmd} = $self;
+}
+
+sub ipc_atfork_child {
+ my ($self) = @_;
+ my $lei = $self->{lei};
+ $lei->_lei_atfork_child;
+ my $ipt = $self->{ipt} // die 'BUG: no self->{ipt}';
+ $ipt->{lei} = $lei;
+ $ipt->{sto} = $lei->{sto} // die 'BUG: no lei->{sto}';
+ $ipt->{lse} = $ipt->{sto}->search;
+ $ipt->{over} = $ipt->{lse}->over;
+ $ipt->{-lms_ro} //= $ipt->{lse}->lms; # may be undef or '0'
+ $self->SUPER::ipc_atfork_child;
+}
+
+sub each_mdir_fn { # maildir_each_file callback
+ my ($f, $self, @args) = @_;
+ $self->wq_io_do('mdir_iter', [], $f, @args);
+}
+
+sub mdir_iter { # via wq_io_do
+ my ($self, $f, @args) = @_;
+ $self->{ipt}->pmdir_cb($f, @args);
+}
+
+sub pmd_done_wait {
+ my ($arg, $pid) = @_;
+ my ($self, $lei) = @$arg;
+ my $wait = $lei->{sto}->ipc_do('done');
+ $lei->can('wq_done_wait')->($arg, $pid);
+}
+
+sub _lei_wq_eof { # EOF callback for main lei daemon
+ my ($lei) = @_;
+ my $pmd = delete $lei->{pmd} or return $lei->fail;
+ $pmd->wq_wait_old(\&pmd_done_wait, $lei);
+}
+
+1;
diff --git a/lib/PublicInbox/LeiTag.pm b/lib/PublicInbox/LeiTag.pm
index b6abd533..0a8644be 100644
--- a/lib/PublicInbox/LeiTag.pm
+++ b/lib/PublicInbox/LeiTag.pm
@@ -117,6 +117,6 @@ sub _complete_tag {
no warnings 'once'; # the following works even when LeiAuth is lazy-loaded
*net_merge_all = \&PublicInbox::LeiAuth::net_merge_all;
-*net_merge_all_done = \&PublicInbox::LeiInput::input_only_net_merge_all_done;
+*net_merge_all_done = \&PublicInbox::LeiInput::parallel_net_merge_all_done;
1;
diff --git a/lib/PublicInbox/MdirReader.pm b/lib/PublicInbox/MdirReader.pm
index 304be63d..484bf0a8 100644
--- a/lib/PublicInbox/MdirReader.pm
+++ b/lib/PublicInbox/MdirReader.pm
@@ -87,17 +87,21 @@ sub maildir_each_eml {
sub new { bless {}, __PACKAGE__ }
sub flags2kw ($) {
- my @unknown;
- my %kw;
- for (split(//, $_[0])) {
- my $k = $c2kw{$_};
- if (defined($k)) {
- $kw{$k} = 1;
- } else {
- push @unknown, $_;
+ if (wantarray) {
+ my @unknown;
+ my %kw;
+ for (split(//, $_[0])) {
+ my $k = $c2kw{$_};
+ if (defined($k)) {
+ $kw{$k} = 1;
+ } else {
+ push @unknown, $_;
+ }
}
+ (\%kw, \@unknown);
+ } else {
+ [ sort(map { $c2kw{$_} // () } split(//, $_[0])) ];
}
- (\%kw, \@unknown);
}
1;
diff --git a/t/lei-import-maildir.t b/t/lei-import-maildir.t
index 688b10ce..c81e7805 100644
--- a/t/lei-import-maildir.t
+++ b/t/lei-import-maildir.t
@@ -28,7 +28,7 @@ test_lei(sub {
is(scalar(keys %v), 1, 'inspect handles relative and absolute paths');
my $inspect = json_utf8->decode([ keys %v ]->[0]);
is_deeply($inspect, {"maildir:$md" => { 'name.count' => 1 }},
- 'inspect maildir: path had expected output');
+ 'inspect maildir: path had expected output') or xbail($inspect);
lei_ok(qw(q s:boolean));
my $res = json_utf8->decode($lei_out);
next reply other threads:[~2021-06-05 19:59 UTC|newest]
Thread overview: 23+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-06-05 19:58 Eric Wong [this message]
-- strict thread matches above, loose matches on Subject: below --
2021-10-27 20:16 [PATCH] wip Eric Wong
2021-04-05 7:42 Eric Wong
2021-03-08 7:11 Eric Wong
2021-01-21 4:24 [PATCH] WIP Eric Wong
2021-01-03 22:57 [PATCH] wip Eric Wong
2020-12-27 11:36 [PATCH] WIP Eric Wong
2020-11-15 7:35 [PATCH] wip Eric Wong
2020-04-23 4:27 Eric Wong
2020-04-20 7:14 Eric Wong
2020-01-13 9:24 [PATCH] WIP Eric Wong
2019-05-11 22:55 Eric Wong
2019-01-02 9:21 [PATCH] wip Eric Wong
2018-07-06 21:31 Eric Wong
2018-06-24 11:55 Eric Wong
2018-06-24 8:39 Eric Wong
2017-07-15 1:42 [PATCH] WIP Eric Wong
2017-04-12 20:17 [PATCH] wip Eric Wong
2017-04-05 18:40 Eric Wong
2016-08-23 20:07 Eric Wong
2016-08-18 2:16 Eric Wong
2016-06-26 3:46 Eric Wong
2015-12-22 0:15 Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20210605195857.30889-1-e@80x24.org \
--to=e@80x24.org \
--cc=spew@80x24.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).