about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiImport.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-06-03 01:05:20 +0000
committerEric Wong <e@80x24.org>2021-06-03 01:09:43 +0000
commitbdecd7ed8e0dcf0b45491b947cd737ba8cfe38a3 (patch)
tree33616d6248bf6b8d2a78d2a609f5ef8389b36b47 /lib/PublicInbox/LeiImport.pm
parent6ff03ba2be9247f1ead26c2524fadc789de558f1 (diff)
downloadpublic-inbox-bdecd7ed8e0dcf0b45491b947cd737ba8cfe38a3.tar.gz
On a 4-core CPU, this speeds up "lei import" on a largish IMAP
inbox with 75K messages from ~21 minutes down to 40s.

Parallelizing with the new LeiImportKw WQ worker class gives a
near-linear speedup and brought the runtime down to ~5:40.

The new idx_fid_uid index on the "fid" and "uid" columns of
blob2num in mail_sync.sqlite3 brought us the final speedup.

An additional index on over.sqlite3#xref3(oidbin) did not help,
since idx_nntp already exists and speeds up the new ->oidbin_exists
internal API.

I initially experimented with a separate "lei import-kw" command
but decided against it since it's useless outside of IMAP+JMAP
and would require extra cognitive overhead for both users and
hackers.  So LeiImportKw is just a WQ worker used by "lei import"
and not its own user-visible command.

v2: fix ikw_done_wait arg handling (ugh, confusing API :x)
Diffstat (limited to 'lib/PublicInbox/LeiImport.pm')
-rw-r--r--lib/PublicInbox/LeiImport.pm25
1 files changed, 12 insertions, 13 deletions
diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm
index 860a2c98..2efd4935 100644
--- a/lib/PublicInbox/LeiImport.pm
+++ b/lib/PublicInbox/LeiImport.pm
@@ -43,18 +43,14 @@ sub input_maildir_cb { # maildir_each_eml cb
 
 sub input_net_cb { # imap_each / nntp_each
         my ($uri, $uid, $kw, $eml, $self) = @_;
-        my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
-        $vmd->{sync_info} = [ $$uri, $uid ] if $self->{-mail_sync};
         if (defined $eml) {
+                my $vmd = $self->{-import_kw} ? { kw => $kw } : undef;
+                $vmd->{sync_info} = [ $$uri, $uid ] if $self->{-mail_sync};
                 $self->input_eml_cb($eml, $vmd);
-        } elsif ($vmd) { # old message, kw only
-                my $oid = $self->{-lms_ro}->imap_oid2($uri, $uid) // return;
-                my @docids = $self->{lse}->over->blob_exists($oid) or return;
-                $self->{lse}->kw_changed(undef, $kw, \@docids) or return;
-                my $lei = $self->{lei};
-                $lei->qerr("# $oid => @$kw\n") if $lei->{opt}->{verbose};
-                $self->{lei}->{sto}->ipc_do('set_eml_vmd', undef,
-                                                $vmd, \@docids);
+        } elsif (my $ikw = $self->{lei}->{ikw}) { # old message, kw only
+                # we send $uri as a bare SCALAR and not a URIimap ref to
+                # reduce socket traffic:
+                $ikw->wq_io_do('ck_update_kw', [], $$uri, $uid, $kw);
         }
 }
 
@@ -71,15 +67,17 @@ sub do_import_index ($$@) {
 
         $lei->ale; # initialize for workers to read
         my $j = $lei->{opt}->{jobs} // scalar(@{$self->{inputs}}) || 1;
+        my $ikw;
         if (my $net = $lei->{net}) {
                 # $j = $net->net_concurrency($j); TODO
                 if ($lei->{opt}->{incremental} // 1) {
                         $net->{incremental} = 1;
                         $net->{-lms_ro} = $sto->search->lms // 0;
-                        if ($self->{-import_kw}) {
+                        if ($self->{-import_kw} && $net->{-lms_ro} &&
+                                        $net->{imap_order}) {
+                                require PublicInbox::LeiImportKw;
+                                $ikw = PublicInbox::LeiImportKw->new($lei);
                                 $net->{each_old} = 1;
-                                $self->{-lms_ro} = $net->{-lms_ro};
-                                $self->{lse} = $sto->search;
                         }
                 }
         } else {
@@ -93,6 +91,7 @@ sub do_import_index ($$@) {
         (my $op_c, $ops) = $lei->workers_start($self, $j, $ops);
         $lei->{wq1} = $self;
         $lei->{-err_type} = 'non-fatal';
+        $ikw->wq_close(1) if $ikw;
         net_merge_all_done($self) unless $lei->{auth};
         $op_c->op_wait_event($ops);
 }