about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--Documentation/public-inbox-daemon.pod17
-rw-r--r--lib/PublicInbox/Daemon.pm8
-rw-r--r--lib/PublicInbox/Listener.pm45
3 files changed, 45 insertions, 25 deletions
diff --git a/Documentation/public-inbox-daemon.pod b/Documentation/public-inbox-daemon.pod
index 81a79a10..71216833 100644
--- a/Documentation/public-inbox-daemon.pod
+++ b/Documentation/public-inbox-daemon.pod
@@ -115,6 +115,23 @@ per-listener C<cert=> option.  The private key may be
 concatenated into the path used by the cert, in which case this
 option is not needed.
 
+=item --multi-accept INTEGER
+
+By default, each worker accepts one connection at-a-time to maximize
+fairness and minimize contention across multiple processes on a
+shared listen socket.  Accepting multiple connections at once may be
+useful in constrained deployments with few, heavily-loaded workers.
+Negative values enables a worker to accept all available clients at
+once, possibly starving others in the process.  C<-1> behaves like
+C<multi_accept yes> in nginx; while C<0> (the default) is
+C<multi_accept no> in nginx.  Positive values allow
+fine-tuning without the runaway behavior of C<-1>.
+
+This may be specified on a per-listener basis via the C<multi-accept=>
+per-listener directive (e.g. C<-l http://127.0.0.1?multi-accept=1>).
+
+Default: 0
+
 =back
 
 =head1 SIGNALS
diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm
index 57435421..30442227 100644
--- a/lib/PublicInbox/Daemon.pm
+++ b/lib/PublicInbox/Daemon.pm
@@ -136,6 +136,8 @@ sub load_mod ($;$$) {
         }
         my $err = $tlsd->{err};
         $tlsd->{warn_cb} = sub { print $err @_ }; # for local $SIG{__WARN__}
+        $opt->{'multi-accept'} and
+                $xn{'multi-accept'} = $opt->{'multi-accept'}->[-1];
         \%xn;
 }
 
@@ -167,6 +169,7 @@ EOF
                 'u|user=s' => \$user,
                 'g|group=s' => \$group,
                 'D|daemonize' => \$daemonize,
+                'multi-accept=i' => \$PublicInbox::Listener::MULTI_ACCEPT,
                 'cert=s' => \$default_cert,
                 'key=s' => \$default_key,
                 'help|h' => \(my $show_help),
@@ -251,7 +254,7 @@ EOF
                 $s->blocking(0);
                 my $sockname = sockname($s);
                 warn "# bound $scheme://$sockname\n";
-                $xnetd->{$sockname} //= load_mod($scheme);
+                $xnetd->{$sockname} //= load_mod($scheme, $opt);
                 $listener_names->{$sockname} = $s;
                 push @listeners, $s;
         }
@@ -712,7 +715,8 @@ sub daemon_loop ($) {
                 defer_accept($_, $tls_cb ? 'dataready' : $xn->{af_default});
 
                 # this calls epoll_create:
-                PublicInbox::Listener->new($_, $tls_cb || $xn->{post_accept})
+                PublicInbox::Listener->new($_, $tls_cb || $xn->{post_accept},
+                                                $xn->{'multi-accept'})
         } @listeners;
         PublicInbox::DS::event_loop($sig, $oldset);
 }
diff --git a/lib/PublicInbox/Listener.pm b/lib/PublicInbox/Listener.pm
index 7cedc349..4669cf04 100644
--- a/lib/PublicInbox/Listener.pm
+++ b/lib/PublicInbox/Listener.pm
@@ -1,14 +1,15 @@
-# Copyright (C) 2015-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 #
 # Used by -nntpd for listen sockets
 package PublicInbox::Listener;
-use strict;
+use v5.12;
 use parent 'PublicInbox::DS';
 use Socket qw(SOL_SOCKET SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY);
 use IO::Handle;
 use PublicInbox::Syscall qw(EPOLLIN EPOLLEXCLUSIVE);
 use Errno qw(EAGAIN ECONNABORTED);
+our $MULTI_ACCEPT = 0;
 
 # Warn on transient errors, mostly resource limitations.
 # EINTR would indicate the failure to set NonBlocking in systemd or similar
@@ -16,37 +17,35 @@ my %ERR_WARN = map {;
         eval("Errno::$_()") => $_
 } qw(EMFILE ENFILE ENOBUFS ENOMEM EINTR);
 
-sub new ($$$) {
-        my ($class, $s, $cb) = @_;
+sub new {
+        my ($class, $s, $cb, $multi_accept) = @_;
         setsockopt($s, SOL_SOCKET, SO_KEEPALIVE, 1);
         setsockopt($s, IPPROTO_TCP, TCP_NODELAY, 1); # ignore errors on non-TCP
         listen($s, 2**31 - 1); # kernel will clamp
         my $self = bless { post_accept => $cb }, $class;
+        $self->{multi_accept} = $multi_accept //= $MULTI_ACCEPT;
         $self->SUPER::new($s, EPOLLIN|EPOLLEXCLUSIVE);
 }
 
 sub event_step {
         my ($self) = @_;
         my $sock = $self->{sock} or return;
-
-        # no loop here, we want to fairly distribute clients
-        # between multiple processes sharing the same socket
-        # XXX our event loop needs better granularity for
-        # a single accept() here to be, umm..., acceptable
-        # on high-traffic sites.
-        if (my $addr = accept(my $c, $sock)) {
-                IO::Handle::blocking($c, 0); # no accept4 :<
-                eval { $self->{post_accept}->($c, $addr, $sock) };
-                warn "E: $@\n" if $@;
-        } elsif ($! == EAGAIN || $! == ECONNABORTED) {
-                # EAGAIN is common and likely
-                # ECONNABORTED is common with bad connections
-                return;
-        } elsif (my $sym = $ERR_WARN{int($!)}) {
-                warn "W: accept(): $! ($sym)\n";
-        } else {
-                warn "BUG?: accept(): $!\n";
-        }
+        my $n = $self->{multi_accept};
+        do {
+                if (my $addr = accept(my $c, $sock)) {
+                        IO::Handle::blocking($c, 0); # no accept4 :<
+                        eval { $self->{post_accept}->($c, $addr, $sock) };
+                        warn "E: $@\n" if $@;
+                } elsif ($! == EAGAIN || $! == ECONNABORTED) {
+                        # EAGAIN is common and likely
+                        # ECONNABORTED is common with bad connections
+                        return;
+                } elsif (my $sym = $ERR_WARN{int($!)}) {
+                        warn "W: accept(): $! ($sym)\n";
+                } else {
+                        warn "BUG?: accept(): $!\n";
+                }
+        } while ($n--);
 }
 
 1;