diff options
Diffstat (limited to 'lib/PublicInbox/LeiStore.pm')
-rw-r--r-- | lib/PublicInbox/LeiStore.pm | 161 |
1 files changed, 116 insertions, 45 deletions
diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index 66049dfe..b2da2bc3 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org> +# Copyright (C) all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> # # Local storage (cache/memo) for lei(1), suitable for personal/private @@ -27,12 +27,16 @@ use PublicInbox::MDA; use PublicInbox::Spawn qw(spawn); use PublicInbox::MdirReader; use PublicInbox::LeiToMail; -use File::Temp (); +use PublicInbox::Compat qw(uniqstr); +use PublicInbox::OnDestroy; +use File::Temp qw(tmpnam); use POSIX (); use IO::Handle (); # ->autoflush use Sys::Syslog qw(syslog openlog); use Errno qw(EEXIST ENOENT); use PublicInbox::Syscall qw(rename_noreplace); +use PublicInbox::LeiStoreErr; +use PublicInbox::DS qw(add_uniq_timer); sub new { my (undef, $dir, $opt) = @_; @@ -77,7 +81,7 @@ sub importer { delete $self->{im}; $im->done; undef $im; - $self->checkpoint; + $self->barrier; $max = $self->{priv_eidx}->{mg}->git_epochs + 1; } my (undef, $tl) = eidx_init($self); # acquire lock @@ -107,17 +111,32 @@ sub search { PublicInbox::LeiSearch->new($_[0]->{priv_eidx}->{topdir}); } +sub cat_blob { + my ($self, $oid) = @_; + $self->{im} ? $self->{im}->cat_blob($oid) : undef; +} + +sub schedule_commit { + my ($self, $sec) = @_; + add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&barrier, $self); +} + # follows the stderr file sub _tail_err { my ($self) = @_; - print { $self->{-err_wr} } readline($self->{-tmp_err}); + my $err = $self->{-tmp_err} // return; + $err->clearerr; # clear EOF marker + my @msg = readline($err); + PublicInbox::LeiStoreErr::emit($self->{-err_wr}, @msg) and return; + # syslog is the last resort if lei-daemon broke + syslog('warning', '%s', $_) for @msg; } sub eidx_init { my ($self) = @_; my $eidx = $self->{priv_eidx}; my $tl = wantarray && $self->{-err_wr} ? - PublicInbox::OnDestroy->new($$, \&_tail_err, $self) : + on_destroy(\&_tail_err, $self) : undef; $eidx->idx_init({-private => 1}); # acquires lock wantarray ? ($eidx, $tl) : $eidx; @@ -255,13 +274,13 @@ sub remove_eml_vmd { # remove just the VMD sub _lms_rw ($) { # it is important to have eidx processes open before lms my ($self) = @_; - my ($eidx, $tl) = eidx_init($self); - $self->{lms} //= do { + $self->{lms} // do { require PublicInbox::LeiMailSync; + my ($eidx, $tl) = eidx_init($self); my $f = "$self->{priv_eidx}->{topdir}/mail_sync.sqlite3"; my $lms = PublicInbox::LeiMailSync->new($f); $lms->lms_write_prepare; - $lms; + $self->{lms} = $lms; }; } @@ -324,15 +343,55 @@ sub _add_vmd ($$$$) { sub _docids_and_maybe_kw ($$) { my ($self, $docids) = @_; return $docids unless wantarray; - my $kw = {}; + my (@kw, $idx, @tmp); for my $num (@$docids) { # likely only 1, unless ContentHash changes # can't use ->search->msg_keywords on uncommitted docs - my $idx = $self->{priv_eidx}->idx_shard($num); - my $tmp = eval { $idx->ipc_do('get_terms', 'K', $num) }; - if ($@) { warn "#$num get_terms: $@" } - else { @$kw{keys %$tmp} = values(%$tmp) }; + $idx = $self->{priv_eidx}->idx_shard($num); + @tmp = eval { $idx->ipc_do('get_terms', 'K', $num) }; + $@ ? warn("#$num get_terms: $@") : push(@kw, @tmp); + } + @kw = sort(uniqstr(@kw)) if @$docids > 1; + ($docids, \@kw); +} + +sub _reindex_1 { # git->cat_async callback + my ($bref, $hex, $type, $size, $smsg) = @_; + my $self = delete $smsg->{-sto}; + my ($eidx, $tl) = eidx_init($self); + $bref //= _lms_rw($self)->local_blob($hex, 1); + if ($bref) { + my $eml = PublicInbox::Eml->new($bref); + $smsg->{-merge_vmd} = 1; # preserve existing keywords + $eidx->idx_shard($smsg->{num})->index_eml($eml, $smsg); + } elsif ($type eq 'missing') { + # pre-release/buggy lei may've indexed external-only msgs, + # try to correct that, here + warn("E: missing $hex, culling (ancient lei artifact?)\n"); + $smsg->{to} = $smsg->{cc} = $smsg->{from} = ''; + $smsg->{bytes} = 0; + $eidx->{oidx}->update_blob($smsg, ''); + my $eml = PublicInbox::Eml->new("\r\n\r\n"); + $eidx->idx_shard($smsg->{num})->index_eml($eml, $smsg); + } else { + warn("E: $type $hex\n"); } - ($docids, [ sort keys %$kw ]); +} + +sub reindex_art { + my ($self, $art) = @_; + my ($eidx, $tl) = eidx_init($self); + my $smsg = $eidx->{oidx}->get_art($art) // return; + return if $smsg->{bytes} == 0; # external-only message + $smsg->{-sto} = $self; + $eidx->git->cat_async($smsg->{blob} // die("no blob (#$art)"), + \&_reindex_1, $smsg); +} + +sub reindex_done { + my ($self) = @_; + my ($eidx, $tl) = eidx_init($self); + $eidx->git->async_wait_all; + # ->done to be called via sto_barrier_request } sub add_eml { @@ -347,8 +406,14 @@ sub add_eml { _lms_rw($self)->set_src($smsg->oidbin, @{$vmd->{sync_info}}); } unless ($im_mark) { # duplicate blob returns undef - return unless wantarray; + return unless wantarray || $vmd; my @docids = $oidx->blob_exists($smsg->{blob}); + if ($vmd) { + for my $docid (@docids) { + my $idx = $eidx->idx_shard($docid); + _add_vmd($self, $idx, $docid, $vmd); + } + } return _docids_and_maybe_kw $self, \@docids; } @@ -506,13 +571,11 @@ sub set_xvmd { sto_export_kw($self, $smsg->{num}, $vmd); } -sub checkpoint { - my ($self, $wait) = @_; - if (my $im = $self->{im}) { - $wait ? $im->barrier : $im->checkpoint; - } - delete $self->{lms}; - $self->{priv_eidx}->checkpoint($wait); +sub check_done { + my ($self) = @_; + $self->git->_active ? + add_uniq_timer("$self-check_done", 5, \&check_done, $self) : + done($self); } sub xchg_stderr { @@ -520,32 +583,42 @@ sub xchg_stderr { _tail_err($self) if $self->{-err_wr}; my $dir = $self->{priv_eidx}->{topdir}; return unless -e $dir; - my $old = delete $self->{-tmp_err}; - my $pfx = POSIX::strftime('%Y%m%d%H%M%S', gmtime(time)); - my $err = File::Temp->new(TEMPLATE => "$pfx.$$.err-XXXX", - SUFFIX => '.err', DIR => $dir); - open STDERR, '>>', $err->filename or die "dup2: $!"; + delete $self->{-tmp_err}; + my ($err, $name) = tmpnam(); + open STDERR, '>>', $name or die "dup2: $!"; + unlink($name); STDERR->autoflush(1); # shared with shard subprocesses $self->{-tmp_err} = $err; # separate file description for RO access undef; } -sub done { - my ($self, $sock_ref) = @_; - my $err = ''; - if (my $im = delete($self->{im})) { - eval { $im->done }; - if ($@) { - $err .= "import done: $@\n"; - warn $err; - } +sub _commit ($$) { + my ($self, $cmd) = @_; # cmd is 'done' or 'barrier' + my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_barrier_request + my @err; + if ($self->{im}) { + eval { $self->{im}->$cmd }; + push(@err, "E: import $cmd: $@\n") if $@; } delete $self->{lms}; - $self->{priv_eidx}->done; # V2Writable::done + eval { $self->{priv_eidx}->$cmd }; + push(@err, "E: priv_eidx $cmd: $@\n") if $@; + print { $errfh // \*STDERR } @err; + send($lei_sock, 'child_error 256', 0) if @err && $lei_sock; xchg_stderr($self); - die $err if $err; + die @err if @err; + # $lei_sock goes out-of-scope and script/lei can terminate } +sub barrier { + my ($self) = @_; + _commit $self, 'barrier'; + add_uniq_timer("$self-check_done", 5, \&check_done, $self); + undef; +} + +sub done { _commit $_[0], 'done' } + sub ipc_atfork_child { my ($self) = @_; my $lei = $self->{lei}; @@ -564,16 +637,15 @@ sub recv_and_run { $self->SUPER::recv_and_run(@args); } -sub _sto_atexit { # dwaitpid callback - my ($args, $pid) = @_; - my $self = $args->[0]; +sub _sto_atexit { # awaitpid cb + my ($pid) = @_; warn "lei/store PID:$pid died \$?=$?\n" if $?; } sub write_prepare { my ($self, $lei) = @_; $lei // die 'BUG: $lei not passed'; - unless ($self->{-ipc_req}) { + unless ($self->{-wq_s1}) { my $dir = $lei->store_path; substr($dir, -length('/lei/store'), 10, ''); pipe(my ($r, $w)) or die "pipe: $!"; @@ -581,13 +653,12 @@ sub write_prepare { # Mail we import into lei are private, so headers filtered out # by -mda for public mail are not appropriate local @PublicInbox::MDA::BAD_HEADERS = (); + local $SIG{ALRM} = 'IGNORE'; $self->wq_workers_start("lei/store $dir", 1, $lei->oldset, { lei => $lei, -err_wr => $w, to_close => [ $r ], - }); - $self->wq_wait_async(\&_sto_atexit); # outlives $lei - require PublicInbox::LeiStoreErr; + }, \&_sto_atexit); PublicInbox::LeiStoreErr->new($r, $lei); } $lei->{sto} = $self; |