diff options
-rw-r--r-- | lib/PublicInbox/ExtSearchIdx.pm | 96 | ||||
-rw-r--r-- | lib/PublicInbox/OverIdx.pm | 20 | ||||
-rwxr-xr-x | script/public-inbox-extindex | 13 | ||||
-rw-r--r-- | t/extsearch.t | 11 |
4 files changed, 137 insertions, 3 deletions
diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 29414e4a..495579a2 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -844,6 +844,98 @@ sub sync_inbox { warn $err, "\n" if defined($err); } +sub dd_smsg { # git->cat_async callback + my ($bref, $oid, $type, $size, $dd) = @_; + my $smsg = $dd->{smsg} // die 'BUG: dd->{smsg} missing'; + my $self = $dd->{self} // die 'BUG: {self} missing'; + my $per_mid = $dd->{per_mid} // die 'BUG: {per_mid} missing'; + if ($type eq 'missing') { + _blob_missing($dd, $smsg); + } elsif (!is_bad_blob($oid, $type, $size, $smsg->{blob})) { + local $self->{current_info} = "$self->{current_info} $oid"; + my $chash = content_hash(PublicInbox::Eml->new($bref)); + push(@{$per_mid->{dd_chash}->{$chash}}, $smsg); + } + return if $per_mid->{last_smsg} != $smsg; + while (my ($chash, $ary) = each %{$per_mid->{dd_chash}}) { + my $keep = shift @$ary; + next if !scalar(@$ary); + $per_mid->{sync}->{dedupe_cull} += scalar(@$ary); + print STDERR + "# <$keep->{mid}> keeping #$keep->{num}, dropping ", + join(', ', map { "#$_->{num}" } @$ary),"\n"; + next if $per_mid->{sync}->{-opt}->{'dry-run'}; + my $oidx = $self->{oidx}; + for my $smsg (@$ary) { + my $gone = $smsg->{num}; + $oidx->merge_xref3($keep->{num}, $gone, $smsg->{blob}); + $self->idx_shard($gone)->ipc_do('xdb_remove', $gone); + $oidx->delete_by_num($gone); + } + } +} + +sub eidx_dedupe ($$) { + my ($self, $sync) = @_; + $sync->{dedupe_cull} = 0; + my $candidates = 0; + my $nr_mid = 0; + return unless eidxq_lock_acquire($self); + my $iter; + my $min_id = 0; + local $sync->{-regen_fmt} = "dedupe %u/".$self->{oidx}->max."\n"; +dedupe_restart: + $iter = $self->{oidx}->dbh->prepare(<<EOS); +SELECT DISTINCT(mid),id FROM msgid WHERE id IN +(SELECT id FROM id2num WHERE id > ? GROUP BY num HAVING COUNT(num) > 1) +ORDER BY id +EOS + $iter->execute($min_id); + local $SIG{__WARN__} = sub { + return if PublicInbox::Eml::warn_ignore(@_); + warn @_; + }; + while (my ($mid, $id) = $iter->fetchrow_array) { + last if $sync->{quit}; + $self->{current_info} = "dedupe $mid"; + ${$sync->{nr}} = $min_id = $id; + my ($n, $prv, @smsg); + while (my $x = $self->{oidx}->next_by_mid($mid, \$n, \$prv)) { + push @smsg, $x; + } + next if scalar(@smsg) < 2; + my $per_mid = { + dd_chash => {}, # chash => [ary of smsgs] + last_smsg => $smsg[-1], + sync => $sync + }; + $nr_mid++; + $candidates += scalar(@smsg) - 1; + for my $smsg (@smsg) { + my $dd = { + per_mid => $per_mid, + smsg => $smsg, + self => $self, + }; + $self->git->cat_async($smsg->{blob}, \&dd_smsg, $dd); + } + # need to wait on every single one + $self->git->async_wait_all; + + # is checkpoint needed? $iter is a very expensive query to restart + if (0 && checkpoint_due($sync)) { + undef $iter; + reindex_checkpoint($self, $sync); + goto dedupe_restart; + } + } + my $n = delete $sync->{dedupe_cull}; + if (my $pr = $sync->{-opt}->{-progress}) { + $pr->("culled $n/$candidates candidates ($nr_mid msgids)\n"); + } + ${$sync->{nr}} = 0; +} + sub eidx_sync { # main entry point my ($self, $opt) = @_; @@ -873,6 +965,10 @@ sub eidx_sync { # main entry point for my $ibx (@{$self->{ibx_list}}) { $ibx->{-ibx_id} //= $self->{oidx}->ibx_id($ibx->eidx_key); } + if (delete($opt->{dedupe})) { + local $sync->{checkpoint_unlocks} = 1; + eidx_dedupe($self, $sync); + } if (delete($opt->{reindex})) { local $sync->{checkpoint_unlocks} = 1; eidx_reindex($self, $sync); diff --git a/lib/PublicInbox/OverIdx.pm b/lib/PublicInbox/OverIdx.pm index 5f96a5b0..8f7cf2bb 100644 --- a/lib/PublicInbox/OverIdx.pm +++ b/lib/PublicInbox/OverIdx.pm @@ -656,6 +656,26 @@ UPDATE over SET ddd = ? WHERE num = ? $sth->execute; } +sub merge_xref3 { # used for "-extindex --dedupe" + my ($self, $keep_docid, $drop_docid, $oidhex) = @_; + my $oidbin = pack('H*', $oidhex); + my $sth = $self->{dbh}->prepare_cached(<<''); +UPDATE OR IGNORE xref3 SET docid = ? WHERE docid = ? AND oidbin = ? + + $sth->bind_param(1, $keep_docid); + $sth->bind_param(2, $drop_docid); + $sth->bind_param(3, $oidbin, SQL_BLOB); + $sth->execute; + + # drop anything that conflicted + $sth = $self->{dbh}->prepare_cached(<<''); +DELETE FROM xref3 WHERE docid = ? AND oidbin = ? + + $sth->bind_param(1, $drop_docid); + $sth->bind_param(2, $oidbin, SQL_BLOB); + $sth->execute; +} + sub eidxq_add { my ($self, $docid) = @_; $self->dbh->prepare_cached(<<'')->execute($docid); diff --git a/script/public-inbox-extindex b/script/public-inbox-extindex index 771486c4..dcb12e5a 100755 --- a/script/public-inbox-extindex +++ b/script/public-inbox-extindex @@ -17,7 +17,9 @@ usage: public-inbox-extindex [options] [EXTINDEX_DIR] [INBOX_DIR...] --batch-size=BYTES flush changes to OS after a given number of bytes --max-size=BYTES do not index messages larger than the given size --gc perform garbage collection instead of indexing + --dedupe fix prior deduplication errors --verbose | -v increase verbosity (may be repeated) + --dry-run | -n dry-run on --dedupe BYTES may use `k', `m', and `g' suffixes (e.g. `10m' for 10 megabytes) See public-inbox-extindex(1) man page for full documentation. @@ -27,7 +29,7 @@ GetOptions($opt, qw(verbose|v+ reindex rethread compact|c+ jobs|j=i fsync|sync! indexlevel|index-level|L=s max_size|max-size=s batch_size|batch-size=s - gc commit-interval=i watch scan! + dedupe gc commit-interval=i watch scan! dry-run|n all help|h)) or die $help; if ($opt->{help}) { print $help; exit 0 }; @@ -50,11 +52,16 @@ unless (defined $eidx_dir) { my @ibxs; if ($opt->{gc}) { die "E: inbox paths must not be specified with --gc\n" if @ARGV; - die "E: --all not compatible with --gc\n" if $opt->{all}; - die "E: --watch is not compatible with --gc\n" if $opt->{watch}; + for my $sw (qw(all watch dry-run dedupe)) { + die "E: --$sw is not compatible with --gc\n" if $opt->{$sw}; + } } else { @ibxs = PublicInbox::Admin::resolve_inboxes(\@ARGV, $opt, $cfg); } +if ($opt->{'dry-run'} && !$opt->{dedupe}) { + die "E: --dry-run only affects --dedupe\n"; +} + PublicInbox::Admin::require_or_die(qw(-search)); PublicInbox::Config::json() or die "Cpanel::JSON::XS or similar missing\n"; PublicInbox::Admin::progress_prepare($opt); diff --git a/t/extsearch.t b/t/extsearch.t index ae889ac6..5f0cd866 100644 --- a/t/extsearch.t +++ b/t/extsearch.t @@ -368,4 +368,15 @@ if ('remove v1test and test gc') { is(scalar(@it), 1, 'only one inbox left'); } +if ('dedupe + dry-run') { + my @cmd = ('-extindex', "$home/extindex"); + my $opt = { 2 => \(my $err = '') }; + ok(run_script([@cmd, '--dedupe'], undef, $opt), '--dedupe'); + ok(run_script([@cmd, qw(--dedupe --dry-run)], undef, $opt), + '--dry-run --dedupe'); + is $err, '', 'no errors'; + ok(!run_script([@cmd, qw(--dry-run)], undef, $opt), + '--dry-run alone fails'); +} + done_testing; |