diff options
Diffstat (limited to 'lib/PublicInbox/Watch.pm')
-rw-r--r-- | lib/PublicInbox/Watch.pm | 246 |
1 files changed, 131 insertions, 115 deletions
diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm index 3f6fe21b..eb90d353 100644 --- a/lib/PublicInbox/Watch.pm +++ b/lib/PublicInbox/Watch.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org> +# Copyright (C) all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> # # ref: https://cr.yp.to/proto/maildir.html @@ -12,11 +12,11 @@ use PublicInbox::MdirReader; use PublicInbox::NetReader; use PublicInbox::Filter::Base qw(REJECT); use PublicInbox::Spamcheck; -use PublicInbox::DS qw(now add_timer); +use PublicInbox::DS qw(now add_timer awaitpid); use PublicInbox::MID qw(mids); use PublicInbox::ContentHash qw(content_hash); -use PublicInbox::EOFpipe; use POSIX qw(_exit WNOHANG); +use constant { D_MAILDIR => 1, D_MH => 2 }; sub compile_watchheaders ($) { my ($ibx) = @_; @@ -41,11 +41,25 @@ sub compile_watchheaders ($) { $ibx->{-watchheaders} = $watch_hdrs if scalar @$watch_hdrs; } +sub d_type_set ($$$) { + my ($d_type, $dir, $is) = @_; + my $isnt = D_MAILDIR; + if ($is == D_MAILDIR) { + $isnt = D_MH; + $d_type->{"$dir/cur"} |= $is; + $d_type->{"$dir/new"} |= $is; + } + warn <<EOM if ($d_type->{$dir} |= $is) & $isnt; +W: `$dir' is both Maildir and MH (non-fatal) +EOM +} + sub new { my ($class, $cfg) = @_; - my (%mdmap, $spamc); + my (%d_map, %d_type); my (%imap, %nntp); # url => [inbox objects] or 'watchspam' my (@imap, @nntp); + PublicInbox::Import::load_config($cfg); # "publicinboxwatch" is the documented namespace # "publicinboxlearn" is legacy but may be supported @@ -57,7 +71,11 @@ sub new { my $uri; if (is_maildir($dir)) { # skip "new", no MUA has seen it, yet. - $mdmap{"$dir/cur"} = 'watchspam'; + $d_map{"$dir/cur"} = 'watchspam'; + d_type_set \%d_type, $dir, D_MAILDIR; + } elsif (is_mh($dir)) { + $d_map{$dir} = 'watchspam'; + d_type_set \%d_type, $dir, D_MH; } elsif ($uri = imap_uri($dir)) { $imap{$$uri} = 'watchspam'; push @imap, $uri; @@ -69,7 +87,6 @@ sub new { } } } - my $k = 'publicinboxwatch.spamcheck'; my $default = undef; my $spamcheck = PublicInbox::Spamcheck::get($cfg, $k, $default); @@ -80,16 +97,28 @@ sub new { my $ibx = $_[0] = PublicInbox::InboxWritable->new($_[0]); my $watches = $ibx->{watch} or return; + + $ibx->{indexlevel} //= $ibx->detect_indexlevel; $watches = PublicInbox::Config::_array($watches); for my $watch (@$watches) { my $uri; - if (is_maildir($watch)) { + my $bool = $cfg->git_bool($watch); + if (defined $bool && !$bool) { + $ibx->{-watch_disabled} = 1; + } elsif (is_maildir($watch)) { compile_watchheaders($ibx); my ($new, $cur) = ("$watch/new", "$watch/cur"); - my $cur_dst = $mdmap{$cur} //= []; + my $cur_dst = $d_map{$cur} //= []; return if is_watchspam($cur, $cur_dst, $ibx); - push @{$mdmap{$new} //= []}, $ibx; + push @{$d_map{$new} //= []}, $ibx; push @$cur_dst, $ibx; + d_type_set \%d_type, $watch, D_MAILDIR; + } elsif (is_mh($watch)) { + my $cur_dst = $d_map{$watch} //= []; + return if is_watchspam($watch, $cur_dst, $ibx); + compile_watchheaders($ibx); + push(@$cur_dst, $ibx); + d_type_set \%d_type, $watch, D_MH; } elsif ($uri = imap_uri($watch)) { my $cur_dst = $imap{$$uri} //= []; return if is_watchspam($uri, $cur_dst, $ibx); @@ -106,18 +135,19 @@ sub new { } }); - my $mdre; - if (scalar keys %mdmap) { - $mdre = join('|', map { quotemeta($_) } keys %mdmap); - $mdre = qr!\A($mdre)/!; + my $d_re; + if (scalar keys %d_map) { + $d_re = join('|', map quotemeta, keys %d_map); + $d_re = qr!\A($d_re)/!; } - return unless $mdre || scalar(keys %imap) || scalar(keys %nntp); + return unless $d_re || scalar(keys %imap) || scalar(keys %nntp); bless { max_batch => 10, # avoid hogging locks for too long spamcheck => $spamcheck, - mdmap => \%mdmap, - mdre => $mdre, + d_map => \%d_map, + d_re => $d_re, + d_type => \%d_type, pi_cfg => $cfg, imap => scalar keys %imap ? \%imap : undef, nntp => scalar keys %nntp? \%nntp : undef, @@ -141,6 +171,7 @@ sub _done_for_now { sub remove_eml_i { # each_inbox callback my ($ibx, $self, $eml, $loc) = @_; + return if $ibx->{-watch_disabled}; eval { # try to avoid taking a lock or unnecessary spawning @@ -214,17 +245,23 @@ sub import_eml ($$$) { sub _try_path { my ($self, $path) = @_; - my $fl = PublicInbox::MdirReader::maildir_path_flags($path) // return; - return if $fl =~ /[DT]/; # no Drafts or Trash - if ($path !~ $self->{mdre}) { - warn "unrecognized path: $path\n"; - return; - } - my $inboxes = $self->{mdmap}->{$1}; - unless ($inboxes) { - warn "unmappable dir: $1\n"; - return; - } + $path =~ $self->{d_re} or + return warn("BUG? unrecognized path: $path\n"); + my $dir = $1; + my $inboxes = $self->{d_map}->{$dir} // + return warn("W: unmappable dir: $dir\n"); + my ($md_fl, $mh_seq); + if ($self->{d_type}->{$dir} & D_MH) { + $path =~ m!/([0-9]+)\z! ? ($mh_seq = $1) : return; + } + $self->{d_type}->{$dir} & D_MAILDIR and + $md_fl = PublicInbox::MdirReader::maildir_path_flags($path); + $md_fl // $mh_seq // return; + return if ($md_fl // '') =~ /[DT]/; # no Drafts or Trash + # n.b. none of the MH keywords are relevant for public mail, + # mh_seq is only used to validate we're reading an email + # and not treating .mh_sequences as an email + my $warn_cb = $SIG{__WARN__} || \&CORE::warn; local $SIG{__WARN__} = sub { my $pfx = ($_[0] // '') =~ /^([A-Z]: )/g ? $1 : ''; @@ -244,25 +281,24 @@ sub quit_done ($) { return unless $self->{quit}; # don't have reliable wakeups, keep signalling - my $done = 1; - for (qw(idle_pids poll_pids)) { - my $pids = $self->{$_} or next; - for (keys %$pids) { - $done = undef if kill('QUIT', $_); - } - } - $done; + my $live = grep { kill('QUIT', $_) } keys %{$self->{pids}}; + add_timer(0.01, \&quit_done, $self) if $live; + $live == 0; } -sub quit { +sub quit { # may be called in IMAP/NNTP children my ($self) = @_; $self->{quit} = 1; %{$self->{opendirs}} = (); _done_for_now($self); quit_done($self); - if (my $idle_mic = $self->{idle_mic}) { + if (my $dir_idle = delete $self->{dir_idle}) { + $dir_idle->close if $dir_idle; + } + if (my $idle_mic = delete $self->{idle_mic}) { # IMAP child + return unless $idle_mic->IsConnected && $idle_mic->Socket; eval { $idle_mic->done }; - if ($@) { + if ($@ && $idle_mic->IsConnected && $idle_mic->Socket) { warn "IDLE DONE error: $@\n"; eval { $idle_mic->disconnect }; warn "IDLE LOGOUT error: $@\n" if $@; @@ -282,8 +318,8 @@ sub watch_fs_init ($) { }; require PublicInbox::DirIdle; # inotify_create + EPOLL_CTL_ADD - my $dir_idle = PublicInbox::DirIdle->new($cb); - $dir_idle->add_watches([keys %{$self->{mdmap}}]); + my $dir_idle = $self->{dir_idle} = PublicInbox::DirIdle->new($cb); + $dir_idle->add_watches([keys %{$self->{d_map}}]); } sub net_cb { # NetReader::(nntp|imap)_each callback @@ -318,7 +354,7 @@ sub imap_fetch_all ($$) { local $SIG{__WARN__} = sub { my $pfx = ($_[0] // '') =~ /^([A-Z]: |# )/g ? $1 : ''; my $uid = $self->{cur_uid}; - $warn_cb->("$pfx$uri", $uid ? ("UID:$uid") : (), "\n", @_); + $warn_cb->("$pfx$uri", $uid ? (" UID:$uid") : (), "\n", @_); }; PublicInbox::NetReader::imap_each($self, $uri, \&net_cb, $self, $self->{imap}->{$$uri}); @@ -328,7 +364,7 @@ sub imap_idle_once ($$$$) { my ($self, $mic, $intvl, $uri) = @_; my $i = $intvl //= (29 * 60); my $end = now() + $intvl; - warn "I: $uri idling for ${intvl}s\n"; + warn "# $uri idling for ${intvl}s\n"; local $0 = "IDLE $0"; return if $self->{quit}; unless ($mic->idle) { @@ -381,63 +417,42 @@ sub watch_imap_idle_1 ($$$) { sub watch_atfork_child ($) { my ($self) = @_; - delete $self->{idle_pids}; - delete $self->{poll_pids}; - delete $self->{opendirs}; - PublicInbox::DS->Reset; + delete @$self{qw(dir_idle pids opendirs)}; my $sig = delete $self->{sig}; - $sig->{CHLD} = 'DEFAULT'; + $sig->{CHLD} = $sig->{HUP} = $sig->{USR1} = 'DEFAULT'; + # TERM/QUIT/INT call ->quit, which works in both parent+child @SIG{keys %$sig} = values %$sig; - PublicInbox::DS::sig_setmask($self->{oldset}); + PublicInbox::DS::sig_setmask(PublicInbox::DS::allowset($sig)); } sub watch_atfork_parent ($) { _done_for_now($_[0]) } sub imap_idle_requeue { # DS::add_timer callback - my ($self, $uri_intvl) = @_; + my ($self, $uri, $intvl) = @_; return if $self->{quit}; - push @{$self->{idle_todo}}, $uri_intvl; + push @{$self->{idle_todo}}, $uri, $intvl; event_step($self); } -sub imap_idle_reap { # PublicInbox::DS::dwaitpid callback - my ($self, $pid) = @_; - my $uri_intvl = delete $self->{idle_pids}->{$pid} or - die "BUG: PID=$pid (unknown) reaped: \$?=$?\n"; - - my ($uri, $intvl) = @$uri_intvl; +sub imap_idle_reap { # awaitpid callback + my ($pid, $self, $uri, $intvl) = @_; + delete $self->{pids}->{$pid}; return if $self->{quit}; warn "W: PID=$pid on $uri died: \$?=$?\n" if $?; - add_timer(60, \&imap_idle_requeue, $self, $uri_intvl); + add_timer(60, \&imap_idle_requeue, $self, $uri, $intvl); } -sub reap { # callback for EOFpipe - my ($pid, $cb, $self) = @{$_[0]}; - my $ret = waitpid($pid, 0); - if ($ret == $pid) { - $cb->($self, $pid); # poll_fetch_reap || imap_idle_reap - } else { - warn "W: waitpid($pid) => ", $ret // "($!)", "\n"; - } -} - -sub imap_idle_fork ($$) { - my ($self, $uri_intvl) = @_; - my ($uri, $intvl) = @$uri_intvl; - pipe(my ($r, $w)) or die "pipe: $!"; - my $seed = rand(0xffffffff); - my $pid = fork // die "fork: $!"; +sub imap_idle_fork { + my ($self, $uri, $intvl) = @_; + return if $self->{quit}; + my $pid = PublicInbox::DS::fork_persist; if ($pid == 0) { - srand($seed); - eval { Net::SSLeay::randomize() }; - close $r; watch_atfork_child($self); watch_imap_idle_1($self, $uri, $intvl); - close $w; _exit(0); } - $self->{idle_pids}->{$pid} = $uri_intvl; - PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&imap_idle_reap, $self]); + $self->{pids}->{$pid} = undef; + awaitpid($pid, \&imap_idle_reap, $self, $uri, $intvl); } sub event_step { @@ -447,13 +462,13 @@ sub event_step { if ($idle_todo && @$idle_todo) { watch_atfork_parent($self); eval { - while (my $uri_intvl = shift(@$idle_todo)) { - imap_idle_fork($self, $uri_intvl); + while (my ($uri, $intvl) = splice(@$idle_todo, 0, 2)) { + imap_idle_fork($self, $uri, $intvl); } }; die $@ if $@; } - fs_scan_step($self) if $self->{mdre}; + fs_scan_step($self) if $self->{d_re}; } sub watch_imap_fetch_all ($$) { @@ -474,7 +489,7 @@ sub watch_nntp_fetch_all ($$) { local $SIG{__WARN__} = sub { my $pfx = ($_[0] // '') =~ /^([A-Z]: |# )/g ? $1 : ''; my $art = $self->{cur_uid}; - $warn_cb->("$pfx$uri", $art ? ("ARTICLE $art") : (), "\n", @_); + $warn_cb->("$pfx$uri", $art ? (" ARTICLE $art") : (), "\n", @_); }; for $uri (@$uris) { PublicInbox::NetReader::nntp_each($self, $uri, \&net_cb, $self, @@ -486,52 +501,44 @@ sub watch_nntp_fetch_all ($$) { sub poll_fetch_fork { # DS::add_timer callback my ($self, $intvl, $uris) = @_; return if $self->{quit}; - pipe(my ($r, $w)) or die "pipe: $!"; watch_atfork_parent($self); - my $seed = rand(0xffffffff); - my $pid = fork; - if (defined($pid) && $pid == 0) { - srand($seed); - eval { Net::SSLeay::randomize() }; - close $r; + my @nntp; + my @imap = grep { # push() always returns > 0 + $_->scheme =~ m!\Aimaps?!i ? 1 : (push(@nntp, $_) < 0) + } @$uris; + my $pid = PublicInbox::DS::fork_persist; + if ($pid == 0) { watch_atfork_child($self); - if ($uris->[0]->scheme =~ m!\Aimaps?!i) { - watch_imap_fetch_all($self, $uris); - } else { - watch_nntp_fetch_all($self, $uris); - } - close $w; + watch_imap_fetch_all($self, \@imap) if @imap; + watch_nntp_fetch_all($self, \@nntp) if @nntp; _exit(0); } - die "fork: $!" unless defined $pid; - $self->{poll_pids}->{$pid} = [ $intvl, $uris ]; - PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&poll_fetch_reap, $self]); + $self->{pids}->{$pid} = undef; + awaitpid($pid, \&poll_fetch_reap, $self, $intvl, $uris); } -sub poll_fetch_reap { - my ($self, $pid) = @_; - my $intvl_uris = delete $self->{poll_pids}->{$pid} or - die "BUG: PID=$pid (unknown) reaped: \$?=$?\n"; +sub poll_fetch_reap { # awaitpid callback + my ($pid, $self, $intvl, $uris) = @_; + delete $self->{pids}->{$pid}; return if $self->{quit}; - my ($intvl, $uris) = @$intvl_uris; if ($?) { warn "W: PID=$pid died: \$?=$?\n", map { "$_\n" } @$uris; } - warn("I: will check $_ in ${intvl}s\n") for @$uris; + warn("# will check $_ in ${intvl}s\n") for @$uris; add_timer($intvl, \&poll_fetch_fork, $self, $intvl, $uris); } sub watch_imap_init ($$) { my ($self, $poll) = @_; - my $mics = PublicInbox::NetReader::imap_common_init($self); - my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ] + my $mics = PublicInbox::NetReader::imap_common_init($self) or return; + my $idle = []; # [ uri1, intvl1, uri2, intvl2 ] for my $uri (@{$self->{imap_order}}) { my $sec = uri_section($uri); my $mic = $mics->{$sec}; my $intvl = $self->{cfg_opt}->{$sec}->{pollInterval}; if ($mic->has_capability('IDLE') && !$intvl) { $intvl = $self->{cfg_opt}->{$sec}->{idleInterval}; - push @$idle, [ $uri, $intvl // () ]; + push @$idle, $uri, $intvl; } else { push @{$poll->{$intvl || 120}}, $uri; } @@ -552,10 +559,12 @@ sub watch_nntp_init ($$) { } } +sub quit_inprogress { !$_[0]->quit_done } # post_loop_do CB + sub watch { # main entry point - my ($self, $sig, $oldset) = @_; - $self->{oldset} = $oldset; - $self->{sig} = $sig; + my ($self, $sig) = @_; + my $first_sig; + $self->{sig} //= ($first_sig = $sig); my $poll = {}; # intvl_seconds => [ uri1, uri2 ] watch_imap_init($self, $poll) if $self->{imap}; watch_nntp_init($self, $poll) if $self->{nntp}; @@ -563,9 +572,9 @@ sub watch { # main entry point # poll all URIs for a given interval sequentially add_timer(0, \&poll_fetch_fork, $self, $intvl, $uris); } - watch_fs_init($self) if $self->{mdre}; - PublicInbox::DS->SetPostLoopCallback(sub { !$self->quit_done }); - PublicInbox::DS::event_loop($sig, $oldset); # calls ->event_step + watch_fs_init($self) if $self->{d_re}; + local @PublicInbox::DS::post_loop_do = (\&quit_inprogress, $self); + PublicInbox::DS::event_loop($first_sig); # calls ->event_step _done_for_now($self); } @@ -594,7 +603,7 @@ sub fs_scan_step { $opendirs->{$dir} = $dh if $n < 0; } if ($op && $op eq 'full') { - foreach my $dir (keys %{$self->{mdmap}}) { + foreach my $dir (keys %{$self->{d_map}}) { next if $opendirs->{$dir}; # already in progress my $ok = opendir(my $dh, $dir); unless ($ok) { @@ -669,6 +678,13 @@ sub is_maildir { $_[0]; } +sub is_mh { + $_[0] =~ s!\Amh:!!i or return; + $_[0] =~ tr!/!/!s; + $_[0] =~ s!/\z!!; + $_[0]; +} + sub is_watchspam { my ($cur, $ws, $ibx) = @_; if ($ws && !ref($ws) && $ws eq 'watchspam') { |