about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiImport.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-06-08 09:50:21 +0000
committerEric Wong <e@80x24.org>2021-06-08 16:50:47 +0000
commit10b523eb017162240b1ac3647f8dcbbf2be348a7 (patch)
tree9ea63ea4c4919556a1bf5b335f365372dfa1c84a /lib/PublicInbox/LeiImport.pm
parentba34a69490dce6ea3ba85ee5416b6590fa0c0a39 (diff)
downloadpublic-inbox-10b523eb017162240b1ac3647f8dcbbf2be348a7.tar.gz
On a 4-core CPU, this speeds up "lei import" on a largish
Maildir inbox with 75K messages from ~8 minutes down to ~40s.

Parallelizing alone did not bring any improvement and may
even hurt performance slightly, depending on CPU availability.
However, creating the index on the "fid" and "name" columns in
blob2name yields us the same speedup we got.

Parallelizing IMAP makes more sense due to the fact most IMAP
stores are non-local and subject to network latency.

Followup-to: bdecd7ed8e0dcf0b45491b947cd737ba8cfe38a3 ("lei import: speed up kw updates for old IMAP messages")
Diffstat (limited to 'lib/PublicInbox/LeiImport.pm')
-rw-r--r--lib/PublicInbox/LeiImport.pm36
1 files changed, 24 insertions, 12 deletions
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 222f75c8..b0e7ba6b 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -6,6 +6,7 @@ package PublicInbox::LeiImport;
 use strict;
 use v5.10.1;
 use parent qw(PublicInbox::IPC PublicInbox::LeiInput);
+use PublicInbox::InboxWritable qw(eml_from_path);
 
 # /^input_/ subs are used by (or override) PublicInbox::LeiInput superclass
 
@@ -28,17 +29,26 @@ sub input_mbox_cb { # MboxReader callback
         input_eml_cb($self, $eml, $vmd);
 }
 
-sub input_maildir_cb { # maildir_each_eml cb
-        my ($f, $kw, $eml, $self) = @_;
+sub pmdir_cb { # called via wq_io_do from LeiPmdir->each_mdir_fn
+        my ($self, $f, @args) = @_;
+        my ($folder, $bn) = ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) or
+                die "BUG: $f was not from a Maildir?\n";
+        my $fl = PublicInbox::MdirReader::maildir_basename_flags($bn);
+        return if index($fl, 'T') >= 0; # no Trashed messages
+        my $kw = PublicInbox::MdirReader::flags2kw($fl);
+        substr($folder, 0, 0) = 'maildir:'; # add prefix
+        my $lms = $self->{-lms_ro};
+        my $oidbin = $lms ? $lms->name_oidbin($folder, $bn) : undef;
+        my @docids = defined($oidbin) ?
+                        $self->{over}->oidbin_exists($oidbin) : ();
         my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
-        if ($self->{-mail_sync}) {
-                if ($f =~ m!\A(.+?)/(?:new|cur)/([^/]+)\z!) { # ugh...
-                        $vmd->{sync_info} = [ "maildir:$1", \(my $n = $2) ];
-                } else {
-                        warn "E: $f was not from a Maildir?\n";
-                }
+        if (scalar @docids) {
+                $self->{lse}->kw_changed(undef, $kw, \@docids) or return;
+        }
+        if (my $eml = eml_from_path($f)) {
+                $vmd->{sync_info} = [ $folder, \$bn ] if $self->{-mail_sync};
+                $self->input_eml_cb($eml, $vmd);
         }
-        $self->input_eml_cb($eml, $vmd);
 }
 
 sub input_net_cb { # imap_each / nntp_each
@@ -62,11 +72,13 @@ sub do_import_index ($$@) {
         my $vmd_mod = $self->vmd_mod_extract(\@inputs);
         return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err};
         $self->{all_vmd} = $vmd_mod if scalar keys %$vmd_mod;
-        $self->prepare_inputs($lei, \@inputs) or return;
+        $lei->ale; # initialize for workers to read (before LeiPmdir->new)
         $self->{-mail_sync} = $lei->{opt}->{'mail-sync'} // 1;
+        $self->prepare_inputs($lei, \@inputs) or return;
 
-        $lei->ale; # initialize for workers to read
-        my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
+        my $j = $lei->{opt}->{jobs} // 0;
+        $j =~ /\A([0-9]+),[0-9]+\z/ and $j = $1 + 0;
+        $j ||= scalar(@{$self->{inputs}}) || 1;
         my $ikw;
         if (my $net = $lei->{net}) {
                 # $j = $net->net_concurrency($j); TODO