about summary refs log tree commit homepage
path: root/lib/PublicInbox/HTTPD/Async.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/HTTPD/Async.pm')
-rw-r--r--lib/PublicInbox/HTTPD/Async.pm55
1 files changed, 25 insertions, 30 deletions
diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm
index b46baeb2..584db8d4 100644
--- a/lib/PublicInbox/HTTPD/Async.pm
+++ b/lib/PublicInbox/HTTPD/Async.pm
@@ -4,14 +4,15 @@
 # XXX This is a totally unstable API for public-inbox internal use only
 # This is exposed via the 'pi-httpd.async' key in the PSGI env hash.
 # The name of this key is not even stable!
-# Currently is is intended for use with read-only pipes.
+# Currently intended for use with read-only pipes with expensive
+# processes such as git-http-backend(1), cgit(1)
 package PublicInbox::HTTPD::Async;
 use strict;
 use warnings;
 use base qw(PublicInbox::DS);
 use fields qw(cb cleanup);
-require PublicInbox::EvCleanup;
 use Errno qw(EAGAIN);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
 
 sub new {
         my ($class, $io, $cb, $cleanup) = @_;
@@ -19,47 +20,35 @@ sub new {
         # no $io? call $cb at the top of the next event loop to
         # avoid recursion:
         unless (defined($io)) {
-                PublicInbox::EvCleanup::asap($cb) if $cb;
-                PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup;
+                PublicInbox::DS::requeue($cb);
+                die 'cleanup unsupported w/o $io' if $cleanup;
                 return;
         }
 
         my $self = fields::new($class);
         IO::Handle::blocking($io, 0);
-        $self->SUPER::new($io, PublicInbox::DS::EPOLLIN());
+        $self->SUPER::new($io, EPOLLIN | EPOLLET);
         $self->{cb} = $cb;
         $self->{cleanup} = $cleanup;
         $self;
 }
 
-sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) }
-
-sub main_cb ($$$) {
-        my ($http, $fh, $bref) = @_;
+sub main_cb ($$) {
+        my ($http, $fh) = @_;
         sub {
                 my ($self) = @_;
-                my $r = sysread($self->{sock}, $$bref, 8192);
+                my $r = sysread($self->{sock}, my $buf, 65536);
                 if ($r) {
-                        $fh->write($$bref); # may call $http->close
-
+                        $fh->write($buf); # may call $http->close
                         if ($http->{sock}) { # !closed
-                                if ($http->{wbuf}) {
-                                        # HTTP client could not keep up, so
-                                        # stop reading and buffering.
-                                        $self->watch(0);
-
-                                        # Tell the HTTP socket to restart us
-                                        # when HTTP client is done draining
-                                        # $http->{wbuf}:
-                                        $http->enqueue_restart_pass;
-                                }
-                                # stay in EPOLLIN, but let other clients
-                                # get some work done, too.
+                                $self->requeue;
+                                # let other clients get some work done, too
                                 return;
                         }
-                        # fall through to close below...
-                } elsif (!defined $r) {
-                        return restart_read($self) if $! == EAGAIN;
+
+                        # else: fall through to close below...
+                } elsif (!defined $r && $! == EAGAIN) {
+                        return; # EPOLLET means we'll be notified
                 }
 
                 # Done! Error handling will happen in $fh->close
@@ -75,10 +64,16 @@ sub async_pass {
         # will automatically close this ($self) object.
         $http->{forward} = $self;
         $fh->write($$bref); # PublicInbox:HTTP::{chunked,identity}_wcb
-        $self->{cb} = main_cb($http, $fh, $bref);
+        $$bref = undef; # we're done with this
+        my $cb = $self->{cb} = main_cb($http, $fh);
+        $cb->($self); # either hit EAGAIN or ->requeue to keep EPOLLET happy
 }
 
-sub event_step { $_[0]->{cb}->(@_) }
+sub event_step {
+        # {cb} may be undef after ->requeue due to $http->close happening
+        my $cb = $_[0]->{cb} or return;
+        $cb->(@_);
+}
 
 sub close {
         my $self = $_[0];
@@ -87,7 +82,7 @@ sub close {
 
         # we defer this to the next timer loop since close is deferred
         if (my $cleanup = delete $self->{cleanup}) {
-                PublicInbox::EvCleanup::next_tick($cleanup);
+                PublicInbox::DS::requeue($cleanup);
         }
 }