about summary refs log tree commit homepage
path: root/lib/PublicInbox/LeiStore.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/LeiStore.pm')
-rw-r--r--lib/PublicInbox/LeiStore.pm161
1 files changed, 116 insertions, 45 deletions
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm
index 66049dfe..b2da2bc3 100644
--- a/lib/PublicInbox/LeiStore.pm
+++ b/lib/PublicInbox/LeiStore.pm
@@ -1,4 +1,4 @@
-# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 #
 # Local storage (cache/memo) for lei(1), suitable for personal/private
@@ -27,12 +27,16 @@ use PublicInbox::MDA;
 use PublicInbox::Spawn qw(spawn);
 use PublicInbox::MdirReader;
 use PublicInbox::LeiToMail;
-use File::Temp ();
+use PublicInbox::Compat qw(uniqstr);
+use PublicInbox::OnDestroy;
+use File::Temp qw(tmpnam);
 use POSIX ();
 use IO::Handle (); # ->autoflush
 use Sys::Syslog qw(syslog openlog);
 use Errno qw(EEXIST ENOENT);
 use PublicInbox::Syscall qw(rename_noreplace);
+use PublicInbox::LeiStoreErr;
+use PublicInbox::DS qw(add_uniq_timer);
 
 sub new {
         my (undef, $dir, $opt) = @_;
@@ -77,7 +81,7 @@ sub importer {
                 delete $self->{im};
                 $im->done;
                 undef $im;
-                $self->checkpoint;
+                $self->barrier;
                 $max = $self->{priv_eidx}->{mg}->git_epochs + 1;
         }
         my (undef, $tl) = eidx_init($self); # acquire lock
@@ -107,17 +111,32 @@ sub search {
         PublicInbox::LeiSearch->new($_[0]->{priv_eidx}->{topdir});
 }
 
+sub cat_blob {
+        my ($self, $oid) = @_;
+        $self->{im} ? $self->{im}->cat_blob($oid) : undef;
+}
+
+sub schedule_commit {
+        my ($self, $sec) = @_;
+        add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&barrier, $self);
+}
+
 # follows the stderr file
 sub _tail_err {
         my ($self) = @_;
-        print { $self->{-err_wr} } readline($self->{-tmp_err});
+        my $err = $self->{-tmp_err} // return;
+        $err->clearerr; # clear EOF marker
+        my @msg = readline($err);
+        PublicInbox::LeiStoreErr::emit($self->{-err_wr}, @msg) and return;
+        # syslog is the last resort if lei-daemon broke
+        syslog('warning', '%s', $_) for @msg;
 }
 
 sub eidx_init {
         my ($self) = @_;
         my $eidx = $self->{priv_eidx};
         my $tl = wantarray && $self->{-err_wr} ?
-                        PublicInbox::OnDestroy->new($$, \&_tail_err, $self) :
+                        on_destroy(\&_tail_err, $self) :
                         undef;
         $eidx->idx_init({-private => 1}); # acquires lock
         wantarray ? ($eidx, $tl) : $eidx;
@@ -255,13 +274,13 @@ sub remove_eml_vmd { # remove just the VMD
 
 sub _lms_rw ($) { # it is important to have eidx processes open before lms
         my ($self) = @_;
-        my ($eidx, $tl) = eidx_init($self);
-        $self->{lms} //= do {
+        $self->{lms} // do {
                 require PublicInbox::LeiMailSync;
+                my ($eidx, $tl) = eidx_init($self);
                 my $f = "$self->{priv_eidx}->{topdir}/mail_sync.sqlite3";
                 my $lms = PublicInbox::LeiMailSync->new($f);
                 $lms->lms_write_prepare;
-                $lms;
+                $self->{lms} = $lms;
         };
 }
 
@@ -324,15 +343,55 @@ sub _add_vmd ($$$$) {
 sub _docids_and_maybe_kw ($$) {
         my ($self, $docids) = @_;
         return $docids unless wantarray;
-        my $kw = {};
+        my (@kw, $idx, @tmp);
         for my $num (@$docids) { # likely only 1, unless ContentHash changes
                 # can't use ->search->msg_keywords on uncommitted docs
-                my $idx = $self->{priv_eidx}->idx_shard($num);
-                my $tmp = eval { $idx->ipc_do('get_terms', 'K', $num) };
-                if ($@) { warn "#$num get_terms: $@" }
-                else { @$kw{keys %$tmp} = values(%$tmp) };
+                $idx = $self->{priv_eidx}->idx_shard($num);
+                @tmp = eval { $idx->ipc_do('get_terms', 'K', $num) };
+                $@ ? warn("#$num get_terms: $@") : push(@kw, @tmp);
+        }
+        @kw = sort(uniqstr(@kw)) if @$docids > 1;
+        ($docids, \@kw);
+}
+
+sub _reindex_1 { # git->cat_async callback
+        my ($bref, $hex, $type, $size, $smsg) = @_;
+        my $self = delete $smsg->{-sto};
+        my ($eidx, $tl) = eidx_init($self);
+        $bref //= _lms_rw($self)->local_blob($hex, 1);
+        if ($bref) {
+                my $eml = PublicInbox::Eml->new($bref);
+                $smsg->{-merge_vmd} = 1; # preserve existing keywords
+                $eidx->idx_shard($smsg->{num})->index_eml($eml, $smsg);
+        } elsif ($type eq 'missing') {
+                # pre-release/buggy lei may've indexed external-only msgs,
+                # try to correct that, here
+                warn("E: missing $hex, culling (ancient lei artifact?)\n");
+                $smsg->{to} = $smsg->{cc} = $smsg->{from} = '';
+                $smsg->{bytes} = 0;
+                $eidx->{oidx}->update_blob($smsg, '');
+                my $eml = PublicInbox::Eml->new("\r\n\r\n");
+                $eidx->idx_shard($smsg->{num})->index_eml($eml, $smsg);
+        } else {
+                warn("E: $type $hex\n");
         }
-        ($docids, [ sort keys %$kw ]);
+}
+
+sub reindex_art {
+        my ($self, $art) = @_;
+        my ($eidx, $tl) = eidx_init($self);
+        my $smsg = $eidx->{oidx}->get_art($art) // return;
+        return if $smsg->{bytes} == 0; # external-only message
+        $smsg->{-sto} = $self;
+        $eidx->git->cat_async($smsg->{blob} // die("no blob (#$art)"),
+                                \&_reindex_1, $smsg);
+}
+
+sub reindex_done {
+        my ($self) = @_;
+        my ($eidx, $tl) = eidx_init($self);
+        $eidx->git->async_wait_all;
+        # ->done to be called via sto_barrier_request
 }
 
 sub add_eml {
@@ -347,8 +406,14 @@ sub add_eml {
                 _lms_rw($self)->set_src($smsg->oidbin, @{$vmd->{sync_info}});
         }
         unless ($im_mark) { # duplicate blob returns undef
-                return unless wantarray;
+                return unless wantarray || $vmd;
                 my @docids = $oidx->blob_exists($smsg->{blob});
+                if ($vmd) {
+                        for my $docid (@docids) {
+                                my $idx = $eidx->idx_shard($docid);
+                                _add_vmd($self, $idx, $docid, $vmd);
+                        }
+                }
                 return _docids_and_maybe_kw $self, \@docids;
         }
 
@@ -506,13 +571,11 @@ sub set_xvmd {
         sto_export_kw($self, $smsg->{num}, $vmd);
 }
 
-sub checkpoint {
-        my ($self, $wait) = @_;
-        if (my $im = $self->{im}) {
-                $wait ? $im->barrier : $im->checkpoint;
-        }
-        delete $self->{lms};
-        $self->{priv_eidx}->checkpoint($wait);
+sub check_done {
+        my ($self) = @_;
+        $self->git->_active ?
+                add_uniq_timer("$self-check_done", 5, \&check_done, $self) :
+                done($self);
 }
 
 sub xchg_stderr {
@@ -520,32 +583,42 @@ sub xchg_stderr {
         _tail_err($self) if $self->{-err_wr};
         my $dir = $self->{priv_eidx}->{topdir};
         return unless -e $dir;
-        my $old = delete $self->{-tmp_err};
-        my $pfx = POSIX::strftime('%Y%m%d%H%M%S', gmtime(time));
-        my $err = File::Temp->new(TEMPLATE => "$pfx.$$.err-XXXX",
-                                SUFFIX => '.err', DIR => $dir);
-        open STDERR, '>>', $err->filename or die "dup2: $!";
+        delete $self->{-tmp_err};
+        my ($err, $name) = tmpnam();
+        open STDERR, '>>', $name or die "dup2: $!";
+        unlink($name);
         STDERR->autoflush(1); # shared with shard subprocesses
         $self->{-tmp_err} = $err; # separate file description for RO access
         undef;
 }
 
-sub done {
-        my ($self, $sock_ref) = @_;
-        my $err = '';
-        if (my $im = delete($self->{im})) {
-                eval { $im->done };
-                if ($@) {
-                        $err .= "import done: $@\n";
-                        warn $err;
-                }
+sub _commit ($$) {
+        my ($self, $cmd) = @_; # cmd is 'done' or 'barrier'
+        my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_barrier_request
+        my @err;
+        if ($self->{im}) {
+                eval { $self->{im}->$cmd };
+                push(@err, "E: import $cmd: $@\n") if $@;
         }
         delete $self->{lms};
-        $self->{priv_eidx}->done; # V2Writable::done
+        eval { $self->{priv_eidx}->$cmd };
+        push(@err, "E: priv_eidx $cmd: $@\n") if $@;
+        print { $errfh // \*STDERR } @err;
+        send($lei_sock, 'child_error 256', 0) if @err && $lei_sock;
         xchg_stderr($self);
-        die $err if $err;
+        die @err if @err;
+        # $lei_sock goes out-of-scope and script/lei can terminate
 }
 
+sub barrier {
+        my ($self) = @_;
+        _commit $self, 'barrier';
+        add_uniq_timer("$self-check_done", 5, \&check_done, $self);
+        undef;
+}
+
+sub done { _commit $_[0], 'done' }
+
 sub ipc_atfork_child {
         my ($self) = @_;
         my $lei = $self->{lei};
@@ -564,16 +637,15 @@ sub recv_and_run {
         $self->SUPER::recv_and_run(@args);
 }
 
-sub _sto_atexit { # dwaitpid callback
-        my ($args, $pid) = @_;
-        my $self = $args->[0];
+sub _sto_atexit { # awaitpid cb
+        my ($pid) = @_;
         warn "lei/store PID:$pid died \$?=$?\n" if $?;
 }
 
 sub write_prepare {
         my ($self, $lei) = @_;
         $lei // die 'BUG: $lei not passed';
-        unless ($self->{-ipc_req}) {
+        unless ($self->{-wq_s1}) {
                 my $dir = $lei->store_path;
                 substr($dir, -length('/lei/store'), 10, '');
                 pipe(my ($r, $w)) or die "pipe: $!";
@@ -581,13 +653,12 @@ sub write_prepare {
                 # Mail we import into lei are private, so headers filtered out
                 # by -mda for public mail are not appropriate
                 local @PublicInbox::MDA::BAD_HEADERS = ();
+                local $SIG{ALRM} = 'IGNORE';
                 $self->wq_workers_start("lei/store $dir", 1, $lei->oldset, {
                                         lei => $lei,
                                         -err_wr => $w,
                                         to_close => [ $r ],
-                                });
-                $self->wq_wait_async(\&_sto_atexit); # outlives $lei
-                require PublicInbox::LeiStoreErr;
+                                }, \&_sto_atexit);
                 PublicInbox::LeiStoreErr->new($r, $lei);
         }
         $lei->{sto} = $self;