about summary refs log tree commit homepage
path: root/lib/PublicInbox/LEI.pm
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2021-07-19 08:59:35 +0000
committerEric Wong <e@80x24.org>2021-07-22 02:29:00 +0000
commit5b4fde37adefa37508d131dbe013353ef3345051 (patch)
treebfd68b4f1fedf94464776a5760a4dc531f1d097e /lib/PublicInbox/LEI.pm
parent7e1c18af5468c7708e28de759911ec5542d23c4b (diff)
downloadpublic-inbox-5b4fde37adefa37508d131dbe013353ef3345051.tar.gz
This allows lei to automatically note keyword (message flag)
changes made to a Maildir and propagate it into lei/store:

	lei add-watch --state=tag-ro /path/to/Maildir

This doesn't persist across restarts, yet.  In the future,
it will be applied automatically to "lei q" output Maildirs
by default (with an option to disable it).

State values of tag-rw, index-<ro|rw>, import-<ro|rw> will all
be supported for Maildir.

This represents a fairly major internal change that's fairly
intrusive, but the whole daemon-oriented design was to
facilitate being able to automatically monitor (and propagate)
Maildir/IMAP flag changes.
Diffstat (limited to 'lib/PublicInbox/LEI.pm')
-rw-r--r--lib/PublicInbox/LEI.pm116
1 files changed, 104 insertions, 12 deletions
diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm
index a9f5edae..b92d7512 100644
--- a/lib/PublicInbox/LEI.pm
+++ b/lib/PublicInbox/LEI.pm
@@ -28,14 +28,15 @@ use Time::HiRes qw(stat); # ctime comparisons for config cache
 use File::Path qw(mkpath);
 use File::Spec;
 our $quit = \&CORE::exit;
-our ($current_lei, $errors_log, $listener, $oldset, $dir_idle);
-my ($recv_cmd, $send_cmd);
+our ($current_lei, $errors_log, $listener, $oldset, $dir_idle,
+        $recv_cmd, $send_cmd);
 my $GLP = Getopt::Long::Parser->new;
 $GLP->configure(qw(gnu_getopt no_ignore_case auto_abbrev));
 my $GLP_PASS = Getopt::Long::Parser->new;
 $GLP_PASS->configure(qw(gnu_getopt no_ignore_case auto_abbrev pass_through));
 
 our %PATH2CFG; # persistent for socket daemon
+our $MDIR2CFGPATH; # /path/to/maildir => { /path/to/config => undef }
 
 # TBD: this is a documentation mechanism to show a subcommand
 # (may) pass options through to another command:
@@ -232,10 +233,9 @@ our %CMD = ( # sorted in order of importance/use:
         qw(exact! all jobs:i indexed), @c_opt ],
 
 'add-watch' => [ 'LOCATION', 'watch for new messages and flag changes',
-        qw(import! kw! interval=s recursive|r
-        exclude=s include=s), @c_opt ],
+        qw(poll-interval=s state=s recursive|r), @c_opt ],
 'ls-watch' => [ '[FILTER...]', 'list active watches with numbers and status',
-                qw(format|f=s z), @c_opt ],
+                qw(l z|0), @c_opt ],
 'pause-watch' => [ '[WATCH_NUMBER_OR_FILTER]', qw(all local remote), @c_opt ],
 'resume-watch' => [ '[WATCH_NUMBER_OR_FILTER]', qw(all local remote), @c_opt ],
 'forget-watch' => [ '{WATCH_NUMBER|--prune}', 'stop and forget a watch',
@@ -391,6 +391,7 @@ my %OPTDESC = (
 'format|f=s        ls-search' => ['OUT|json|jsonl|concatjson',
                         'listing output format' ],
 'l        ls-search' => 'long listing format',
+'l        ls-watch' => 'long listing format',
 'l        ls-mail-source' => 'long listing format',
 'url        ls-mail-source' => 'show full URL of newsgroup or IMAP folder',
 'format|f=s        ls-external' => $ls_format,
@@ -435,7 +436,7 @@ my %CONFIG_KEYS = (
         'leistore.dir' => 'top-level storage location',
 );
 
-my @WQ_KEYS = qw(lxs l2m ikw pmd wq1); # internal workers
+my @WQ_KEYS = qw(lxs l2m ikw pmd wq1 lne); # internal workers
 
 sub _drop_wq {
         my ($self) = @_;
@@ -538,7 +539,7 @@ sub _lei_atfork_child {
                 chdir '/' or die "chdir(/): $!";
                 close($_) for (grep(defined, delete @$self{qw(0 1 2 sock)}));
                 if (my $cfg = $self->{cfg}) {
-                        delete $cfg->{-lei_store};
+                        delete @$cfg{qw(-lei_store -watches -lei_note_event)};
                 }
         } else { # worker, Net::NNTP (Net::Cmd) uses STDERR directly
                 open STDERR, '+>&='.fileno($self->{2}) or warn "open $!";
@@ -555,6 +556,8 @@ sub _lei_atfork_child {
         undef $listener;
         undef $dir_idle;
         %PATH2CFG = ();
+        $MDIR2CFGPATH = {};
+        eval 'no warnings; undef $PublicInbox::LeiNoteEvent::to_flush';
         undef $errors_log;
         $quit = \&CORE::exit;
         $self->{-eml_noisy} or # only "lei import" sets this atm
@@ -781,10 +784,12 @@ sub _lei_cfg ($;$) {
         my $f = _config_path($self);
         my @st = stat($f);
         my $cur_st = @st ? pack('dd', $st[10], $st[7]) : ''; # 10:ctime, 7:size
-        my ($sto, $sto_dir);
+        my ($sto, $sto_dir, $watches, $lne);
         if (my $cfg = $PATH2CFG{$f}) { # reuse existing object in common case
                 return ($self->{cfg} = $cfg) if $cur_st eq $cfg->{-st};
-                ($sto, $sto_dir) = @$cfg{qw(-lei_store leistore.dir)};
+                ($sto, $sto_dir, $watches, $lne) =
+                                @$cfg{qw(-lei_store leistore.dir -watches
+                                        -lei_note_event)};
         }
         if (!@st) {
                 unless ($creat) {
@@ -805,6 +810,8 @@ sub _lei_cfg ($;$) {
                         eq File::Spec->canonpath($cfg->{'leistore.dir'} //
                                                 store_path($self))) {
                 $cfg->{-lei_store} = $sto;
+                $cfg->{-lei_note_event} = $lne;
+                $cfg->{-watches} = $watches if $watches;
         }
         if (scalar(keys %PATH2CFG) > 5) {
                 # FIXME: use inotify/EVFILT_VNODE to detect unlinked configs
@@ -817,7 +824,7 @@ sub _lei_cfg ($;$) {
 
 sub _lei_store ($;$) {
         my ($self, $creat) = @_;
-        my $cfg = _lei_cfg($self, $creat);
+        my $cfg = _lei_cfg($self, $creat) // return;
         $cfg->{-lei_store} //= do {
                 require PublicInbox::LeiStore;
                 my $dir = $cfg->{'leistore.dir'} // store_path($self);
@@ -1126,6 +1133,53 @@ sub dump_and_clear_log {
         }
 }
 
+sub cfg2lei ($) {
+        my ($cfg) = @_;
+        my $lei = bless { env => { %{$cfg->{-env}} } }, __PACKAGE__;
+        open($lei->{0}, '<&', \*STDIN) or die "dup 0: $!";
+        open($lei->{1}, '>>&', \*STDOUT) or die "dup 1: $!";
+        open($lei->{2}, '>>&', \*STDERR) or die "dup 2: $!";
+        open($lei->{3}, '/') or die "open /: $!";
+        chdir($lei->{3}) or die "chdir /': $!";
+        my ($x, $y);
+        socketpair($x, $y, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
+        $lei->{sock} = $x;
+        require PublicInbox::LeiSelfSocket;
+        PublicInbox::LeiSelfSocket->new($y); # adds to event loop
+        $lei;
+}
+
+sub dir_idle_handler ($) { # PublicInbox::DirIdle callback
+        my ($ev) = @_; # Linux::Inotify2::Event or duck type
+        my $fn = $ev->fullname;
+        if ($fn =~ m!\A(.+)/(new|cur)/([^/]+)\z!) { # Maildir file
+                my ($mdir, $nc, $bn) = ($1, $2, $3);
+                $nc = '' if $ev->IN_DELETE;
+                for my $f (keys %{$MDIR2CFGPATH->{$mdir} // {}}) {
+                        my $cfg = $PATH2CFG{$f} // next;
+                        eval {
+                                local %ENV = %{$cfg->{-env}};
+                                my $lei = cfg2lei($cfg);
+                                $lei->dispatch('note-event',
+                                                "maildir:$mdir", $nc, $bn, $fn);
+                        };
+                        warn "E note-event $f: $@\n" if $@;
+                }
+        }
+        if ($ev->can('cancel') && ($ev->IN_IGNORE || $ev->IN_UNMOUNT)) {
+                $ev->cancel;
+        }
+        if ($fn =~ m!\A(.+)/(?:new|cur)\z! && !-e $fn) {
+                delete $MDIR2CFGPATH->{$1};
+        }
+        if (!-e $fn) { # config file or Maildir gone
+                for my $cfgpaths (values %$MDIR2CFGPATH) {
+                        delete $cfgpaths->{$fn};
+                }
+                delete $PATH2CFG{$fn};
+        }
+}
+
 # lei(1) calls this when it can't connect
 sub lazy_start {
         my ($path, $errno, $narg) = @_;
@@ -1175,6 +1229,7 @@ sub lazy_start {
         return if $pid;
         $0 = "lei-daemon $path";
         local %PATH2CFG;
+        local $MDIR2CFGPATH;
         $listener->blocking(0);
         my $exit_code;
         my $pil = PublicInbox::Listener->new($listener, \&accept_dispatch);
@@ -1204,8 +1259,8 @@ sub lazy_start {
         local $SIG{PIPE} = 'IGNORE';
         require PublicInbox::DirIdle;
         local $dir_idle = PublicInbox::DirIdle->new([$sock_dir], sub {
-                # just rely on wakeup ot hit PostLoopCallback set below
-                _dir_idle_handler(@_) if $_[0]->fullname ne $path;
+                # just rely on wakeup to hit PostLoopCallback set below
+                dir_idle_handler($_[0]) if $_[0]->fullname ne $path;
         }, 1);
         if ($sigfd) {
                 undef $sigfd; # unref, already in DS::DescriptorMap
@@ -1293,4 +1348,41 @@ sub wq_eof { # EOF callback for main daemon
         $wq1->wq_wait_old(\&wq_done_wait, $lei);
 }
 
+sub watch_state_ok ($) {
+        my ($state) = $_[-1]; # $_[0] may be $self
+        $state =~ /\Apause|(?:import|index|tag)-(?:ro|rw)\z/;
+}
+
+sub refresh_watches {
+        my ($lei) = @_;
+        my $cfg = _lei_cfg($lei) or return;
+        $cfg->{-env} //= { %{$lei->{env}}, PWD => '/' }; # for cfg2lei
+        my $watches = $cfg->{-watches} //= {};
+        require PublicInbox::LeiWatch;
+        for my $w (grep(/\Awatch\..+\.state\z/, keys %$cfg)) {
+                my $url = substr($w, length('watch.'), -length('.state'));
+                my $lw = $watches->{$w} //= PublicInbox::LeiWatch->new($url);
+                my $state = $cfg->get_1("watch.$url", 'state');
+                if (!watch_state_ok($state)) {
+                        $lei->err("watch.$url.state=$state not supported");
+                        next;
+                }
+                my $f = $cfg->{'-f'};
+                if ($url =~ /\Amaildir:(.+)/i) {
+                        my $d = File::Spec->canonpath($1);
+                        if ($state eq 'pause') {
+                                delete $MDIR2CFGPATH->{$d}->{$f};
+                                scalar(keys %{$MDIR2CFGPATH->{$d}}) or
+                                        delete $MDIR2CFGPATH->{$d};
+                        } elsif (!exists($MDIR2CFGPATH->{$d}->{$f})) {
+                                $dir_idle->add_watches(["$d/cur", "$d/new"], 1);
+                                $MDIR2CFGPATH->{$d}->{$f} = undef;
+                        }
+                } else { # TODO: imap/nntp/jmap
+                        $lei->child_error(1,
+                                "E: watch $url not supported, yet");
+                }
+        }
+}
+
 1;