about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiToMail.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2023-11-15 09:21:44 +0000
committerEric Wong <e@80x24.org>2023-11-16 10:56:53 +0000
commit72e0fcd979719f62bce89a029875e15a4cb497bb (patch)
tree9bf0501ee02fdf41a76f282f212563937a7fc0a9 /lib/PublicInbox/LeiToMail.pm
parent2c472f1f571ae55155c78bcbc4d420d06266ba63 (diff)
downloadpublic-inbox-72e0fcd979719f62bce89a029875e15a4cb497bb.tar.gz
We've always forced LeiToMail to only have one process for v2
outputs anyways since v2 has its own sharding and IPC.  Thus we
can use the single LeiToMail process directly to avoid extra IPC
overhead.
Diffstat (limited to 'lib/PublicInbox/LeiToMail.pm')
-rw-r--r--lib/PublicInbox/LeiToMail.pm19
1 files changed, 9 insertions, 10 deletions
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 2d9b7061..0d62888d 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -369,12 +369,14 @@ sub _v2_write_cb ($$) {
         my ($self, $lei) = @_;
         my $dedupe = $lei->{dedupe};
         $dedupe->prepare_dedupe if $dedupe;
+        # only call in worker
+        $PublicInbox::Import::DROP_UNIQUE_UNSUB = $lei->{-drop_unique_unsub};
         sub { # for git_to_mail
                 my ($bref, $smsg, $eml) = @_;
                 $eml //= PublicInbox::Eml->new($bref);
                 ++$self->{-nr_seen};
                 return if $dedupe && $dedupe->is_dup($eml, $smsg);
-                $lei->{v2w}->wq_do('add', $eml); # V2Writable->add
+                $lei->{v2w}->add($eml) and ++$self->{-nr_write};
         }
 }
 
@@ -647,11 +649,6 @@ sub _do_augment_mbox {
         $dedupe->pause_dedupe if $dedupe;
 }
 
-sub v2w_done_wait { # awaitpid cb
-        my ($pid, $v2w, $lei) = @_;
-        $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?;
-}
-
 sub _pre_augment_v2 {
         my ($self, $lei) = @_;
         my $dir = $self->{dst};
@@ -677,11 +674,9 @@ sub _pre_augment_v2 {
                 $lei->x_it(shift);
                 die "E: can't write v2 inbox with broken config\n";
         });
+        $lei->{-drop_unique_unsub} = $PublicInbox::Import::DROP_UNIQUE_UNSUB;
         $ibx->init_inbox if @creat;
-        my $v2w = $ibx->importer;
-        $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei},
-                                \&v2w_done_wait, $lei);
-        $lei->{v2w} = $v2w;
+        $lei->{v2w} = $ibx->importer;
         return if !$lei->{opt}->{shared};
         my $d = "$lei->{ale}->{git}->{git_dir}/objects";
         open my $fh, '+>>', my $f = "$dir/git/0.git/objects/info/alternates";
@@ -806,6 +801,10 @@ sub wq_atexit_child {
         my $lei = $self->{lei};
         $lei->{ale}->git->async_wait_all;
         my ($nr_w, $nr_s) = delete(@$self{qw(-nr_write -nr_seen)});
+        if (my $v2w = delete $lei->{v2w}) {
+                eval { $v2w->done };
+                $lei->child_error($?, "E: $@ ($v2w->{ibx}->{inboxdir})") if $@;
+        }
         delete $self->{wcb};
         (($nr_w //= 0) + ($nr_s //= 0)) or return;
         return if $lei->{early_mua} || !$lei->{-progress} || !$lei->{pkt_op_p};