diff options
-rw-r--r-- | MANIFEST | 3 | ||||
-rw-r--r-- | lib/PublicInbox/FakeImport.pm | 23 | ||||
-rw-r--r-- | lib/PublicInbox/Import.pm | 26 | ||||
-rw-r--r-- | lib/PublicInbox/LeiBlob.pm | 38 | ||||
-rw-r--r-- | lib/PublicInbox/LeiImport.pm | 15 | ||||
-rw-r--r-- | lib/PublicInbox/LeiIndex.pm | 48 | ||||
-rw-r--r-- | lib/PublicInbox/LeiInput.pm | 30 | ||||
-rw-r--r-- | lib/PublicInbox/LeiMailSync.pm | 27 | ||||
-rw-r--r-- | lib/PublicInbox/LeiStore.pm | 10 | ||||
-rw-r--r-- | lib/PublicInbox/LeiToMail.pm | 10 | ||||
-rw-r--r-- | lib/PublicInbox/OverIdx.pm | 18 | ||||
-rw-r--r-- | t/lei-index.t | 58 |
12 files changed, 253 insertions, 53 deletions
@@ -148,6 +148,7 @@ lib/PublicInbox/EmlContentFoo.pm lib/PublicInbox/ExtMsg.pm lib/PublicInbox/ExtSearch.pm lib/PublicInbox/ExtSearchIdx.pm +lib/PublicInbox/FakeImport.pm lib/PublicInbox/FakeInotify.pm lib/PublicInbox/Feed.pm lib/PublicInbox/Filter/Base.pm @@ -197,6 +198,7 @@ lib/PublicInbox/LeiExternal.pm lib/PublicInbox/LeiForgetSearch.pm lib/PublicInbox/LeiHelp.pm lib/PublicInbox/LeiImport.pm +lib/PublicInbox/LeiIndex.pm lib/PublicInbox/LeiInit.pm lib/PublicInbox/LeiInput.pm lib/PublicInbox/LeiInspect.pm @@ -403,6 +405,7 @@ t/lei-import-imap.t t/lei-import-maildir.t t/lei-import-nntp.t t/lei-import.t +t/lei-index.t t/lei-lcat.t t/lei-mirror.t t/lei-p2q.t diff --git a/lib/PublicInbox/FakeImport.pm b/lib/PublicInbox/FakeImport.pm new file mode 100644 index 00000000..dea25cbe --- /dev/null +++ b/lib/PublicInbox/FakeImport.pm @@ -0,0 +1,23 @@ +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# pretend to do PublicInbox::Import::add for "lei index" +package PublicInbox::FakeImport; +use strict; +use PublicInbox::ContentHash qw(git_sha); + +sub new { bless { bytes_added => 0 }, __PACKAGE__ } + +sub add { + my ($self, $eml, $check_cb, $smsg) = @_; + $smsg->populate($eml); + my $raw = $eml->as_string; + $smsg->{blob} = git_sha(1, \$raw)->hexdigest; + $smsg->set_bytes($raw, length($raw)); + if (my $oidx = delete $smsg->{-oidx}) { # used by LeiStore + $oidx->vivify_xvmd($smsg) or return; + } + 1; +} + +1; diff --git a/lib/PublicInbox/Import.pm b/lib/PublicInbox/Import.pm index 3adf9dec..362cdc47 100644 --- a/lib/PublicInbox/Import.pm +++ b/lib/PublicInbox/Import.pm @@ -413,19 +413,19 @@ sub add { $smsg->{blob} = $self->get_mark(":$blob"); $smsg->set_bytes($raw_email, $n); if (my $oidx = delete $smsg->{-oidx}) { # used by LeiStore - my @docids = $oidx->blob_exists($smsg->{blob}); - my @vivify_xvmd; - for my $id (@docids) { - if (my $cur = $oidx->get_art($id)) { - # already imported if bytes > 0 - return if $cur->{bytes} > 0; - push @vivify_xvmd, $id; - } else { - warn "W: $smsg->{blob} ", - "#$id gone (bug?)\n"; - } - } - $smsg->{-vivify_xvmd} = \@vivify_xvmd; + my $eidx_git = delete $smsg->{-eidx_git}; + + # we need this sharedkv to dedupe blobs added in the + # same fast-import transaction + my $u = $self->{uniq_skv} //= do { + require PublicInbox::SharedKV; + my $x = PublicInbox::SharedKV->new; + $x->dbh; + $x; + }; + return if !$u->set_maybe(pack('H*', $smsg->{blob}), 1); + return if (!$oidx->vivify_xvmd($smsg) && + $eidx_git->check($smsg->{blob})); } } my $ref = $self->{ref}; diff --git a/lib/PublicInbox/LeiBlob.pm b/lib/PublicInbox/LeiBlob.pm index 710430a2..8de86565 100644 --- a/lib/PublicInbox/LeiBlob.pm +++ b/lib/PublicInbox/LeiBlob.pm @@ -87,6 +87,16 @@ sub cat_attach_i { # Eml->each_part callback $lei->out($part->body); } +sub extract_attach ($$$) { + my ($lei, $blob, $bref) = @_; + my $eml = PublicInbox::Eml->new($bref); + $eml->each_part(\&cat_attach_i, $lei, 1); + my $idx = delete $lei->{-attach_idx}; + defined($idx) and return $lei->fail(<<EOM); +E: attachment $idx not found in $blob +EOM +} + sub lei_blob { my ($lei, $blob) = @_; $lei->start_pager if -t $lei->{1}; @@ -106,7 +116,7 @@ sub lei_blob { } my $rdr = {}; if ($opt->{mail}) { - $rdr->{2} = $lei->{2}; + open $rdr->{2}, '+>', undef or die "open: $!"; } else { open $rdr->{2}, '>', '/dev/null' or die "open: $!"; } @@ -115,21 +125,25 @@ sub lei_blob { if (defined $lei->{-attach_idx}) { my $fh = popen_rd($cmd, $lei->{env}, $rdr); require PublicInbox::Eml; - my $str = do { local $/; <$fh> }; - if (close $fh) { - my $eml = PublicInbox::Eml->new(\$str); - $eml->each_part(\&cat_attach_i, $lei, 1); - my $idx = delete $lei->{-attach_idx}; - defined($idx) and return $lei->fail(<<EOM); -E: attachment $idx not found in $blob -EOM - } + my $buf = do { local $/; <$fh> }; + return extract_attach($lei, $blob, \$buf) if close($fh); } else { $rdr->{1} = $lei->{1}; waitpid(spawn($cmd, $lei->{env}, $rdr), 0); } - return if $? == 0; - return $lei->child_error($?) if $opt->{mail}; + my $ce = $?; + return if $ce == 0; + my $sto = $lei->_lei_store; + my $lms = $sto ? $sto->search->lms : undef; + if (my $bref = $lms ? $lms->local_blob($blob, 1) : undef) { + defined($lei->{-attach_idx}) and + return extract_attach($lei, $blob, $bref); + return $lei->out($$bref); + } elsif ($opt->{mail}) { + my $eh = $rdr->{2}; + seek($eh, 0, 0); + return $lei->child_error($ce, do { local $/; <$eh> }); + } # else: fall through to solver below } # maybe it's a non-email (code) blob from a coderepo diff --git a/lib/PublicInbox/LeiImport.pm b/lib/PublicInbox/LeiImport.pm index 6a57df47..55925cc5 100644 --- a/lib/PublicInbox/LeiImport.pm +++ b/lib/PublicInbox/LeiImport.pm @@ -38,21 +38,20 @@ sub input_maildir_cb { # maildir_each_eml cb warn "E: $f was not from a Maildir?\n"; } } - input_eml_cb($self, $eml, $vmd); + $self->input_eml_cb($eml, $vmd); } sub input_net_cb { # imap_each / nntp_each my ($url, $uid, $kw, $eml, $self) = @_; my $vmd = $self->{-import_kw} ? { kw => $kw } : undef; $vmd->{sync_info} = [ $url, $uid ] if $self->{-mail_sync}; - input_eml_cb($self, $eml, $vmd); + $self->input_eml_cb($eml, $vmd); } -sub lei_import { # the main "lei import" method - my ($lei, @inputs) = @_; +sub do_import_index ($$@) { + my ($self, $lei, @inputs) = @_; my $sto = $lei->_lei_store(1); $sto->write_prepare($lei); - my $self = bless {}, __PACKAGE__; $self->{-import_kw} = $lei->{opt}->{kw} // 1; my $vmd_mod = $self->vmd_mod_extract(\@inputs); return $lei->fail(join("\n", @{$vmd_mod->{err}})) if $vmd_mod->{err}; @@ -83,6 +82,12 @@ sub lei_import { # the main "lei import" method $op_c->op_wait_event($ops); } +sub lei_import { # the main "lei import" method + my ($lei, @inputs) = @_; + my $self = bless {}, __PACKAGE__; + do_import_index($self, $lei, @inputs); +} + sub _complete_import { my ($lei, @argv) = @_; my $sto = $lei->_lei_store or return; diff --git a/lib/PublicInbox/LeiIndex.pm b/lib/PublicInbox/LeiIndex.pm new file mode 100644 index 00000000..cc3e83e7 --- /dev/null +++ b/lib/PublicInbox/LeiIndex.pm @@ -0,0 +1,48 @@ +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# front-end for the "lei index" sub-command, this is similar to +# "lei import" but doesn't put a git blob into ~/.local/share/lei/store +package PublicInbox::LeiIndex; +use strict; +use v5.10.1; +use parent qw(PublicInbox::IPC PublicInbox::LeiInput); +use PublicInbox::LeiImport; + +# /^input_/ subs are used by (or override) PublicInbox::LeiInput superclass +sub input_eml_cb { # used by input_maildir_cb and input_net_cb + my ($self, $eml, $vmd) = @_; + my $xoids = $self->{lei}->{ale}->xoids_for($eml); + if (my $all_vmd = $self->{all_vmd}) { + @$vmd{keys %$all_vmd} = values %$all_vmd; + } + $self->{lei}->{sto}->ipc_do('index_eml_only', $eml, $vmd, $xoids); +} + +sub input_fh { # overrides PublicInbox::LeiInput::input_fh + my ($self, $ifmt, $fh, $input, @args) = @_; + $self->{lei}->child_error(1<<8, <<EOM); +$input ($ifmt) not yet supported, try `lei import' +EOM +} + +sub lei_index { + my ($lei, @argv) = @_; + $lei->{opt}->{'mail-sync'} = 1; + my $self = bless {}, __PACKAGE__; + PublicInbox::LeiImport::do_import_index($self, $lei, @argv); +} + +no warnings 'once'; +no strict 'refs'; +for my $m (qw(input_maildir_cb input_net_cb)) { + *$m = PublicInbox::LeiImport->can($m); +} + +*_complete_import = \&PublicInbox::LeiImport::_complete_import; +*ipc_atfork_child = \&PublicInbox::LeiInput::input_only_atfork_child; +*net_merge_all_done = \&PublicInbox::LeiInput::input_only_net_merge_all_done; + +# the following works even when LeiAuth is lazy-loaded +*net_merge_all = \&PublicInbox::LeiAuth::net_merge_all; +1; diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm index 85caac35..46eea111 100644 --- a/lib/PublicInbox/LeiInput.pm +++ b/lib/PublicInbox/LeiInput.pm @@ -1,7 +1,7 @@ # Copyright (C) 2021 all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> -# parent class for LeiImport, LeiConvert +# parent class for LeiImport, LeiConvert, LeiIndex package PublicInbox::LeiInput; use strict; use v5.10.1; @@ -93,11 +93,7 @@ sub handle_http_input ($$@) { my ($fh, $pid) = popen_rd($cmd, undef, $rdr); grep(/\A--compressed\z/, @$curl) or $fh = IO::Uncompress::Gunzip->new($fh, MultiStream => 1); - eval { - PublicInbox::MboxReader->mboxrd($fh, - $self->can('input_mbox_cb'), - $self, @args); - }; + eval { $self->input_fh('mboxrd', $fh, $url, @args) }; my $err = $@; waitpid($pid, 0); $? || $err and @@ -221,14 +217,8 @@ sub prepare_inputs { # returns undef on error require PublicInbox::NetReader; $net //= PublicInbox::NetReader->new; $net->add_url($input); - if ($sync) { - if ($input =~ m!\Aimaps?://!) { - push @{$sync->{ok}}, $input; - } else { - push @{$sync->{no}}, $input; - } - } - } elsif ($input_path =~ m!\Ahttps?://!i) { + push @{$sync->{ok}}, $input if $sync; + } elsif ($input_path =~ m!\Ahttps?://!i) { # mboxrd.gz # TODO: how would we detect r/w JMAP? push @{$sync->{no}}, $input if $sync; prepare_http_input($self, $lei, $input_path) or return; @@ -239,12 +229,10 @@ sub prepare_inputs { # returns undef on error --in-format=$in_fmt and `$ifmt:' conflict } - if ($sync) { - if ($ifmt =~ /\A(?:maildir|mh)\z/i) { - push @{$sync->{ok}}, $input; - } else { - push @{$sync->{no}}, $input; - } + if ($ifmt =~ /\A(?:maildir|mh)\z/i) { + push @{$sync->{ok}}, $input if $sync; + } else { + push @{$sync->{no}}, $input if $sync; } my $devfd = $lei->path_to_fd($input_path) // return; if ($devfd >= 0 || (-f $input_path || -p _)) { @@ -260,7 +248,7 @@ sub prepare_inputs { # returns undef on error } else { return $lei->fail("Unable to handle $input"); } - } elsif ($input =~ /\.(eml|patch)\z/i && -f $input) { + } elsif ($input =~ /\.(?:eml|patch)\z/i && -f $input) { lc($in_fmt//'eml') eq 'eml' or return $lei->fail(<<""); $input is `eml', not --in-format=$in_fmt diff --git a/lib/PublicInbox/LeiMailSync.pm b/lib/PublicInbox/LeiMailSync.pm index 2ce189fa..2e74e433 100644 --- a/lib/PublicInbox/LeiMailSync.pm +++ b/lib/PublicInbox/LeiMailSync.pm @@ -6,6 +6,7 @@ package PublicInbox::LeiMailSync; use strict; use v5.10.1; use DBI; +use PublicInbox::ContentHash qw(git_sha); sub dbh_new { my ($self, $rw) = @_; @@ -208,4 +209,30 @@ sub folders { map { $_->[0] } @{$dbh->selectall_arrayref($sql, undef, @pfx)}; } +sub local_blob { + my ($self, $oidhex, $vrfy) = @_; + my $dbh = $self->{dbh} //= dbh_new($self); + my $b2n = $dbh->prepare(<<''); +SELECT f.loc,b.name FROM blob2name b +LEFT JOIN folders f ON b.fid = f.fid +WHERE b.oidbin = ? + + $b2n->execute(pack('H*', $oidhex)); + while (my ($d, $n) = $b2n->fetchrow_array) { + substr($d, 0, length('maildir:')) = ''; + my $f = "$d/" . ($n =~ /:2,[a-zA-Z]*\z/ ? "cur/$n" : "new/$n"); + open my $fh, '<', $f or next; + if (-s $fh) { + local $/; + my $raw = <$fh>; + if ($vrfy && git_sha(1, \$raw)->hexdigest ne $oidhex) { + warn "$f changed $oidhex\n"; + next; + } + return \$raw; + } + } + undef; +} + 1; diff --git a/lib/PublicInbox/LeiStore.pm b/lib/PublicInbox/LeiStore.pm index 29362b2e..a7a0ebef 100644 --- a/lib/PublicInbox/LeiStore.pm +++ b/lib/PublicInbox/LeiStore.pm @@ -206,10 +206,11 @@ sub set_sync_info { sub add_eml { my ($self, $eml, $vmd, $xoids) = @_; - my $im = $self->importer; # may create new epoch + my $im = $self->{-fake_im} // $self->importer; # may create new epoch my ($eidx, $tl) = eidx_init($self); my $oidx = $eidx->{oidx}; # PublicInbox::Import::add checks this my $smsg = bless { -oidx => $oidx }, 'PublicInbox::Smsg'; + $smsg->{-eidx_git} = $eidx->git if !$self->{-fake_im}; my $im_mark = $im->add($eml, undef, $smsg); if ($vmd && $vmd->{sync_info}) { set_sync_info($self, $smsg->{blob}, @{$vmd->{sync_info}}); @@ -276,6 +277,13 @@ sub set_eml { set_eml_vmd($self, $eml, $vmd); } +sub index_eml_only { + my ($self, $eml, $vmd, $xoids) = @_; + require PublicInbox::FakeImport; + local $self->{-fake_im} = PublicInbox::FakeImport->new; + set_eml($self, $eml, $vmd, $xoids); +} + sub _external_only ($$$) { my ($self, $xoids, $eml) = @_; my $eidx = $self->{priv_eidx}; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 64061788..da3a95d2 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -137,9 +137,15 @@ sub eml2mboxcl2 { sub git_to_mail { # git->cat_async callback my ($bref, $oid, $type, $size, $arg) = @_; + my ($write_cb, $smsg) = @$arg; + if ($type eq 'missing' && $smsg->{-lms_ro}) { + if ($bref = $smsg->{-lms_ro}->local_blob($oid, 1)) { + $type = 'blob'; + $size = length($$bref); + } + } return warn("W: $oid is $type (!= blob)\n") if $type ne 'blob'; return warn("E: $oid is empty\n") unless $size; - my ($write_cb, $smsg) = @$arg; die "BUG: expected=$smsg->{blob} got=$oid" if $smsg->{blob} ne $oid; $write_cb->($bref, $smsg); } @@ -644,6 +650,7 @@ sub ipc_atfork_child { my ($self) = @_; my $lei = $self->{lei}; $lei->_lei_atfork_child; + $self->{-lms_ro} = $lei->{lse}->lms if $lei->{lse}; $lei->{auth}->do_auth_atfork($self) if $lei->{auth}; $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb(); $self->SUPER::ipc_atfork_child; @@ -665,6 +672,7 @@ sub poke_dst { sub write_mail { # via ->wq_io_do my ($self, $smsg, $eml) = @_; return $self->{wcb}->(undef, $smsg, $eml) if $eml; + $smsg->{-lms_ro} = $self->{-lms_ro}; $self->{lei}->{ale}->git->cat_async($smsg->{blob}, \&git_to_mail, [$self->{wcb}, $smsg]); } diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm index 66dec099..5f96a5b0 100644 --- a/lib/PublicInbox/OverIdx.pm +++ b/lib/PublicInbox/OverIdx.pm @@ -670,4 +670,22 @@ DELETE FROM eidxq WHERE docid = ? } +# returns true if we're vivifying a message for lei/store that was +# previously external-metadata only +sub vivify_xvmd { + my ($self, $smsg) = @_; + my @docids = $self->blob_exists($smsg->{blob}); + my @vivify_xvmd; + for my $id (@docids) { + if (my $cur = $self->get_art($id)) { + # already indexed if bytes > 0 + return if $cur->{bytes} > 0; + push @vivify_xvmd, $id; + } else { + warn "W: $smsg->{blob} #$id gone (bug?)\n"; + } + } + $smsg->{-vivify_xvmd} = \@vivify_xvmd; +} + 1; diff --git a/t/lei-index.t b/t/lei-index.t new file mode 100644 index 00000000..3382d42b --- /dev/null +++ b/t/lei-index.t @@ -0,0 +1,58 @@ +#!perl -w +# Copyright (C) 2021 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +use strict; use v5.10.1; use PublicInbox::TestCommon; +use File::Spec; +require_mods(qw(lei -nntpd)); +my ($ro_home, $cfg_path) = setup_public_inboxes; +my ($tmpdir, $for_destroy) = tmpdir; +my $env = { PI_CONFIG => $cfg_path }; + +my $sock = tcp_server; +my $cmd = [ '-nntpd', '-W0', "--stdout=$tmpdir/n1", "--stderr=$tmpdir/n2" ]; +my $nntpd = start_script($cmd, $env, { 3 => $sock }) or BAIL_OUT("-nntpd $?"); +my $nntp_host_port = tcp_host_port($sock); + +$sock = tcp_server; +$cmd = [ '-imapd', '-W0', "--stdout=$tmpdir/i1", "--stderr=$tmpdir/i2" ]; +my $imapd = start_script($cmd, $env, { 3 => $sock }) or BAIL_OUT("-imapd $?"); +my $imap_host_port = tcp_host_port($sock); +undef $sock; +for ('', qw(cur new)) { + mkdir "$tmpdir/md/$_" or xbail "mkdir: $!"; +} +symlink(File::Spec->rel2abs('t/plack-qp.eml'), "$tmpdir/md/cur/x:2,"); +my $expect = do { + open my $fh, '<', 't/plack-qp.eml' or xbail $!; + local $/; + <$fh>; +}; +test_lei({ tmpdir => $tmpdir }, sub { + my $store_path = "$ENV{HOME}/.local/share/lei/store/"; + + lei_ok('index', "$tmpdir/md"); + lei_ok(qw(q mid:qp@example.com)); + my $res_a = json_utf8->decode($lei_out); + my $blob = $res_a->[0]->{'blob'}; + like($blob, qr/\A[0-9a-f]{40,}\z/, 'got blob from qp@example'); + lei_ok('blob', $blob); + is($lei_out, $expect, 'got expected blob via Maildir'); + lei_ok(qw(q mid:qp@example.com -f text)); + like($lei_out, qr/^hi = bye/sm, 'lei2mail fallback'); + + my $all_obj = ['git', "--git-dir=$store_path/ALL.git", + qw(cat-file --batch-check --batch-all-objects)]; + is_deeply([xqx($all_obj)], [], 'no git objects'); + lei_ok('import', 't/plack-qp.eml'); + ok(grep(/\A$blob blob /, my @objs = xqx($all_obj)), + 'imported blob'); + lei_ok(qw(q z:0.. --dedupe=none)); + my $res_b = json_utf8->decode($lei_out); + is_deeply($res_b, $res_a, 'no extra DB entries'); + + lei_ok('index', "nntp://$nntp_host_port/t.v2"); + lei_ok('index', "imap://$imap_host_port/t.v2.0"); + is_deeply([xqx($all_obj)], \@objs, 'no new objects from NNTP+IMAP'); +}); + +done_testing; |