about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/PublicInbox/DS.pm103
-rw-r--r--lib/PublicInbox/DSKQXS.pm22
-rw-r--r--lib/PublicInbox/Daemon.pm4
-rw-r--r--lib/PublicInbox/EvCleanup.pm80
-rw-r--r--lib/PublicInbox/HTTP.pm102
-rw-r--r--lib/PublicInbox/HTTPD/Async.pm55
-rw-r--r--lib/PublicInbox/Listener.pm7
-rw-r--r--lib/PublicInbox/NNTP.pm47
-rw-r--r--lib/PublicInbox/ParentPipe.pm17
-rw-r--r--lib/PublicInbox/Qspawn.pm2
-rw-r--r--lib/PublicInbox/Syscall.pm4
-rw-r--r--lib/PublicInbox/TLS.pm9
-rw-r--r--lib/PublicInbox/WatchMaildir.pm6
13 files changed, 198 insertions, 260 deletions
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm
index a8700bc5..586c47cd 100644
--- a/lib/PublicInbox/DS.pm
+++ b/lib/PublicInbox/DS.pm
@@ -28,6 +28,7 @@ use 5.010_001;
 use PublicInbox::Syscall qw(:epoll);
 
 use fields ('sock',              # underlying socket
+            'rbuf',              # scalarref, usually undef
             'wbuf',              # arrayref of coderefs or GLOB refs
             'wbuf_off',  # offset into first element of wbuf to start writing at
             );
@@ -36,6 +37,7 @@ use Errno  qw(EAGAIN EINVAL EEXIST);
 use Carp   qw(croak confess carp);
 require File::Spec;
 
+my $nextq = []; # queue for next_tick
 our (
      %DescriptorMap,             # fd (num) -> PublicInbox::DS object
      $Epoll,                     # Global epoll fd (or DSKQXS ref)
@@ -98,12 +100,6 @@ Returns a timer object which you can call C<< $timer->cancel >> on if you need t
 sub AddTimer {
     my ($class, $secs, $coderef) = @_;
 
-    if (!$secs) {
-        my $timer = bless([0, $coderef], 'PublicInbox::DS::Timer');
-        unshift(@Timers, $timer);
-        return $timer;
-    }
-
     my $fire_time = now() + $secs;
 
     my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer";
@@ -173,9 +169,23 @@ sub FirstTimeEventLoop {
 
 sub now () { clock_gettime(CLOCK_MONOTONIC) }
 
+sub next_tick () {
+    my $q = $nextq;
+    $nextq = [];
+    for (@$q) {
+        if (ref($_) eq 'CODE') {
+            $_->();
+        } else {
+            $_->event_step;
+        }
+    }
+}
+
 # runs timers and returns milliseconds for next one, or next event loop
 sub RunTimers {
-    return $LoopTimeout unless @Timers;
+    next_tick();
+
+    return ((@$nextq || @ToClose) ? 0 : $LoopTimeout) unless @Timers;
 
     my $now = now();
 
@@ -185,6 +195,9 @@ sub RunTimers {
         $to_run->[1]->($now) if $to_run->[1];
     }
 
+    # timers may enqueue into nextq:
+    return 0 if (@$nextq || @ToClose);
+
     return $LoopTimeout unless @Timers;
 
     # convert time to an even number of milliseconds, adding 1
@@ -246,17 +259,8 @@ sub PostEventLoop {
     # now we can close sockets that wanted to close during our event processing.
     # (we didn't want to close them during the loop, as we didn't want fd numbers
     #  being reused and confused during the event loop)
-    while (my $sock = shift @ToClose) {
-        my $fd = fileno($sock);
-
-        # close the socket. (not a PublicInbox::DS close)
-        CORE::close($sock);
-
-        # and now we can finally remove the fd from the map.  see
-        # comment above in ->close.
-        delete $DescriptorMap{$fd};
-    }
-
+    delete($DescriptorMap{fileno($_)}) for @ToClose;
+    @ToClose = (); # let refcounting drop everything all at once
 
     # by default we keep running, unless a postloop callback (either per-object
     # or global) cancels it
@@ -317,6 +321,8 @@ sub new {
 ### I N S T A N C E   M E T H O D S
 #####################################################################
 
+sub requeue ($) { push @$nextq, $_[0] }
+
 =head2 C<< $obj->close >>
 
 Close the socket.
@@ -373,6 +379,10 @@ sub psendfile ($$$) {
     $written;
 }
 
+sub epbit ($$) { # (sock, default)
+    ref($_[0]) eq 'IO::Socket::SSL' ? PublicInbox::TLS::epollbit() : $_[1];
+}
+
 # returns 1 if done, 0 if incomplete
 sub flush_write ($) {
     my ($self) = @_;
@@ -391,8 +401,8 @@ next_buf:
                         goto next_buf;
                     }
                 } elsif ($! == EAGAIN) {
+                    epwait($sock, epbit($sock, EPOLLOUT) | EPOLLONESHOT);
                     $self->{wbuf_off} = $off;
-                    watch($self, EPOLLOUT|EPOLLONESHOT);
                     return 0;
                 } else {
                     return $self->close;
@@ -412,17 +422,25 @@ next_buf:
     1; # all done
 }
 
-sub do_read ($$$$) {
+sub rbuf_idle ($$) {
+    my ($self, $rbuf) = @_;
+    if ($$rbuf eq '') { # who knows how long till we can read again
+        delete $self->{rbuf};
+    } else {
+        $self->{rbuf} = $rbuf;
+    }
+}
+
+sub do_read ($$$;$) {
     my ($self, $rbuf, $len, $off) = @_;
-    my $r = sysread($self->{sock}, $$rbuf, $len, $off);
+    my $r = sysread(my $sock = $self->{sock}, $$rbuf, $len, $off // 0);
     return ($r == 0 ? $self->close : $r) if defined $r;
     # common for clients to break connections without warning,
     # would be too noisy to log here:
-    if (ref($self) eq 'IO::Socket::SSL') {
-        my $ev = PublicInbox::TLS::epollbit() or return $self->close;
-        watch($self, $ev | EPOLLONESHOT);
-    } elsif ($! == EAGAIN) {
-        watch($self, EPOLLIN | EPOLLONESHOT);
+    if ($! == EAGAIN) {
+        epwait($sock, epbit($sock, EPOLLIN) | EPOLLONESHOT);
+        rbuf_idle($self, $rbuf);
+        0;
     } else {
         $self->close;
     }
@@ -499,17 +517,20 @@ sub write {
 
         if (defined $written) {
             return 1 if $written == $to_write;
+            requeue($self); # runs: event_step -> flush_write
         } elsif ($! == EAGAIN) {
+            epwait($sock, epbit($sock, EPOLLOUT) | EPOLLONESHOT);
             $written = 0;
         } else {
             return $self->close;
         }
+
+        # deal with EAGAIN or partial write:
         my $tmpio = tmpio($self, $bref, $written) or return 0;
 
         # wbuf may be an empty array if we're being called inside
         # ->flush_write via CODE bref:
         push @{$self->{wbuf} ||= []}, $tmpio;
-        watch($self, EPOLLOUT|EPOLLONESHOT);
         return 0;
     }
 }
@@ -528,46 +549,39 @@ sub msg_more ($$) {
             # queue up the unwritten substring:
             my $tmpio = tmpio($self, \($_[1]), $n) or return 0;
             $self->{wbuf} = [ $tmpio ];
-            watch($self, EPOLLOUT|EPOLLONESHOT);
+            epwait($sock, EPOLLOUT|EPOLLONESHOT);
             return 0;
         }
     }
     $self->write(\($_[1]));
 }
 
-sub watch ($$) {
-    my ($self, $ev) = @_;
-    my $sock = $self->{sock} or return;
+sub epwait ($$) {
+    my ($sock, $ev) = @_;
     epoll_ctl($Epoll, EPOLL_CTL_MOD, fileno($sock), $ev) and
         confess("EPOLL_CTL_MOD $!");
-    0;
 }
 
-sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) }
-
 # return true if complete, false if incomplete (or failure)
 sub accept_tls_step ($) {
     my ($self) = @_;
     my $sock = $self->{sock} or return;
     return 1 if $sock->accept_SSL;
     return $self->close if $! != EAGAIN;
-    if (my $ev = PublicInbox::TLS::epollbit()) {
-        unshift @{$self->{wbuf} ||= []}, \&accept_tls_step;
-        return watch($self, $ev | EPOLLONESHOT);
-    }
-    drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err());
+    epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT);
+    unshift @{$self->{wbuf} ||= []}, \&accept_tls_step;
+    0;
 }
 
+# return true if complete, false if incomplete (or failure)
 sub shutdn_tls_step ($) {
     my ($self) = @_;
     my $sock = $self->{sock} or return;
     return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1);
     return $self->close if $! != EAGAIN;
-    if (my $ev = PublicInbox::TLS::epollbit()) {
-        unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step;
-        return watch($self, $ev | EPOLLONESHOT);
-    }
-    drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err());
+    epwait($sock, PublicInbox::TLS::epollbit() | EPOLLONESHOT);
+    unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step;
+    0;
 }
 
 # don't bother with shutdown($sock, 2), we don't fork+exec w/o CLOEXEC
@@ -581,7 +595,6 @@ sub shutdn ($) {
         $self->close;
     }
 }
-
 package PublicInbox::DS::Timer;
 # [$abs_float_firetime, $coderef];
 sub cancel {
diff --git a/lib/PublicInbox/DSKQXS.pm b/lib/PublicInbox/DSKQXS.pm
index 364df3d6..1c3b970b 100644
--- a/lib/PublicInbox/DSKQXS.pm
+++ b/lib/PublicInbox/DSKQXS.pm
@@ -16,7 +16,8 @@ use warnings;
 use parent qw(IO::KQueue);
 use parent qw(Exporter);
 use IO::KQueue;
-use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLL_CTL_DEL);
+use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLLET
+        EPOLL_CTL_ADD EPOLL_CTL_MOD EPOLL_CTL_DEL);
 our @EXPORT_OK = qw(epoll_ctl epoll_wait);
 my $owner_pid = -1; # kqueue is close-on-fork (yes, fork, not exec)
 
@@ -24,10 +25,15 @@ my $owner_pid = -1; # kqueue is close-on-fork (yes, fork, not exec)
 sub kq_flag ($$) {
         my ($bit, $ev) = @_;
         if ($ev & $bit) {
-                my $fl = EV_ADD | EV_ENABLE;
-                ($ev & EPOLLONESHOT) ? ($fl | EV_ONESHOT) : $fl;
+                my $fl = EV_ENABLE;
+                $fl |= EV_CLEAR if $fl & EPOLLET;
+
+                # EV_DISPATCH matches EPOLLONESHOT semantics more closely
+                # than EV_ONESHOT, in that EV_ADD is not required to
+                # re-enable a disabled watch.
+                ($ev & EPOLLONESHOT) ? ($fl | EV_DISPATCH) : $fl;
         } else {
-                EV_ADD | EV_DISABLE;
+                EV_DISABLE;
         }
 }
 
@@ -40,9 +46,15 @@ sub new {
 
 sub epoll_ctl {
         my ($self, $op, $fd, $ev) = @_;
-        if ($op != EPOLL_CTL_DEL) {
+        if ($op == EPOLL_CTL_MOD) {
                 $self->EV_SET($fd, EVFILT_READ, kq_flag(EPOLLIN, $ev));
                 $self->EV_SET($fd, EVFILT_WRITE, kq_flag(EPOLLOUT, $ev));
+        } elsif ($op == EPOLL_CTL_DEL) {
+                $self->EV_SET($fd, EVFILT_READ, EV_DISABLE);
+                $self->EV_SET($fd, EVFILT_WRITE, EV_DISABLE);
+        } else {
+                $self->EV_SET($fd, EVFILT_READ, EV_ADD|kq_flag(EPOLLIN, $ev));
+                $self->EV_SET($fd, EVFILT_WRITE, EV_ADD|kq_flag(EPOLLOUT, $ev));
         }
         0;
 }
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index cf011a20..2b7ac266 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -252,13 +252,11 @@ sub daemonize () {
 
 
 sub worker_quit {
-        my ($reason) = @_;
         # killing again terminates immediately:
         exit unless @listeners;
 
         $_->close foreach @listeners; # call PublicInbox::DS::close
         @listeners = ();
-        $reason->close if ref($reason) eq 'PublicInbox::ParentPipe';
 
         my $proc_name;
         my $warn = 0;
@@ -590,7 +588,7 @@ sub daemon_loop ($$$$) {
         } else {
                 reopen_logs();
                 $set_user->() if $set_user;
-                $SIG{USR2} = sub { worker_quit('USR2') if upgrade() };
+                $SIG{USR2} = sub { worker_quit() if upgrade() };
                 $refresh->();
         }
         $uid = $gid = undef;
diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm
index 33b54ebc..be6672ed 100644
--- a/lib/PublicInbox/EvCleanup.pm
+++ b/lib/PublicInbox/EvCleanup.pm
@@ -1,80 +1,23 @@
-# Copyright (C) 2016-2018 all contributors <meta@public-inbox.org>
+# Copyright (C) 2016-2019 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
-# event cleanups (currently for PublicInbox::DS)
+# event cleanups (for PublicInbox::DS)
 package PublicInbox::EvCleanup;
 use strict;
 use warnings;
-use base qw(PublicInbox::DS);
-use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
+require PublicInbox::DS;
 
+# this only runs under public-inbox-{httpd/nntpd}, not generic PSGI servers
 my $ENABLED;
 sub enabled { $ENABLED }
 sub enable { $ENABLED = 1 }
-my $singleton;
-my $asapq = [ [], undef ];
-my $nextq = [ [], undef ];
 my $laterq = [ [], undef ];
 
-sub once_init () {
-        my $self = fields::new('PublicInbox::EvCleanup');
-        my ($r, $w);
-
-        # This is a dummy pipe which is always writable so it can always
-        # fires in the next event loop iteration.
-        pipe($r, $w) or die "pipe: $!";
-        fcntl($w, 1031, 4096) if $^O eq 'linux'; # 1031: F_SETPIPE_SZ
-        $self->SUPER::new($w, 0);
-
-        # always writable, since PublicInbox::EvCleanup::event_step
-        # never drains wbuf.  We can avoid wasting a hash slot by
-        # stuffing the read-end of the pipe into the never-to-be-touched
-        # wbuf
-        $self->{wbuf} = $r;
-        $self;
-}
-
-sub _run_all ($) {
-        my ($q) = @_;
-
-        my $run = $q->[0];
-        $q->[0] = [];
-        $q->[1] = undef;
-        $_->() foreach @$run;
-}
-
-# ensure PublicInbox::DS::ToClose processing after timers fire
-sub _asap_close () { $asapq->[1] ||= _asap_timer() }
-
-# Called by PublicInbox::DS
-sub event_step { _run_all($asapq) }
-
-sub _run_next () {
-        _run_all($nextq);
-        _asap_close();
-}
-
 sub _run_later () {
-        _run_all($laterq);
-        _asap_close();
-}
-
-sub _asap_timer () {
-        $singleton ||= once_init();
-        $singleton->watch(EPOLLOUT|EPOLLONESHOT);
-        1;
-}
-
-sub asap ($) {
-        my ($cb) = @_;
-        push @{$asapq->[0]}, $cb;
-        $asapq->[1] ||= _asap_timer();
-}
-
-sub next_tick ($) {
-        my ($cb) = @_;
-        push @{$nextq->[0]}, $cb;
-        $nextq->[1] ||= PublicInbox::DS->AddTimer(0, *_run_next);
+        my $run = $laterq->[0];
+        $laterq->[0] = [];
+        $laterq->[1] = undef;
+        $_->() foreach @$run;
 }
 
 sub later ($) {
@@ -83,10 +26,5 @@ sub later ($) {
         $laterq->[1] ||= PublicInbox::DS->AddTimer(60, *_run_later);
 }
 
-END {
-        event_step();
-        _run_all($nextq);
-        _run_all($laterq);
-}
-
+END { _run_later() }
 1;
diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm
index a1cb4aca..5546ac46 100644
--- a/lib/PublicInbox/HTTP.pm
+++ b/lib/PublicInbox/HTTP.pm
@@ -11,7 +11,7 @@ package PublicInbox::HTTP;
 use strict;
 use warnings;
 use base qw(PublicInbox::DS);
-use fields qw(httpd env rbuf input_left remote_addr remote_port forward pull);
+use fields qw(httpd env input_left remote_addr remote_port forward pull);
 use bytes (); # only for bytes::length
 use Fcntl qw(:seek);
 use Plack::HTTPParser qw(parse_http_request); # XS or pure Perl
@@ -30,10 +30,8 @@ use constant {
 use Errno qw(EAGAIN);
 
 my $pipelineq = [];
-my $pipet;
 sub process_pipelineq () {
         my $q = $pipelineq;
-        $pipet = undef;
         $pipelineq = [];
         foreach (@$q) {
                 next unless $_->{sock};
@@ -58,9 +56,16 @@ sub http_date () {
 sub new ($$$) {
         my ($class, $sock, $addr, $httpd) = @_;
         my $self = fields::new($class);
-        $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT);
+        my $ev = EPOLLIN;
+        my $wbuf;
+        if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) {
+                return CORE::close($sock) if $! != EAGAIN;
+                $ev = PublicInbox::TLS::epollbit();
+                $wbuf = [ \&PublicInbox::DS::accept_tls_step ];
+        }
+        $self->SUPER::new($sock, $ev | EPOLLONESHOT);
         $self->{httpd} = $httpd;
-        $self->{rbuf} = '';
+        $self->{wbuf} = $wbuf if $wbuf;
         ($self->{remote_addr}, $self->{remote_port}) =
                 PublicInbox::Daemon::host_with_port($addr);
         $self;
@@ -75,31 +80,34 @@ sub event_step { # called by PublicInbox::DS
         # otherwise we can be buffering infinitely w/o backpressure
 
         return read_input($self) if defined $self->{env};
-        my $rbuf = \($self->{rbuf});
-        my $off = bytes::length($$rbuf);
-        $self->do_read($rbuf, 8192, $off) and rbuf_process($self);
+        my $rbuf = $self->{rbuf} // (\(my $x = ''));
+        $self->do_read($rbuf, 8192, bytes::length($$rbuf)) or return;
+        rbuf_process($self, $rbuf);
 }
 
 sub rbuf_process {
-        my ($self) = @_;
+        my ($self, $rbuf) = @_;
+        $rbuf //= $self->{rbuf} // (\(my $x = ''));
 
         my %env = %{$self->{httpd}->{env}}; # full hash copy
-        my $r = parse_http_request($self->{rbuf}, \%env);
+        my $r = parse_http_request($$rbuf, \%env);
 
         # We do not support Trailers in chunked requests, for now
         # (they are rarely-used and git (as of 2.7.2) does not use them)
         if ($r == -1 || $env{HTTP_TRAILER} ||
                         # this length-check is necessary for PURE_PERL=1:
-                        ($r == -2 && bytes::length($self->{rbuf}) > 0x4000)) {
+                        ($r == -2 && bytes::length($$rbuf) > 0x4000)) {
                 return quit($self, 400);
         }
-        return $self->watch_in1 if $r < 0; # incomplete
-        $self->{rbuf} = substr($self->{rbuf}, $r);
-
+        if ($r < 0) { # incomplete
+                $self->rbuf_idle($rbuf);
+                return $self->requeue;
+        }
+        $$rbuf = substr($$rbuf, $r);
         my $len = input_prepare($self, \%env);
         defined $len or return write_err($self, undef); # EMFILE/ENFILE
 
-        $len ? read_input($self) : app_dispatch($self);
+        $len ? read_input($self, $rbuf) : app_dispatch($self, undef, $rbuf);
 }
 
 # IO::Handle::write returns boolean, this returns bytes written:
@@ -111,16 +119,15 @@ sub xwrite ($$$) {
         $w;
 }
 
-sub read_input ($) {
-        my ($self) = @_;
+sub read_input ($;$) {
+        my ($self, $rbuf) = @_;
+        $rbuf //= $self->{rbuf} // (\(my $x = ''));
         my $env = $self->{env};
         return if $env->{REMOTE_ADDR}; # in app dispatch
-        return read_input_chunked($self) if env_chunked($env);
+        return read_input_chunked($self, $rbuf) if env_chunked($env);
 
         # env->{CONTENT_LENGTH} (identity)
-        my $sock = $self->{sock};
         my $len = delete $self->{input_left};
-        my $rbuf = \($self->{rbuf});
         my $input = $env->{'psgi.input'};
 
         while ($len > 0) {
@@ -135,15 +142,15 @@ sub read_input ($) {
                         }
                         $$rbuf = '';
                 }
-                my $r = sysread($sock, $$rbuf, 8192);
-                return recv_err($self, $r, $len) unless $r;
+                $self->do_read($rbuf, 8192) or return recv_err($self, $len);
                 # continue looping if $r > 0;
         }
-        app_dispatch($self, $input);
+        app_dispatch($self, $input, $rbuf);
 }
 
 sub app_dispatch {
-        my ($self, $input) = @_;
+        my ($self, $input, $rbuf) = @_;
+        $self->rbuf_idle($rbuf);
         my $env = $self->{env};
         $env->{REMOTE_ADDR} = $self->{remote_addr};
         $env->{REMOTE_PORT} = $self->{remote_port};
@@ -235,11 +242,12 @@ sub identity_wcb ($) {
 
 sub next_request ($) {
         my ($self) = @_;
-        if ($self->{rbuf} eq '') { # wait for next request
-                $self->watch_in1;
-        } else { # avoid recursion for pipelined requests
+        if ($self->{rbuf}) {
+                # avoid recursion for pipelined requests
+                PublicInbox::DS::requeue(\&process_pipelineq) if !@$pipelineq;
                 push @$pipelineq, $self;
-                $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq);
+        } else { # wait for next request
+                $self->requeue;
         }
 }
 
@@ -267,7 +275,7 @@ sub getline_cb ($$$) {
                                 if ($self->{wbuf}) {
                                         $self->write($next);
                                 } else {
-                                        PublicInbox::EvCleanup::asap($next);
+                                        PublicInbox::DS::requeue($next);
                                 }
                                 return;
                         }
@@ -360,27 +368,25 @@ sub write_err {
 }
 
 sub recv_err {
-        my ($self, $r, $len) = @_;
-        return $self->close if (defined $r && $r == 0);
-        if ($! == EAGAIN) {
+        my ($self, $len) = @_;
+        if ($! == EAGAIN) { # epoll/kevent watch already set by do_read
                 $self->{input_left} = $len;
-                return $self->watch_in1;
+        } else {
+                err($self, "error reading input: $! ($len bytes remaining)");
         }
-        err($self, "error reading for input: $! ($len bytes remaining)");
-        quit($self, 500);
 }
 
 sub read_input_chunked { # unlikely...
-        my ($self) = @_;
+        my ($self, $rbuf) = @_;
+        $rbuf //= $self->{rbuf} // (\(my $x = ''));
         my $input = $self->{env}->{'psgi.input'};
-        my $sock = $self->{sock};
         my $len = delete $self->{input_left};
-        my $rbuf = \($self->{rbuf});
 
         while (1) { # chunk start
                 if ($len == CHUNK_ZEND) {
                         $$rbuf =~ s/\A\r\n//s and
-                                return app_dispatch($self, $input);
+                                return app_dispatch($self, $input, $rbuf);
+
                         return quit($self, 400) if bytes::length($$rbuf) > 2;
                 }
                 if ($len == CHUNK_END) {
@@ -403,9 +409,8 @@ sub read_input_chunked { # unlikely...
                 }
 
                 if ($len < 0) { # chunk header is trickled, read more
-                        my $off = bytes::length($$rbuf);
-                        my $r = sysread($sock, $$rbuf, 8192, $off);
-                        return recv_err($self, $r, $len) unless $r;
+                        $self->do_read($rbuf, 8192, bytes::length($$rbuf)) or
+                                return recv_err($self, $len);
                         # (implicit) goto chunk_start if $r > 0;
                 }
                 $len = CHUNK_ZEND if $len == 0;
@@ -429,8 +434,8 @@ sub read_input_chunked { # unlikely...
                         }
                         if ($$rbuf eq '') {
                                 # read more of current chunk
-                                my $r = sysread($sock, $$rbuf, 8192);
-                                return recv_err($self, $r, $len) unless $r;
+                                $self->do_read($rbuf, 8192) or
+                                        return recv_err($self, $len);
                         }
                 }
         }
@@ -459,14 +464,7 @@ sub close {
 # for graceful shutdown in PublicInbox::Daemon:
 sub busy () {
         my ($self) = @_;
-        ($self->{rbuf} ne '' || $self->{env} || $self->{wbuf});
+        ($self->{rbuf} || $self->{env} || $self->{wbuf});
 }
 
-# fires after pending writes are complete:
-sub restart_pass ($) {
-        $_[0]->{forward}->restart_read; # see PublicInbox::HTTPD::Async
-}
-
-sub enqueue_restart_pass ($) { $_[0]->write(\&restart_pass) }
-
 1;
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index b46baeb2..584db8d4 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -4,14 +4,15 @@
 # XXX This is a totally unstable API for public-inbox internal use only
 # This is exposed via the 'pi-httpd.async' key in the PSGI env hash.
 # The name of this key is not even stable!
-# Currently is is intended for use with read-only pipes.
+# Currently intended for use with read-only pipes with expensive
+# processes such as git-http-backend(1), cgit(1)
 package PublicInbox::HTTPD::Async;
 use strict;
 use warnings;
 use base qw(PublicInbox::DS);
 use fields qw(cb cleanup);
-require PublicInbox::EvCleanup;
 use Errno qw(EAGAIN);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
 
 sub new {
         my ($class, $io, $cb, $cleanup) = @_;
@@ -19,47 +20,35 @@ sub new {
         # no $io? call $cb at the top of the next event loop to
         # avoid recursion:
         unless (defined($io)) {
-                PublicInbox::EvCleanup::asap($cb) if $cb;
-                PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup;
+                PublicInbox::DS::requeue($cb);
+                die 'cleanup unsupported w/o $io' if $cleanup;
                 return;
         }
 
         my $self = fields::new($class);
         IO::Handle::blocking($io, 0);
-        $self->SUPER::new($io, PublicInbox::DS::EPOLLIN());
+        $self->SUPER::new($io, EPOLLIN | EPOLLET);
         $self->{cb} = $cb;
         $self->{cleanup} = $cleanup;
         $self;
 }
 
-sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) }
-
-sub main_cb ($$$) {
-        my ($http, $fh, $bref) = @_;
+sub main_cb ($$) {
+        my ($http, $fh) = @_;
         sub {
                 my ($self) = @_;
-                my $r = sysread($self->{sock}, $$bref, 8192);
+                my $r = sysread($self->{sock}, my $buf, 65536);
                 if ($r) {
-                        $fh->write($$bref); # may call $http->close
-
+                        $fh->write($buf); # may call $http->close
                         if ($http->{sock}) { # !closed
-                                if ($http->{wbuf}) {
-                                        # HTTP client could not keep up, so
-                                        # stop reading and buffering.
-                                        $self->watch(0);
-
-                                        # Tell the HTTP socket to restart us
-                                        # when HTTP client is done draining
-                                        # $http->{wbuf}:
-                                        $http->enqueue_restart_pass;
-                                }
-                                # stay in EPOLLIN, but let other clients
-                                # get some work done, too.
+                                $self->requeue;
+                                # let other clients get some work done, too
                                 return;
                         }
-                        # fall through to close below...
-                } elsif (!defined $r) {
-                        return restart_read($self) if $! == EAGAIN;
+
+                        # else: fall through to close below...
+                } elsif (!defined $r && $! == EAGAIN) {
+                        return; # EPOLLET means we'll be notified
                 }
 
                 # Done! Error handling will happen in $fh->close
@@ -75,10 +64,16 @@ sub async_pass {
         # will automatically close this ($self) object.
         $http->{forward} = $self;
         $fh->write($$bref); # PublicInbox:HTTP::{chunked,identity}_wcb
-        $self->{cb} = main_cb($http, $fh, $bref);
+        $$bref = undef; # we're done with this
+        my $cb = $self->{cb} = main_cb($http, $fh);
+        $cb->($self); # either hit EAGAIN or ->requeue to keep EPOLLET happy
 }
 
-sub event_step { $_[0]->{cb}->(@_) }
+sub event_step {
+        # {cb} may be undef after ->requeue due to $http->close happening
+        my $cb = $_[0]->{cb} or return;
+        $cb->(@_);
+}
 
 sub close {
         my $self = $_[0];
@@ -87,7 +82,7 @@ sub close {
 
         # we defer this to the next timer loop since close is deferred
         if (my $cleanup = delete $self->{cleanup}) {
-                PublicInbox::EvCleanup::next_tick($cleanup);
+                PublicInbox::DS::requeue($cleanup);
         }
 }
 
diff --git a/lib/PublicInbox/Listener.pm b/lib/PublicInbox/Listener.pm
index 94b2aed4..594dabb8 100644
--- a/lib/PublicInbox/Listener.pm
+++ b/lib/PublicInbox/Listener.pm
@@ -9,6 +9,7 @@ use base 'PublicInbox::DS';
 use Socket qw(SOL_SOCKET SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY);
 use fields qw(post_accept);
 require IO::Handle;
+use PublicInbox::Syscall qw(EPOLLIN EPOLLEXCLUSIVE EPOLLET);
 
 sub new ($$$) {
         my ($class, $s, $cb) = @_;
@@ -17,15 +18,14 @@ sub new ($$$) {
         listen($s, 1024);
         IO::Handle::blocking($s, 0);
         my $self = fields::new($class);
-        $self->SUPER::new($s, PublicInbox::DS::EPOLLIN()|
-                              PublicInbox::DS::EPOLLEXCLUSIVE());
+        $self->SUPER::new($s, EPOLLIN|EPOLLET|EPOLLEXCLUSIVE);
         $self->{post_accept} = $cb;
         $self
 }
 
 sub event_step {
         my ($self) = @_;
-        my $sock = $self->{sock};
+        my $sock = $self->{sock} or return;
 
         # no loop here, we want to fairly distribute clients
         # between multiple processes sharing the same socket
@@ -35,6 +35,7 @@ sub event_step {
         if (my $addr = accept(my $c, $sock)) {
                 IO::Handle::blocking($c, 0); # no accept4 :<
                 $self->{post_accept}->($c, $addr, $sock);
+                $self->requeue;
         }
 }
 
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 5a886a3c..26bc679f 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -6,7 +6,7 @@ package PublicInbox::NNTP;
 use strict;
 use warnings;
 use base qw(PublicInbox::DS);
-use fields qw(nntpd article rbuf ng);
+use fields qw(nntpd article ng);
 use PublicInbox::Search;
 use PublicInbox::Msgmap;
 use PublicInbox::MID qw(mid_escape);
@@ -38,20 +38,6 @@ my %DISABLED; # = map { $_ => 1 } qw(xover list_overview_fmt newnews xhdr);
 my $EXPMAP; # fd -> [ idle_time, $self ]
 my $expt;
 our $EXPTIME = 180; # 3 minutes
-my $nextt;
-
-my $nextq = [];
-sub next_tick () {
-        $nextt = undef;
-        my $q = $nextq;
-        $nextq = [];
-        event_step($_) for @$q;
-}
-
-sub requeue ($) {
-        push @$nextq, $_[0];
-        $nextt ||= PublicInbox::EvCleanup::asap(*next_tick);
-}
 
 sub update_idle_time ($) {
         my ($self) = @_;
@@ -64,14 +50,11 @@ sub expire_old () {
         my $exp = $EXPTIME;
         my $old = $now - $exp;
         my $nr = 0;
-        my $closed = 0;
         my %new;
         while (my ($fd, $v) = each %$EXPMAP) {
                 my ($idle_time, $nntp) = @$v;
                 if ($idle_time < $old) {
-                        if ($nntp->shutdn) {
-                                $closed++;
-                        } else {
+                        if (!$nntp->shutdn) {
                                 ++$nr;
                                 $new{$fd} = $v;
                         }
@@ -81,14 +64,7 @@ sub expire_old () {
                 }
         }
         $EXPMAP = \%new;
-        if ($nr) {
-                $expt = PublicInbox::EvCleanup::later(*expire_old);
-        } else {
-                $expt = undef;
-                # noop to kick outselves out of the loop ASAP so descriptors
-                # really get closed
-                PublicInbox::EvCleanup::asap(sub {}) if $closed;
-        }
+        $expt = PublicInbox::EvCleanup::later(*expire_old) if $nr;
 }
 
 sub greet ($) { $_[0]->write($_[0]->{nntpd}->{greet}) };
@@ -99,7 +75,8 @@ sub new ($$$) {
         my $ev = EPOLLIN;
         my $wbuf;
         if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) {
-                $ev = PublicInbox::TLS::epollbit() or return CORE::close($sock);
+                return CORE::close($sock) if $! != EAGAIN;
+                $ev = PublicInbox::TLS::epollbit();
                 $wbuf = [ \&PublicInbox::DS::accept_tls_step, \&greet ];
         }
         $self->SUPER::new($sock, $ev | EPOLLONESHOT);
@@ -654,12 +631,12 @@ sub long_response ($$) {
                         push @$wbuf, $long_cb;
 
                         # wbuf may be populated by $cb, no need to rearm if so:
-                        requeue($self) if scalar(@$wbuf) == 1;
+                        $self->requeue if scalar(@$wbuf) == 1;
                 } else { # all done!
                         $long_cb = undef;
                         res($self, '.');
                         out($self, " deferred[$fd] done - %0.6f", now() - $t0);
-                        requeue($self) unless $self->{wbuf};
+                        $self->requeue unless $self->{wbuf};
                 }
         };
         $self->write($long_cb); # kick off!
@@ -914,7 +891,7 @@ sub cmd_starttls ($) {
                 return '580 can not initiate TLS negotiation';
         res($self, '382 Continue with TLS negotiation');
         $self->{sock} = IO::Socket::SSL->start_SSL($sock, %$opt);
-        requeue($self) if PublicInbox::DS::accept_tls_step($self);
+        $self->requeue if PublicInbox::DS::accept_tls_step($self);
         undef;
 }
 
@@ -984,16 +961,12 @@ sub event_step {
         return $self->close if $r < 0;
         my $len = bytes::length($$rbuf);
         return $self->close if ($len >= LINE_MAX);
-        if ($len) {
-                $self->{rbuf} = $rbuf;
-        } else {
-                delete $self->{rbuf};
-        }
+        $self->rbuf_idle($rbuf);
         update_idle_time($self);
 
         # maybe there's more pipelined data, or we'll have
         # to register it for socket-readiness notifications
-        requeue($self) unless $self->{wbuf};
+        $self->requeue unless $self->{wbuf};
 }
 
 sub not_idle_long ($$) {
diff --git a/lib/PublicInbox/ParentPipe.pm b/lib/PublicInbox/ParentPipe.pm
index ccc0815e..2e2abb5f 100644
--- a/lib/PublicInbox/ParentPipe.pm
+++ b/lib/PublicInbox/ParentPipe.pm
@@ -1,20 +1,27 @@
 # Copyright (C) 2016-2018 all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-# only for PublicInbox::Daemon
+
+# only for PublicInbox::Daemon, allows worker processes to be
+# notified if the master process dies.
 package PublicInbox::ParentPipe;
 use strict;
 use warnings;
 use base qw(PublicInbox::DS);
 use fields qw(cb);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
 
 sub new ($$$) {
-        my ($class, $pipe, $cb) = @_;
+        my ($class, $pipe, $worker_quit) = @_;
         my $self = fields::new($class);
-        $self->SUPER::new($pipe, PublicInbox::DS::EPOLLIN());
-        $self->{cb} = $cb;
+        $self->SUPER::new($pipe, EPOLLIN|EPOLLONESHOT);
+        $self->{cb} = $worker_quit;
         $self;
 }
 
-sub event_step { $_[0]->{cb}->($_[0]) }
+# master process died, time to call worker_quit ourselves
+sub event_step {
+        $_[0]->close; # PublicInbox::DS::close
+        $_[0]->{cb}->();
+}
 
 1;
diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm
index f2630a0f..8f0b9fe2 100644
--- a/lib/PublicInbox/Qspawn.pm
+++ b/lib/PublicInbox/Qspawn.pm
@@ -128,7 +128,7 @@ sub psgi_qx {
         my $rpipe; # comes from popen_rd
         my $async = $env->{'pi-httpd.async'};
         my $cb = sub {
-                my $r = sysread($rpipe, my $buf, 8192);
+                my $r = sysread($rpipe, my $buf, 65536);
                 if ($async) {
                         $async->async_pass($env->{'psgix.io'}, $qx, \$buf);
                 } elsif (defined $r) {
diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm
index 500efa67..d7e15c72 100644
--- a/lib/PublicInbox/Syscall.pm
+++ b/lib/PublicInbox/Syscall.pm
@@ -22,7 +22,7 @@ use vars qw(@ISA @EXPORT_OK %EXPORT_TAGS $VERSION);
 $VERSION     = "0.25";
 @ISA         = qw(Exporter);
 @EXPORT_OK   = qw(sendfile epoll_ctl epoll_create epoll_wait
-                  EPOLLIN EPOLLOUT
+                  EPOLLIN EPOLLOUT EPOLLET
                   EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD
                   EPOLLONESHOT EPOLLEXCLUSIVE);
 %EXPORT_TAGS = (epoll => [qw(epoll_ctl epoll_create epoll_wait
@@ -39,7 +39,7 @@ use constant EPOLLOUT      => 4;
 # use constant EPOLLRDBAND   => 128;
 use constant EPOLLEXCLUSIVE => (1 << 28);
 use constant EPOLLONESHOT => (1 << 30);
-# use constant EPOLLET => (1 << 31);
+use constant EPOLLET => (1 << 31);
 use constant EPOLL_CTL_ADD => 1;
 use constant EPOLL_CTL_DEL => 2;
 use constant EPOLL_CTL_MOD => 3;
diff --git a/lib/PublicInbox/TLS.pm b/lib/PublicInbox/TLS.pm
index 576c11d7..0b9a55df 100644
--- a/lib/PublicInbox/TLS.pm
+++ b/lib/PublicInbox/TLS.pm
@@ -13,12 +13,9 @@ sub err () { $SSL_ERROR }
 
 # returns the EPOLL event bit which matches the existing SSL error
 sub epollbit () {
-        if ($! == EAGAIN) {
-                return EPOLLIN if $SSL_ERROR == SSL_WANT_READ;
-                return EPOLLOUT if $SSL_ERROR == SSL_WANT_WRITE;
-                die "unexpected SSL error: $SSL_ERROR";
-        }
-        0;
+        return EPOLLIN if $SSL_ERROR == SSL_WANT_READ;
+        return EPOLLOUT if $SSL_ERROR == SSL_WANT_WRITE;
+        die "unexpected SSL error: $SSL_ERROR";
 }
 
 1;
diff --git a/lib/PublicInbox/WatchMaildir.pm b/lib/PublicInbox/WatchMaildir.pm
index 2d4c6f43..a76bf06e 100644
--- a/lib/PublicInbox/WatchMaildir.pm
+++ b/lib/PublicInbox/WatchMaildir.pm
@@ -150,6 +150,12 @@ sub _try_path {
         if (!ref($inboxes) && $inboxes eq 'watchspam') {
                 return _remove_spam($self, $path);
         }
+
+        my $warn_cb = $SIG{__WARN__} || sub { print STDERR @_ };
+        local $SIG{__WARN__} = sub {
+                $warn_cb->("path: $path\n");
+                $warn_cb->(@_);
+        };
         foreach my $ibx (@$inboxes) {
                 my $mime = _path_to_mime($path) or next;
                 my $im = _importer_for($self, $ibx);