about summary refs log tree commit homepage
path: root/lib/PublicInbox/Watch.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/Watch.pm')
-rw-r--r--lib/PublicInbox/Watch.pm246
1 files changed, 131 insertions, 115 deletions
diff --git a/lib/PublicInbox/Watch.pm b/lib/PublicInbox/Watch.pm
index 3f6fe21b..eb90d353 100644
--- a/lib/PublicInbox/Watch.pm
+++ b/lib/PublicInbox/Watch.pm
@@ -1,4 +1,4 @@
-# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 #
 # ref: https://cr.yp.to/proto/maildir.html
@@ -12,11 +12,11 @@ use PublicInbox::MdirReader;
 use PublicInbox::NetReader;
 use PublicInbox::Filter::Base qw(REJECT);
 use PublicInbox::Spamcheck;
-use PublicInbox::DS qw(now add_timer);
+use PublicInbox::DS qw(now add_timer awaitpid);
 use PublicInbox::MID qw(mids);
 use PublicInbox::ContentHash qw(content_hash);
-use PublicInbox::EOFpipe;
 use POSIX qw(_exit WNOHANG);
+use constant { D_MAILDIR => 1, D_MH => 2 };
 
 sub compile_watchheaders ($) {
         my ($ibx) = @_;
@@ -41,11 +41,25 @@ sub compile_watchheaders ($) {
         $ibx->{-watchheaders} = $watch_hdrs if scalar @$watch_hdrs;
 }
 
+sub d_type_set ($$$) {
+        my ($d_type, $dir, $is) = @_;
+        my $isnt = D_MAILDIR;
+        if ($is == D_MAILDIR) {
+                $isnt = D_MH;
+                $d_type->{"$dir/cur"} |= $is;
+                $d_type->{"$dir/new"} |= $is;
+        }
+        warn <<EOM if ($d_type->{$dir} |= $is) & $isnt;
+W: `$dir' is both Maildir and MH (non-fatal)
+EOM
+}
+
 sub new {
         my ($class, $cfg) = @_;
-        my (%mdmap, $spamc);
+        my (%d_map, %d_type);
         my (%imap, %nntp); # url => [inbox objects] or 'watchspam'
         my (@imap, @nntp);
+        PublicInbox::Import::load_config($cfg);
 
         # "publicinboxwatch" is the documented namespace
         # "publicinboxlearn" is legacy but may be supported
@@ -57,7 +71,11 @@ sub new {
                         my $uri;
                         if (is_maildir($dir)) {
                                 # skip "new", no MUA has seen it, yet.
-                                $mdmap{"$dir/cur"} = 'watchspam';
+                                $d_map{"$dir/cur"} = 'watchspam';
+                                d_type_set \%d_type, $dir, D_MAILDIR;
+                        } elsif (is_mh($dir)) {
+                                $d_map{$dir} = 'watchspam';
+                                d_type_set \%d_type, $dir, D_MH;
                         } elsif ($uri = imap_uri($dir)) {
                                 $imap{$$uri} = 'watchspam';
                                 push @imap, $uri;
@@ -69,7 +87,6 @@ sub new {
                         }
                 }
         }
-
         my $k = 'publicinboxwatch.spamcheck';
         my $default = undef;
         my $spamcheck = PublicInbox::Spamcheck::get($cfg, $k, $default);
@@ -80,16 +97,28 @@ sub new {
                 my $ibx = $_[0] = PublicInbox::InboxWritable->new($_[0]);
 
                 my $watches = $ibx->{watch} or return;
+
+                $ibx->{indexlevel} //= $ibx->detect_indexlevel;
                 $watches = PublicInbox::Config::_array($watches);
                 for my $watch (@$watches) {
                         my $uri;
-                        if (is_maildir($watch)) {
+                        my $bool = $cfg->git_bool($watch);
+                        if (defined $bool && !$bool) {
+                                $ibx->{-watch_disabled} = 1;
+                        } elsif (is_maildir($watch)) {
                                 compile_watchheaders($ibx);
                                 my ($new, $cur) = ("$watch/new", "$watch/cur");
-                                my $cur_dst = $mdmap{$cur} //= [];
+                                my $cur_dst = $d_map{$cur} //= [];
                                 return if is_watchspam($cur, $cur_dst, $ibx);
-                                push @{$mdmap{$new} //= []}, $ibx;
+                                push @{$d_map{$new} //= []}, $ibx;
                                 push @$cur_dst, $ibx;
+                                d_type_set \%d_type, $watch, D_MAILDIR;
+                        } elsif (is_mh($watch)) {
+                                my $cur_dst = $d_map{$watch} //= [];
+                                return if is_watchspam($watch, $cur_dst, $ibx);
+                                compile_watchheaders($ibx);
+                                push(@$cur_dst, $ibx);
+                                d_type_set \%d_type, $watch, D_MH;
                         } elsif ($uri = imap_uri($watch)) {
                                 my $cur_dst = $imap{$$uri} //= [];
                                 return if is_watchspam($uri, $cur_dst, $ibx);
@@ -106,18 +135,19 @@ sub new {
                 }
         });
 
-        my $mdre;
-        if (scalar keys %mdmap) {
-                $mdre = join('|', map { quotemeta($_) } keys %mdmap);
-                $mdre = qr!\A($mdre)/!;
+        my $d_re;
+        if (scalar keys %d_map) {
+                $d_re = join('|', map quotemeta, keys %d_map);
+                $d_re = qr!\A($d_re)/!;
         }
-        return unless $mdre || scalar(keys %imap) || scalar(keys %nntp);
+        return unless $d_re || scalar(keys %imap) || scalar(keys %nntp);
 
         bless {
                 max_batch => 10, # avoid hogging locks for too long
                 spamcheck => $spamcheck,
-                mdmap => \%mdmap,
-                mdre => $mdre,
+                d_map => \%d_map,
+                d_re => $d_re,
+                d_type => \%d_type,
                 pi_cfg => $cfg,
                 imap => scalar keys %imap ? \%imap : undef,
                 nntp => scalar keys %nntp? \%nntp : undef,
@@ -141,6 +171,7 @@ sub _done_for_now {
 
 sub remove_eml_i { # each_inbox callback
         my ($ibx, $self, $eml, $loc) = @_;
+        return if $ibx->{-watch_disabled};
 
         eval {
                 # try to avoid taking a lock or unnecessary spawning
@@ -214,17 +245,23 @@ sub import_eml ($$$) {
 
 sub _try_path {
         my ($self, $path) = @_;
-        my $fl = PublicInbox::MdirReader::maildir_path_flags($path) // return;
-        return if $fl =~ /[DT]/; # no Drafts or Trash
-        if ($path !~ $self->{mdre}) {
-                warn "unrecognized path: $path\n";
-                return;
-        }
-        my $inboxes = $self->{mdmap}->{$1};
-        unless ($inboxes) {
-                warn "unmappable dir: $1\n";
-                return;
-        }
+        $path =~ $self->{d_re} or
+                return warn("BUG? unrecognized path: $path\n");
+        my $dir = $1;
+        my $inboxes = $self->{d_map}->{$dir} //
+                return warn("W: unmappable dir: $dir\n");
+        my ($md_fl, $mh_seq);
+        if ($self->{d_type}->{$dir} & D_MH) {
+                $path =~ m!/([0-9]+)\z! ? ($mh_seq = $1) : return;
+        }
+        $self->{d_type}->{$dir} & D_MAILDIR and
+                $md_fl = PublicInbox::MdirReader::maildir_path_flags($path);
+        $md_fl // $mh_seq // return;
+        return if ($md_fl // '') =~ /[DT]/; # no Drafts or Trash
+        # n.b. none of the MH keywords are relevant for public mail,
+        # mh_seq is only used to validate we're reading an email
+        # and not treating .mh_sequences as an email
+
         my $warn_cb = $SIG{__WARN__} || \&CORE::warn;
         local $SIG{__WARN__} = sub {
                 my $pfx = ($_[0] // '') =~ /^([A-Z]: )/g ? $1 : '';
@@ -244,25 +281,24 @@ sub quit_done ($) {
         return unless $self->{quit};
 
         # don't have reliable wakeups, keep signalling
-        my $done = 1;
-        for (qw(idle_pids poll_pids)) {
-                my $pids = $self->{$_} or next;
-                for (keys %$pids) {
-                        $done = undef if kill('QUIT', $_);
-                }
-        }
-        $done;
+        my $live = grep { kill('QUIT', $_) } keys %{$self->{pids}};
+        add_timer(0.01, \&quit_done, $self) if $live;
+        $live == 0;
 }
 
-sub quit {
+sub quit { # may be called in IMAP/NNTP children
         my ($self) = @_;
         $self->{quit} = 1;
         %{$self->{opendirs}} = ();
         _done_for_now($self);
         quit_done($self);
-        if (my $idle_mic = $self->{idle_mic}) {
+        if (my $dir_idle = delete $self->{dir_idle}) {
+                $dir_idle->close if $dir_idle;
+        }
+        if (my $idle_mic = delete $self->{idle_mic}) { # IMAP child
+                return unless $idle_mic->IsConnected && $idle_mic->Socket;
                 eval { $idle_mic->done };
-                if ($@) {
+                if ($@ && $idle_mic->IsConnected && $idle_mic->Socket) {
                         warn "IDLE DONE error: $@\n";
                         eval { $idle_mic->disconnect };
                         warn "IDLE LOGOUT error: $@\n" if $@;
@@ -282,8 +318,8 @@ sub watch_fs_init ($) {
         };
         require PublicInbox::DirIdle;
         # inotify_create + EPOLL_CTL_ADD
-        my $dir_idle = PublicInbox::DirIdle->new($cb);
-        $dir_idle->add_watches([keys %{$self->{mdmap}}]);
+        my $dir_idle = $self->{dir_idle} = PublicInbox::DirIdle->new($cb);
+        $dir_idle->add_watches([keys %{$self->{d_map}}]);
 }
 
 sub net_cb { # NetReader::(nntp|imap)_each callback
@@ -318,7 +354,7 @@ sub imap_fetch_all ($$) {
         local $SIG{__WARN__} = sub {
                 my $pfx = ($_[0] // '') =~ /^([A-Z]: |# )/g ? $1 : '';
                 my $uid = $self->{cur_uid};
-                $warn_cb->("$pfx$uri", $uid ? ("UID:$uid") : (), "\n", @_);
+                $warn_cb->("$pfx$uri", $uid ? (" UID:$uid") : (), "\n", @_);
         };
         PublicInbox::NetReader::imap_each($self, $uri, \&net_cb, $self,
                                         $self->{imap}->{$$uri});
@@ -328,7 +364,7 @@ sub imap_idle_once ($$$$) {
         my ($self, $mic, $intvl, $uri) = @_;
         my $i = $intvl //= (29 * 60);
         my $end = now() + $intvl;
-        warn "I: $uri idling for ${intvl}s\n";
+        warn "# $uri idling for ${intvl}s\n";
         local $0 = "IDLE $0";
         return if $self->{quit};
         unless ($mic->idle) {
@@ -381,63 +417,42 @@ sub watch_imap_idle_1 ($$$) {
 
 sub watch_atfork_child ($) {
         my ($self) = @_;
-        delete $self->{idle_pids};
-        delete $self->{poll_pids};
-        delete $self->{opendirs};
-        PublicInbox::DS->Reset;
+        delete @$self{qw(dir_idle pids opendirs)};
         my $sig = delete $self->{sig};
-        $sig->{CHLD} = 'DEFAULT';
+        $sig->{CHLD} = $sig->{HUP} = $sig->{USR1} = 'DEFAULT';
+        # TERM/QUIT/INT call ->quit, which works in both parent+child
         @SIG{keys %$sig} = values %$sig;
-        PublicInbox::DS::sig_setmask($self->{oldset});
+        PublicInbox::DS::sig_setmask(PublicInbox::DS::allowset($sig));
 }
 
 sub watch_atfork_parent ($) { _done_for_now($_[0]) }
 
 sub imap_idle_requeue { # DS::add_timer callback
-        my ($self, $uri_intvl) = @_;
+        my ($self, $uri, $intvl) = @_;
         return if $self->{quit};
-        push @{$self->{idle_todo}}, $uri_intvl;
+        push @{$self->{idle_todo}}, $uri, $intvl;
         event_step($self);
 }
 
-sub imap_idle_reap { # PublicInbox::DS::dwaitpid callback
-        my ($self, $pid) = @_;
-        my $uri_intvl = delete $self->{idle_pids}->{$pid} or
-                die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";
-
-        my ($uri, $intvl) = @$uri_intvl;
+sub imap_idle_reap { # awaitpid callback
+        my ($pid, $self, $uri, $intvl) = @_;
+        delete $self->{pids}->{$pid};
         return if $self->{quit};
         warn "W: PID=$pid on $uri died: \$?=$?\n" if $?;
-        add_timer(60, \&imap_idle_requeue, $self, $uri_intvl);
+        add_timer(60, \&imap_idle_requeue, $self, $uri, $intvl);
 }
 
-sub reap { # callback for EOFpipe
-        my ($pid, $cb, $self) = @{$_[0]};
-        my $ret = waitpid($pid, 0);
-        if ($ret == $pid) {
-                $cb->($self, $pid); # poll_fetch_reap || imap_idle_reap
-        } else {
-                warn "W: waitpid($pid) => ", $ret // "($!)", "\n";
-        }
-}
-
-sub imap_idle_fork ($$) {
-        my ($self, $uri_intvl) = @_;
-        my ($uri, $intvl) = @$uri_intvl;
-        pipe(my ($r, $w)) or die "pipe: $!";
-        my $seed = rand(0xffffffff);
-        my $pid = fork // die "fork: $!";
+sub imap_idle_fork {
+        my ($self, $uri, $intvl) = @_;
+        return if $self->{quit};
+        my $pid = PublicInbox::DS::fork_persist;
         if ($pid == 0) {
-                srand($seed);
-                eval { Net::SSLeay::randomize() };
-                close $r;
                 watch_atfork_child($self);
                 watch_imap_idle_1($self, $uri, $intvl);
-                close $w;
                 _exit(0);
         }
-        $self->{idle_pids}->{$pid} = $uri_intvl;
-        PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&imap_idle_reap, $self]);
+        $self->{pids}->{$pid} = undef;
+        awaitpid($pid, \&imap_idle_reap, $self, $uri, $intvl);
 }
 
 sub event_step {
@@ -447,13 +462,13 @@ sub event_step {
         if ($idle_todo && @$idle_todo) {
                 watch_atfork_parent($self);
                 eval {
-                        while (my $uri_intvl = shift(@$idle_todo)) {
-                                imap_idle_fork($self, $uri_intvl);
+                        while (my ($uri, $intvl) = splice(@$idle_todo, 0, 2)) {
+                                imap_idle_fork($self, $uri, $intvl);
                         }
                 };
                 die $@ if $@;
         }
-        fs_scan_step($self) if $self->{mdre};
+        fs_scan_step($self) if $self->{d_re};
 }
 
 sub watch_imap_fetch_all ($$) {
@@ -474,7 +489,7 @@ sub watch_nntp_fetch_all ($$) {
         local $SIG{__WARN__} = sub {
                 my $pfx = ($_[0] // '') =~ /^([A-Z]: |# )/g ? $1 : '';
                 my $art = $self->{cur_uid};
-                $warn_cb->("$pfx$uri", $art ? ("ARTICLE $art") : (), "\n", @_);
+                $warn_cb->("$pfx$uri", $art ? (" ARTICLE $art") : (), "\n", @_);
         };
         for $uri (@$uris) {
                 PublicInbox::NetReader::nntp_each($self, $uri, \&net_cb, $self,
@@ -486,52 +501,44 @@ sub watch_nntp_fetch_all ($$) {
 sub poll_fetch_fork { # DS::add_timer callback
         my ($self, $intvl, $uris) = @_;
         return if $self->{quit};
-        pipe(my ($r, $w)) or die "pipe: $!";
         watch_atfork_parent($self);
-        my $seed = rand(0xffffffff);
-        my $pid = fork;
-        if (defined($pid) && $pid == 0) {
-                srand($seed);
-                eval { Net::SSLeay::randomize() };
-                close $r;
+        my @nntp;
+        my @imap = grep { # push() always returns > 0
+                $_->scheme =~ m!\Aimaps?!i ? 1 : (push(@nntp, $_) < 0)
+        } @$uris;
+        my $pid = PublicInbox::DS::fork_persist;
+        if ($pid == 0) {
                 watch_atfork_child($self);
-                if ($uris->[0]->scheme =~ m!\Aimaps?!i) {
-                        watch_imap_fetch_all($self, $uris);
-                } else {
-                        watch_nntp_fetch_all($self, $uris);
-                }
-                close $w;
+                watch_imap_fetch_all($self, \@imap) if @imap;
+                watch_nntp_fetch_all($self, \@nntp) if @nntp;
                 _exit(0);
         }
-        die "fork: $!"  unless defined $pid;
-        $self->{poll_pids}->{$pid} = [ $intvl, $uris ];
-        PublicInbox::EOFpipe->new($r, \&reap, [$pid, \&poll_fetch_reap, $self]);
+        $self->{pids}->{$pid} = undef;
+        awaitpid($pid, \&poll_fetch_reap, $self, $intvl, $uris);
 }
 
-sub poll_fetch_reap {
-        my ($self, $pid) = @_;
-        my $intvl_uris = delete $self->{poll_pids}->{$pid} or
-                die "BUG: PID=$pid (unknown) reaped: \$?=$?\n";
+sub poll_fetch_reap { # awaitpid callback
+        my ($pid, $self, $intvl, $uris) = @_;
+        delete $self->{pids}->{$pid};
         return if $self->{quit};
-        my ($intvl, $uris) = @$intvl_uris;
         if ($?) {
                 warn "W: PID=$pid died: \$?=$?\n", map { "$_\n" } @$uris;
         }
-        warn("I: will check $_ in ${intvl}s\n") for @$uris;
+        warn("# will check $_ in ${intvl}s\n") for @$uris;
         add_timer($intvl, \&poll_fetch_fork, $self, $intvl, $uris);
 }
 
 sub watch_imap_init ($$) {
         my ($self, $poll) = @_;
-        my $mics = PublicInbox::NetReader::imap_common_init($self);
-        my $idle = []; # [ [ uri1, intvl1 ], [uri2, intvl2] ]
+        my $mics = PublicInbox::NetReader::imap_common_init($self) or return;
+        my $idle = []; # [ uri1, intvl1, uri2, intvl2 ]
         for my $uri (@{$self->{imap_order}}) {
                 my $sec = uri_section($uri);
                 my $mic = $mics->{$sec};
                 my $intvl = $self->{cfg_opt}->{$sec}->{pollInterval};
                 if ($mic->has_capability('IDLE') && !$intvl) {
                         $intvl = $self->{cfg_opt}->{$sec}->{idleInterval};
-                        push @$idle, [ $uri, $intvl // () ];
+                        push @$idle, $uri, $intvl;
                 } else {
                         push @{$poll->{$intvl || 120}}, $uri;
                 }
@@ -552,10 +559,12 @@ sub watch_nntp_init ($$) {
         }
 }
 
+sub quit_inprogress { !$_[0]->quit_done } # post_loop_do CB
+
 sub watch { # main entry point
-        my ($self, $sig, $oldset) = @_;
-        $self->{oldset} = $oldset;
-        $self->{sig} = $sig;
+        my ($self, $sig) = @_;
+        my $first_sig;
+        $self->{sig} //= ($first_sig = $sig);
         my $poll = {}; # intvl_seconds => [ uri1, uri2 ]
         watch_imap_init($self, $poll) if $self->{imap};
         watch_nntp_init($self, $poll) if $self->{nntp};
@@ -563,9 +572,9 @@ sub watch { # main entry point
                 # poll all URIs for a given interval sequentially
                 add_timer(0, \&poll_fetch_fork, $self, $intvl, $uris);
         }
-        watch_fs_init($self) if $self->{mdre};
-        PublicInbox::DS->SetPostLoopCallback(sub { !$self->quit_done });
-        PublicInbox::DS::event_loop($sig, $oldset); # calls ->event_step
+        watch_fs_init($self) if $self->{d_re};
+        local @PublicInbox::DS::post_loop_do = (\&quit_inprogress, $self);
+        PublicInbox::DS::event_loop($first_sig); # calls ->event_step
         _done_for_now($self);
 }
 
@@ -594,7 +603,7 @@ sub fs_scan_step {
                 $opendirs->{$dir} = $dh if $n < 0;
         }
         if ($op && $op eq 'full') {
-                foreach my $dir (keys %{$self->{mdmap}}) {
+                foreach my $dir (keys %{$self->{d_map}}) {
                         next if $opendirs->{$dir}; # already in progress
                         my $ok = opendir(my $dh, $dir);
                         unless ($ok) {
@@ -669,6 +678,13 @@ sub is_maildir {
         $_[0];
 }
 
+sub is_mh {
+        $_[0] =~ s!\Amh:!!i or return;
+        $_[0] =~ tr!/!/!s;
+        $_[0] =~ s!/\z!!;
+        $_[0];
+}
+
 sub is_watchspam {
         my ($cur, $ws, $ibx) = @_;
         if ($ws && !ref($ws) && $ws eq 'watchspam') {