diff options
author | Eric Wong <e@80x24.org> | 2020-12-31 13:51:28 +0000 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2021-01-01 05:00:39 +0000 |
commit | d2a7dcb58ffb9604b2023159431fcdc4871f368f (patch) | |
tree | 1895ef3c9f091dc14ef5654b179d66fe24f65a7e /lib | |
parent | 7f17df5c6f1892ef53f149a0ab24a5d917cce7d9 (diff) | |
download | public-inbox-d2a7dcb58ffb9604b2023159431fcdc4871f368f.tar.gz |
For writing mboxes and Maildirs, users may wish to use stricter or looser deduplication strategies. This gives them more control.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/PublicInbox/LEI.pm | 2 | ||||
-rw-r--r-- | lib/PublicInbox/LeiDedupe.pm | 96 | ||||
-rw-r--r-- | lib/PublicInbox/LeiToMail.pm | 26 |
3 files changed, 112 insertions, 12 deletions
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 7002a1f7..9aa4d95a 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -172,7 +172,7 @@ my %OPTDESC = ( 'type=s' => [ 'any|mid|git', 'disambiguate type' ], -'dedupe|d=s' => ['STRAT|content|oid|mid', +'dedupe|d=s' => ['STRAT|content|oid|mid|none', 'deduplication strategy'], 'show thread|t' => 'display entire thread a message belongs to', 'q thread|t' => diff --git a/lib/PublicInbox/LeiDedupe.pm b/lib/PublicInbox/LeiDedupe.pm new file mode 100644 index 00000000..c6eb7196 --- /dev/null +++ b/lib/PublicInbox/LeiDedupe.pm @@ -0,0 +1,96 @@ +# Copyright (C) 2020 all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> +package PublicInbox::LeiDedupe; +use strict; +use v5.10.1; +use PublicInbox::SharedKV; +use PublicInbox::ContentHash qw(content_hash); + +# n.b. mutt sets most of these headers not sure about Bytes +our @OID_IGNORE = qw(Status X-Status Content-Length Lines Bytes); + +# best-effort regeneration of OID when augmenting existing results +sub _regen_oid ($) { + my ($eml) = @_; + my @stash; # stash away headers we shouldn't have in git + for my $k (@OID_IGNORE) { + my @v = $eml->header_raw($k) or next; + push @stash, [ $k, \@v ]; + $eml->header_set($k); # restore below + } + my $dig = Digest::SHA->new(1); # XXX SHA256 later + my $buf = $eml->as_string; + $dig->add('blob '.length($buf)."\0"); + $dig->add($buf); + undef $buf; + + for my $kv (@stash) { # restore stashed headers + my ($k, @v) = @$kv; + $eml->header_set($k, @v); + } + $dig->digest; +} + +sub _oidbin ($) { defined($_[0]) ? pack('H*', $_[0]) : undef } + +# the paranoid option +sub dedupe_oid () { + my $skv = PublicInbox::SharedKV->new; + ($skv, sub { # may be called in a child process + my ($eml, $oid) = @_; + $skv->set_maybe(_oidbin($oid) // _regen_oid($eml), ''); + }); +} + +# dangerous if there's duplicate messages with different Message-IDs +sub dedupe_mid () { + my $skv = PublicInbox::SharedKV->new; + ($skv, sub { # may be called in a child process + my ($eml, $oid) = @_; + # TODO: lei will support non-public messages w/o Message-ID + my $mid = $eml->header_raw('Message-ID') // _oidbin($oid) // + content_hash($eml); + $skv->set_maybe($mid, ''); + }); +} + +# our default deduplication strategy (used by v2, also) +sub dedupe_content () { + my $skv = PublicInbox::SharedKV->new; + ($skv, sub { # may be called in a child process + my ($eml) = @_; # oid = $_[1], ignored + $skv->set_maybe(content_hash($eml), ''); + }); +} + +# no deduplication at all +sub dedupe_none () { (undef, sub { 1 }) } + +sub new { + my ($cls, $lei) = @_; + my $dd = $lei->{opt}->{dedupe} // 'content'; + my $dd_new = $cls->can("dedupe_$dd") // + die "unsupported dedupe strategy: $dd\n"; + bless [ $dd_new->() ], $cls; # [ $skv, $cb ] +} + +# returns true on unseen messages according to the deduplication strategy, +# returns false if seen +sub is_dup { + my ($self, $eml, $oid) = @_; + !$self->[1]->($eml, $oid); +} + +sub prepare_dedupe { + my ($self) = @_; + my $skv = $self->[0]; + $skv ? $skv->dbh : undef; +} + +sub pause_dedupe { + my ($self) = @_; + my $skv = $self->[0]; + delete($skv->{dbh}) if $skv; +} + +1; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index 294291b2..ead00d1a 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -8,9 +8,8 @@ use v5.10.1; use PublicInbox::Eml; use PublicInbox::Lock; use PublicInbox::ProcessPipe; -use PublicInbox::SharedKV; use PublicInbox::Spawn qw(which spawn popen_rd); -use PublicInbox::ContentHash qw(content_hash); +use PublicInbox::LeiDedupe; use Symbol qw(gensym); use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET); @@ -226,10 +225,11 @@ sub dup_src ($) { $dup; } -# --augment existing output destination, without duplicating anything +# --augment existing output destination, with deduplication sub _augment { # MboxReader eml_cb my ($eml, $lei) = @_; - $lei->{skv}->set_maybe(content_hash($eml), ''); + # ignore return value, just populate the skv + $lei->{dedupe_cb}->is_dup($eml); } sub _mbox_write_cb ($$$$) { @@ -240,23 +240,27 @@ sub _mbox_write_cb ($$$$) { open $out, '+>>', $dst or die "open $dst: $!"; # Perl does SEEK_END even with O_APPEND :< seek($out, 0, SEEK_SET) or die "seek $dst: $!"; - my $atomic = !!(($lei->{opt}->{jobs} // 0) > 1); - $lei->{skv} = PublicInbox::SharedKV->new; - $lei->{skv}->dbh; + my $jobs = $lei->{opt}->{jobs} // 0; + my $atomic = $jobs > 1; + my $dedupe = $lei->{dedupe} = PublicInbox::LeiDedupe->new($lei); state $zsfx_allow = join('|', keys %zsfx2cmd); my ($zsfx) = ($dst =~ /\.($zsfx_allow)\z/); if ($lei->{opt}->{augment}) { - my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) : - dup_src($out); - PublicInbox::MboxReader->$mbox($rd, \&_augment, $lei); + if (-s $out && $dedupe->prepare_dedupe) { + my $rd = $zsfx ? decompress_src($out, $zsfx, $lei) : + dup_src($out); + PublicInbox::MboxReader->$mbox($rd, \&_augment, $lei); + } + $dedupe->pause_dedupe if $jobs; # are we forking? } else { truncate($out, 0) or die "truncate $dst: $!"; + $dedupe->prepare_dedupe if !$jobs; } ($out, $pipe_lk) = compress_dst($out, $zsfx, $lei) if $zsfx; sub { my ($buf, $oid, $kw) = @_; my $eml = PublicInbox::Eml->new($buf); - if ($lei->{skv}->set_maybe(content_hash($eml), '')) { + if (!$lei->{dedupe}->is_dup($eml, $oid)) { $buf = $eml2mbox->($eml, $kw); my $lock = $pipe_lk->lock_for_scope if $pipe_lk; write_in_full($out, $buf, $atomic); |