From 218bfc40d6873dfe45fe177455491b3090b49f22 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 30 Jan 2024 06:31:09 +0000 Subject: watch: support incremental updates from MH The good news (compared to lei) is we only have to worry about imports and don't care about the filename nor keywords, so it's immune to .mh_sequences writing inconsistencies across MH implementations and sequence number packing. We still assume the writer will write the mail file with one of: * rename(2) to create the final sequence number filename * a single write(2) if not relying on rename(2) mlmmj and mutt satisfy these requirements. Python's Lib/mailbox.py may, I'm not sure... --- Documentation/public-inbox-watch.pod | 16 +++-- MANIFEST | 1 + lib/PublicInbox/Watch.pm | 92 +++++++++++++++++++-------- t/watch_maildir.t | 2 +- t/watch_mh.t | 120 +++++++++++++++++++++++++++++++++++ 5 files changed, 198 insertions(+), 33 deletions(-) create mode 100644 t/watch_mh.t diff --git a/Documentation/public-inbox-watch.pod b/Documentation/public-inbox-watch.pod index 6f812966..6e2142fe 100644 --- a/Documentation/public-inbox-watch.pod +++ b/Documentation/public-inbox-watch.pod @@ -48,9 +48,11 @@ of large Maildirs. Upon startup, it scans the mailbox for new messages to be imported while it was not running. -As of public-inbox 1.6.0, Maildirs, IMAP folders, and NNTP -newsgroups are supported. Previous versions of public-inbox -only supported Maildirs. +All versions of public-inbox-watch support Maildirs. public-inbox +1.6.0 added support for IMAP folders and NNTP newsgroups. +public-inbox 2.0 adds support for MH directories. There are no +plans to support the mbox family since new messages are expensive +to detect in large mboxes. public-inbox-watch should be run inside a L session or as a L service. Errors are emitted to stderr. @@ -84,12 +86,16 @@ C and C URLs: watch = nntp://news.example.com/inbox.test.group watch = imaps://user@mail.example.com/INBOX.test +2.0+ supports MH: + + watch = mh:/path/to/MH/inbox.test + This may be specified multiple times to combine several mailboxes into a single public-inbox. URLs requiring authentication will require L and/or L (preferred) to fill in the username and password. -public-inbox 2.0+ supports boolean C to prevent the global +public-inbox 2.0+ also supports boolean C to prevent the global L directive from writing to the inbox. Default: none @@ -127,7 +133,7 @@ Messages without the (S)een flag are not considered for hiding. This hiding affects all configured public-inboxes in PI_CONFIG. As with C, C and C URLs -are supported in public-inbox 1.6.0+. +are supported in public-inbox 1.6.0+, and C in 2.0+. Default: none; only for L users diff --git a/MANIFEST b/MANIFEST index 051cd6f9..2223cfb4 100644 --- a/MANIFEST +++ b/MANIFEST @@ -627,6 +627,7 @@ t/watch_filter_rubylang.t t/watch_imap.t t/watch_maildir.t t/watch_maildir_v2.t +t/watch_mh.t t/watch_multiple_headers.t t/www_altid.t t/www_listing.t diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm index b83a77eb..1ec574ea 100644 --- a/lib/PublicInbox/Watch.pm +++ b/lib/PublicInbox/Watch.pm @@ -16,6 +16,7 @@ use PublicInbox::DS qw(now add_timer awaitpid); use PublicInbox::MID qw(mids); use PublicInbox::ContentHash qw(content_hash); use POSIX qw(_exit WNOHANG); +use constant { D_MAILDIR => 1, D_MH => 2 }; sub compile_watchheaders ($) { my ($ibx) = @_; @@ -40,9 +41,22 @@ sub compile_watchheaders ($) { $ibx->{-watchheaders} = $watch_hdrs if scalar @$watch_hdrs; } +sub d_type_set ($$$) { + my ($d_type, $dir, $is) = @_; + my $isnt = D_MAILDIR; + if ($is == D_MAILDIR) { + $isnt = D_MH; + $d_type->{"$dir/cur"} |= $is; + $d_type->{"$dir/new"} |= $is; + } + warn <{$dir} |= $is) & $isnt; +W: `$dir' is both Maildir and MH (non-fatal) +EOM +} + sub new { my ($class, $cfg) = @_; - my (%mdmap); + my (%d_map, %d_type); my (%imap, %nntp); # url => [inbox objects] or 'watchspam' my (@imap, @nntp); PublicInbox::Import::load_config($cfg); @@ -57,7 +71,11 @@ sub new { my $uri; if (is_maildir($dir)) { # skip "new", no MUA has seen it, yet. - $mdmap{"$dir/cur"} = 'watchspam'; + $d_map{"$dir/cur"} = 'watchspam'; + d_type_set \%d_type, $dir, D_MAILDIR; + } elsif (is_mh($dir)) { + $d_map{$dir} = 'watchspam'; + d_type_set \%d_type, $dir, D_MH; } elsif ($uri = imap_uri($dir)) { $imap{$$uri} = 'watchspam'; push @imap, $uri; @@ -69,7 +87,6 @@ sub new { } } } - my $k = 'publicinboxwatch.spamcheck'; my $default = undef; my $spamcheck = PublicInbox::Spamcheck::get($cfg, $k, $default); @@ -91,10 +108,17 @@ sub new { } elsif (is_maildir($watch)) { compile_watchheaders($ibx); my ($new, $cur) = ("$watch/new", "$watch/cur"); - my $cur_dst = $mdmap{$cur} //= []; + my $cur_dst = $d_map{$cur} //= []; return if is_watchspam($cur, $cur_dst, $ibx); - push @{$mdmap{$new} //= []}, $ibx; + push @{$d_map{$new} //= []}, $ibx; push @$cur_dst, $ibx; + d_type_set \%d_type, $watch, D_MAILDIR; + } elsif (is_mh($watch)) { + my $cur_dst = $d_map{$watch} //= []; + return if is_watchspam($watch, $cur_dst, $ibx); + compile_watchheaders($ibx); + push(@$cur_dst, $ibx); + d_type_set \%d_type, $watch, D_MH; } elsif ($uri = imap_uri($watch)) { my $cur_dst = $imap{$$uri} //= []; return if is_watchspam($uri, $cur_dst, $ibx); @@ -111,18 +135,19 @@ sub new { } }); - my $mdre; - if (scalar keys %mdmap) { - $mdre = join('|', map { quotemeta($_) } keys %mdmap); - $mdre = qr!\A($mdre)/!; + my $d_re; + if (scalar keys %d_map) { + $d_re = join('|', map quotemeta, keys %d_map); + $d_re = qr!\A($d_re)/!; } - return unless $mdre || scalar(keys %imap) || scalar(keys %nntp); + return unless $d_re || scalar(keys %imap) || scalar(keys %nntp); bless { max_batch => 10, # avoid hogging locks for too long spamcheck => $spamcheck, - mdmap => \%mdmap, - mdre => $mdre, + d_map => \%d_map, + d_re => $d_re, + d_type => \%d_type, pi_cfg => $cfg, imap => scalar keys %imap ? \%imap : undef, nntp => scalar keys %nntp? \%nntp : undef, @@ -220,17 +245,23 @@ sub import_eml ($$$) { sub _try_path { my ($self, $path) = @_; - my $fl = PublicInbox::MdirReader::maildir_path_flags($path) // return; - return if $fl =~ /[DT]/; # no Drafts or Trash - if ($path !~ $self->{mdre}) { - warn "unrecognized path: $path\n"; - return; - } - my $inboxes = $self->{mdmap}->{$1}; - unless ($inboxes) { - warn "unmappable dir: $1\n"; - return; - } + $path =~ $self->{d_re} or + return warn("BUG? unrecognized path: $path\n"); + my $dir = $1; + my $inboxes = $self->{d_map}->{$dir} // + return warn("W: unmappable dir: $dir\n"); + my ($md_fl, $mh_seq); + if ($self->{d_type}->{$dir} & D_MH) { + $path =~ m!/([0-9]+)\z! ? ($mh_seq = $1) : return; + } + $self->{d_type}->{$dir} & D_MAILDIR and + $md_fl = PublicInbox::MdirReader::maildir_path_flags($path); + $md_fl // $mh_seq // return; + return if ($md_fl // '') =~ /[DT]/; # no Drafts or Trash + # n.b. none of the MH keywords are relevant for public mail, + # mh_seq is only used to validate we're reading an email + # and not treating .mh_sequences as an email + my $warn_cb = $SIG{__WARN__} || \&CORE::warn; local $SIG{__WARN__} = sub { my $pfx = ($_[0] // '') =~ /^([A-Z]: )/g ? $1 : ''; @@ -288,7 +319,7 @@ sub watch_fs_init ($) { require PublicInbox::DirIdle; # inotify_create + EPOLL_CTL_ADD my $dir_idle = $self->{dir_idle} = PublicInbox::DirIdle->new($cb); - $dir_idle->add_watches([keys %{$self->{mdmap}}]); + $dir_idle->add_watches([keys %{$self->{d_map}}]); } sub net_cb { # NetReader::(nntp|imap)_each callback @@ -437,7 +468,7 @@ sub event_step { }; die $@ if $@; } - fs_scan_step($self) if $self->{mdre}; + fs_scan_step($self) if $self->{d_re}; } sub watch_imap_fetch_all ($$) { @@ -541,7 +572,7 @@ sub watch { # main entry point # poll all URIs for a given interval sequentially add_timer(0, \&poll_fetch_fork, $self, $intvl, $uris); } - watch_fs_init($self) if $self->{mdre}; + watch_fs_init($self) if $self->{d_re}; local @PublicInbox::DS::post_loop_do = (\&quit_inprogress, $self); PublicInbox::DS::event_loop($first_sig); # calls ->event_step _done_for_now($self); @@ -572,7 +603,7 @@ sub fs_scan_step { $opendirs->{$dir} = $dh if $n < 0; } if ($op && $op eq 'full') { - foreach my $dir (keys %{$self->{mdmap}}) { + foreach my $dir (keys %{$self->{d_map}}) { next if $opendirs->{$dir}; # already in progress my $ok = opendir(my $dh, $dir); unless ($ok) { @@ -647,6 +678,13 @@ sub is_maildir { $_[0]; } +sub is_mh { + $_[0] =~ s!\Amh:!!i or return; + $_[0] =~ tr!/!/!s; + $_[0] =~ s!/\z!!; + $_[0]; +} + sub is_watchspam { my ($cur, $ws, $ibx) = @_; if ($ws && !ref($ws) && $ws eq 'watchspam') { diff --git a/t/watch_maildir.t b/t/watch_maildir.t index d7f01b1a..a12ceefd 100644 --- a/t/watch_maildir.t +++ b/t/watch_maildir.t @@ -46,7 +46,7 @@ my $sem = PublicInbox::Emergency->new($spamdir); # create dirs EOF my $wm = PublicInbox::Watch->new($cfg); is(scalar grep(/is a spam folder/, @w), 1, 'got warning about spam'); - is_deeply($wm->{mdmap}, { "$spamdir/cur" => 'watchspam' }, + is_deeply($wm->{d_map}, { "$spamdir/cur" => 'watchspam' }, 'only got the spam folder to watch'); } diff --git a/t/watch_mh.t b/t/watch_mh.t new file mode 100644 index 00000000..04793750 --- /dev/null +++ b/t/watch_mh.t @@ -0,0 +1,120 @@ +#!perl -w +# Copyright (C) all contributors +# License: AGPL-3.0+ +use v5.12; +use PublicInbox::Eml; +use PublicInbox::TestCommon; +use PublicInbox::Import; +use PublicInbox::IO qw(write_file); +use POSIX qw(mkfifo); +use File::Copy qw(cp); +use autodie qw(rename mkdir); + +my $tmpdir = tmpdir; +my $git_dir = "$tmpdir/test.git"; +my $mh = "$tmpdir/mh"; +my $spamdir = "$tmpdir/mh-spam"; +mkdir $_ for ($mh, $spamdir); +use_ok 'PublicInbox::Watch'; +my $addr = 'test-public@example.com'; +my $default_branch = PublicInbox::Import::default_branch; +PublicInbox::Import::init_bare($git_dir); +my $msg = < +Date: Sat, 18 Jun 2016 00:00:00 +0000 + +something +EOF + +cp 't/plack-qp.eml', "$mh/1"; +mkfifo("$mh/5", 0777) or xbail "mkfifo: $!"; # FIFO to ensure no stuckage +my $cfg = cfg_new $tmpdir, <new($cfg)->scan('full'); +my $git = PublicInbox::Git->new($git_dir); +{ + my @list = $git->qx('rev-list', $default_branch); + is(scalar @list, 1, 'one revision in rev-list'); + $git->cleanup; +} + +# end-to-end test which actually uses inotify/kevent +{ + my $env = { PI_CONFIG => $cfg->{-f} }; + # n.b. --no-scan is only intended for testing atm + my $wm = start_script([qw(-watch --no-scan)], $env); + no_pollerfd($wm->{pid}); + + my $eml = eml_load 't/data/binary.patch'; + $eml->header_set('Cc', $addr); + write_file '>', "$mh/2.tmp", $eml->as_string; + + use_ok 'PublicInbox::InboxIdle'; + use_ok 'PublicInbox::DS'; + my $delivered = 0; + my $cb = sub { + my ($ibx) = @_; + diag "message delivered to `$ibx->{name}'"; + $delivered++; + }; + PublicInbox::DS->Reset; + my $ii = PublicInbox::InboxIdle->new($cfg); + my $obj = bless \$cb, 'PublicInbox::TestCommon::InboxWakeup'; + $cfg->each_inbox(sub { $_[0]->subscribe_unlock('ident', $obj) }); + local @PublicInbox::DS::post_loop_do = (sub { $delivered == 0 }); + + # wait for -watch to setup inotify watches + my $sleep = 1; + if (eval { require PublicInbox::Inotify } && -d "/proc/$wm->{pid}/fd") { + my $end = time + 2; + my (@ino, @ino_info); + do { + @ino = grep { + (readlink($_)//'') =~ /\binotify\b/ + } glob("/proc/$wm->{pid}/fd/*"); + } until (@ino || time > $end || !tick); + if (scalar(@ino) == 1) { + my $ino_fd = (split(m'/', $ino[0]))[-1]; + my $ino_fdinfo = "/proc/$wm->{pid}/fdinfo/$ino_fd"; + while (time < $end && open(my $fh, '<', $ino_fdinfo)) { + @ino_info = grep(/^inotify wd:/, <$fh>); + last if @ino_info >= 2; + tick; + } + $sleep = undef if @ino_info >= 2; + } + } + if ($sleep) { + diag "waiting ${sleep}s for -watch to start up"; + sleep $sleep; + } + rename "$mh/2.tmp", "$mh/2"; + diag 'waiting for -watch to import new message'; + PublicInbox::DS::event_loop(); + + my $subj = $eml->header_raw('Subject'); + my $head = $git->qx(qw(cat-file commit HEAD)); + like $head, qr/^\Q$subj\E/sm, 'new commit made'; + + $wm->kill; + $wm->join; + $ii->close; + PublicInbox::DS->Reset; +} + +my $is_mh = sub { PublicInbox::Watch::is_mh(my $val = shift) }; + +is $is_mh->('mh:/hello//world'), '/hello/world', 'extra slash gone'; +is $is_mh->('MH:/hello/world/'), '/hello/world', 'trailing slash gone'; +is $is_mh->('maildir:/hello/world/'), undef, 'non-MH rejected'; + +done_testing; -- cgit v1.2.3-24-ge0c7