diff options
author | Eric Wong <e@80x24.org> | 2021-09-19 12:50:22 +0000 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2021-09-19 19:52:45 +0000 |
commit | cbc2890cb89b81cb6b9e8fabf3f196d9a6110dce (patch) | |
tree | 5e8fc56e04b6b52d28c5853aca1bb5f9b98f36b7 /lib/PublicInbox/LeiToMail.pm | |
parent | 12775b5be53db1244b9cb32ae2ef90f105735e1b (diff) | |
download | public-inbox-cbc2890cb89b81cb6b9e8fabf3f196d9a6110dce.tar.gz |
This has several advantages: * no need to use ipc.lock to protect a pipe for non-atomic writes * ability to pass FDs. In another commit, this will let us simplify lei->sto_done_request and pass newly-created sockets to lei/store directly. disadvantages: - an extra pipe is required for rare messages over several hundred KB, this is probably a non-issue, though The performance delta is unknown, but I expect shards (which remain pipes) to be the primary bottleneck IPC-wise for lei/store.
Diffstat (limited to 'lib/PublicInbox/LeiToMail.pm')
-rw-r--r-- | lib/PublicInbox/LeiToMail.pm | 22 |
1 files changed, 12 insertions, 10 deletions
diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 9f7171fb..a419b83f 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -215,14 +215,14 @@ sub update_kw_maybe ($$$$) { my $c = $lse->kw_changed($eml, $kw, my $docids = []); my $vmd = { kw => $kw }; if (scalar @$docids) { # already in lei/store - $lei->{sto}->ipc_do('set_eml_vmd', undef, $vmd, $docids) if $c; + $lei->{sto}->wq_do('set_eml_vmd', undef, $vmd, $docids) if $c; } elsif (my $xoids = $lei->{ale}->xoids_for($eml)) { # it's in an external, only set kw, here - $lei->{sto}->ipc_do('set_xvmd', $xoids, $eml, $vmd); + $lei->{sto}->wq_do('set_xvmd', $xoids, $eml, $vmd); } else { # never-before-seen, import the whole thing # XXX this is critical in protecting against accidental # data loss without --augment - $lei->{sto}->ipc_do('set_eml', $eml, $vmd); + $lei->{sto}->wq_do('set_eml', $eml, $vmd); } } @@ -296,7 +296,7 @@ sub _maildir_write_cb ($$) { $lse->xsmsg_vmd($smsg) if $lse; my $n = _buf2maildir($dst, $bref // \($eml->as_string), $smsg, $dir); - $sto->ipc_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto; + $sto->wq_do('set_sync_info', $smsg->{blob}, $out, $n) if $sto; ++$lei->{-nr_write}; } } @@ -326,7 +326,7 @@ sub _imap_write_cb ($$) { } # imap_append returns UID if IMAP server has UIDPLUS extension ($sto && $uid =~ /\A[0-9]+\z/) and - $sto->ipc_do('set_sync_info', + $sto->wq_do('set_sync_info', $smsg->{blob}, $$uri, $uid + 0); ++$lei->{-nr_write}; } @@ -360,7 +360,7 @@ sub _v2_write_cb ($$) { my ($bref, $smsg, $eml) = @_; $eml //= PublicInbox::Eml->new($bref); return if $dedupe && $dedupe->is_dup($eml, $smsg); - $lei->{v2w}->ipc_do('add', $eml); # V2Writable->add + $lei->{v2w}->wq_do('add', $eml); # V2Writable->add ++$lei->{-nr_write}; } } @@ -658,9 +658,10 @@ sub _pre_augment_v2 { } PublicInbox::InboxWritable->new($ibx, @creat); $ibx->init_inbox if @creat; - my $v2w = $lei->{v2w} = $ibx->importer; - $v2w->ipc_lock_init("$dir/ipc.lock"); - $v2w->ipc_worker_spawn("lei/v2w $dir", $lei->oldset, { lei => $lei }); + my $v2w = $ibx->importer; + $v2w->{-wq_no_bcast} = 1; + $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei}); + $lei->{v2w} = $v2w; return if !$lei->{opt}->{shared}; my $d = "$lei->{ale}->{git}->{git_dir}/objects"; my $al = "$dir/git/0.git/objects/info/alternates"; @@ -689,7 +690,7 @@ sub do_augment { # slow, runs in wq worker sub post_augment { my ($self, $lei, @args) = @_; my $wait = $lei->{opt}->{'import-before'} ? - $lei->{sto}->ipc_do('checkpoint', 1) : 0; + $lei->{sto}->wq_do('checkpoint', 1) : 0; # _post_augment_mbox my $m = $self->can("_post_augment_$self->{base_type}") or return; $m->($self, $lei, @args); @@ -774,6 +775,7 @@ sub write_mail { # via ->wq_io_do sub wq_atexit_child { my ($self) = @_; + local $PublicInbox::DS::in_loop = 0; # waitpid synchronously my $lei = $self->{lei}; delete $self->{wcb}; $lei->{ale}->git->async_wait_all; |