about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/LeiConvert.pm7
-rw-r--r--lib/PublicInbox/LeiToMail.pm19
-rw-r--r--lib/PublicInbox/LeiXSearch.pm6
-rw-r--r--lib/PublicInbox/V2Writable.pm2
4 files changed, 12 insertions, 22 deletions
diff --git a/lib/PublicInbox/LeiConvert.pm b/lib/PublicInbox/LeiConvert.pm
index 4a1f8323..9d2479b0 100644
--- a/lib/PublicInbox/LeiConvert.pm
+++ b/lib/PublicInbox/LeiConvert.pm
@@ -35,12 +35,9 @@ sub process_inputs { # via wq_do
         my $lei = $self->{lei};
         delete $lei->{1};
         my $l2m = delete $lei->{l2m};
-        my $nr_w = delete($l2m->{-nr_write}) // 0;
         delete $self->{wcb}; # commit
-        if (my $v2w = delete $lei->{v2w}) {
-                $nr_w = $v2w->wq_do('done'); # may die
-                $v2w->wq_close;
-        }
+        if (my $v2w = delete $lei->{v2w}) { $v2w->done } # may die
+        my $nr_w = delete($l2m->{-nr_write}) // 0;
         my $d = (delete($l2m->{-nr_seen}) // 0) - $nr_w;
         $d = $d ? " ($d duplicates)" : '';
         $lei->qerr("# converted $nr_w messages$d");
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm
index 2d9b7061..0d62888d 100644
--- a/lib/PublicInbox/LeiToMail.pm
+++ b/lib/PublicInbox/LeiToMail.pm
@@ -369,12 +369,14 @@ sub _v2_write_cb ($$) {
         my ($self, $lei) = @_;
         my $dedupe = $lei->{dedupe};
         $dedupe->prepare_dedupe if $dedupe;
+        # only call in worker
+        $PublicInbox::Import::DROP_UNIQUE_UNSUB = $lei->{-drop_unique_unsub};
         sub { # for git_to_mail
                 my ($bref, $smsg, $eml) = @_;
                 $eml //= PublicInbox::Eml->new($bref);
                 ++$self->{-nr_seen};
                 return if $dedupe && $dedupe->is_dup($eml, $smsg);
-                $lei->{v2w}->wq_do('add', $eml); # V2Writable->add
+                $lei->{v2w}->add($eml) and ++$self->{-nr_write};
         }
 }
 
@@ -647,11 +649,6 @@ sub _do_augment_mbox {
         $dedupe->pause_dedupe if $dedupe;
 }
 
-sub v2w_done_wait { # awaitpid cb
-        my ($pid, $v2w, $lei) = @_;
-        $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?;
-}
-
 sub _pre_augment_v2 {
         my ($self, $lei) = @_;
         my $dir = $self->{dst};
@@ -677,11 +674,9 @@ sub _pre_augment_v2 {
                 $lei->x_it(shift);
                 die "E: can't write v2 inbox with broken config\n";
         });
+        $lei->{-drop_unique_unsub} = $PublicInbox::Import::DROP_UNIQUE_UNSUB;
         $ibx->init_inbox if @creat;
-        my $v2w = $ibx->importer;
-        $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei},
-                                \&v2w_done_wait, $lei);
-        $lei->{v2w} = $v2w;
+        $lei->{v2w} = $ibx->importer;
         return if !$lei->{opt}->{shared};
         my $d = "$lei->{ale}->{git}->{git_dir}/objects";
         open my $fh, '+>>', my $f = "$dir/git/0.git/objects/info/alternates";
@@ -806,6 +801,10 @@ sub wq_atexit_child {
         my $lei = $self->{lei};
         $lei->{ale}->git->async_wait_all;
         my ($nr_w, $nr_s) = delete(@$self{qw(-nr_write -nr_seen)});
+        if (my $v2w = delete $lei->{v2w}) {
+                eval { $v2w->done };
+                $lei->child_error($?, "E: $@ ($v2w->{ibx}->{inboxdir})") if $@;
+        }
         delete $self->{wcb};
         (($nr_w //= 0) + ($nr_s //= 0)) or return;
         return if $lei->{early_mua} || !$lei->{-progress} || !$lei->{pkt_op_p};
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 7eda6f9e..5e36c11a 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -391,11 +391,6 @@ sub query_done { # EOF callback for main daemon
         ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
                 warn "BUG: {sto} missing with --mail-sync";
         $lei->sto_done_request;
-        my $nr_w = delete($lei->{-nr_write}) // 0;
-        if (my $v2w = delete $lei->{v2w}) {
-                $nr_w = $v2w->wq_do('done'); # may die
-                $v2w->wq_close;
-        }
         $lei->{ovv}->ovv_end($lei);
         if ($l2m) { # close() calls LeiToMail reap_compress
                 if (my $out = delete $lei->{old_1}) {
@@ -413,6 +408,7 @@ Error closing $lei->{ovv}->{dst}: \$!=$! \$?=$?
                         delete $l2m->{mbl}; # drop dotlock
                 }
         }
+        my $nr_w = delete($lei->{-nr_write}) // 0;
         my $nr_dup = (delete($lei->{-nr_seen}) // 0) - $nr_w;
         if ($lei->{-progress}) {
                 my $tot = $lei->{-mset_total} // 0;
diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index 231ed516..fb259396 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -135,7 +135,6 @@ sub add {
         if (do_idx($self, $mime, $smsg)) {
                 $self->checkpoint;
         }
-        ++$self->{-nr_add}; # for lei convert
         $cmt;
 }
 
@@ -611,7 +610,6 @@ sub done {
         $self->lock_release(!!$nbytes) if $shards;
         $self->git->cleanup;
         die $err if $err;
-        delete $self->{-nr_add}; # for lei-convert
 }
 
 sub importer {