From 740d274818d7af9c50c8609a05860817e6aa9680 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:02 +0000 Subject: ds: get rid of {closed} field Merely checking the presence of the {sock} field is enough, and having multiple sources of truth increases confusion and the likelyhood of bugs. --- lib/PublicInbox/DS.pm | 52 +++++++++++++++--------------------------- lib/PublicInbox/HTTP.pm | 8 +++---- lib/PublicInbox/HTTPD/Async.pm | 2 +- lib/PublicInbox/NNTP.pm | 30 ++++++++++++------------ 4 files changed, 37 insertions(+), 55 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 2b04886a..f4fe8793 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -28,7 +28,6 @@ use PublicInbox::Syscall qw(:epoll); use fields ('sock', # underlying socket 'wbuf', # arrayref of scalars, scalarrefs, or coderefs to write 'wbuf_off', # offset into first element of wbuf to start writing at - 'closed', # bool: socket is closed 'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.) ); @@ -366,7 +365,7 @@ sub PostEventLoop { $sock->close; # and now we can finally remove the fd from the map. see - # comment above in _cleanup. + # comment above in ->close. delete $DescriptorMap{$fd}; } @@ -411,7 +410,6 @@ sub new { $self->{wbuf} = []; $self->{wbuf_off} = 0; - $self->{closed} = 0; my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL; @@ -457,28 +455,8 @@ Close the socket. =cut sub close { - my PublicInbox::DS $self = $_[0]; - return if $self->{closed}; - - # this does most of the work of closing us - $self->_cleanup(); - - # defer closing the actual socket until the event loop is done - # processing this round of events. (otherwise we might reuse fds) - if (my $sock = delete $self->{sock}) { - push @ToClose, $sock; - } - - return 0; -} - -### METHOD: _cleanup() -### Called by our closers so we can clean internal data structures. -sub _cleanup { - my PublicInbox::DS $self = $_[0]; - - # we're effectively closed; we have no fd and sock when we leave here - $self->{closed} = 1; + my ($self) = @_; + my $sock = delete $self->{sock} or return; # we need to flush our write buffer, as there may # be self-referential closures (sub { $client->close }) @@ -487,8 +465,8 @@ sub _cleanup { # if we're using epoll, we have to remove this from our epoll fd so we stop getting # notifications about it - if ($HaveEpoll && $self->{sock}) { - my $fd = fileno($self->{sock}); + if ($HaveEpoll) { + my $fd = fileno($sock); epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, $self->{event_watch}) and confess("EPOLL_CTL_DEL: $!"); } @@ -498,9 +476,15 @@ sub _cleanup { # processing an epoll_wait/etc that returned hundreds of fds, one # of which is not yet processed and is what we're closing. if we # keep it in DescriptorMap, then the event harnesses can just - # looked at $pob->{closed} and ignore it. but if it's an + # looked at $pob->{sock} == undef and ignore it. but if it's an # un-accounted for fd, then it (understandably) freak out a bit # and emit warnings, thinking their state got off. + + # defer closing the actual socket until the event loop is done + # processing this round of events. (otherwise we might reuse fds) + push @ToClose, $sock; + + return 0; } =head2 C<< $obj->sock() >> @@ -533,7 +517,7 @@ sub write { # now-dead object does its second write. that is this case. we # just lie and say it worked. it'll be dead soon and won't be # hurt by this lie. - return 1 if $self->{closed}; + return 1 unless $self->{sock}; my $bref; @@ -634,7 +618,7 @@ Turn 'readable' event notification on or off. =cut sub watch_read { my PublicInbox::DS $self = shift; - return if $self->{closed} || !$self->{sock}; + my $sock = $self->{sock} or return; my $val = shift; my $event = $self->{event_watch}; @@ -642,7 +626,7 @@ sub watch_read { $event &= ~POLLIN if ! $val; $event |= POLLIN if $val; - my $fd = fileno($self->{sock}); + my $fd = fileno($sock); # If it changed, set it if ($event != $self->{event_watch}) { if ($HaveKQueue) { @@ -664,14 +648,14 @@ Turn 'writable' event notification on or off. =cut sub watch_write { my PublicInbox::DS $self = shift; - return if $self->{closed} || !$self->{sock}; + my $sock = $self->{sock} or return; my $val = shift; my $event = $self->{event_watch}; $event &= ~POLLOUT if ! $val; $event |= POLLOUT if $val; - my $fd = fileno($self->{sock}); + my $fd = fileno($sock); # If it changed, set it if ($event != $self->{event_watch}) { @@ -728,7 +712,7 @@ sub as_string { my PublicInbox::DS $self = shift; my $rw = "(" . ($self->{event_watch} & POLLIN ? 'R' : '') . ($self->{event_watch} & POLLOUT ? 'W' : '') . ")"; - my $ret = ref($self) . "$rw: " . ($self->{closed} ? "closed" : "open"); + my $ret = ref($self) . "$rw: " . ($self->{sock} ? 'open' : 'closed'); return $ret; } diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index 45bf23ec..dff59286 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -33,7 +33,7 @@ sub process_pipelineq () { $pipet = undef; $pipelineq = []; foreach (@$q) { - next if $_->{closed}; + next unless $_->{sock}; rbuf_process($_); } } @@ -70,7 +70,7 @@ sub event_step { # called by PublicInbox::DS my $wbuf = $self->{wbuf}; if (@$wbuf) { $self->write(undef); - return if $self->{closed} || scalar(@$wbuf); + return if !$self->{sock} || scalar(@$wbuf); } # only read more requests if we've drained the write buffer, # otherwise we can be buffering infinitely w/o backpressure @@ -266,7 +266,7 @@ sub getline_cb ($$$) { my $buf = eval { $forward->getline }; if (defined $buf) { $write->($buf); # may close in PublicInbox::DS::write - unless ($self->{closed}) { + if ($self->{sock}) { my $next = $self->{pull}; if (scalar @{$self->{wbuf}}) { $self->write($next); @@ -322,7 +322,7 @@ sub response_write { use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0; sub more ($$) { my $self = $_[0]; - return if $self->{closed}; + return unless $self->{sock}; if (MSG_MORE && !scalar(@{$self->{wbuf}})) { my $n = send($self->{sock}, $_[1], MSG_MORE); if (defined $n) { diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index 604627ab..261a01e0 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -45,7 +45,7 @@ sub main_cb ($$$) { my $r = sysread($self->{sock}, $$bref, 8192); if ($r) { $fh->write($$bref); - unless ($http->{closed}) { # PublicInbox::DS sets this + if ($http->{sock}) { # !closed if (scalar @{$http->{wbuf}}) { $self->watch_read(0); $http->write(restart_read_cb($self)); diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 796ac74d..107cbe31 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -57,7 +57,7 @@ sub next_tick () { # maybe there's more pipelined data, or we'll have # to register it for socket-readiness notifications - if (!$nntp->{long_res} && !$nntp->{closed}) { + if (!$nntp->{long_res} && $nntp->{sock}) { check_read($nntp); } } @@ -66,9 +66,8 @@ sub next_tick () { sub update_idle_time ($) { my ($self) = @_; - my $sock = $self->{sock} or return; - my $fd = fileno($sock); - defined $fd and $EXPMAP->{$fd} = [ now(), $self ]; + my $sock = $self->{sock} or return; + $EXPMAP->{fileno($sock)} = [ now(), $self ]; } sub expire_old () { @@ -134,7 +133,7 @@ sub process_line ($$) { my $res = eval { $req->($self, @args) }; my $err = $@; - if ($err && !$self->{closed}) { + if ($err && $self->{sock}) { local $/ = "\n"; chomp($l); err($self, 'error from: %s (%s)', $l, $err); @@ -632,7 +631,7 @@ sub long_response ($$) { my $t0 = now(); $self->{long_res} = sub { my $more = eval { $cb->() }; - if ($@ || $self->{closed}) { + if ($@ || !$self->{sock}) { $self->{long_res} = undef; if ($@) { @@ -640,12 +639,12 @@ sub long_response ($$) { "%s during long response[$fd] - %0.6f", $@, now() - $t0); } - if ($self->{closed}) { - out($self, " deferred[$fd] aborted - %0.6f", - now() - $t0); - } else { + if ($self->{sock}) { update_idle_time($self); check_read($self); + } else { + out($self, " deferred[$fd] aborted - %0.6f", + now() - $t0); } } elsif ($more) { # scalar @{$self->{wbuf}}: # no recursion, schedule another call ASAP @@ -930,7 +929,7 @@ sub more ($$) { sub do_write ($$) { my ($self, $data) = @_; my $done = $self->write($data); - return 0 if $self->{closed}; + return 0 unless $self->{sock}; # Do not watch for readability if we have data in the queue, # instead re-enable watching for readability when we can @@ -966,13 +965,13 @@ sub do_more ($$) { sub event_step { my ($self) = @_; - return if $self->{closed}; + return unless $self->{sock}; my $wbuf = $self->{wbuf}; if (@$wbuf) { update_idle_time($self); $self->write(undef); - return if $self->{closed} || scalar(@$wbuf); + return if !$self->{sock} || scalar(@$wbuf); } return if $self->{long_res}; # only read more requests if we've drained the write buffer, @@ -1028,9 +1027,8 @@ sub check_read { sub not_idle_long ($$) { my ($self, $now) = @_; - my $sock = $self->{sock} or return; - defined(my $fd = fileno($sock)) or return; - my $ary = $EXPMAP->{$fd} or return; + my $sock = $self->{sock} or return; + my $ary = $EXPMAP->{fileno($sock)} or return; my $exp_at = $ary->[0] + $EXPTIME; $exp_at > $now; } -- cgit v1.2.3-24-ge0c7 From 8c619bae62a3d468505716d58d3559278883eee7 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:03 +0000 Subject: ds: get rid of more unused debug instance methods Over a decade of using Danga::Socket and I never found the built-in debug functionality useful. --- lib/PublicInbox/DS.pm | 69 --------------------------------------------------- 1 file changed, 69 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index f4fe8793..9e24ed78 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -34,8 +34,6 @@ use fields ('sock', # underlying socket use Errno qw(EAGAIN EINVAL); use Carp qw(croak confess); -use constant DebugLevel => 0; - use constant POLLIN => 1; use constant POLLOUT => 4; use constant POLLERR => 8; @@ -105,18 +103,6 @@ sub SetLoopTimeout { return $LoopTimeout = $_[1] + 0; } -=head2 C<< CLASS->DebugMsg( $format, @args ) >> - -Print the debugging message specified by the C-style I and -I - -=cut -sub DebugMsg { - my ( $class, $fmt, @args ) = @_; - chomp $fmt; - printf STDERR ">>> $fmt\n", @args; -} - =head2 C<< CLASS->AddTimer( $seconds, $coderef ) >> Add a timer to occur $seconds from now. $seconds may be fractional, but timers @@ -487,16 +473,6 @@ sub close { return 0; } -=head2 C<< $obj->sock() >> - -Returns the underlying IO::Handle for the object. - -=cut -sub sock { - my PublicInbox::DS $self = shift; - return $self->{sock}; -} - =head2 C<< $obj->write( $data ) >> Write the specified data to the underlying handle. I may be scalar, @@ -671,51 +647,6 @@ sub watch_write { } } -=head2 C<< $obj->dump_error( $message ) >> - -Prints to STDERR a backtrace with information about this socket and what lead -up to the dump_error call. - -=cut -sub dump_error { - my $i = 0; - my @list; - while (my ($file, $line, $sub) = (caller($i++))[1..3]) { - push @list, "\t$file:$line called $sub\n"; - } - - warn "ERROR: $_[1]\n" . - "\t$_[0] = " . $_[0]->as_string . "\n" . - join('', @list); -} - -=head2 C<< $obj->debugmsg( $format, @args ) >> - -Print the debugging message specified by the C-style I and -I. - -=cut -sub debugmsg { - my ( $self, $fmt, @args ) = @_; - confess "Not an object" unless ref $self; - - chomp $fmt; - printf STDERR ">>> $fmt\n", @args; -} - -=head2 C<< $obj->as_string() >> - -Returns a string describing this socket. - -=cut -sub as_string { - my PublicInbox::DS $self = shift; - my $rw = "(" . ($self->{event_watch} & POLLIN ? 'R' : '') . - ($self->{event_watch} & POLLOUT ? 'W' : '') . ")"; - my $ret = ref($self) . "$rw: " . ($self->{sock} ? 'open' : 'closed'); - return $ret; -} - package PublicInbox::DS::Timer; # [$abs_float_firetime, $coderef]; sub cancel { -- cgit v1.2.3-24-ge0c7 From 860c7e13d31390dc16c2cce813e4887f12e76731 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:04 +0000 Subject: ds: use and export monotonic now() All of our internal timing code should use monotonic clocks for consistency against system clock adjustments. This can be shared by our Daemon and NNTP packages. --- lib/PublicInbox/DS.pm | 11 +++++++---- lib/PublicInbox/Daemon.pm | 9 ++++----- lib/PublicInbox/NNTP.pm | 4 +--- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 9e24ed78..e7db2034 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -17,10 +17,11 @@ package PublicInbox::DS; use strict; use bytes; use POSIX (); -use Time::HiRes (); use IO::Handle qw(); use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD); - +use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); +use parent qw(Exporter); +our @EXPORT_OK = qw(now); use warnings; use PublicInbox::Syscall qw(:epoll); @@ -115,7 +116,7 @@ sub AddTimer { my $class = shift; my ($secs, $coderef) = @_; - my $fire_time = Time::HiRes::time() + $secs; + my $fire_time = now() + $secs; my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer"; @@ -195,11 +196,13 @@ sub FirstTimeEventLoop { } } +sub now () { clock_gettime(CLOCK_MONOTONIC) } + # runs timers and returns milliseconds for next one, or next event loop sub RunTimers { return $LoopTimeout unless @Timers; - my $now = Time::HiRes::time(); + my $now = now(); # Run expired timers while (@Timers && $Timers[0][0] <= $now) { diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm index 227ba5f9..b8d6b572 100644 --- a/lib/PublicInbox/Daemon.pm +++ b/lib/PublicInbox/Daemon.pm @@ -9,10 +9,9 @@ use Getopt::Long qw/:config gnu_getopt no_ignore_case auto_abbrev/; use IO::Handle; use IO::Socket; use Cwd qw/abs_path/; -use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); STDOUT->autoflush(1); STDERR->autoflush(1); -require PublicInbox::DS; +use PublicInbox::DS qw(now); require PublicInbox::EvCleanup; require POSIX; require PublicInbox::Listener; @@ -183,7 +182,7 @@ sub worker_quit { PublicInbox::DS->SetPostLoopCallback(sub { my ($dmap, undef) = @_; my $n = 0; - my $now = clock_gettime(CLOCK_MONOTONIC); + my $now = now(); foreach my $s (values %$dmap) { $s->can('busy') or next; @@ -195,9 +194,9 @@ sub worker_quit { } } if ($n) { - if (($warn + 5) < time) { + if (($warn + 5) < now()) { warn "$$ quitting, $n client(s) left\n"; - $warn = time; + $warn = now(); } unless (defined $proc_name) { $proc_name = (split(/\s+/, $0))[0]; diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 107cbe31..0a473e42 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -14,7 +14,7 @@ use PublicInbox::Git; require PublicInbox::EvCleanup; use Email::Simple; use POSIX qw(strftime); -use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); +PublicInbox::DS->import('now'); use Digest::SHA qw(sha1_hex); use Time::Local qw(timegm timelocal); use constant { @@ -25,8 +25,6 @@ use constant { r430 => '430 No article with that message-id', }; -sub now () { clock_gettime(CLOCK_MONOTONIC) }; - my @OVERVIEW = qw(Subject From Date Message-ID References Xref); my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n"; my $LIST_HEADERS = join("\r\n", @OVERVIEW, -- cgit v1.2.3-24-ge0c7 From 2f9aea4ecb6208955144e8a2290d747a4ff9c966 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:05 +0000 Subject: AddTimer: avoid clock_gettime for the '0' case We rely on immediate timers often, so we can avoid the overhead of an extra subroutine call to retrieve the monotonic time (and a sometimes-system call on some platforms). --- lib/PublicInbox/DS.pm | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index e7db2034..ed04feb5 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -113,8 +113,13 @@ Returns a timer object which you can call C<< $timer->cancel >> on if you need t =cut sub AddTimer { - my $class = shift; - my ($secs, $coderef) = @_; + my ($class, $secs, $coderef) = @_; + + if (!$secs) { + my $timer = bless([0, $coderef], 'PublicInbox::DS::Timer'); + unshift(@Timers, $timer); + return $timer; + } my $fire_time = now() + $secs; -- cgit v1.2.3-24-ge0c7 From 8c8b5c28b20b4d8ceee89312b1cc9e4602a7beb3 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:06 +0000 Subject: ds: get rid of on_incomplete_write wrapper Wrong place to be wrapping this method. --- lib/PublicInbox/DS.pm | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index ed04feb5..26c5251b 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -571,7 +571,7 @@ sub write { # since connection has stuff to write, it should now be # interested in pending writes: $self->{wbuf_off} += $written; - $self->on_incomplete_write; + $self->watch_write(1); return 0; } elsif ($written == $to_write) { $self->{wbuf_off} = 0; @@ -590,11 +590,6 @@ sub write { } } -sub on_incomplete_write { - my PublicInbox::DS $self = shift; - $self->watch_write(1); -} - =head2 C<< $obj->watch_read( $boolean ) >> Turn 'readable' event notification on or off. -- cgit v1.2.3-24-ge0c7 From ddba176a763dd7f36e3aa53b87907c6226207efa Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:07 +0000 Subject: ds: lazy initialize wbuf_off Since Perl 5.10+, "fields" makes a restricted hash; not a compile-time-defined array (struct) with fixed offsets as it did in Perl <= 5.8. Thus in-use fields cost memory, and since the write buffer offset is rarely needed; stop relying on it. --- lib/PublicInbox/DS.pm | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 26c5251b..154fd4dd 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -403,7 +403,6 @@ sub new { unless $sock && $fd; $self->{wbuf} = []; - $self->{wbuf_off} = 0; my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL; @@ -501,7 +500,7 @@ sub write { # now-dead object does its second write. that is this case. we # just lie and say it worked. it'll be dead soon and won't be # hurt by this lie. - return 1 unless $self->{sock}; + my $sock = $self->{sock} or return 1; my $bref; @@ -548,9 +547,9 @@ sub write { die "Write error: $@ <$bref>"; } - my $to_write = $len - $self->{wbuf_off}; - my $written = syswrite($self->{sock}, $$bref, $to_write, - $self->{wbuf_off}); + my $off = $self->{wbuf_off} // 0; + my $to_write = $len - $off; + my $written = syswrite($sock, $$bref, $to_write, $off); if (! defined $written) { if ($! == EAGAIN) { @@ -570,11 +569,11 @@ sub write { } # since connection has stuff to write, it should now be # interested in pending writes: - $self->{wbuf_off} += $written; + $self->{wbuf_off} = $off + $written; $self->watch_write(1); return 0; } elsif ($written == $to_write) { - $self->{wbuf_off} = 0; + delete $self->{wbuf_off}; $self->watch_write(0); # this was our only write, so we can return immediately -- cgit v1.2.3-24-ge0c7 From fdf67396c179a64154eaa6c10ac255d61ed39c01 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:08 +0000 Subject: ds: split out from ->flush_write and ->write Get rid of the confusing $need_queue variable and all the associated documentation for it. Instead, make it obvious that we're either skipping the write buffer or flushing the write buffer by splitting the sub in two. --- lib/PublicInbox/DS.pm | 141 +++++++++++++++++++++----------------------------- 1 file changed, 58 insertions(+), 83 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 154fd4dd..f1b7bab7 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -480,6 +480,42 @@ sub close { return 0; } +# returns 1 if done, 0 if incomplete +sub flush_write ($) { + my ($self) = @_; + my $sock = $self->{sock} or return 1; + my $wbuf = $self->{wbuf}; + + while (my $bref = $wbuf->[0]) { + my $ref = ref($bref); + if ($ref eq 'SCALAR') { + my $len = bytes::length($$bref); + my $off = $self->{wbuf_off} || 0; + my $to_write = $len - $off; + my $written = syswrite($sock, $$bref, $to_write, $off); + if (defined $written) { + if ($written == $to_write) { + shift @$wbuf; + } else { + $self->{wbuf_off} = $off + $written; + } + next; # keep going until EAGAIN + } elsif ($! == EAGAIN) { + $self->watch_write(1); + } else { + $self->close; + } + return 0; + } else { #($ref eq 'CODE') { + shift @$wbuf; + $bref->(); + } + } # while @$wbuf + + $self->watch_write(0); + 1; # all done +} + =head2 C<< $obj->write( $data ) >> Write the specified data to the underlying handle. I may be scalar, @@ -489,9 +525,8 @@ it returns 1, caller should stop waiting for 'writable' events) =cut sub write { - my PublicInbox::DS $self; - my $data; - ($self, $data) = @_; + my ($self, $data) = @_; + return flush_write($self) unless defined $data; # nobody should be writing to closed sockets, but caller code can # do two writes within an event, have the first fail and @@ -501,91 +536,31 @@ sub write { # just lie and say it worked. it'll be dead soon and won't be # hurt by this lie. my $sock = $self->{sock} or return 1; - - my $bref; - - # just queue data if there's already a wait - my $need_queue; + my $ref = ref $data; + my $bref = $ref ? $data : \$data; my $wbuf = $self->{wbuf}; + if (@$wbuf) { # already buffering, can't write more... + push @$wbuf, $bref; + return 0; + } elsif ($ref eq 'CODE') { + $bref->(); + return 1; + } else { + my $to_write = bytes::length($$bref); + my $written = syswrite($sock, $$bref, $to_write); - if (defined $data) { - $bref = ref $data ? $data : \$data; - if (scalar @$wbuf) { + if (defined $written) { + return 1 if $written == $to_write; + $self->{wbuf_off} = $written; + push @$wbuf, $bref; + return flush_write($self); # try until EAGAIN + } elsif ($! == EAGAIN) { push @$wbuf, $bref; - return 0; - } - - # this flag says we're bypassing the queue system, knowing we're the - # only outstanding write, and hoping we don't ever need to use it. - # if so later, though, we'll need to queue - $need_queue = 1; - } - - WRITE: - while (1) { - return 1 unless $bref ||= $wbuf->[0]; - - my $len; - eval { - $len = length($$bref); # this will die if $bref is a code ref, caught below - }; - if ($@) { - if (UNIVERSAL::isa($bref, "CODE")) { - unless ($need_queue) { - shift @$wbuf; - } - $bref->(); - - # code refs are just run and never get reenqueued - # (they're one-shot), so turn off the flag indicating the - # outstanding data needs queueing. - $need_queue = 0; - - undef $bref; - next WRITE; - } - die "Write error: $@ <$bref>"; - } - - my $off = $self->{wbuf_off} // 0; - my $to_write = $len - $off; - my $written = syswrite($sock, $$bref, $to_write, $off); - - if (! defined $written) { - if ($! == EAGAIN) { - # since connection has stuff to write, it should now be - # interested in pending writes: - if ($need_queue) { - push @$wbuf, $bref; - } - $self->watch_write(1); - return 0; - } - - return $self->close; - } elsif ($written != $to_write) { - if ($need_queue) { - push @$wbuf, $bref; - } - # since connection has stuff to write, it should now be - # interested in pending writes: - $self->{wbuf_off} = $off + $written; $self->watch_write(1); - return 0; - } elsif ($written == $to_write) { - delete $self->{wbuf_off}; - $self->watch_write(0); - - # this was our only write, so we can return immediately - # since we avoided incrementing the buffer size or - # putting it in the buffer. we also know there - # can't be anything else to write. - return 1 if $need_queue; - - shift @$wbuf; - undef $bref; - next WRITE; + } else { + $self->close; } + return 0; } } -- cgit v1.2.3-24-ge0c7 From 0e1c3fe09a06faf24f7bca159020f69730f1275a Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:09 +0000 Subject: ds: lazy-initialize wbuf We don't need write buffering unless we encounter slow clients requesting large responses. So don't waste a hash slot or (empty) arrayref for it. --- lib/PublicInbox/DS.pm | 14 ++++++-------- lib/PublicInbox/EvCleanup.pm | 2 +- lib/PublicInbox/HTTP.pm | 13 +++++-------- lib/PublicInbox/HTTPD/Async.pm | 2 +- lib/PublicInbox/NNTP.pm | 16 ++++++---------- 5 files changed, 19 insertions(+), 28 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index f1b7bab7..d07620a8 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -402,8 +402,6 @@ sub new { Carp::cluck("undef sock and/or fd in PublicInbox::DS->new. sock=" . ($sock || "") . ", fd=" . ($fd || "")) unless $sock && $fd; - $self->{wbuf} = []; - my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL; _InitPoller(); @@ -454,7 +452,7 @@ sub close { # we need to flush our write buffer, as there may # be self-referential closures (sub { $client->close }) # preventing the object from being destroyed - @{$self->{wbuf}} = (); + delete $self->{wbuf}; # if we're using epoll, we have to remove this from our epoll fd so we stop getting # notifications about it @@ -483,8 +481,8 @@ sub close { # returns 1 if done, 0 if incomplete sub flush_write ($) { my ($self) = @_; + my $wbuf = $self->{wbuf} or return 1; my $sock = $self->{sock} or return 1; - my $wbuf = $self->{wbuf}; while (my $bref = $wbuf->[0]) { my $ref = ref($bref); @@ -512,6 +510,7 @@ sub flush_write ($) { } } # while @$wbuf + delete $self->{wbuf}; $self->watch_write(0); 1; # all done } @@ -538,8 +537,7 @@ sub write { my $sock = $self->{sock} or return 1; my $ref = ref $data; my $bref = $ref ? $data : \$data; - my $wbuf = $self->{wbuf}; - if (@$wbuf) { # already buffering, can't write more... + if (my $wbuf = $self->{wbuf}) { # already buffering, can't write more... push @$wbuf, $bref; return 0; } elsif ($ref eq 'CODE') { @@ -552,10 +550,10 @@ sub write { if (defined $written) { return 1 if $written == $to_write; $self->{wbuf_off} = $written; - push @$wbuf, $bref; + $self->{wbuf} = [ $bref ]; return flush_write($self); # try until EAGAIN } elsif ($! == EAGAIN) { - push @$wbuf, $bref; + $self->{wbuf} = [ $bref ]; $self->watch_write(1); } else { $self->close; diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm index c64e2388..bd4dda11 100644 --- a/lib/PublicInbox/EvCleanup.pm +++ b/lib/PublicInbox/EvCleanup.pm @@ -29,7 +29,7 @@ sub once_init () { # never drains wbuf. We can avoid wasting a hash slot by # stuffing the read-end of the pipe into the never-to-be-touched # wbuf - push @{$self->{wbuf}}, $r; + $self->{wbuf} = $r; $self; } diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index dff59286..9a43069f 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -67,11 +67,8 @@ sub new ($$$) { sub event_step { # called by PublicInbox::DS my ($self) = @_; - my $wbuf = $self->{wbuf}; - if (@$wbuf) { - $self->write(undef); - return if !$self->{sock} || scalar(@$wbuf); - } + return unless $self->flush_write && $self->{sock}; + # only read more requests if we've drained the write buffer, # otherwise we can be buffering infinitely w/o backpressure @@ -268,7 +265,7 @@ sub getline_cb ($$$) { $write->($buf); # may close in PublicInbox::DS::write if ($self->{sock}) { my $next = $self->{pull}; - if (scalar @{$self->{wbuf}}) { + if ($self->{wbuf}) { $self->write($next); } else { PublicInbox::EvCleanup::asap($next); @@ -323,7 +320,7 @@ use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0; sub more ($$) { my $self = $_[0]; return unless $self->{sock}; - if (MSG_MORE && !scalar(@{$self->{wbuf}})) { + if (MSG_MORE && !$self->{wbuf}) { my $n = send($self->{sock}, $_[1], MSG_MORE); if (defined $n) { my $nlen = length($_[1]) - $n; @@ -490,7 +487,7 @@ sub close { # for graceful shutdown in PublicInbox::Daemon: sub busy () { my ($self) = @_; - ($self->{rbuf} ne '' || $self->{env} || scalar(@{$self->{wbuf}})); + ($self->{rbuf} ne '' || $self->{env} || $self->{wbuf}); } 1; diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index 261a01e0..46ea188c 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -46,7 +46,7 @@ sub main_cb ($$$) { if ($r) { $fh->write($$bref); if ($http->{sock}) { # !closed - if (scalar @{$http->{wbuf}}) { + if ($http->{wbuf}) { $self->watch_read(0); $http->write(restart_read_cb($self)); } diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 0a473e42..d9097cc7 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -644,7 +644,7 @@ sub long_response ($$) { out($self, " deferred[$fd] aborted - %0.6f", now() - $t0); } - } elsif ($more) { # scalar @{$self->{wbuf}}: + } elsif ($more) { # $self->{wbuf}: # no recursion, schedule another call ASAP # but only after all pending writes are done update_idle_time($self); @@ -950,7 +950,7 @@ use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0; sub do_more ($$) { my ($self, $data) = @_; - if (MSG_MORE && !scalar(@{$self->{wbuf}})) { + if (MSG_MORE && !$self->{wbuf}) { my $n = send($self->{sock}, $data, MSG_MORE); if (defined $n) { my $dlen = length($data); @@ -963,15 +963,11 @@ sub do_more ($$) { sub event_step { my ($self) = @_; - return unless $self->{sock}; - my $wbuf = $self->{wbuf}; - if (@$wbuf) { - update_idle_time($self); - $self->write(undef); - return if !$self->{sock} || scalar(@$wbuf); - } + return unless $self->flush_write && $self->{sock}; return if $self->{long_res}; + + update_idle_time($self); # only read more requests if we've drained the write buffer, # otherwise we can be buffering infinitely w/o backpressure @@ -1035,7 +1031,7 @@ sub not_idle_long ($$) { sub busy { my ($self, $now) = @_; ($self->{rbuf} ne '' || $self->{long_res} || - scalar(@{$self->{wbuf}}) || not_idle_long($self, $now)); + $self->{wbuf} || not_idle_long($self, $now)); } 1; -- cgit v1.2.3-24-ge0c7 From b4aae3e011e24a9aacbd6d84c0e0aa610144bb76 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:10 +0000 Subject: ds: don't pass `events' arg to EPOLL_CTL_DEL There's no point in passing a mask of interesting events when removing an item from the epoll watch set. --- lib/PublicInbox/DS.pm | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index d07620a8..8fc49eee 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -458,7 +458,7 @@ sub close { # notifications about it if ($HaveEpoll) { my $fd = fileno($sock); - epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, $self->{event_watch}) and + epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0) and confess("EPOLL_CTL_DEL: $!"); } -- cgit v1.2.3-24-ge0c7 From f6c9b3da9cf87cfbde7b95772de6b337ba46ef68 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:11 +0000 Subject: ds: remove support for DS->write(undef) We call ->flush_write directly, now; so we can eliminate a needless check. --- lib/PublicInbox/DS.pm | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 8fc49eee..ba8bd95f 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -518,14 +518,13 @@ sub flush_write ($) { =head2 C<< $obj->write( $data ) >> Write the specified data to the underlying handle. I may be scalar, -scalar ref, code ref (to run when there), or undef just to kick-start. +scalar ref, code ref (to run when there). Returns 1 if writes all went through, or 0 if there are writes in queue. If it returns 1, caller should stop waiting for 'writable' events) =cut sub write { my ($self, $data) = @_; - return flush_write($self) unless defined $data; # nobody should be writing to closed sockets, but caller code can # do two writes within an event, have the first fail and -- cgit v1.2.3-24-ge0c7 From 676f13ac53bf96eab869514fe9fafcc0169874ab Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:12 +0000 Subject: http: favor DS->write(strref) when reasonable This can avoid large memory copies when strings can't be copy-on-write and saves us the trouble of creating new refs in the code. --- lib/PublicInbox/HTTP.pm | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index 9a43069f..4f1f88fe 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -209,7 +209,7 @@ sub response_header_write { if (($len || $chunked) && $env->{REQUEST_METHOD} ne 'HEAD') { more($self, $h); } else { - $self->write($h); + $self->write(\$h); } $alive; } @@ -222,7 +222,7 @@ sub chunked_wcb ($) { more($self, sprintf("%x\r\n", bytes::length($_[0]))); more($self, $_[0]); - # use $self->write("\n\n") if you care about real-time + # use $self->write(\"\n\n") if you care about real-time # streaming responses, public-inbox WWW does not. more($self, "\r\n"); } @@ -248,7 +248,7 @@ sub response_done_cb ($$) { sub { my $env = $self->{env}; $self->{env} = undef; - $self->write("0\r\n\r\n") if $alive == 2; + $self->write(\"0\r\n\r\n") if $alive == 2; $self->write(sub{$alive ? next_request($self) : $self->close}); } } @@ -330,7 +330,7 @@ sub more ($$) { return $self->write(substr($_[1], $n, $nlen)); } } - $self->write($_[1]); + $self->write(\($_[1])); } sub input_prepare { @@ -467,7 +467,7 @@ sub read_input_chunked { # unlikely... sub quit { my ($self, $status) = @_; my $h = "HTTP/1.1 $status " . status_message($status) . "\r\n\r\n"; - $self->write($h); + $self->write(\$h); $self->close; } -- cgit v1.2.3-24-ge0c7 From e1b203f218b3fedea3068d6265130c47f0af9f4c Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:13 +0000 Subject: ds: share send(..., MSG_MORE) logic No sense in having similar Linux-specific functionality in both our NNTP.pm and HTTP.pm --- lib/PublicInbox/DS.pm | 21 ++++++++++++++++++++- lib/PublicInbox/HTTP.pm | 26 +++++--------------------- lib/PublicInbox/NNTP.pm | 47 +++++++++++++---------------------------------- 3 files changed, 38 insertions(+), 56 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index ba8bd95f..3e8b0b1a 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -21,7 +21,7 @@ use IO::Handle qw(); use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD); use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); use parent qw(Exporter); -our @EXPORT_OK = qw(now); +our @EXPORT_OK = qw(now msg_more); use warnings; use PublicInbox::Syscall qw(:epoll); @@ -561,6 +561,25 @@ sub write { } } +use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0; + +sub msg_more ($$) { + my $self = $_[0]; + my $sock = $self->{sock} or return 1; + + if (MSG_MORE && !$self->{wbuf}) { + my $n = send($sock, $_[1], MSG_MORE); + if (defined $n) { + my $nlen = bytes::length($_[1]) - $n; + return 1 if $nlen == 0; # all done! + + # PublicInbox::DS::write queues the unwritten substring: + return $self->write(substr($_[1], $n, $nlen)); + } + } + $self->write(\($_[1])); +} + =head2 C<< $obj->watch_read( $boolean ) >> Turn 'readable' event notification on or off. diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index 4f1f88fe..a669eb6e 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -19,6 +19,7 @@ use HTTP::Status qw(status_message); use HTTP::Date qw(time2str); use IO::Handle; require PublicInbox::EvCleanup; +PublicInbox::DS->import('msg_more'); use constant { CHUNK_START => -1, # [a-f0-9]+\r\n CHUNK_END => -2, # \r\n @@ -207,7 +208,7 @@ sub response_header_write { $h .= 'Date: ' . http_date() . "\r\n\r\n"; if (($len || $chunked) && $env->{REQUEST_METHOD} ne 'HEAD') { - more($self, $h); + msg_more($self, $h); } else { $self->write(\$h); } @@ -219,12 +220,12 @@ sub chunked_wcb ($) { my ($self) = @_; sub { return if $_[0] eq ''; - more($self, sprintf("%x\r\n", bytes::length($_[0]))); - more($self, $_[0]); + msg_more($self, sprintf("%x\r\n", bytes::length($_[0]))); + msg_more($self, $_[0]); # use $self->write(\"\n\n") if you care about real-time # streaming responses, public-inbox WWW does not. - more($self, "\r\n"); + msg_more($self, "\r\n"); } } @@ -316,23 +317,6 @@ sub response_write { } } -use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0; -sub more ($$) { - my $self = $_[0]; - return unless $self->{sock}; - if (MSG_MORE && !$self->{wbuf}) { - my $n = send($self->{sock}, $_[1], MSG_MORE); - if (defined $n) { - my $nlen = length($_[1]) - $n; - return 1 if $nlen == 0; # all done! - - # PublicInbox::DS::write queues the unwritten substring: - return $self->write(substr($_[1], $n, $nlen)); - } - } - $self->write(\($_[1])); -} - sub input_prepare { my ($self, $env) = @_; my $input; diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index d9097cc7..fe01627f 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -14,7 +14,7 @@ use PublicInbox::Git; require PublicInbox::EvCleanup; use Email::Simple; use POSIX qw(strftime); -PublicInbox::DS->import('now'); +PublicInbox::DS->import(qw(now msg_more)); use Digest::SHA qw(sha1_hex); use Time::Local qw(timegm timelocal); use constant { @@ -159,12 +159,12 @@ sub cmd_xgtitle ($;$) { sub list_overview_fmt ($) { my ($self) = @_; - do_more($self, $OVERVIEW_FMT); + msg_more($self, $OVERVIEW_FMT); } sub list_headers ($;$) { my ($self) = @_; - do_more($self, $LIST_HEADERS); + msg_more($self, $LIST_HEADERS); } sub list_active ($;$) { @@ -519,8 +519,8 @@ sub simple_body_write ($$) { $s->body_set(''); $body =~ s/^\./../smg; $body =~ s/(? article retrieved - head and body follow"); - do_more($self, _header($s)); - do_more($self, "\r\n"); + msg_more($self, _header($s)); + msg_more($self, "\r\n"); simple_body_write($self, $s); } @@ -562,7 +562,7 @@ sub cmd_head ($;$) { my ($n, $mid, $s) = @$r; set_art($self, $art); more($self, "221 $n <$mid> article retrieved - head follows"); - do_more($self, _header($s)); + msg_more($self, _header($s)); '.' } @@ -762,7 +762,7 @@ sub hdr_searchmsg ($$$$) { $tmp .= $s->{num} . ' ' . $s->$field . "\r\n"; } utf8::encode($tmp); - do_more($self, $tmp); + msg_more($self, $tmp); $cur = $msgs->[-1]->{num} + 1; }); } @@ -914,19 +914,13 @@ sub cmd_xpath ($$) { '223 '.join(' ', @paths); } -sub res ($$) { - my ($self, $line) = @_; - do_write($self, $line . "\r\n"); -} +sub res ($$) { do_write($_[0], $_[1] . "\r\n") } -sub more ($$) { - my ($self, $line) = @_; - do_more($self, $line . "\r\n"); -} +sub more ($$) { msg_more($_[0], $_[1] . "\r\n") } sub do_write ($$) { - my ($self, $data) = @_; - my $done = $self->write($data); + my $self = $_[0]; + my $done = $self->write(\($_[1])); return 0 unless $self->{sock}; # Do not watch for readability if we have data in the queue, @@ -946,21 +940,6 @@ sub out ($$;@) { printf { $self->{nntpd}->{out} } $fmt."\n", @args; } -use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0; - -sub do_more ($$) { - my ($self, $data) = @_; - if (MSG_MORE && !$self->{wbuf}) { - my $n = send($self->{sock}, $data, MSG_MORE); - if (defined $n) { - my $dlen = length($data); - return 1 if $n == $dlen; # all done! - $data = substr($data, $n, $dlen - $n); - } - } - do_write($self, $data); -} - sub event_step { my ($self) = @_; -- cgit v1.2.3-24-ge0c7 From 2929e3b3c62925149a9a8cafd872bfdb017453eb Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:14 +0000 Subject: ds: switch write buffering to use a tempfile Data which can't fit into a generously-sized socket buffer, has no business being stored in heap. --- lib/PublicInbox/DS.pm | 110 ++++++++++++++++++++++++++++++++++++------------ lib/PublicInbox/HTTP.pm | 20 ++------- 2 files changed, 85 insertions(+), 45 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 3e8b0b1a..eb468f57 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -18,22 +18,23 @@ use strict; use bytes; use POSIX (); use IO::Handle qw(); -use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD); +use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD SEEK_SET); use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); use parent qw(Exporter); -our @EXPORT_OK = qw(now msg_more); +our @EXPORT_OK = qw(now msg_more write_in_full); use warnings; use PublicInbox::Syscall qw(:epoll); use fields ('sock', # underlying socket - 'wbuf', # arrayref of scalars, scalarrefs, or coderefs to write + 'wbuf', # arrayref of coderefs or GLOB refs 'wbuf_off', # offset into first element of wbuf to start writing at 'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.) ); use Errno qw(EAGAIN EINVAL); use Carp qw(croak confess); +use File::Temp qw(tempfile); use constant POLLIN => 1; use constant POLLOUT => 4; @@ -478,32 +479,51 @@ sub close { return 0; } +# portable, non-thread-safe sendfile emulation (no pread, yet) +sub psendfile ($$$) { + my ($sock, $fh, $off) = @_; + + sysseek($fh, $$off, SEEK_SET) or return; + defined(my $to_write = sysread($fh, my $buf, 16384)) or return; + my $written = 0; + while ($to_write > 0) { + if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) { + $written += $w; + $to_write -= $w; + } else { + return if $written == 0; + last; + } + } + $$off += $written; + $written; +} + # returns 1 if done, 0 if incomplete sub flush_write ($) { my ($self) = @_; my $wbuf = $self->{wbuf} or return 1; my $sock = $self->{sock} or return 1; +next_buf: while (my $bref = $wbuf->[0]) { - my $ref = ref($bref); - if ($ref eq 'SCALAR') { - my $len = bytes::length($$bref); - my $off = $self->{wbuf_off} || 0; - my $to_write = $len - $off; - my $written = syswrite($sock, $$bref, $to_write, $off); - if (defined $written) { - if ($written == $to_write) { - shift @$wbuf; + if (ref($bref) ne 'CODE') { + my $off = delete($self->{wbuf_off}) // 0; + while (1) { + my $w = psendfile($sock, $bref, \$off); + if (defined $w) { + if ($w == 0) { + shift @$wbuf; + goto next_buf; + } + } elsif ($! == EAGAIN) { + $self->{wbuf_off} = $off; + watch_write($self, 1); + return 0; } else { - $self->{wbuf_off} = $off + $written; + return $self->close; } - next; # keep going until EAGAIN - } elsif ($! == EAGAIN) { - $self->watch_write(1); - } else { - $self->close; } - return 0; } else { #($ref eq 'CODE') { shift @$wbuf; $bref->(); @@ -515,6 +535,31 @@ sub flush_write ($) { 1; # all done } +sub write_in_full ($$$$) { + my ($fh, $bref, $len, $off) = @_; + my $rv = 0; + while ($len > 0) { + my $w = syswrite($fh, $$bref, $len, $off); + return ($rv ? $rv : $w) unless $w; # undef or 0 + $rv += $w; + $len -= $w; + $off += $w; + } + $rv +} + +sub tmpbuf ($$) { + my ($bref, $off) = @_; + # open(my $fh, '+>>', undef) doesn't set O_APPEND + my ($fh, $path) = tempfile('wbuf-XXXXXXX', TMPDIR => 1); + open $fh, '+>>', $path or die "open: $!"; + unlink $path; + my $to_write = bytes::length($$bref) - $off; + my $w = write_in_full($fh, $bref, $to_write, $off); + die "write_in_full ($to_write): $!" unless defined $w; + $w == $to_write ? $fh : die("short write $w < $to_write"); +} + =head2 C<< $obj->write( $data ) >> Write the specified data to the underlying handle. I may be scalar, @@ -537,7 +582,16 @@ sub write { my $ref = ref $data; my $bref = $ref ? $data : \$data; if (my $wbuf = $self->{wbuf}) { # already buffering, can't write more... - push @$wbuf, $bref; + if ($ref eq 'CODE') { + push @$wbuf, $bref; + } else { + my $last = $wbuf->[-1]; + if (ref($last) eq 'GLOB') { # append to tmp file buffer + write_in_full($last, $bref, bytes::length($$bref), 0); + } else { + push @$wbuf, tmpbuf($bref, 0); + } + } return 0; } elsif ($ref eq 'CODE') { $bref->(); @@ -548,15 +602,13 @@ sub write { if (defined $written) { return 1 if $written == $to_write; - $self->{wbuf_off} = $written; - $self->{wbuf} = [ $bref ]; - return flush_write($self); # try until EAGAIN } elsif ($! == EAGAIN) { - $self->{wbuf} = [ $bref ]; - $self->watch_write(1); + $written = 0; } else { - $self->close; + return $self->close; } + $self->{wbuf} = [ tmpbuf($bref, $written) ]; + watch_write($self, 1); return 0; } } @@ -573,8 +625,10 @@ sub msg_more ($$) { my $nlen = bytes::length($_[1]) - $n; return 1 if $nlen == 0; # all done! - # PublicInbox::DS::write queues the unwritten substring: - return $self->write(substr($_[1], $n, $nlen)); + # queue up the unwritten substring: + $self->{wbuf} = [ tmpbuf(\($_[1]), $n) ]; + watch_write($self, 1); + return 0; } } $self->write(\($_[1])); diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index a669eb6e..fcb5eb6c 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -19,7 +19,7 @@ use HTTP::Status qw(status_message); use HTTP::Date qw(time2str); use IO::Handle; require PublicInbox::EvCleanup; -PublicInbox::DS->import('msg_more'); +PublicInbox::DS->import(qw(msg_more write_in_full)); use constant { CHUNK_START => -1, # [a-f0-9]+\r\n CHUNK_END => -2, # \r\n @@ -125,7 +125,7 @@ sub read_input ($) { while ($len > 0) { if ($$rbuf ne '') { - my $w = write_in_full($input, $rbuf, $len); + my $w = write_in_full($input, $rbuf, $len, 0); return write_err($self, $len) unless $w; $len -= $w; die "BUG: $len < 0 (w=$w)" if $len < 0; @@ -367,20 +367,6 @@ sub recv_err { quit($self, 500); } -sub write_in_full { - my ($fh, $rbuf, $len) = @_; - my $rv = 0; - my $off = 0; - while ($len > 0) { - my $w = syswrite($fh, $$rbuf, $len, $off); - return ($rv ? $rv : $w) unless $w; # undef or 0 - $rv += $w; - $off += $w; - $len -= $w; - } - $rv -} - sub read_input_chunked { # unlikely... my ($self) = @_; my $input = $self->{env}->{'psgi.input'}; @@ -425,7 +411,7 @@ sub read_input_chunked { # unlikely... # drain the current chunk until ($len <= 0) { if ($$rbuf ne '') { - my $w = write_in_full($input, $rbuf, $len); + my $w = write_in_full($input, $rbuf, $len, 0); return write_err($self, "$len chunk") if !$w; $len -= $w; if ($len == 0) { -- cgit v1.2.3-24-ge0c7 From 66022a2e311c71ea3d50c7c5d5c92dfcfa8b1d7e Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:15 +0000 Subject: ds: get rid of redundant and unnecessary POLL* constants EPOLL* constants already match their POLL* counterparts and there's no way Linux can ever diverge or change the values of those constants. So we'll favor the EPOLL* ones since we use EPOLLEXCLUSIVE, already. For weird stuff like kqueue, we'd need to keep maintaining the mapping, anyways. --- lib/PublicInbox/DS.pm | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index eb468f57..bff12de5 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -29,19 +29,14 @@ use PublicInbox::Syscall qw(:epoll); use fields ('sock', # underlying socket 'wbuf', # arrayref of coderefs or GLOB refs 'wbuf_off', # offset into first element of wbuf to start writing at - 'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.) + 'event_watch', # bitmask of events the client is interested in + # (EPOLLIN,OUT,etc.) ); use Errno qw(EAGAIN EINVAL); use Carp qw(croak confess); use File::Temp qw(tempfile); -use constant POLLIN => 1; -use constant POLLOUT => 4; -use constant POLLERR => 8; -use constant POLLHUP => 16; -use constant POLLNVAL => 32; - our $HAVE_KQUEUE = eval { require IO::KQueue; 1 }; our ( @@ -403,19 +398,19 @@ sub new { Carp::cluck("undef sock and/or fd in PublicInbox::DS->new. sock=" . ($sock || "") . ", fd=" . ($fd || "")) unless $sock && $fd; - my $ev = $self->{event_watch} = POLLERR|POLLHUP|POLLNVAL; + my $ev = $self->{event_watch} = 0; _InitPoller(); if ($HaveEpoll) { if ($exclusive) { - $ev = $self->{event_watch} = EPOLLIN|EPOLLERR|EPOLLHUP|$EPOLLEXCLUSIVE; + $ev = $self->{event_watch} = EPOLLIN|$EPOLLEXCLUSIVE; } retry: if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) { if ($! == EINVAL && ($ev & $EPOLLEXCLUSIVE)) { $EPOLLEXCLUSIVE = 0; # old kernel - $ev = $self->{event_watch} = EPOLLIN|EPOLLERR|EPOLLHUP; + $ev = $self->{event_watch} = EPOLLIN; goto retry; } die "couldn't add epoll watch for $fd: $!\n"; @@ -646,8 +641,8 @@ sub watch_read { my $val = shift; my $event = $self->{event_watch}; - $event &= ~POLLIN if ! $val; - $event |= POLLIN if $val; + $event &= ~EPOLLIN if ! $val; + $event |= EPOLLIN if $val; my $fd = fileno($sock); # If it changed, set it @@ -676,8 +671,8 @@ sub watch_write { my $val = shift; my $event = $self->{event_watch}; - $event &= ~POLLOUT if ! $val; - $event |= POLLOUT if $val; + $event &= ~EPOLLOUT if ! $val; + $event |= EPOLLOUT if $val; my $fd = fileno($sock); # If it changed, set it -- cgit v1.2.3-24-ge0c7 From 7a5794fa02e07507564e8c3d7f2fc646956b66cb Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:16 +0000 Subject: syscall: get rid of unused EPOLL* constants EPOLLRDBAND is used for DECnet; and I'm pretty sure I won't be updating any of our code to work with DECnet. I've never found use for EPOLLHUP or EPOLLERR, either; so disable those for now and add comments for things I might actually use: EPOLLET and EPOLLONESHOT. --- lib/PublicInbox/Syscall.pm | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm index 4ef64cc3..98110eaf 100644 --- a/lib/PublicInbox/Syscall.pm +++ b/lib/PublicInbox/Syscall.pm @@ -22,11 +22,11 @@ use vars qw(@ISA @EXPORT_OK %EXPORT_TAGS $VERSION); $VERSION = "0.25"; @ISA = qw(Exporter); @EXPORT_OK = qw(sendfile epoll_ctl epoll_create epoll_wait - EPOLLIN EPOLLOUT EPOLLERR EPOLLHUP EPOLLRDBAND + EPOLLIN EPOLLOUT EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD EPOLLEXCLUSIVE); %EXPORT_TAGS = (epoll => [qw(epoll_ctl epoll_create epoll_wait - EPOLLIN EPOLLOUT EPOLLERR EPOLLHUP EPOLLRDBAND + EPOLLIN EPOLLOUT EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD EPOLLEXCLUSIVE)], sendfile => [qw(sendfile)], @@ -34,10 +34,12 @@ $VERSION = "0.25"; use constant EPOLLIN => 1; use constant EPOLLOUT => 4; -use constant EPOLLERR => 8; -use constant EPOLLHUP => 16; -use constant EPOLLRDBAND => 128; +# use constant EPOLLERR => 8; +# use constant EPOLLHUP => 16; +# use constant EPOLLRDBAND => 128; use constant EPOLLEXCLUSIVE => (1 << 28); +# use constant EPOLLONESHOT => (1 << 30); +# use constant EPOLLET => (1 << 31); use constant EPOLL_CTL_ADD => 1; use constant EPOLL_CTL_DEL => 2; use constant EPOLL_CTL_MOD => 3; -- cgit v1.2.3-24-ge0c7 From e8dce524749fccb5b0044a92e221b03282f5024e Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:17 +0000 Subject: syscall: get rid of unnecessary uname local vars We don't need to keep information from uname(2) around outside of startup. --- lib/PublicInbox/Syscall.pm | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm index 98110eaf..17fd1398 100644 --- a/lib/PublicInbox/Syscall.pm +++ b/lib/PublicInbox/Syscall.pm @@ -59,7 +59,6 @@ sub _load_syscall { return $rv; } -our ($sysname, $nodename, $release, $version, $machine) = POSIX::uname(); our ( $SYS_epoll_create, @@ -71,6 +70,7 @@ our ( our $no_deprecated = 0; if ($^O eq "linux") { + my $machine = (POSIX::uname())[-1]; # whether the machine requires 64-bit numbers to be on 8-byte # boundaries. my $u64_mod_8 = 0; -- cgit v1.2.3-24-ge0c7 From f41dc46f6213661ba51443d6cb0d6a9ba4d41472 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:18 +0000 Subject: ds: set event flags directly at initialization We can avoid the EPOLL_CTL_ADD && EPOLL_CTL_MOD sequence with a single EPOLL_CTL_ADD. --- lib/PublicInbox/DS.pm | 23 ++++++++++------------- lib/PublicInbox/EvCleanup.pm | 2 +- lib/PublicInbox/HTTP.pm | 3 +-- lib/PublicInbox/HTTPD/Async.pm | 3 +-- lib/PublicInbox/Listener.pm | 4 ++-- lib/PublicInbox/NNTP.pm | 3 +-- lib/PublicInbox/ParentPipe.pm | 3 +-- 7 files changed, 17 insertions(+), 24 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index bff12de5..2e0aa1e0 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -55,8 +55,6 @@ our ( @Timers, # timers ); -# this may be set to zero with old kernels -our $EPOLLEXCLUSIVE = EPOLLEXCLUSIVE; Reset(); ##################################################################### @@ -389,7 +387,7 @@ This is normally (always?) called from your subclass via: =cut sub new { - my ($self, $sock, $exclusive) = @_; + my ($self, $sock, $ev) = @_; $self = fields::new($self) unless ref $self; $self->{sock} = $sock; @@ -398,30 +396,29 @@ sub new { Carp::cluck("undef sock and/or fd in PublicInbox::DS->new. sock=" . ($sock || "") . ", fd=" . ($fd || "")) unless $sock && $fd; - my $ev = $self->{event_watch} = 0; + $self->{event_watch} = $ev; _InitPoller(); if ($HaveEpoll) { - if ($exclusive) { - $ev = $self->{event_watch} = EPOLLIN|$EPOLLEXCLUSIVE; - } retry: if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) { - if ($! == EINVAL && ($ev & $EPOLLEXCLUSIVE)) { - $EPOLLEXCLUSIVE = 0; # old kernel - $ev = $self->{event_watch} = EPOLLIN; + if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) { + $self->{event_watch} = ($ev &= ~EPOLLEXCLUSIVE); goto retry; } die "couldn't add epoll watch for $fd: $!\n"; } } elsif ($HaveKQueue) { - # Add them to the queue but disabled for now + my $f = $ev & EPOLLIN ? IO::KQueue::EV_ENABLE() + : IO::KQueue::EV_DISABLE(); $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(), - IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE()); + IO::KQueue::EV_ADD() | $f); + $f = $ev & EPOLLOUT ? IO::KQueue::EV_ENABLE() + : IO::KQueue::EV_DISABLE(); $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(), - IO::KQueue::EV_ADD() | IO::KQueue::EV_DISABLE()); + IO::KQueue::EV_ADD() | $f); } Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})") diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm index bd4dda11..d60ac2cc 100644 --- a/lib/PublicInbox/EvCleanup.pm +++ b/lib/PublicInbox/EvCleanup.pm @@ -23,7 +23,7 @@ sub once_init () { # fires in the next event loop iteration. pipe($r, $w) or die "pipe: $!"; fcntl($w, 1031, 4096) if $^O eq 'linux'; # 1031: F_SETPIPE_SZ - $self->SUPER::new($w); + $self->SUPER::new($w, 0); # always writable, since PublicInbox::EvCleanup::event_step # never drains wbuf. We can avoid wasting a hash slot by diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index fcb5eb6c..afa71ea5 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -56,12 +56,11 @@ sub http_date () { sub new ($$$) { my ($class, $sock, $addr, $httpd) = @_; my $self = fields::new($class); - $self->SUPER::new($sock); + $self->SUPER::new($sock, PublicInbox::DS::EPOLLIN()); $self->{httpd} = $httpd; $self->{rbuf} = ''; ($self->{remote_addr}, $self->{remote_port}) = PublicInbox::Daemon::host_with_port($addr); - $self->watch_read(1); $self; } diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index 46ea188c..dae62e55 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -25,10 +25,9 @@ sub new { my $self = fields::new($class); IO::Handle::blocking($io, 0); - $self->SUPER::new($io); + $self->SUPER::new($io, PublicInbox::DS::EPOLLIN()); $self->{cb} = $cb; $self->{cleanup} = $cleanup; - $self->watch_read(1); $self; } diff --git a/lib/PublicInbox/Listener.pm b/lib/PublicInbox/Listener.pm index 6ee3abb1..94b2aed4 100644 --- a/lib/PublicInbox/Listener.pm +++ b/lib/PublicInbox/Listener.pm @@ -17,8 +17,8 @@ sub new ($$$) { listen($s, 1024); IO::Handle::blocking($s, 0); my $self = fields::new($class); - $self->SUPER::new($s, 1); # calls epoll_create for the first socket - $self->watch_read(1); + $self->SUPER::new($s, PublicInbox::DS::EPOLLIN()| + PublicInbox::DS::EPOLLEXCLUSIVE()); $self->{post_accept} = $cb; $self } diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index fe01627f..eb1679a7 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -97,11 +97,10 @@ sub expire_old () { sub new ($$$) { my ($class, $sock, $nntpd) = @_; my $self = fields::new($class); - $self->SUPER::new($sock); + $self->SUPER::new($sock, PublicInbox::DS::EPOLLIN()); $self->{nntpd} = $nntpd; res($self, '201 ' . $nntpd->{servername} . ' ready - post via email'); $self->{rbuf} = ''; - $self->watch_read(1); update_idle_time($self); $expt ||= PublicInbox::EvCleanup::later(*expire_old); $self; diff --git a/lib/PublicInbox/ParentPipe.pm b/lib/PublicInbox/ParentPipe.pm index a9f05fc1..ccc0815e 100644 --- a/lib/PublicInbox/ParentPipe.pm +++ b/lib/PublicInbox/ParentPipe.pm @@ -10,9 +10,8 @@ use fields qw(cb); sub new ($$$) { my ($class, $pipe, $cb) = @_; my $self = fields::new($class); - $self->SUPER::new($pipe); + $self->SUPER::new($pipe, PublicInbox::DS::EPOLLIN()); $self->{cb} = $cb; - $self->watch_read(1); $self; } -- cgit v1.2.3-24-ge0c7 From 78380ca176e62503407b76df20fe3e9a8a33aa35 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:19 +0000 Subject: ds: import IO::KQueue namespace Make the rest of our IO::KQueue-using code less verbose and closer to the C equivalent. --- lib/PublicInbox/DS.pm | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 2e0aa1e0..00e2e5c6 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -37,7 +37,7 @@ use Errno qw(EAGAIN EINVAL); use Carp qw(croak confess); use File::Temp qw(tempfile); -our $HAVE_KQUEUE = eval { require IO::KQueue; 1 }; +our $HAVE_KQUEUE = eval { require IO::KQueue; IO::KQueue->import; 1 }; our ( $HaveEpoll, # Flag -- is epoll available? initially undefined. @@ -411,14 +411,10 @@ retry: } } elsif ($HaveKQueue) { - my $f = $ev & EPOLLIN ? IO::KQueue::EV_ENABLE() - : IO::KQueue::EV_DISABLE(); - $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(), - IO::KQueue::EV_ADD() | $f); - $f = $ev & EPOLLOUT ? IO::KQueue::EV_ENABLE() - : IO::KQueue::EV_DISABLE(); - $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(), - IO::KQueue::EV_ADD() | $f); + my $f = $ev & EPOLLIN ? EV_ENABLE() : EV_DISABLE(); + $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | $f); + $f = $ev & EPOLLOUT ? EV_ENABLE() : EV_DISABLE(); + $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | $f); } Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})") @@ -645,8 +641,8 @@ sub watch_read { # If it changed, set it if ($event != $self->{event_watch}) { if ($HaveKQueue) { - $KQueue->EV_SET($fd, IO::KQueue::EVFILT_READ(), - $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE()); + $KQueue->EV_SET($fd, EVFILT_READ(), + $val ? EV_ENABLE() : EV_DISABLE()); } elsif ($HaveEpoll) { epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and @@ -675,8 +671,8 @@ sub watch_write { # If it changed, set it if ($event != $self->{event_watch}) { if ($HaveKQueue) { - $KQueue->EV_SET($fd, IO::KQueue::EVFILT_WRITE(), - $val ? IO::KQueue::EV_ENABLE() : IO::KQueue::EV_DISABLE()); + $KQueue->EV_SET($fd, EVFILT_WRITE(), + $val ? EV_ENABLE() : EV_DISABLE()); } elsif ($HaveEpoll) { epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and -- cgit v1.2.3-24-ge0c7 From 393f8e88f31def03f4e951ccca4d3f59347abb64 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:20 +0000 Subject: ds: share watch_chg between watch_read/watch_write There was much duplicate logic between watch_read and watch_write. Share that logic, and give us room to enable edge-triggered or one-shot notifications in the future. --- lib/PublicInbox/DS.pm | 73 ++++++++++++++++++--------------------------------- 1 file changed, 25 insertions(+), 48 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 00e2e5c6..943e30b5 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -622,65 +622,42 @@ sub msg_more ($$) { $self->write(\($_[1])); } -=head2 C<< $obj->watch_read( $boolean ) >> - -Turn 'readable' event notification on or off. - -=cut -sub watch_read { - my PublicInbox::DS $self = shift; +sub watch_chg ($$$) { + my ($self, $bits, $set) = @_; my $sock = $self->{sock} or return; - - my $val = shift; - my $event = $self->{event_watch}; - - $event &= ~EPOLLIN if ! $val; - $event |= EPOLLIN if $val; - + my $cur = $self->{event_watch}; + my $changes = $cur; + if ($set) { + $changes |= $bits; + } else { + $changes &= ~$bits; + } + return if $changes == $cur; my $fd = fileno($sock); - # If it changed, set it - if ($event != $self->{event_watch}) { - if ($HaveKQueue) { - $KQueue->EV_SET($fd, EVFILT_READ(), - $val ? EV_ENABLE() : EV_DISABLE()); - } - elsif ($HaveEpoll) { - epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and - confess("EPOLL_CTL_MOD: $!"); - } - $self->{event_watch} = $event; + if ($HaveEpoll) { + epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $changes) and + confess("EPOLL_CTL_MOD $!"); + } elsif ($HaveKQueue) { + my $flag = $set ? EV_ENABLE() : EV_DISABLE(); + $KQueue->EV_SET($fd, EVFILT_READ(), $flag) if $bits & EPOLLIN; + $KQueue->EV_SET($fd, EVFILT_WRITE(), $flag) if $bits & EPOLLOUT; } + $self->{event_watch} = $changes; } -=head2 C<< $obj->watch_write( $boolean ) >> +=head2 C<< $obj->watch_read( $boolean ) >> -Turn 'writable' event notification on or off. +Turn 'readable' event notification on or off. =cut -sub watch_write { - my PublicInbox::DS $self = shift; - my $sock = $self->{sock} or return; +sub watch_read ($$) { watch_chg($_[0], EPOLLIN, $_[1]) }; - my $val = shift; - my $event = $self->{event_watch}; +=head2 C<< $obj->watch_write( $boolean ) >> - $event &= ~EPOLLOUT if ! $val; - $event |= EPOLLOUT if $val; - my $fd = fileno($sock); +Turn 'writable' event notification on or off. - # If it changed, set it - if ($event != $self->{event_watch}) { - if ($HaveKQueue) { - $KQueue->EV_SET($fd, EVFILT_WRITE(), - $val ? EV_ENABLE() : EV_DISABLE()); - } - elsif ($HaveEpoll) { - epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $event) and - confess "EPOLL_CTL_MOD: $!"; - } - $self->{event_watch} = $event; - } -} +=cut +sub watch_write ($$) { watch_chg($_[0], EPOLLOUT, $_[1]) }; package PublicInbox::DS::Timer; # [$abs_float_firetime, $coderef]; -- cgit v1.2.3-24-ge0c7 From dbcdabe601cfb29c8b7d5f169be9bf560d656a42 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:21 +0000 Subject: ds: remove IO::Poll support (for now) It may be reinstated at a later time if there's interest; but I want to be able to use one-shot notifications for certain events while retaining level-triggered notifications others. OTOH, I intend to fully support kqueue; via IO::KQueue for now, but via syscall() eventually to take advantage of the syscall reduction kevent(2) can provide over (current) epoll APIs. --- lib/PublicInbox/DS.pm | 52 --------------------------------------------------- 1 file changed, 52 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 943e30b5..9c801214 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -168,11 +168,6 @@ sub _InitPoller *EventLoop = *EpollEventLoop; } } - - if (!$HaveEpoll && !$HaveKQueue) { - require IO::Poll; - *EventLoop = *PollEventLoop; - } } =head2 C<< CLASS->EventLoop() >> @@ -190,8 +185,6 @@ sub FirstTimeEventLoop { EpollEventLoop($class); } elsif ($HaveKQueue) { KQueueEventLoop($class); - } else { - PollEventLoop($class); } } @@ -250,51 +243,6 @@ sub EpollEventLoop { exit 0; } -### The fallback IO::Poll-based event loop. Gets installed as EventLoop if -### IO::Epoll fails to load. -sub PollEventLoop { - my $class = shift; - - my PublicInbox::DS $pob; - - while (1) { - my $timeout = RunTimers(); - - # the following sets up @poll as a series of ($poll,$event_mask) - # items, then uses IO::Poll::_poll, implemented in XS, which - # modifies the array in place with the even elements being - # replaced with the event masks that occured. - my @poll; - while ( my ($fd, $sock) = each %DescriptorMap ) { - push @poll, $fd, $sock->{event_watch}; - } - - # if nothing to poll, either end immediately (if no timeout) - # or just keep calling the callback - unless (@poll) { - select undef, undef, undef, ($timeout / 1000); - return unless PostEventLoop(); - next; - } - - my $count = IO::Poll::_poll($timeout, @poll); - unless ($count >= 0) { - return unless PostEventLoop(); - next; - } - - # Fetch handles with read events - while (@poll) { - my ($fd, $state) = splice(@poll, 0, 2); - $DescriptorMap{$fd}->event_step if $state; - } - - return unless PostEventLoop(); - } - - exit 0; -} - ### The kqueue-based event loop. Gets installed as EventLoop if IO::KQueue works ### okay. sub KQueueEventLoop { -- cgit v1.2.3-24-ge0c7 From ca05b93cddfa2cc495451410222af3753bfe1b4e Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:22 +0000 Subject: ds: get rid of event_watch field We don't need to keep track of that field since we always know what events we're interested in when using one-shot wakeups. --- lib/PublicInbox/DS.pm | 64 +++++++++++++++--------------------------- lib/PublicInbox/EvCleanup.pm | 4 +-- lib/PublicInbox/HTTP.pm | 13 +++++---- lib/PublicInbox/HTTPD/Async.pm | 10 ++++--- lib/PublicInbox/NNTP.pm | 25 ++++++----------- lib/PublicInbox/Syscall.pm | 6 ++-- 6 files changed, 50 insertions(+), 72 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 9c801214..f5986e55 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -29,8 +29,6 @@ use PublicInbox::Syscall qw(:epoll); use fields ('sock', # underlying socket 'wbuf', # arrayref of coderefs or GLOB refs 'wbuf_off', # offset into first element of wbuf to start writing at - 'event_watch', # bitmask of events the client is interested in - # (EPOLLIN,OUT,etc.) ); use Errno qw(EAGAIN EINVAL); @@ -318,6 +316,17 @@ sub PostEventLoop { return $keep_running; } +# map EPOLL* bits to kqueue EV_* flags for EV_SET +sub kq_flag ($$) { + my ($bit, $ev) = @_; + if ($ev & $bit) { + my $fl = EV_ADD() | EV_ENABLE(); + ($ev & EPOLLONESHOT) ? ($fl|EV_ONESHOT()) : $fl; + } else { + EV_DISABLE(); + } +} + ##################################################################### ### PublicInbox::DS-the-object code ##################################################################### @@ -344,25 +353,21 @@ sub new { Carp::cluck("undef sock and/or fd in PublicInbox::DS->new. sock=" . ($sock || "") . ", fd=" . ($fd || "")) unless $sock && $fd; - $self->{event_watch} = $ev; - _InitPoller(); if ($HaveEpoll) { retry: if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) { if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) { - $self->{event_watch} = ($ev &= ~EPOLLEXCLUSIVE); + $ev &= ~EPOLLEXCLUSIVE; goto retry; } die "couldn't add epoll watch for $fd: $!\n"; } } elsif ($HaveKQueue) { - my $f = $ev & EPOLLIN ? EV_ENABLE() : EV_DISABLE(); - $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | $f); - $f = $ev & EPOLLOUT ? EV_ENABLE() : EV_DISABLE(); - $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | $f); + $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | kq_flag(EPOLLIN, $ev)); + $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | kq_flag(EPOLLOUT, $ev)); } Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})") @@ -454,7 +459,7 @@ next_buf: } } elsif ($! == EAGAIN) { $self->{wbuf_off} = $off; - watch_write($self, 1); + watch($self, EPOLLOUT|EPOLLONESHOT); return 0; } else { return $self->close; @@ -467,7 +472,6 @@ next_buf: } # while @$wbuf delete $self->{wbuf}; - $self->watch_write(0); 1; # all done } @@ -544,7 +548,7 @@ sub write { return $self->close; } $self->{wbuf} = [ tmpbuf($bref, $written) ]; - watch_write($self, 1); + watch($self, EPOLLOUT|EPOLLONESHOT); return 0; } } @@ -563,49 +567,27 @@ sub msg_more ($$) { # queue up the unwritten substring: $self->{wbuf} = [ tmpbuf(\($_[1]), $n) ]; - watch_write($self, 1); + watch($self, EPOLLOUT|EPOLLONESHOT); return 0; } } $self->write(\($_[1])); } -sub watch_chg ($$$) { - my ($self, $bits, $set) = @_; +sub watch ($$) { + my ($self, $ev) = @_; my $sock = $self->{sock} or return; - my $cur = $self->{event_watch}; - my $changes = $cur; - if ($set) { - $changes |= $bits; - } else { - $changes &= ~$bits; - } - return if $changes == $cur; my $fd = fileno($sock); if ($HaveEpoll) { - epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $changes) and + epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $ev) and confess("EPOLL_CTL_MOD $!"); } elsif ($HaveKQueue) { - my $flag = $set ? EV_ENABLE() : EV_DISABLE(); - $KQueue->EV_SET($fd, EVFILT_READ(), $flag) if $bits & EPOLLIN; - $KQueue->EV_SET($fd, EVFILT_WRITE(), $flag) if $bits & EPOLLOUT; + $KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev)); + $KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev)); } - $self->{event_watch} = $changes; } -=head2 C<< $obj->watch_read( $boolean ) >> - -Turn 'readable' event notification on or off. - -=cut -sub watch_read ($$) { watch_chg($_[0], EPOLLIN, $_[1]) }; - -=head2 C<< $obj->watch_write( $boolean ) >> - -Turn 'writable' event notification on or off. - -=cut -sub watch_write ($$) { watch_chg($_[0], EPOLLOUT, $_[1]) }; +sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) } package PublicInbox::DS::Timer; # [$abs_float_firetime, $coderef]; diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm index d60ac2cc..a9f6167d 100644 --- a/lib/PublicInbox/EvCleanup.pm +++ b/lib/PublicInbox/EvCleanup.pm @@ -6,6 +6,7 @@ package PublicInbox::EvCleanup; use strict; use warnings; use base qw(PublicInbox::DS); +use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT); my $ENABLED; sub enabled { $ENABLED } @@ -59,13 +60,12 @@ sub _run_later () { # Called by PublicInbox::DS sub event_step { my ($self) = @_; - $self->watch_write(0); _run_asap(); } sub _asap_timer () { $singleton ||= once_init(); - $singleton->watch_write(1); + $singleton->watch(EPOLLOUT|EPOLLONESHOT); 1; } diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index afa71ea5..773d77ba 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -20,6 +20,7 @@ use HTTP::Date qw(time2str); use IO::Handle; require PublicInbox::EvCleanup; PublicInbox::DS->import(qw(msg_more write_in_full)); +use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT); use constant { CHUNK_START => -1, # [a-f0-9]+\r\n CHUNK_END => -2, # \r\n @@ -56,7 +57,7 @@ sub http_date () { sub new ($$$) { my ($class, $sock, $addr, $httpd) = @_; my $self = fields::new($class); - $self->SUPER::new($sock, PublicInbox::DS::EPOLLIN()); + $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT); $self->{httpd} = $httpd; $self->{rbuf} = ''; ($self->{remote_addr}, $self->{remote_port}) = @@ -80,7 +81,8 @@ sub event_step { # called by PublicInbox::DS return $self->close if $r == 0; return rbuf_process($self); } - return if $!{EAGAIN}; # no need to call watch_read(1) again + + return $self->watch_in1 if $!{EAGAIN}; # common for clients to break connections without warning, # would be too noisy to log here: @@ -100,7 +102,7 @@ sub rbuf_process { ($r == -2 && length($self->{rbuf}) > 0x4000)) { return quit($self, 400); } - return $self->watch_read(1) if $r < 0; # incomplete + return $self->watch_in1 if $r < 0; # incomplete $self->{rbuf} = substr($self->{rbuf}, $r); my $len = input_prepare($self, \%env); @@ -143,7 +145,6 @@ sub read_input ($) { sub app_dispatch { my ($self, $input) = @_; - $self->watch_read(0); my $env = $self->{env}; $env->{REMOTE_ADDR} = $self->{remote_addr}; $env->{REMOTE_PORT} = $self->{remote_port}; @@ -236,7 +237,7 @@ sub identity_wcb ($) { sub next_request ($) { my ($self) = @_; if ($self->{rbuf} eq '') { # wait for next request - $self->watch_read(1); + $self->watch_in1; } else { # avoid recursion for pipelined requests push @$pipelineq, $self; $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq); @@ -360,7 +361,7 @@ sub recv_err { return $self->close if (defined $r && $r == 0); if ($!{EAGAIN}) { $self->{input_left} = $len; - return; + return $self->watch_in1; } err($self, "error reading for input: $! ($len bytes remaining)"); quit($self, 500); diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index dae62e55..f32ef009 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -31,10 +31,12 @@ sub new { $self; } +sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) } + # fires after pending writes are complete: sub restart_read_cb ($) { my ($self) = @_; - sub { $self->watch_read(1) } + sub { restart_read($self) } } sub main_cb ($$$) { @@ -46,16 +48,16 @@ sub main_cb ($$$) { $fh->write($$bref); if ($http->{sock}) { # !closed if ($http->{wbuf}) { - $self->watch_read(0); + $self->watch(0); $http->write(restart_read_cb($self)); } - # stay in watch_read, but let other clients + # stay in EPOLLIN, but let other clients # get some work done, too. return; } # fall through to close below... } elsif (!defined $r) { - return if $!{EAGAIN} || $!{EINTR}; + return restart_read($self) if $!{EAGAIN} || $!{EINTR}; } # Done! Error handling will happen in $fh->close diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index eb1679a7..98f88410 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -24,6 +24,7 @@ use constant { r225 => '225 Headers follow (multi-line)', r430 => '430 No article with that message-id', }; +use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT); my @OVERVIEW = qw(Subject From Date Message-ID References Xref); my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n"; @@ -52,12 +53,6 @@ sub next_tick () { # pipelined request, we bypassed socket-readiness # checks to get here: event_step($nntp); - - # maybe there's more pipelined data, or we'll have - # to register it for socket-readiness notifications - if (!$nntp->{long_res} && $nntp->{sock}) { - check_read($nntp); - } } } } @@ -97,7 +92,7 @@ sub expire_old () { sub new ($$$) { my ($class, $sock, $nntpd) = @_; my $self = fields::new($class); - $self->SUPER::new($sock, PublicInbox::DS::EPOLLIN()); + $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT); $self->{nntpd} = $nntpd; res($self, '201 ' . $nntpd->{servername} . ' ready - post via email'); $self->{rbuf} = ''; @@ -624,11 +619,10 @@ sub long_response ($$) { # make sure we disable reading during a long response, # clients should not be sending us stuff and making us do more # work while we are stream a response to them - $self->watch_read(0); my $t0 = now(); $self->{long_res} = sub { my $more = eval { $cb->() }; - if ($@ || !$self->{sock}) { + if ($@ || !$self->{sock}) { # something bad happened... $self->{long_res} = undef; if ($@) { @@ -922,10 +916,6 @@ sub do_write ($$) { my $done = $self->write(\($_[1])); return 0 unless $self->{sock}; - # Do not watch for readability if we have data in the queue, - # instead re-enable watching for readability when we can - $self->watch_read(0) if (!$done || $self->{long_res}); - $done; } @@ -943,7 +933,6 @@ sub event_step { my ($self) = @_; return unless $self->flush_write && $self->{sock}; - return if $self->{long_res}; update_idle_time($self); # only read more requests if we've drained the write buffer, @@ -957,7 +946,7 @@ sub event_step { my $off = length($$rbuf); $r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off); unless (defined $r) { - return if $!{EAGAIN}; + return $self->watch_in1 if $!{EAGAIN}; return $self->close; } return $self->close if $r == 0; @@ -978,6 +967,10 @@ sub event_step { my $len = length($$rbuf); return $self->close if ($len >= LINE_MAX); update_idle_time($self); + + # maybe there's more pipelined data, or we'll have + # to register it for socket-readiness notifications + check_read($self) unless ($self->{long_res} || $self->{wbuf}); } sub check_read { @@ -993,7 +986,7 @@ sub check_read { } else { # no pipelined requests available, let the kernel know # to wake us up if there's more - $self->watch_read(1); # PublicInbox::DS::watch_read + $self->watch_in1; # PublicInbox::DS::watch_in1 } } diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm index 17fd1398..f1988e61 100644 --- a/lib/PublicInbox/Syscall.pm +++ b/lib/PublicInbox/Syscall.pm @@ -24,11 +24,11 @@ $VERSION = "0.25"; @EXPORT_OK = qw(sendfile epoll_ctl epoll_create epoll_wait EPOLLIN EPOLLOUT EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD - EPOLLEXCLUSIVE); + EPOLLONESHOT EPOLLEXCLUSIVE); %EXPORT_TAGS = (epoll => [qw(epoll_ctl epoll_create epoll_wait EPOLLIN EPOLLOUT EPOLL_CTL_ADD EPOLL_CTL_DEL EPOLL_CTL_MOD - EPOLLEXCLUSIVE)], + EPOLLONESHOT EPOLLEXCLUSIVE)], sendfile => [qw(sendfile)], ); @@ -38,7 +38,7 @@ use constant EPOLLOUT => 4; # use constant EPOLLHUP => 16; # use constant EPOLLRDBAND => 128; use constant EPOLLEXCLUSIVE => (1 << 28); -# use constant EPOLLONESHOT => (1 << 30); +use constant EPOLLONESHOT => (1 << 30); # use constant EPOLLET => (1 << 31); use constant EPOLL_CTL_ADD => 1; use constant EPOLL_CTL_DEL => 2; -- cgit v1.2.3-24-ge0c7 From fd533491c3dd1ca032deaddc66a42423354e828f Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:23 +0000 Subject: httpd/async: remove EINTR check This pipe is always non-blocking when run under public-inbox-httpd and it won't fail with EINTR in that case --- lib/PublicInbox/HTTPD/Async.pm | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index f32ef009..3eb7f75a 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -57,7 +57,7 @@ sub main_cb ($$$) { } # fall through to close below... } elsif (!defined $r) { - return restart_read($self) if $!{EAGAIN} || $!{EINTR}; + return restart_read($self) if $!{EAGAIN}; } # Done! Error handling will happen in $fh->close -- cgit v1.2.3-24-ge0c7 From bd61cb1b18c7f38588e0c3b166dd265b738242cc Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:24 +0000 Subject: spawn: remove `Blocking' flag handling Instead, the O_NONBLOCK flag is set by PublicInbox::HTTPD::Async; and we won't be setting it elsewhere. --- lib/PublicInbox/Spawn.pm | 2 -- t/spawn.t | 11 ----------- 2 files changed, 13 deletions(-) diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index 66b916df..9161bb5b 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -229,8 +229,6 @@ sub popen_rd { my ($cmd, $env, $opts) = @_; pipe(my ($r, $w)) or die "pipe: $!\n"; $opts ||= {}; - my $blocking = $opts->{Blocking}; - IO::Handle::blocking($r, $blocking) if defined $blocking; $opts->{1} = fileno($w); my $pid = spawn($cmd, $env, $opts); return unless defined $pid; diff --git a/t/spawn.t b/t/spawn.t index 88404282..1d71b26d 100644 --- a/t/spawn.t +++ b/t/spawn.t @@ -81,17 +81,6 @@ use PublicInbox::Spawn qw(which spawn popen_rd); isnt($?, 0, '$? set properly: '.$?); } -{ - my ($fh, $pid) = popen_rd([qw(sleep 60)], undef, { Blocking => 0 }); - ok(defined $pid && $pid > 0, 'returned pid when array requested'); - is(kill(0, $pid), 1, 'child process is running'); - ok(!defined(sysread($fh, my $buf, 1)) && $!{EAGAIN}, - 'sysread returned quickly with EAGAIN'); - is(kill(9, $pid), 1, 'child process killed early'); - is(waitpid($pid, 0), $pid, 'child process reapable'); - isnt($?, 0, '$? set properly: '.$?); -} - SKIP: { eval { require BSD::Resource; -- cgit v1.2.3-24-ge0c7 From cee1b928497c002ba03c325cbc6de7022673e2cb Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:25 +0000 Subject: qspawn: describe where `$rpipe' come from It wasn't immediately obvious to me after several months of not looking at this code. --- lib/PublicInbox/Qspawn.pm | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 9aede103..943ee801 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -122,7 +122,7 @@ sub psgi_qx { eval { $qx_cb->($qx) }; $qx = undef; }; - my $rpipe; + my $rpipe; # comes from popen_rd my $async = $env->{'pi-httpd.async'}; my $cb = sub { my $r = sysread($rpipe, my $buf, 8192); @@ -137,7 +137,7 @@ sub psgi_qx { }; $limiter ||= $def_limiter ||= PublicInbox::Qspawn::Limiter->new(32); $self->start($limiter, sub { # may run later, much later... - ($rpipe) = @_; + ($rpipe) = @_; # popen_rd result if ($async) { # PublicInbox::HTTPD::Async->new($rpipe, $cb, $end) $async = $async->($rpipe, $cb, $end); -- cgit v1.2.3-24-ge0c7 From 7c89df780b7b160fe85b8a355455d06ec8499205 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:26 +0000 Subject: http|nntp: favor "$! == EFOO" over $!{EFOO} checks Integer comparisions of "$!" are faster than hash lookups. See commit 6fa2b29fcd0477d126ebb7db7f97b334f74bbcbc ("ds: cleanup Errno imports and favor constant comparisons") for benchmarks. --- lib/PublicInbox/HTTP.pm | 7 +++---- lib/PublicInbox/HTTPD/Async.pm | 3 ++- lib/PublicInbox/NNTP.pm | 4 ++-- lib/PublicInbox/Qspawn.pm | 7 +++++-- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index 773d77ba..4738e156 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -27,6 +27,7 @@ use constant { CHUNK_ZEND => -3, # \r\n CHUNK_MAX_HDR => 256, }; +use Errno qw(EAGAIN); my $pipelineq = []; my $pipet; @@ -82,11 +83,9 @@ sub event_step { # called by PublicInbox::DS return rbuf_process($self); } - return $self->watch_in1 if $!{EAGAIN}; - # common for clients to break connections without warning, # would be too noisy to log here: - return $self->close; + $! == EAGAIN ? $self->watch_in1 : $self->close; } sub rbuf_process { @@ -359,7 +358,7 @@ sub write_err { sub recv_err { my ($self, $r, $len) = @_; return $self->close if (defined $r && $r == 0); - if ($!{EAGAIN}) { + if ($! == EAGAIN) { $self->{input_left} = $len; return $self->watch_in1; } diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index 3eb7f75a..9cc41f17 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -11,6 +11,7 @@ use warnings; use base qw(PublicInbox::DS); use fields qw(cb cleanup); require PublicInbox::EvCleanup; +use Errno qw(EAGAIN); sub new { my ($class, $io, $cb, $cleanup) = @_; @@ -57,7 +58,7 @@ sub main_cb ($$$) { } # fall through to close below... } elsif (!defined $r) { - return restart_read($self) if $!{EAGAIN}; + return restart_read($self) if $! == EAGAIN; } # Done! Error handling will happen in $fh->close diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 98f88410..fbdf1364 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -25,6 +25,7 @@ use constant { r430 => '430 No article with that message-id', }; use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT); +use Errno qw(EAGAIN); my @OVERVIEW = qw(Subject From Date Message-ID References Xref); my $OVERVIEW_FMT = join(":\r\n", @OVERVIEW, qw(Bytes Lines)) . ":\r\n"; @@ -946,8 +947,7 @@ sub event_step { my $off = length($$rbuf); $r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off); unless (defined $r) { - return $self->watch_in1 if $!{EAGAIN}; - return $self->close; + return $! == EAGAIN ? $self->watch_in1 : $self->close; } return $self->close if $r == 0; } diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 943ee801..f2630a0f 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -29,6 +29,9 @@ use warnings; use PublicInbox::Spawn qw(popen_rd); require Plack::Util; +# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers +use Errno qw(EAGAIN EINTR); + my $def_limiter; # declares a command to spawn (but does not spawn it). @@ -131,7 +134,7 @@ sub psgi_qx { } elsif (defined $r) { $r ? $qx->write($buf) : $end->(); } else { - return if $!{EAGAIN} || $!{EINTR}; # loop again + return if $! == EAGAIN || $! == EINTR; # loop again $end->(); } }; @@ -193,7 +196,7 @@ sub psgi_return { my $buf = ''; my $rd_hdr = sub { my $r = sysread($rpipe, $buf, 1024, length($buf)); - return if !defined($r) && ($!{EINTR} || $!{EAGAIN}); + return if !defined($r) && $! == EAGAIN || $! == EINTR; $parse_hdr->($r, \$buf); }; -- cgit v1.2.3-24-ge0c7 From 62e3722f13164696a7af66cfa6253f69f0f5892b Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:27 +0000 Subject: ds: favor `delete' over assigning fields to `undef' This is cleaner in most cases and may allow Perl to reuse memory from unused fields. We can do this now that we no longer support Perl 5.8; since Danga::Socket was written with struct-like pseudo-hash support in mind, and Perl 5.9+ dropped support for pseudo-hashes over a decade ago. --- lib/PublicInbox/DS.pm | 1 + lib/PublicInbox/HTTP.pm | 21 +++++++++------------ lib/PublicInbox/HTTPD/Async.pm | 9 +++++---- lib/PublicInbox/NNTP.pm | 4 ++-- 4 files changed, 17 insertions(+), 18 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index f5986e55..482710f7 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -23,6 +23,7 @@ use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); use parent qw(Exporter); our @EXPORT_OK = qw(now msg_more write_in_full); use warnings; +use 5.010_001; use PublicInbox::Syscall qw(:epoll); diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index 4738e156..94972054 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -118,8 +118,7 @@ sub read_input ($) { # env->{CONTENT_LENGTH} (identity) my $sock = $self->{sock}; - my $len = $self->{input_left}; - $self->{input_left} = undef; + my $len = delete $self->{input_left}; my $rbuf = \($self->{rbuf}); my $input = $env->{'psgi.input'}; @@ -246,8 +245,7 @@ sub next_request ($) { sub response_done_cb ($$) { my ($self, $alive) = @_; sub { - my $env = $self->{env}; - $self->{env} = undef; + my $env = delete $self->{env}; $self->write(\"0\r\n\r\n") if $alive == 2; $self->write(sub{$alive ? next_request($self) : $self->close}); } @@ -279,7 +277,7 @@ sub getline_cb ($$$) { } } - $self->{forward} = $self->{pull} = undef; + delete @$self{qw(forward pull)}; # avoid recursion if ($forward) { eval { $forward->close }; @@ -370,8 +368,7 @@ sub read_input_chunked { # unlikely... my ($self) = @_; my $input = $self->{env}->{'psgi.input'}; my $sock = $self->{sock}; - my $len = $self->{input_left}; - $self->{input_left} = undef; + my $len = delete $self->{input_left}; my $rbuf = \($self->{rbuf}); while (1) { # chunk start @@ -442,11 +439,11 @@ sub quit { sub close { my $self = shift; - my $forward = $self->{forward}; - my $env = $self->{env}; - delete $env->{'psgix.io'} if $env; # prevent circular references - $self->{pull} = $self->{forward} = $self->{env} = undef; - if ($forward) { + if (my $env = delete $self->{env}) { + delete $env->{'psgix.io'}; # prevent circular references + } + delete $self->{pull}; + if (my $forward = delete $self->{forward}) { eval { $forward->close }; err($self, "forward ->close error: $@") if $@; } diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index 9cc41f17..bec49337 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -63,7 +63,7 @@ sub main_cb ($$$) { # Done! Error handling will happen in $fh->close # called by the {cleanup} handler - $http->{forward} = undef; + delete $http->{forward}; $self->close; } } @@ -81,12 +81,13 @@ sub event_step { $_[0]->{cb}->(@_) } sub close { my $self = shift; - my $cleanup = $self->{cleanup}; - $self->{cleanup} = $self->{cb} = undef; + delete $self->{cb}; $self->SUPER::close(@_); # we defer this to the next timer loop since close is deferred - PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup; + if (my $cleanup = delete $self->{cleanup}) { + PublicInbox::EvCleanup::next_tick($cleanup); + } } 1; diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index fbdf1364..6a582ea4 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -624,7 +624,7 @@ sub long_response ($$) { $self->{long_res} = sub { my $more = eval { $cb->() }; if ($@ || !$self->{sock}) { # something bad happened... - $self->{long_res} = undef; + delete $self->{long_res}; if ($@) { err($self, @@ -646,7 +646,7 @@ sub long_response ($$) { push @$nextq, $self; $nextt ||= PublicInbox::EvCleanup::asap(*next_tick); } else { # all done! - $self->{long_res} = undef; + delete $self->{long_res}; check_read($self); res($self, '.'); out($self, " deferred[$fd] done - %0.6f", now() - $t0); -- cgit v1.2.3-24-ge0c7 From 2600289573c569fea65a1da817497414175bae55 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:28 +0000 Subject: http: don't pass extra args to PublicInbox::DS::close YAGNI Followup-to: commit 30ab5cf82b9d47242640f748a0f9a088ca783e32 ("ds: reduce Errno imports and drop ->close reason") --- lib/PublicInbox/HTTP.pm | 4 ++-- lib/PublicInbox/HTTPD/Async.pm | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index 94972054..c81aeacd 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -438,7 +438,7 @@ sub quit { } sub close { - my $self = shift; + my $self = $_[0]; if (my $env = delete $self->{env}) { delete $env->{'psgix.io'}; # prevent circular references } @@ -447,7 +447,7 @@ sub close { eval { $forward->close }; err($self, "forward ->close error: $@") if $@; } - $self->SUPER::close(@_); + $self->SUPER::close; # PublicInbox::DS::close } # for graceful shutdown in PublicInbox::Daemon: diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index bec49337..e6df58eb 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -80,9 +80,9 @@ sub async_pass { sub event_step { $_[0]->{cb}->(@_) } sub close { - my $self = shift; + my $self = $_[0]; delete $self->{cb}; - $self->SUPER::close(@_); + $self->SUPER::close; # we defer this to the next timer loop since close is deferred if (my $cleanup = delete $self->{cleanup}) { -- cgit v1.2.3-24-ge0c7 From 8e1c3155da4edc082e8e3d8b30351f0c861757a7 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:29 +0000 Subject: ds: pass $self to code references We can reduce the amount of short-lived anonymous subs we create by passing $self to code references. --- lib/PublicInbox/DS.pm | 4 ++-- lib/PublicInbox/HTTP.pm | 9 ++++++++- lib/PublicInbox/HTTPD/Async.pm | 17 +++++++++-------- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 482710f7..7b87cd56 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -468,7 +468,7 @@ next_buf: } } else { #($ref eq 'CODE') { shift @$wbuf; - $bref->(); + $bref->($self); } } # while @$wbuf @@ -535,7 +535,7 @@ sub write { } return 0; } elsif ($ref eq 'CODE') { - $bref->(); + $bref->($self); return 1; } else { my $to_write = bytes::length($$bref); diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index c81aeacd..e132c610 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -247,7 +247,7 @@ sub response_done_cb ($$) { sub { my $env = delete $self->{env}; $self->write(\"0\r\n\r\n") if $alive == 2; - $self->write(sub{$alive ? next_request($self) : $self->close}); + $self->write($alive ? \&next_request : \&close); } } @@ -456,4 +456,11 @@ sub busy () { ($self->{rbuf} ne '' || $self->{env} || $self->{wbuf}); } +# fires after pending writes are complete: +sub restart_pass ($) { + $_[0]->{forward}->restart_read; # see PublicInbox::HTTPD::Async +} + +sub enqueue_restart_pass ($) { $_[0]->write(\&restart_pass) } + 1; diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index e6df58eb..b46baeb2 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -34,23 +34,24 @@ sub new { sub restart_read ($) { $_[0]->watch(PublicInbox::DS::EPOLLIN()) } -# fires after pending writes are complete: -sub restart_read_cb ($) { - my ($self) = @_; - sub { restart_read($self) } -} - sub main_cb ($$$) { my ($http, $fh, $bref) = @_; sub { my ($self) = @_; my $r = sysread($self->{sock}, $$bref, 8192); if ($r) { - $fh->write($$bref); + $fh->write($$bref); # may call $http->close + if ($http->{sock}) { # !closed if ($http->{wbuf}) { + # HTTP client could not keep up, so + # stop reading and buffering. $self->watch(0); - $http->write(restart_read_cb($self)); + + # Tell the HTTP socket to restart us + # when HTTP client is done draining + # $http->{wbuf}: + $http->enqueue_restart_pass; } # stay in EPOLLIN, but let other clients # get some work done, too. -- cgit v1.2.3-24-ge0c7 From 4efa374fc040c3a4f09160323a2fb92e18304fae Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:30 +0000 Subject: evcleanup: replace _run_asap with `event_step' callback No point in keeping a one-line wrapper sub around. --- lib/PublicInbox/EvCleanup.pm | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm index a9f6167d..33b54ebc 100644 --- a/lib/PublicInbox/EvCleanup.pm +++ b/lib/PublicInbox/EvCleanup.pm @@ -46,7 +46,9 @@ sub _run_all ($) { # ensure PublicInbox::DS::ToClose processing after timers fire sub _asap_close () { $asapq->[1] ||= _asap_timer() } -sub _run_asap () { _run_all($asapq) } +# Called by PublicInbox::DS +sub event_step { _run_all($asapq) } + sub _run_next () { _run_all($nextq); _asap_close(); @@ -57,12 +59,6 @@ sub _run_later () { _asap_close(); } -# Called by PublicInbox::DS -sub event_step { - my ($self) = @_; - _run_asap(); -} - sub _asap_timer () { $singleton ||= once_init(); $singleton->watch(EPOLLOUT|EPOLLONESHOT); @@ -88,7 +84,7 @@ sub later ($) { } END { - _run_asap(); + event_step(); _run_all($nextq); _run_all($laterq); } -- cgit v1.2.3-24-ge0c7 From a3cb58141cb7ef78fc4b3b85f6132804972c20e7 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:31 +0000 Subject: ds: remove pointless exit calls They're never called; the only way to break out of that loop is the PostEventLoop callback. --- lib/PublicInbox/DS.pm | 3 --- 1 file changed, 3 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 7b87cd56..9811405b 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -239,7 +239,6 @@ sub EpollEventLoop { } return unless PostEventLoop(); } - exit 0; } ### The kqueue-based event loop. Gets installed as EventLoop if IO::KQueue works @@ -264,8 +263,6 @@ sub KQueueEventLoop { } return unless PostEventLoop(); } - - exit(0); } =head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >> -- cgit v1.2.3-24-ge0c7 From c100879166cbbd6c2481ce68a549dab7d018d826 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:32 +0000 Subject: http|nntp: be explicit about bytes::length on rbuf It should not matter because our rbuf is always from a socket without encoding layers, but this makes things easier to follow. --- lib/PublicInbox/HTTP.pm | 12 ++++++------ lib/PublicInbox/NNTP.pm | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index e132c610..fbca9a54 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -76,7 +76,7 @@ sub event_step { # called by PublicInbox::DS return read_input($self) if defined $self->{env}; - my $off = length($self->{rbuf}); + my $off = bytes::length($self->{rbuf}); my $r = sysread($self->{sock}, $self->{rbuf}, 8192, $off); if (defined $r) { return $self->close if $r == 0; @@ -98,7 +98,7 @@ sub rbuf_process { # (they are rarely-used and git (as of 2.7.2) does not use them) if ($r == -1 || $env{HTTP_TRAILER} || # this length-check is necessary for PURE_PERL=1: - ($r == -2 && length($self->{rbuf}) > 0x4000)) { + ($r == -2 && bytes::length($self->{rbuf}) > 0x4000)) { return quit($self, 400); } return $self->watch_in1 if $r < 0; # incomplete @@ -375,12 +375,12 @@ sub read_input_chunked { # unlikely... if ($len == CHUNK_ZEND) { $$rbuf =~ s/\A\r\n//s and return app_dispatch($self, $input); - return quit($self, 400) if length($$rbuf) > 2; + return quit($self, 400) if bytes::length($$rbuf) > 2; } if ($len == CHUNK_END) { if ($$rbuf =~ s/\A\r\n//s) { $len = CHUNK_START; - } elsif (length($$rbuf) > 2) { + } elsif (bytes::length($$rbuf) > 2) { return quit($self, 400); } } @@ -390,14 +390,14 @@ sub read_input_chunked { # unlikely... if (($len + -s $input) > $MAX_REQUEST_BUFFER) { return quit($self, 413); } - } elsif (length($$rbuf) > CHUNK_MAX_HDR) { + } elsif (bytes::length($$rbuf) > CHUNK_MAX_HDR) { return quit($self, 400); } # will break from loop since $len >= 0 } if ($len < 0) { # chunk header is trickled, read more - my $off = length($$rbuf); + my $off = bytes::length($$rbuf); my $r = sysread($sock, $$rbuf, 8192, $off); return recv_err($self, $r, $len) unless $r; # (implicit) goto chunk_start if $r > 0; diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 6a582ea4..a9e54a68 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -243,7 +243,7 @@ sub parse_time ($$;$) { } my @now = $gmt ? gmtime : localtime; my ($YYYY, $MM, $DD); - if (length($date) == 8) { # RFC 3977 allows YYYYMMDD + if (bytes::length($date) == 8) { # RFC 3977 allows YYYYMMDD ($YYYY, $MM, $DD) = unpack('A4A2A2', $date); } else { # legacy clients send YYMMDD ($YYYY, $MM, $DD) = unpack('A2A2A2', $date); @@ -944,7 +944,7 @@ sub event_step { my $r; if (index($$rbuf, "\n") < 0) { - my $off = length($$rbuf); + my $off = bytes::length($$rbuf); $r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off); unless (defined $r) { return $! == EAGAIN ? $self->watch_in1 : $self->close; @@ -964,7 +964,7 @@ sub event_step { } return $self->close if $r < 0; - my $len = length($$rbuf); + my $len = bytes::length($$rbuf); return $self->close if ($len >= LINE_MAX); update_idle_time($self); -- cgit v1.2.3-24-ge0c7 From 2101e27b937893aa427d8693161966e3673b887e Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:33 +0000 Subject: ds: hoist out do_read from NNTP and HTTP Both NNTP and HTTP have common needs and we can factor out some common code to make dealing with IO::Socket::SSL easier. --- lib/PublicInbox/DS.pm | 10 ++++++++++ lib/PublicInbox/HTTP.pm | 14 +++----------- lib/PublicInbox/NNTP.pm | 9 ++------- 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 9811405b..8735e888 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -473,6 +473,15 @@ next_buf: 1; # all done } +sub do_read ($$$$) { + my ($self, $rbuf, $len, $off) = @_; + my $r = sysread($self->{sock}, $$rbuf, $len, $off); + return ($r == 0 ? $self->close : $r) if defined $r; + # common for clients to break connections without warning, + # would be too noisy to log here: + $! == EAGAIN ? $self->watch_in1 : $self->close; +} + sub write_in_full ($$$$) { my ($fh, $bref, $len, $off) = @_; my $rv = 0; @@ -583,6 +592,7 @@ sub watch ($$) { $KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev)); $KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev)); } + 0; } sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) } diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index fbca9a54..7697ac5c 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -75,17 +75,9 @@ sub event_step { # called by PublicInbox::DS # otherwise we can be buffering infinitely w/o backpressure return read_input($self) if defined $self->{env}; - - my $off = bytes::length($self->{rbuf}); - my $r = sysread($self->{sock}, $self->{rbuf}, 8192, $off); - if (defined $r) { - return $self->close if $r == 0; - return rbuf_process($self); - } - - # common for clients to break connections without warning, - # would be too noisy to log here: - $! == EAGAIN ? $self->watch_in1 : $self->close; + my $rbuf = \($self->{rbuf}); + my $off = bytes::length($$rbuf); + $self->do_read($rbuf, 8192, $off) and rbuf_process($self); } sub rbuf_process { diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index a9e54a68..42fbb255 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -941,17 +941,12 @@ sub event_step { use constant LINE_MAX => 512; # RFC 977 section 2.3 my $rbuf = \($self->{rbuf}); - my $r; + my $r = 1; if (index($$rbuf, "\n") < 0) { my $off = bytes::length($$rbuf); - $r = sysread($self->{sock}, $$rbuf, LINE_MAX, $off); - unless (defined $r) { - return $! == EAGAIN ? $self->watch_in1 : $self->close; - } - return $self->close if $r == 0; + $r = $self->do_read($rbuf, LINE_MAX, $off) or return; } - $r = 1; while ($r > 0 && $$rbuf =~ s/\A[ \t\r\n]*([^\r\n]*)\r?\n//) { my $line = $1; return $self->close if $line =~ /[[:cntrl:]]/s; -- cgit v1.2.3-24-ge0c7 From 6b1ee7ed8032e277a84bbe2a343f2c318a0defb8 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:34 +0000 Subject: nntp: simplify re-arming/requeue logic We can be smarter about requeuing clients to run and avoid excessive epoll_ctl calls since we can trust event_step to do the right thing depending on the state of the client. --- lib/PublicInbox/NNTP.pm | 33 ++++++++++----------------------- 1 file changed, 10 insertions(+), 23 deletions(-) diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 42fbb255..468a22f5 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -58,6 +58,11 @@ sub next_tick () { } } +sub requeue ($) { + push @$nextq, $_[0]; + $nextt ||= PublicInbox::EvCleanup::asap(*next_tick); +} + sub update_idle_time ($) { my ($self) = @_; my $sock = $self->{sock} or return; @@ -633,7 +638,7 @@ sub long_response ($$) { } if ($self->{sock}) { update_idle_time($self); - check_read($self); + requeue($self); } else { out($self, " deferred[$fd] aborted - %0.6f", now() - $t0); @@ -642,14 +647,12 @@ sub long_response ($$) { # no recursion, schedule another call ASAP # but only after all pending writes are done update_idle_time($self); - - push @$nextq, $self; - $nextt ||= PublicInbox::EvCleanup::asap(*next_tick); + requeue($self); } else { # all done! delete $self->{long_res}; - check_read($self); res($self, '.'); out($self, " deferred[$fd] done - %0.6f", now() - $t0); + requeue($self); } }; $self->{long_res}->(); # kick off! @@ -930,6 +933,7 @@ sub out ($$;@) { printf { $self->{nntpd}->{out} } $fmt."\n", @args; } +# callback used by PublicInbox::DS for any (e)poll (in/out/hup/err) sub event_step { my ($self) = @_; @@ -965,24 +969,7 @@ sub event_step { # maybe there's more pipelined data, or we'll have # to register it for socket-readiness notifications - check_read($self) unless ($self->{long_res} || $self->{wbuf}); -} - -sub check_read { - my ($self) = @_; - if (index($self->{rbuf}, "\n") >= 0) { - # Force another read if there is a pipelined request. - # We don't know if the socket has anything for us to read, - # and we must double-check again by the time the timer fires - # in case we really did dispatch a read event and started - # another long response. - push @$nextq, $self; - $nextt ||= PublicInbox::EvCleanup::asap(*next_tick); - } else { - # no pipelined requests available, let the kernel know - # to wake us up if there's more - $self->watch_in1; # PublicInbox::DS::watch_in1 - } + requeue($self) unless ($self->{long_res} || $self->{wbuf}); } sub not_idle_long ($$) { -- cgit v1.2.3-24-ge0c7 From a3c27256d273492e1c9ee464dabda2c7ed4019c2 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:35 +0000 Subject: allow use of PerlIO layers for filesystem writes It may make sense to use PerlIO::mmap or PerlIO::scalar for DS write buffering with IO::Socket::SSL or similar (since we can't use MSG_MORE), so that means we need to go through buffering in userspace for the common case; while still being easily compatible with slow clients. And it also simplifies GitHTTPBackend slightly. Maybe it can make sense for HTTP input buffering, too... --- lib/PublicInbox/DS.pm | 32 ++++++++++++-------------------- lib/PublicInbox/GitHTTPBackend.pm | 18 ++++++++---------- lib/PublicInbox/HTTP.pm | 24 +++++++++++++++++++----- 3 files changed, 39 insertions(+), 35 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 8735e888..486af40e 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -21,7 +21,7 @@ use IO::Handle qw(); use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD SEEK_SET); use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); use parent qw(Exporter); -our @EXPORT_OK = qw(now msg_more write_in_full); +our @EXPORT_OK = qw(now msg_more); use warnings; use 5.010_001; @@ -422,8 +422,8 @@ sub close { sub psendfile ($$$) { my ($sock, $fh, $off) = @_; - sysseek($fh, $$off, SEEK_SET) or return; - defined(my $to_write = sysread($fh, my $buf, 16384)) or return; + seek($fh, $$off, SEEK_SET) or return; + defined(my $to_write = read($fh, my $buf, 16384)) or return; my $written = 0; while ($to_write > 0) { if (defined(my $w = syswrite($sock, $buf, $to_write, $written))) { @@ -482,29 +482,18 @@ sub do_read ($$$$) { $! == EAGAIN ? $self->watch_in1 : $self->close; } -sub write_in_full ($$$$) { - my ($fh, $bref, $len, $off) = @_; - my $rv = 0; - while ($len > 0) { - my $w = syswrite($fh, $$bref, $len, $off); - return ($rv ? $rv : $w) unless $w; # undef or 0 - $rv += $w; - $len -= $w; - $off += $w; - } - $rv -} - +# n.b.: use ->write/->read for this buffer to allow compatibility with +# PerlIO::mmap or PerlIO::scalar if needed sub tmpbuf ($$) { my ($bref, $off) = @_; # open(my $fh, '+>>', undef) doesn't set O_APPEND my ($fh, $path) = tempfile('wbuf-XXXXXXX', TMPDIR => 1); open $fh, '+>>', $path or die "open: $!"; + $fh->autoflush(1); unlink $path; my $to_write = bytes::length($$bref) - $off; - my $w = write_in_full($fh, $bref, $to_write, $off); - die "write_in_full ($to_write): $!" unless defined $w; - $w == $to_write ? $fh : die("short write $w < $to_write"); + $fh->write($$bref, $to_write, $off) or die "write ($to_write): $!"; + $fh; } =head2 C<< $obj->write( $data ) >> @@ -534,7 +523,10 @@ sub write { } else { my $last = $wbuf->[-1]; if (ref($last) eq 'GLOB') { # append to tmp file buffer - write_in_full($last, $bref, bytes::length($$bref), 0); + unless ($last->print($$bref)) { + warn "error buffering: $!"; + return $self->close; + } } else { push @$wbuf, tmpbuf($bref, 0); } diff --git a/lib/PublicInbox/GitHTTPBackend.pm b/lib/PublicInbox/GitHTTPBackend.pm index a2a81f8e..303d5073 100644 --- a/lib/PublicInbox/GitHTTPBackend.pm +++ b/lib/PublicInbox/GitHTTPBackend.pm @@ -231,18 +231,16 @@ sub input_prepare { return; } last if $r == 0; - my $off = 0; - while ($r > 0) { - my $w = syswrite($in, $buf, $r, $off); - if (defined $w) { - $r -= $w; - $off += $w; - } else { - err($env, "error writing temporary file: $!"); - return; - } + unless (print $in $buf) { + err($env, "error writing temporary file: $!"); + return; } } + # ensure it's visible to git-http-backend(1): + unless ($in->flush) { + err($env, "error writing temporary file: $!"); + return; + } unless (defined(sysseek($in, 0, SEEK_SET))) { err($env, "error seeking temporary file: $!"); return; diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index 7697ac5c..a1cb4aca 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -19,7 +19,7 @@ use HTTP::Status qw(status_message); use HTTP::Date qw(time2str); use IO::Handle; require PublicInbox::EvCleanup; -PublicInbox::DS->import(qw(msg_more write_in_full)); +PublicInbox::DS->import(qw(msg_more)); use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT); use constant { CHUNK_START => -1, # [a-f0-9]+\r\n @@ -102,6 +102,15 @@ sub rbuf_process { $len ? read_input($self) : app_dispatch($self); } +# IO::Handle::write returns boolean, this returns bytes written: +sub xwrite ($$$) { + my ($fh, $rbuf, $max) = @_; + my $w = bytes::length($$rbuf); + $w = $max if $w > $max; + $fh->write($$rbuf, $w) or return; + $w; +} + sub read_input ($) { my ($self) = @_; my $env = $self->{env}; @@ -116,7 +125,7 @@ sub read_input ($) { while ($len > 0) { if ($$rbuf ne '') { - my $w = write_in_full($input, $rbuf, $len, 0); + my $w = xwrite($input, $rbuf, $len); return write_err($self, $len) unless $w; $len -= $w; die "BUG: $len < 0 (w=$w)" if $len < 0; @@ -306,6 +315,11 @@ sub response_write { } } +sub input_tmpfile ($) { + open($_[0], '+>', undef); + $_[0]->autoflush(1); +} + sub input_prepare { my ($self, $env) = @_; my $input; @@ -315,10 +329,10 @@ sub input_prepare { quit($self, 413); return; } - open($input, '+>', undef); + input_tmpfile($input); } elsif (env_chunked($env)) { $len = CHUNK_START; - open($input, '+>', undef); + input_tmpfile($input); } else { $input = $null_io; } @@ -399,7 +413,7 @@ sub read_input_chunked { # unlikely... # drain the current chunk until ($len <= 0) { if ($$rbuf ne '') { - my $w = write_in_full($input, $rbuf, $len, 0); + my $w = xwrite($input, $rbuf, $len); return write_err($self, "$len chunk") if !$w; $len -= $w; if ($len == 0) { -- cgit v1.2.3-24-ge0c7 From 03b8f86532a29d0cb129ff6e8d3ddf2d51deb2f8 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:36 +0000 Subject: ds: deal better with FS-related errors IO buffers Instead of ENOMEM (or fragmentation/swap storms), using tempfile buffers opens us up to filesystem and storage-related errors (e.g. ENOSPC, EFBIG, EIO, EROFS). Log these errors, drop the particular client, and try to limp by with whateve we have left. --- lib/PublicInbox/DS.pm | 42 +++++++++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 486af40e..1a1ef7d3 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -33,7 +33,7 @@ use fields ('sock', # underlying socket ); use Errno qw(EAGAIN EINVAL); -use Carp qw(croak confess); +use Carp qw(croak confess carp); use File::Temp qw(tempfile); our $HAVE_KQUEUE = eval { require IO::KQueue; IO::KQueue->import; 1 }; @@ -482,18 +482,27 @@ sub do_read ($$$$) { $! == EAGAIN ? $self->watch_in1 : $self->close; } +# drop the socket if we hit unrecoverable errors on our system which +# require BOFH attention: ENOSPC, EFBIG, EIO, EMFILE, ENFILE... +sub drop { + my $self = shift; + carp(@_); + $self->close; +} + # n.b.: use ->write/->read for this buffer to allow compatibility with # PerlIO::mmap or PerlIO::scalar if needed -sub tmpbuf ($$) { - my ($bref, $off) = @_; +sub tmpio ($$$) { + my ($self, $bref, $off) = @_; # open(my $fh, '+>>', undef) doesn't set O_APPEND - my ($fh, $path) = tempfile('wbuf-XXXXXXX', TMPDIR => 1); - open $fh, '+>>', $path or die "open: $!"; + my ($fh, $path) = eval { tempfile('wbuf-XXXXXXX', TMPDIR => 1) }; + $fh or return drop($self, "tempfile: $@"); + open($fh, '+>>', $path) or return drop($self, "open: $!"); $fh->autoflush(1); - unlink $path; - my $to_write = bytes::length($$bref) - $off; - $fh->write($$bref, $to_write, $off) or die "write ($to_write): $!"; - $fh; + unlink($path) or return drop($self, "unlink: $!"); + my $len = bytes::length($$bref) - $off; + $fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!"); + $fh } =head2 C<< $obj->write( $data ) >> @@ -523,12 +532,10 @@ sub write { } else { my $last = $wbuf->[-1]; if (ref($last) eq 'GLOB') { # append to tmp file buffer - unless ($last->print($$bref)) { - warn "error buffering: $!"; - return $self->close; - } + $last->print($$bref) or return drop($self, "print: $!"); } else { - push @$wbuf, tmpbuf($bref, 0); + my $tmpio = tmpio($self, $bref, 0) or return 0; + push @$wbuf, $tmpio; } } return 0; @@ -546,7 +553,8 @@ sub write { } else { return $self->close; } - $self->{wbuf} = [ tmpbuf($bref, $written) ]; + my $tmpio = tmpio($self, $bref, $written) or return 0; + $self->{wbuf} = [ $tmpio ]; watch($self, EPOLLOUT|EPOLLONESHOT); return 0; } @@ -563,9 +571,9 @@ sub msg_more ($$) { if (defined $n) { my $nlen = bytes::length($_[1]) - $n; return 1 if $nlen == 0; # all done! - # queue up the unwritten substring: - $self->{wbuf} = [ tmpbuf(\($_[1]), $n) ]; + my $tmpio = tmpio($self, \($_[1]), $n) or return 0; + $self->{wbuf} = [ $tmpio ]; watch($self, EPOLLOUT|EPOLLONESHOT); return 0; } -- cgit v1.2.3-24-ge0c7 From 93fc0336d39ba3ef07b479877e64371f07c86eab Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:37 +0000 Subject: nntp: wait for writability before sending greeting This will be needed for NNTPS support, since we need to negotiate the TLS connection before writing the greeting and we can reuse the existing buffer layer to enqueue writes. --- lib/PublicInbox/NNTP.pm | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 468a22f5..a18641d3 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -24,7 +24,7 @@ use constant { r225 => '225 Headers follow (multi-line)', r430 => '430 No article with that message-id', }; -use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT); +use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT); use Errno qw(EAGAIN); my @OVERVIEW = qw(Subject From Date Message-ID References Xref); @@ -98,9 +98,11 @@ sub expire_old () { sub new ($$$) { my ($class, $sock, $nntpd) = @_; my $self = fields::new($class); - $self->SUPER::new($sock, EPOLLIN | EPOLLONESHOT); + $self->SUPER::new($sock, EPOLLOUT | EPOLLONESHOT); $self->{nntpd} = $nntpd; - res($self, '201 ' . $nntpd->{servername} . ' ready - post via email'); + my $greet = "201 $nntpd->{servername} ready - post via email\r\n"; + open my $fh, '<:scalar', \$greet or die "open :scalar: $!"; + $self->{wbuf} = [ $fh ]; $self->{rbuf} = ''; update_idle_time($self); $expt ||= PublicInbox::EvCleanup::later(*expire_old); -- cgit v1.2.3-24-ge0c7 From b70cf61f0c1f70621b88fe6420083a576d47f19f Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:38 +0000 Subject: nntp: NNTPS and NNTP+STARTTLS working It kinda, barely works, and I'm most happy I got it working without any modifications to the main NNTP::event_step callback thanks to the DS->write(CODE) support we inherited from Danga::Socket. --- MANIFEST | 4 ++ certs/.gitignore | 4 ++ certs/create-certs.perl | 132 +++++++++++++++++++++++++++++++++++++++ lib/PublicInbox/DS.pm | 28 ++++++++- lib/PublicInbox/Daemon.pm | 82 +++++++++++++++++++++--- lib/PublicInbox/NNTP.pm | 27 +++++++- lib/PublicInbox/NNTPD.pm | 1 + lib/PublicInbox/TLS.pm | 24 +++++++ script/public-inbox-nntpd | 3 +- t/nntpd-tls.t | 156 ++++++++++++++++++++++++++++++++++++++++++++++ t/nntpd.t | 2 + 11 files changed, 450 insertions(+), 13 deletions(-) create mode 100644 certs/.gitignore create mode 100755 certs/create-certs.perl create mode 100644 lib/PublicInbox/TLS.pm create mode 100644 t/nntpd-tls.t diff --git a/MANIFEST b/MANIFEST index c7693976..26ff0d0d 100644 --- a/MANIFEST +++ b/MANIFEST @@ -31,6 +31,8 @@ MANIFEST Makefile.PL README TODO +certs/.gitignore +certs/create-certs.perl ci/README ci/deps.perl ci/profiles.sh @@ -129,6 +131,7 @@ lib/PublicInbox/Spamcheck/Spamc.pm lib/PublicInbox/Spawn.pm lib/PublicInbox/SpawnPP.pm lib/PublicInbox/Syscall.pm +lib/PublicInbox/TLS.pm lib/PublicInbox/Unsubscribe.pm lib/PublicInbox/UserContent.pm lib/PublicInbox/V2Writable.pm @@ -222,6 +225,7 @@ t/msg_iter.t t/msgmap.t t/msgtime.t t/nntp.t +t/nntpd-tls.t t/nntpd.t t/nulsubject.t t/over.t diff --git a/certs/.gitignore b/certs/.gitignore new file mode 100644 index 00000000..0b3a547b --- /dev/null +++ b/certs/.gitignore @@ -0,0 +1,4 @@ +*.pem +*.der +*.enc +*.p12 diff --git a/certs/create-certs.perl b/certs/create-certs.perl new file mode 100755 index 00000000..bfd8e5f1 --- /dev/null +++ b/certs/create-certs.perl @@ -0,0 +1,132 @@ +#!/usr/bin/perl -w +# License: GPL-1.0+ or Artistic-1.0-Perl +# from IO::Socket::SSL 2.063 / https://github.com/noxxi/p5-io-socket-ssl +use strict; +use warnings; +use IO::Socket::SSL::Utils; +use Net::SSLeay; + +my $dir = "./"; +my $now = time(); +my $later = $now + 100*365*86400; + +Net::SSLeay::SSLeay_add_ssl_algorithms(); +my $sha256 = Net::SSLeay::EVP_get_digestbyname('sha256') or die; +my $printfp = sub { + my ($w,$cert) = @_; + print $w.' sha256$'.unpack('H*',Net::SSLeay::X509_digest($cert, $sha256))."\n" +}; + +my %time_valid = (not_before => $now, not_after => $later); + +my @ca = CERT_create( + CA => 1, + subject => { CN => 'IO::Socket::SSL Demo CA' }, + %time_valid, +); +save('test-ca.pem',PEM_cert2string($ca[0])); + +my @server = CERT_create( + CA => 0, + subject => { CN => 'server.local' }, + purpose => 'server', + issuer => \@ca, + %time_valid, +); +save('server-cert.pem',PEM_cert2string($server[0])); +save('server-key.pem',PEM_key2string($server[1])); +$printfp->(server => $server[0]); + +@server = CERT_create( + CA => 0, + subject => { CN => 'server2.local' }, + purpose => 'server', + issuer => \@ca, + %time_valid, +); +save('server2-cert.pem',PEM_cert2string($server[0])); +save('server2-key.pem',PEM_key2string($server[1])); +$printfp->(server2 => $server[0]); + +@server = CERT_create( + CA => 0, + subject => { CN => 'server-ecc.local' }, + purpose => 'server', + issuer => \@ca, + key => KEY_create_ec(), + %time_valid, +); +save('server-ecc-cert.pem',PEM_cert2string($server[0])); +save('server-ecc-key.pem',PEM_key2string($server[1])); +$printfp->('server-ecc' => $server[0]); + + +my @client = CERT_create( + CA => 0, + subject => { CN => 'client.local' }, + purpose => 'client', + issuer => \@ca, + %time_valid, +); +save('client-cert.pem',PEM_cert2string($client[0])); +save('client-key.pem',PEM_key2string($client[1])); +$printfp->(client => $client[0]); + +my @swc = CERT_create( + CA => 0, + subject => { CN => 'server.local' }, + purpose => 'server', + issuer => \@ca, + subjectAltNames => [ + [ DNS => '*.server.local' ], + [ IP => '127.0.0.1' ], + [ DNS => 'www*.other.local' ], + [ DNS => 'smtp.mydomain.local' ], + [ DNS => 'xn--lwe-sna.idntest.local' ] + ], + %time_valid, +); +save('server-wildcard.pem',PEM_cert2string($swc[0]),PEM_key2string($swc[1])); + + +my @subca = CERT_create( + CA => 1, + issuer => \@ca, + subject => { CN => 'IO::Socket::SSL Demo Sub CA' }, + %time_valid, +); +save('test-subca.pem',PEM_cert2string($subca[0])); +@server = CERT_create( + CA => 0, + subject => { CN => 'server.local' }, + purpose => 'server', + issuer => \@subca, + %time_valid, +); +save('sub-server.pem',PEM_cert2string($server[0]).PEM_key2string($server[1])); + + + +my @cap = CERT_create( + CA => 1, + subject => { CN => 'IO::Socket::SSL::Intercept' }, + %time_valid, +); +save('proxyca.pem',PEM_cert2string($cap[0]).PEM_key2string($cap[1])); + +sub save { + my $file = shift; + open(my $fd,'>',$dir.$file) or die $!; + print $fd @_; +} + +system(<($self); + + # bref may be enqueueing more CODE to call (see accept_tls_step) + return 0 if (scalar(@$wbuf) > $before); } } # while @$wbuf @@ -479,7 +483,14 @@ sub do_read ($$$$) { return ($r == 0 ? $self->close : $r) if defined $r; # common for clients to break connections without warning, # would be too noisy to log here: - $! == EAGAIN ? $self->watch_in1 : $self->close; + if (ref($self) eq 'IO::Socket::SSL') { + my $ev = PublicInbox::TLS::epollbit() or return $self->close; + watch($self, $ev | EPOLLONESHOT); + } elsif ($! == EAGAIN) { + watch($self, EPOLLIN | EPOLLONESHOT); + } else { + $self->close; + } } # drop the socket if we hit unrecoverable errors on our system which @@ -566,7 +577,7 @@ sub msg_more ($$) { my $self = $_[0]; my $sock = $self->{sock} or return 1; - if (MSG_MORE && !$self->{wbuf}) { + if (MSG_MORE && !$self->{wbuf} && ref($sock) ne 'IO::Socket::SSL') { my $n = send($sock, $_[1], MSG_MORE); if (defined $n) { my $nlen = bytes::length($_[1]) - $n; @@ -597,6 +608,19 @@ sub watch ($$) { sub watch_in1 ($) { watch($_[0], EPOLLIN | EPOLLONESHOT) } +# return true if complete, false if incomplete (or failure) +sub accept_tls_step ($) { + my ($self) = @_; + my $sock = $self->{sock} or return; + return 1 if $sock->accept_SSL; + return $self->close if $! != EAGAIN; + if (my $ev = PublicInbox::TLS::epollbit()) { + unshift @{$self->{wbuf} ||= []}, \&accept_tls_step; + return watch($self, $ev | EPOLLONESHOT); + } + drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err()); +} + package PublicInbox::DS::Timer; # [$abs_float_firetime, $coderef]; sub cancel { diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm index b8d6b572..24c13ad2 100644 --- a/lib/PublicInbox/Daemon.pm +++ b/lib/PublicInbox/Daemon.pm @@ -22,12 +22,48 @@ my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize); my $worker_processes = 1; my @listeners; my %pids; -my %listener_names; +my %listener_names; # sockname => IO::Handle +my %tls_opt; # scheme://sockname => args for IO::Socket::SSL->start_SSL my $reexec_pid; my $cleanup; my ($uid, $gid); +my ($default_cert, $default_key); END { $cleanup->() if $cleanup }; +sub tls_listen ($$$) { + my ($scheme, $sockname, $opt_str) = @_; + # opt_str: opt1=val1,opt2=val2 (opt may repeat for multi-value) + require PublicInbox::TLS; + my $o = {}; + # allow ',' as delimiter since '&' is shell-unfriendly + foreach (split(/[,&]/, $opt_str)) { + my ($k, $v) = split(/=/, $_, 2); + push @{$o->{$k} ||= []}, $v; + } + + # key may be a part of cert. At least + # p5-io-socket-ssl/example/ssl_server.pl has this fallback: + $o->{cert} //= [ $default_cert ]; + $o->{key} //= defined($default_key) ? [ $default_key ] : $o->{cert}; + my %ctx_opt = (SSL_server => 1); + # parse out hostname:/path/to/ mappings: + foreach my $k (qw(cert key)) { + my $x = $ctx_opt{'SSL_'.$k.'_file'} = {}; + foreach my $path (@{$o->{$k}}) { + my $host = ''; + $path =~ s/\A([^:]+):// and $host = $1; + $x->{$host} = $path; + } + } + my $ctx = IO::Socket::SSL::SSL_Context->new(%ctx_opt) or + die 'SSL_Context->new: '.PublicInbox::TLS::err(); + $tls_opt{"$scheme://$sockname"} = { + SSL_server => 1, + SSL_startHandshake => 0, + SSL_reuse_ctx => $ctx + }; +} + sub daemon_prepare ($) { my ($default_listen) = @_; @CMD = ($0, @ARGV); @@ -42,6 +78,8 @@ sub daemon_prepare ($) { 'u|user=s' => \$user, 'g|group=s' => \$group, 'D|daemonize' => \$daemonize, + 'cert=s' => \$default_cert, + 'key=s' => \$default_key, ); GetOptions(%opts) or die "bad command-line args\n"; @@ -55,6 +93,18 @@ sub daemon_prepare ($) { push @cfg_listen, $default_listen unless (@listeners || @cfg_listen); foreach my $l (@cfg_listen) { + my $orig = $l; + my $scheme = ''; + $l =~ s!\A([^:]+)://!! and $scheme = $1; + if ($l =~ s!/?\?(.+)\z!!) { + tls_listen($scheme, $l, $1); + } elsif (defined($default_cert)) { + tls_listen($scheme, $l, ''); + } elsif ($scheme =~ /\A(?:nntps|https)\z/) { + die "$orig specified w/o cert=\n"; + } + # TODO: use scheme to load either NNTP.pm or HTTP.pm + next if $listener_names{$l}; # already inherited my (%o, $sock_pkg); if (index($l, '/') == 0) { @@ -461,9 +511,26 @@ sub master_loop { exit # never gets here, just for documentation } -sub daemon_loop ($$) { - my ($refresh, $post_accept) = @_; +sub tls_start_cb ($$) { + my ($opt, $orig_post_accept) = @_; + sub { + my ($io, $addr, $srv) = @_; + my $ssl = IO::Socket::SSL->start_SSL($io, %$opt); + $orig_post_accept->($ssl, $addr, $srv); + } +} + +sub daemon_loop ($$$) { + my ($refresh, $post_accept, $nntpd) = @_; PublicInbox::EvCleanup::enable(); # early for $refresh + my %post_accept; + while (my ($k, $v) = each %tls_opt) { + if ($k =~ s!\A(?:nntps|https)://!!) { + $post_accept{$k} = tls_start_cb($v, $post_accept); + } elsif ($nntpd) { # STARTTLS, $k eq '' is OK + $nntpd->{accept_tls} = $v; + } + } my $parent_pipe; if ($worker_processes > 0) { $refresh->(); # preload by default @@ -484,18 +551,19 @@ sub daemon_loop ($$) { $SIG{$_} = 'IGNORE' for qw(USR2 TTIN TTOU WINCH); # this calls epoll_create: @listeners = map { - PublicInbox::Listener->new($_, $post_accept) + PublicInbox::Listener->new($_, + $post_accept{sockname($_)} || $post_accept) } @listeners; PublicInbox::DS->EventLoop; $parent_pipe = undef; } -sub run ($$$) { - my ($default, $refresh, $post_accept) = @_; +sub run ($$$;$) { + my ($default, $refresh, $post_accept, $nntpd) = @_; daemon_prepare($default); daemonize(); - daemon_loop($refresh, $post_accept); + daemon_loop($refresh, $post_accept, $nntpd); } sub do_chown ($) { diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index a18641d3..659e44d5 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2018 all contributors +# Copyright (C) 2015-2019 all contributors # License: AGPL-3.0+ # # Each instance of this represents a NNTP client socket @@ -98,11 +98,19 @@ sub expire_old () { sub new ($$$) { my ($class, $sock, $nntpd) = @_; my $self = fields::new($class); - $self->SUPER::new($sock, EPOLLOUT | EPOLLONESHOT); + my $ev = EPOLLOUT | EPOLLONESHOT; + my $wbuf = []; + if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) { + $ev = PublicInbox::TLS::epollbit() or return $sock->close; + $ev |= EPOLLONESHOT; + $wbuf->[0] = \&PublicInbox::DS::accept_tls_step; + } + $self->SUPER::new($sock, $ev); $self->{nntpd} = $nntpd; my $greet = "201 $nntpd->{servername} ready - post via email\r\n"; open my $fh, '<:scalar', \$greet or die "open :scalar: $!"; - $self->{wbuf} = [ $fh ]; + push @$wbuf, $fh; + $self->{wbuf} = $wbuf; $self->{rbuf} = ''; update_idle_time($self); $expt ||= PublicInbox::EvCleanup::later(*expire_old); @@ -900,6 +908,19 @@ sub cmd_xover ($;$) { }); } +sub cmd_starttls ($) { + my ($self) = @_; + my $sock = $self->{sock} or return; + # RFC 4642 2.2.1 + (ref($sock) eq 'IO::Socket::SSL') and return '502 Command unavailable'; + my $opt = $self->{nntpd}->{accept_tls} or + return '580 can not initiate TLS negotiation'; + res($self, '382 Continue with TLS negotiation'); + $self->{sock} = IO::Socket::SSL->start_SSL($sock, %$opt); + requeue($self) if PublicInbox::DS::accept_tls_step($self); + undef; +} + sub cmd_xpath ($$) { my ($self, $mid) = @_; return r501 unless $mid =~ /\A<(.+)>\z/; diff --git a/lib/PublicInbox/NNTPD.pm b/lib/PublicInbox/NNTPD.pm index 32848d7c..6d9ffd5f 100644 --- a/lib/PublicInbox/NNTPD.pm +++ b/lib/PublicInbox/NNTPD.pm @@ -25,6 +25,7 @@ sub new { out => \*STDOUT, grouplist => [], servername => $name, + # accept_tls => { SSL_server => 1, ..., SSL_reuse_ctx => ... } }, $class; } diff --git a/lib/PublicInbox/TLS.pm b/lib/PublicInbox/TLS.pm new file mode 100644 index 00000000..576c11d7 --- /dev/null +++ b/lib/PublicInbox/TLS.pm @@ -0,0 +1,24 @@ +# Copyright (C) 2019 all contributors +# License: AGPL-3.0+ + +# IO::Socket::SSL support code +package PublicInbox::TLS; +use strict; +use IO::Socket::SSL; +require Carp; +use Errno qw(EAGAIN); +use PublicInbox::Syscall qw(EPOLLIN EPOLLOUT); + +sub err () { $SSL_ERROR } + +# returns the EPOLL event bit which matches the existing SSL error +sub epollbit () { + if ($! == EAGAIN) { + return EPOLLIN if $SSL_ERROR == SSL_WANT_READ; + return EPOLLOUT if $SSL_ERROR == SSL_WANT_WRITE; + die "unexpected SSL error: $SSL_ERROR"; + } + 0; +} + +1; diff --git a/script/public-inbox-nntpd b/script/public-inbox-nntpd index 484ce8d6..55bf330e 100755 --- a/script/public-inbox-nntpd +++ b/script/public-inbox-nntpd @@ -11,4 +11,5 @@ require PublicInbox::NNTPD; my $nntpd = PublicInbox::NNTPD->new; PublicInbox::Daemon::run('0.0.0.0:119', sub { $nntpd->refresh_groups }, # refresh - sub ($$$) { PublicInbox::NNTP->new($_[0], $nntpd) }); # post_accept + sub ($$$) { PublicInbox::NNTP->new($_[0], $nntpd) }, # post_accept + $nntpd); diff --git a/t/nntpd-tls.t b/t/nntpd-tls.t new file mode 100644 index 00000000..53890ff2 --- /dev/null +++ b/t/nntpd-tls.t @@ -0,0 +1,156 @@ +# Copyright (C) 2019 all contributors +# License: AGPL-3.0+ +use strict; +use warnings; +use Test::More; +use File::Temp qw(tempdir); +use Socket qw(SOCK_STREAM); +foreach my $mod (qw(DBD::SQLite IO::Socket::SSL Net::NNTP)) { + eval "require $mod"; + plan skip_all => "$mod missing for $0" if $@; +} +my $cert = 'certs/server-cert.pem'; +my $key = 'certs/server-key.pem'; +unless (-r $key && -r $cert) { + plan skip_all => + "certs/ missing for $0, run ./create-certs.perl in certs/"; +} + +use_ok 'PublicInbox::TLS'; +use_ok 'IO::Socket::SSL'; +require './t/common.perl'; +require PublicInbox::InboxWritable; +require PublicInbox::MIME; +require PublicInbox::SearchIdx; +my $version = 2; # v2 needs newer git +require_git('2.6') if $version >= 2; +my $tmpdir = tempdir('pi-nntpd-tls-XXXXXX', TMPDIR => 1, CLEANUP => 1); +my $err = "$tmpdir/stderr.log"; +my $out = "$tmpdir/stdout.log"; +my $mainrepo = "$tmpdir"; +my $pi_config = "$tmpdir/pi_config"; +my $group = 'test-nntpd-tls'; +my $addr = $group . '@example.com'; +my $nntpd = 'blib/script/public-inbox-nntpd'; +my %opts = ( + LocalAddr => '127.0.0.1', + ReuseAddr => 1, + Proto => 'tcp', + Type => SOCK_STREAM, + Listen => 1024, +); +my $starttls = IO::Socket::INET->new(%opts); +my $nntps = IO::Socket::INET->new(%opts); +my ($pid, $tail_pid); +END { + foreach ($pid, $tail_pid) { + kill 'TERM', $_ if defined $_; + } +}; + +my $ibx = PublicInbox::Inbox->new({ + mainrepo => $mainrepo, + name => 'nntpd-tls', + version => $version, + -primary_address => $addr, + indexlevel => 'basic', +}); +$ibx = PublicInbox::InboxWritable->new($ibx, {nproc=>1}); +$ibx->init_inbox(0); +{ + open my $fh, '>', $pi_config or die "open: $!\n"; + print $fh <importer(0); + my $mime = PublicInbox::MIME->new(do { + open my $fh, '<', 't/data/0001.patch' or die; + local $/; + <$fh> + }); + ok($im->add($mime), 'message added'); + $im->done; + if ($version == 1) { + my $s = PublicInbox::SearchIdx->new($ibx, 1); + $s->index_sync; + } +} + +my $nntps_addr = $nntps->sockhost . ':' . $nntps->sockport; +my $starttls_addr = $starttls->sockhost . ':' . $starttls->sockport; +my $env = { PI_CONFIG => $pi_config }; + +for my $args ( + [ "--cert=$cert", "--key=$key", + "-lnntps://$nntps_addr", + "-lnntp://$starttls_addr" ], +) { + for ($out, $err) { + open my $fh, '>', $_ or die "truncate: $!"; + } + if (my $tail_cmd = $ENV{TAIL}) { # don't assume GNU tail + $tail_pid = fork; + if (defined $tail_pid && $tail_pid == 0) { + exec(split(' ', $tail_cmd), $out, $err); + } + } + my $cmd = [ $nntpd, '-W0', @$args, "--stdout=$out", "--stderr=$err" ]; + $pid = spawn_listener($env, $cmd, [ $starttls, $nntps ]); + my %o = ( + SSL_hostname => 'server.local', + SSL_verifycn_name => 'server.local', + SSL => 1, + SSL_verify_mode => SSL_VERIFY_PEER(), + SSL_ca_file => 'certs/test-ca.pem', + ); + my $expect = { $group => [qw(1 1 n)] }; + + # NNTPS + my $c = Net::NNTP->new($nntps_addr, %o); + my $list = $c->list; + is_deeply($list, $expect, 'NNTPS LIST works'); + + # STARTTLS + delete $o{SSL}; + $c = Net::NNTP->new($starttls_addr, %o); + $list = $c->list; + is_deeply($list, $expect, 'plain LIST works'); + ok($c->starttls, 'STARTTLS succeeds'); + is($c->code, 382, 'got 382 for STARTTLS'); + $list = $c->list; + is_deeply($list, $expect, 'LIST works after STARTTLS'); + + # Net::NNTP won't let us do dumb things, but we need to test + # dumb things, so use Net::Cmd directly: + my $n = $c->command('STARTTLS')->response(); + is($n, Net::Cmd::CMD_ERROR(), 'error attempting STARTTLS again'); + is($c->code, 502, '502 according to RFC 4642 sec#2.2.1'); + + $c = undef; + kill('TERM', $pid); + is($pid, waitpid($pid, 0), 'nntpd exited successfully'); + is($?, 0, 'no error in exited process'); + $pid = undef; + my $eout = eval { + open my $fh, '<', $err or die "open $err failed: $!"; + local $/; + <$fh>; + }; + unlike($eout, qr/wide/i, 'no Wide character warnings'); + if (defined $tail_pid) { + kill 'TERM', $tail_pid; + waitpid($tail_pid, 0); + $tail_pid = undef; + } +} +done_testing(); +1; diff --git a/t/nntpd.t b/t/nntpd.t index c37880bf..6cba2be4 100644 --- a/t/nntpd.t +++ b/t/nntpd.t @@ -106,6 +106,8 @@ EOF is_deeply($list, { $group => [ qw(1 1 n) ] }, 'LIST works'); is_deeply([$n->group($group)], [ qw(0 1 1), $group ], 'GROUP works'); is_deeply($n->listgroup($group), [1], 'listgroup OK'); + ok(!$n->starttls, 'STARTTLS fails when unconfigured'); + is($n->code, 580, 'got 580 code on server w/o TLS'); %opts = ( PeerAddr => $host_port, -- cgit v1.2.3-24-ge0c7 From 3c7949c4ecd5b772020c8db3878f7b064ebfb9b7 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:39 +0000 Subject: certs/create-certs.perl: fix cert validity on 32-bit If I'm still alive, I won't be coding after 2038 :< --- certs/create-certs.perl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/certs/create-certs.perl b/certs/create-certs.perl index bfd8e5f1..476be4d7 100755 --- a/certs/create-certs.perl +++ b/certs/create-certs.perl @@ -8,7 +8,7 @@ use Net::SSLeay; my $dir = "./"; my $now = time(); -my $later = $now + 100*365*86400; +my $later = 0x7fffffff; # 2038 problems on 32-bit :< Net::SSLeay::SSLeay_add_ssl_algorithms(); my $sha256 = Net::SSLeay::EVP_get_digestbyname('sha256') or die; -- cgit v1.2.3-24-ge0c7 From db4169098e955380c47689d26deeb75e6952eff3 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:40 +0000 Subject: daemon: map inherited sockets to well-known schemes I don't want to specify "--listen" in my systemd .service files, so map 563 to NNTPS automatically (and 443 to HTTPS, but HTTPS support doesn't work, yet). --- lib/PublicInbox/Daemon.pm | 42 ++++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm index 24c13ad2..55103f40 100644 --- a/lib/PublicInbox/Daemon.pm +++ b/lib/PublicInbox/Daemon.pm @@ -29,9 +29,11 @@ my $cleanup; my ($uid, $gid); my ($default_cert, $default_key); END { $cleanup->() if $cleanup }; +my %KNOWN_TLS = ( 443 => 'https', 563 => 'nntps' ); +my %KNOWN_STARTTLS = ( 119 => 'nntp' ); -sub tls_listen ($$$) { - my ($scheme, $sockname, $opt_str) = @_; +sub accept_tls_opt ($) { + my ($opt_str) = @_; # opt_str: opt1=val1,opt2=val2 (opt may repeat for multi-value) require PublicInbox::TLS; my $o = {}; @@ -57,11 +59,7 @@ sub tls_listen ($$$) { } my $ctx = IO::Socket::SSL::SSL_Context->new(%ctx_opt) or die 'SSL_Context->new: '.PublicInbox::TLS::err(); - $tls_opt{"$scheme://$sockname"} = { - SSL_server => 1, - SSL_startHandshake => 0, - SSL_reuse_ctx => $ctx - }; + { SSL_server => 1, SSL_startHandshake => 0, SSL_reuse_ctx => $ctx }; } sub daemon_prepare ($) { @@ -87,6 +85,11 @@ sub daemon_prepare ($) { die "--pid-file cannot end with '.oldbin'\n"; } @listeners = inherit(); + + # allow socket-activation users to set certs once and not + # have to configure each socket: + my @inherited_names = keys(%listener_names) if defined($default_cert); + # ignore daemonize when inheriting $daemonize = undef if scalar @listeners; @@ -95,11 +98,16 @@ sub daemon_prepare ($) { foreach my $l (@cfg_listen) { my $orig = $l; my $scheme = ''; - $l =~ s!\A([^:]+)://!! and $scheme = $1; + if ($l =~ s!\A([^:]+)://!!) { + $scheme = $1; + } elsif ($l =~ /\A(?:\[[^\]]+\]|[^:]+):([0-9])+/) { + my $s = $KNOWN_TLS{$1} // $KNOWN_STARTTLS{$1}; + $scheme = $s if defined $s; + } if ($l =~ s!/?\?(.+)\z!!) { - tls_listen($scheme, $l, $1); + $tls_opt{"$scheme://$l"} = accept_tls_opt($1); } elsif (defined($default_cert)) { - tls_listen($scheme, $l, ''); + $tls_opt{"$scheme://$l"} = accept_tls_opt(''); } elsif ($scheme =~ /\A(?:nntps|https)\z/) { die "$orig specified w/o cert=\n"; } @@ -141,6 +149,20 @@ sub daemon_prepare ($) { push @listeners, $s; } } + + # cert/key options in @cfg_listen takes precedence when inheriting, + # but map well-known inherited ports if --listen isn't specified + # at all + for my $sockname (@inherited_names) { + $sockname =~ /:([0-9]+)\z/ or next; + if (my $scheme = $KNOWN_TLS{$1}) { + $tls_opt{"$scheme://$sockname"} ||= accept_tls_opt(''); + } elsif (($scheme = $KNOWN_STARTTLS{$1})) { + next if $tls_opt{"$scheme://$sockname"}; + $tls_opt{''} ||= accept_tls_opt(''); + } + } + die "No listeners bound\n" unless @listeners; } -- cgit v1.2.3-24-ge0c7 From b3e4b3b3c67b9df7868518978e721417b0aa7c9c Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:41 +0000 Subject: ds|nntp: use CORE::close on socket IO::Socket::SSL will try to re-bless back to the original class on TLS negotiation failure. Unfortunately, the original class is 'GLOB', and re-blessing to 'GLOB' takes away all the IO::Handle methods, because Filehandle/IO are a special case in Perl5. Anyways, since we already use syswrite() and sysread() as functions on our socket, we might as well use CORE::close(), as well (and it plays nicely with tied classes). --- lib/PublicInbox/DS.pm | 4 ++-- lib/PublicInbox/NNTP.pm | 2 +- t/nntpd-tls.t | 17 +++++++++++++++++ 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 044b991c..2c886b4e 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -293,8 +293,8 @@ sub PostEventLoop { while (my $sock = shift @ToClose) { my $fd = fileno($sock); - # close the socket. (not a PublicInbox::DS close) - $sock->close; + # close the socket. (not a PublicInbox::DS close) + CORE::close($sock); # and now we can finally remove the fd from the map. see # comment above in ->close. diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 659e44d5..8840adbb 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -101,7 +101,7 @@ sub new ($$$) { my $ev = EPOLLOUT | EPOLLONESHOT; my $wbuf = []; if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) { - $ev = PublicInbox::TLS::epollbit() or return $sock->close; + $ev = PublicInbox::TLS::epollbit() or return CORE::close($sock); $ev |= EPOLLONESHOT; $wbuf->[0] = \&PublicInbox::DS::accept_tls_step; } diff --git a/t/nntpd-tls.t b/t/nntpd-tls.t index 53890ff2..4727ee5b 100644 --- a/t/nntpd-tls.t +++ b/t/nntpd-tls.t @@ -135,6 +135,23 @@ for my $args ( is($n, Net::Cmd::CMD_ERROR(), 'error attempting STARTTLS again'); is($c->code, 502, '502 according to RFC 4642 sec#2.2.1'); + # STARTTLS with bad hostname + $o{SSL_hostname} = $o{SSL_verifycn_name} = 'server.invalid'; + $c = Net::NNTP->new($starttls_addr, %o); + $list = $c->list; + is_deeply($list, $expect, 'plain LIST works again'); + ok(!$c->starttls, 'STARTTLS fails with bad hostname'); + $c = Net::NNTP->new($starttls_addr, %o); + $list = $c->list; + is_deeply($list, $expect, 'not broken after bad negotiation'); + + # NNTPS with bad hostname + $c = Net::NNTP->new($nntps_addr, %o, SSL => 1); + is($c, undef, 'NNTPS fails with bad hostname'); + $o{SSL_hostname} = $o{SSL_verifycn_name} = 'server.local'; + $c = Net::NNTP->new($nntps_addr, %o, SSL => 1); + ok($c, 'NNTPS succeeds again with valid hostname'); + $c = undef; kill('TERM', $pid); is($pid, waitpid($pid, 0), 'nntpd exited successfully'); -- cgit v1.2.3-24-ge0c7 From 595854982a59f369ab605794f05c046c86253468 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:42 +0000 Subject: nntp: call SSL_shutdown in normal cases This is in accordance with TLS standards and will be needed to support session caching/reuse in the future. However, we don't issue shutdown(2) since we know not to inadvertantly share our sockets with other processes. --- lib/PublicInbox/DS.pm | 24 ++++++++++++++++++++++++ lib/PublicInbox/NNTP.pm | 12 +++++++++--- t/nntpd-tls.t | 2 ++ 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 2c886b4e..2aa9e3d2 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -621,6 +621,30 @@ sub accept_tls_step ($) { drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err()); } +sub shutdn_tls_step ($) { + my ($self) = @_; + my $sock = $self->{sock} or return; + return $self->close if $sock->stop_SSL(SSL_fast_shutdown => 1); + return $self->close if $! != EAGAIN; + if (my $ev = PublicInbox::TLS::epollbit()) { + unshift @{$self->{wbuf} ||= []}, \&shutdn_tls_step; + return watch($self, $ev | EPOLLONESHOT); + } + drop($self, 'BUG? EAGAIN but '.PublicInbox::TLS::err()); +} + +# don't bother with shutdown($sock, 2), we don't fork+exec w/o CLOEXEC +# or fork w/o exec, so no inadvertant socket sharing +sub shutdn ($) { + my ($self) = @_; + my $sock = $self->{sock} or return; + if (ref($sock) eq 'IO::Socket::SSL') { + shutdn_tls_step($self); + } else { + $self->close; + } +} + package PublicInbox::DS::Timer; # [$abs_float_firetime, $coderef]; sub cancel { diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 8840adbb..53de2bca 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -74,11 +74,17 @@ sub expire_old () { my $exp = $EXPTIME; my $old = $now - $exp; my $nr = 0; + my $closed = 0; my %new; while (my ($fd, $v) = each %$EXPMAP) { my ($idle_time, $nntp) = @$v; if ($idle_time < $old) { - $nntp->close; # idempotent + if ($nntp->shutdn) { + $closed++; + } else { + ++$nr; + $new{$fd} = $v; + } } else { ++$nr; $new{$fd} = $v; @@ -91,7 +97,7 @@ sub expire_old () { $expt = undef; # noop to kick outselves out of the loop ASAP so descriptors # really get closed - PublicInbox::EvCleanup::asap(sub {}); + PublicInbox::EvCleanup::asap(sub {}) if $closed; } } @@ -410,7 +416,7 @@ sub cmd_post ($) { sub cmd_quit ($) { my ($self) = @_; res($self, '205 closing connection - goodbye!'); - $self->close; + $self->shutdn; undef; } diff --git a/t/nntpd-tls.t b/t/nntpd-tls.t index 4727ee5b..00b03b66 100644 --- a/t/nntpd-tls.t +++ b/t/nntpd-tls.t @@ -118,6 +118,8 @@ for my $args ( my $c = Net::NNTP->new($nntps_addr, %o); my $list = $c->list; is_deeply($list, $expect, 'NNTPS LIST works'); + is($c->command('QUIT')->response(), Net::Cmd::CMD_OK(), 'QUIT works'); + is(0, sysread($c, my $buf, 1), 'got EOF after QUIT'); # STARTTLS delete $o{SSL}; -- cgit v1.2.3-24-ge0c7 From 1dc4d2f75a387c9113fc7646c463e3aac2d3de1f Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:43 +0000 Subject: t/nntpd-tls: slow client connection test We need to ensure slowly negotiating TLS clients don't block the event loop. This is why I added the size check of {wbuf} before and after calling the CODE ref in DS::flush_write. --- t/nntpd-tls.t | 36 ++++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/t/nntpd-tls.t b/t/nntpd-tls.t index 00b03b66..e8fb63b4 100644 --- a/t/nntpd-tls.t +++ b/t/nntpd-tls.t @@ -5,7 +5,9 @@ use warnings; use Test::More; use File::Temp qw(tempdir); use Socket qw(SOCK_STREAM); -foreach my $mod (qw(DBD::SQLite IO::Socket::SSL Net::NNTP)) { +# IO::Poll and Net::NNTP are part of the standard library, but +# distros may split them off... +foreach my $mod (qw(DBD::SQLite IO::Socket::SSL Net::NNTP IO::Poll)) { eval "require $mod"; plan skip_all => "$mod missing for $0" if $@; } @@ -108,21 +110,32 @@ for my $args ( my %o = ( SSL_hostname => 'server.local', SSL_verifycn_name => 'server.local', - SSL => 1, SSL_verify_mode => SSL_VERIFY_PEER(), SSL_ca_file => 'certs/test-ca.pem', ); my $expect = { $group => [qw(1 1 n)] }; + # start negotiating a slow TLS connection + my $slow = IO::Socket::INET->new( + Proto => 'tcp', + PeerAddr => $nntps_addr, + Type => SOCK_STREAM, + Blocking => 0, + ); + $slow = IO::Socket::SSL->start_SSL($slow, SSL_startHandshake => 0, %o); + my $slow_done = $slow->connect_SSL; + diag('W: connect_SSL early OK, slow client test invalid') if $slow_done; + my @poll = (fileno($slow), PublicInbox::TLS::epollbit()); + # we should call connect_SSL much later... + # NNTPS - my $c = Net::NNTP->new($nntps_addr, %o); + my $c = Net::NNTP->new($nntps_addr, %o, SSL => 1); my $list = $c->list; is_deeply($list, $expect, 'NNTPS LIST works'); is($c->command('QUIT')->response(), Net::Cmd::CMD_OK(), 'QUIT works'); is(0, sysread($c, my $buf, 1), 'got EOF after QUIT'); # STARTTLS - delete $o{SSL}; $c = Net::NNTP->new($starttls_addr, %o); $list = $c->list; is_deeply($list, $expect, 'plain LIST works'); @@ -154,6 +167,21 @@ for my $args ( $c = Net::NNTP->new($nntps_addr, %o, SSL => 1); ok($c, 'NNTPS succeeds again with valid hostname'); + # slow TLS connection did not block the other fast clients while + # connecting, finish it off: + until ($slow_done) { + IO::Poll::_poll(-1, @poll); + $slow_done = $slow->connect_SSL and last; + @poll = (fileno($slow), PublicInbox::TLS::epollbit()); + } + $slow->blocking(1); + ok(sysread($slow, my $greet, 4096) > 0, 'slow got greeting'); + like($greet, qr/\A201 /, 'got expected greeting'); + is(syswrite($slow, "QUIT\r\n"), 6, 'slow wrote QUIT'); + ok(sysread($slow, my $end, 4096) > 0, 'got EOF'); + is(sysread($slow, my $eof, 4096), 0, 'got EOF'); + $slow = undef; + $c = undef; kill('TERM', $pid); is($pid, waitpid($pid, 0), 'nntpd exited successfully'); -- cgit v1.2.3-24-ge0c7 From 87a03babc14247d4eac489beb95abba47cf4f358 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:44 +0000 Subject: daemon: use SSL_MODE_RELEASE_BUFFERS 34K per idle connection adds up to large amounts of memory; especially with the speed of malloc nowadays compared to the cost of cache misses or worse, swapping. --- lib/PublicInbox/Daemon.pm | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm index 55103f40..c4481555 100644 --- a/lib/PublicInbox/Daemon.pm +++ b/lib/PublicInbox/Daemon.pm @@ -59,6 +59,16 @@ sub accept_tls_opt ($) { } my $ctx = IO::Socket::SSL::SSL_Context->new(%ctx_opt) or die 'SSL_Context->new: '.PublicInbox::TLS::err(); + + # save ~34K per idle connection (cf. SSL_CTX_set_mode(3ssl)) + # RSS goes from 346MB to 171MB with 10K idle NNTPS clients on amd64 + # cf. https://rt.cpan.org/Ticket/Display.html?id=129463 + my $mode = eval { Net::SSLeay::MODE_RELEASE_BUFFERS() }; + if ($mode && $ctx->{context}) { + eval { Net::SSLeay::CTX_set_mode($ctx->{context}, $mode) }; + warn "W: $@ (setting SSL_MODE_RELEASE_BUFFERS)\n" if $@; + } + { SSL_server => 1, SSL_startHandshake => 0, SSL_reuse_ctx => $ctx }; } -- cgit v1.2.3-24-ge0c7 From ca6b59974a0f2e24a1d569d118b55c4fc66ff7a3 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:45 +0000 Subject: ds: allow ->write callbacks to syswrite directly We can bypass buffering when wbuf is empty when it's called from a CODE reference passed to ->write. --- lib/PublicInbox/DS.pm | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 2aa9e3d2..0e48ed07 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -537,7 +537,8 @@ sub write { my $sock = $self->{sock} or return 1; my $ref = ref $data; my $bref = $ref ? $data : \$data; - if (my $wbuf = $self->{wbuf}) { # already buffering, can't write more... + my $wbuf = $self->{wbuf}; + if ($wbuf && scalar(@$wbuf)) { # already buffering, can't write more... if ($ref eq 'CODE') { push @$wbuf, $bref; } else { -- cgit v1.2.3-24-ge0c7 From 6ea82fb5d6cd5ae6813f3700fe915ab9110086ea Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:46 +0000 Subject: nntp: reduce allocations for greeting No need to allocate a new PerlIO::scalar filehandle for every client, instead we can now pass the same CODE reference which calls DS->write on a reused string reference. --- lib/PublicInbox/NNTP.pm | 6 +++--- lib/PublicInbox/NNTPD.pm | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 53de2bca..12ce4e68 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -101,6 +101,8 @@ sub expire_old () { } } +sub greet ($) { $_[0]->write($_[0]->{nntpd}->{greet}) }; + sub new ($$$) { my ($class, $sock, $nntpd) = @_; my $self = fields::new($class); @@ -113,9 +115,7 @@ sub new ($$$) { } $self->SUPER::new($sock, $ev); $self->{nntpd} = $nntpd; - my $greet = "201 $nntpd->{servername} ready - post via email\r\n"; - open my $fh, '<:scalar', \$greet or die "open :scalar: $!"; - push @$wbuf, $fh; + push @$wbuf, \&greet; $self->{wbuf} = $wbuf; $self->{rbuf} = ''; update_idle_time($self); diff --git a/lib/PublicInbox/NNTPD.pm b/lib/PublicInbox/NNTPD.pm index 6d9ffd5f..4f30c5d9 100644 --- a/lib/PublicInbox/NNTPD.pm +++ b/lib/PublicInbox/NNTPD.pm @@ -25,6 +25,7 @@ sub new { out => \*STDOUT, grouplist => [], servername => $name, + greet => \"201 $name ready - post via email\r\n", # accept_tls => { SSL_server => 1, ..., SSL_reuse_ctx => ... } }, $class; } -- cgit v1.2.3-24-ge0c7 From b86c1790854c6d8b8299e5b71ad067b97cff548b Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:47 +0000 Subject: ds: always use EV_ADD with EV_SET kqueue EV_ONESHOT semantics are different than epoll EPOLLONESHOT. epoll only disables watches for that event while keeping the item in the rbtree for future EPOLL_CTL_MOD. kqueue removes the watch from the filter set entirely, necessitating the use of EV_ADD for future modifications. --- lib/PublicInbox/DS.pm | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 0e48ed07..8f77ce24 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -321,7 +321,7 @@ sub kq_flag ($$) { my $fl = EV_ADD() | EV_ENABLE(); ($ev & EPOLLONESHOT) ? ($fl|EV_ONESHOT()) : $fl; } else { - EV_DISABLE(); + EV_ADD() | EV_DISABLE(); } } @@ -364,8 +364,8 @@ retry: } } elsif ($HaveKQueue) { - $KQueue->EV_SET($fd, EVFILT_READ(), EV_ADD() | kq_flag(EPOLLIN, $ev)); - $KQueue->EV_SET($fd, EVFILT_WRITE(), EV_ADD() | kq_flag(EPOLLOUT, $ev)); + $KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev)); + $KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev)); } Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})") -- cgit v1.2.3-24-ge0c7 From 3ac6b68138da02cea825f22468d9850c67c15916 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:48 +0000 Subject: nntp: simplify long response logic and fix nesting We can get rid of the {long_res} field and reuse the write buffer ordering logic to prevent nesting of responses from requeue. On FreeBSD, this fixes a problem of callbacks firing twice because kqueue as event_step is now our only callback entry point. There's a slight change in the stdout "logging" format, in that we can no longer distinguish between writes blocked due to slow clients or deferred long responses. Not sure if this affects anybody parsing logs or not, but preserving the old format could prove expensive and not worth the effort. --- lib/PublicInbox/NNTP.pm | 61 ++++++++++++++++++++----------------------------- 1 file changed, 25 insertions(+), 36 deletions(-) diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 12ce4e68..6acfcc1b 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -6,7 +6,7 @@ package PublicInbox::NNTP; use strict; use warnings; use base qw(PublicInbox::DS); -use fields qw(nntpd article rbuf ng long_res); +use fields qw(nntpd article rbuf ng); use PublicInbox::Search; use PublicInbox::Msgmap; use PublicInbox::MID qw(mid_escape); @@ -45,17 +45,7 @@ sub next_tick () { $nextt = undef; my $q = $nextq; $nextq = []; - foreach my $nntp (@$q) { - # for request && response protocols, always finish writing - # before finishing reading: - if (my $long_cb = $nntp->{long_res}) { - $nntp->write($long_cb); - } else { - # pipelined request, we bypassed socket-readiness - # checks to get here: - event_step($nntp); - } - } + event_step($_) for @$q; } sub requeue ($) { @@ -633,8 +623,7 @@ sub get_range ($$) { } sub long_response ($$) { - my ($self, $cb) = @_; - die "BUG: nested long response" if $self->{long_res}; + my ($self, $cb) = @_; # cb returns true if more, false if done my $fd = fileno($self->{sock}); defined $fd or return; @@ -642,36 +631,38 @@ sub long_response ($$) { # clients should not be sending us stuff and making us do more # work while we are stream a response to them my $t0 = now(); - $self->{long_res} = sub { + my $long_cb; # DANGER: self-referential + $long_cb = sub { + # wbuf is unset or empty, here; $cb may add to it my $more = eval { $cb->() }; if ($@ || !$self->{sock}) { # something bad happened... - delete $self->{long_res}; - + $long_cb = undef; + my $diff = now() - $t0; if ($@) { err($self, "%s during long response[$fd] - %0.6f", - $@, now() - $t0); - } - if ($self->{sock}) { - update_idle_time($self); - requeue($self); - } else { - out($self, " deferred[$fd] aborted - %0.6f", - now() - $t0); + $@, $diff); } + out($self, " deferred[$fd] aborted - %0.6f", $diff); + $self->close; } elsif ($more) { # $self->{wbuf}: + update_idle_time($self); + # no recursion, schedule another call ASAP # but only after all pending writes are done - update_idle_time($self); - requeue($self); + my $wbuf = $self->{wbuf} ||= []; + push @$wbuf, $long_cb; + + # wbuf may be populated by $cb, no need to rearm if so: + requeue($self) if scalar(@$wbuf) == 1; } else { # all done! - delete $self->{long_res}; + $long_cb = undef; res($self, '.'); out($self, " deferred[$fd] done - %0.6f", now() - $t0); - requeue($self); + requeue($self) unless $self->{wbuf}; } }; - $self->{long_res}->(); # kick off! + $self->write($long_cb); # kick off! undef; } @@ -986,9 +977,8 @@ sub event_step { my $t0 = now(); my $fd = fileno($self->{sock}); $r = eval { process_line($self, $line) }; - my $d = $self->{long_res} ? - " deferred[$fd]" : ''; - out($self, "[$fd] %s - %0.6f$d", $line, now() - $t0); + my $pending = $self->{wbuf} ? ' pending' : ''; + out($self, "[$fd] %s - %0.6f$pending", $line, now() - $t0); } return $self->close if $r < 0; @@ -998,7 +988,7 @@ sub event_step { # maybe there's more pipelined data, or we'll have # to register it for socket-readiness notifications - requeue($self) unless ($self->{long_res} || $self->{wbuf}); + requeue($self) unless $self->{wbuf}; } sub not_idle_long ($$) { @@ -1012,8 +1002,7 @@ sub not_idle_long ($$) { # for graceful shutdown in PublicInbox::Daemon: sub busy { my ($self, $now) = @_; - ($self->{rbuf} ne '' || $self->{long_res} || - $self->{wbuf} || not_idle_long($self, $now)); + ($self->{rbuf} ne '' || $self->{wbuf} || not_idle_long($self, $now)); } 1; -- cgit v1.2.3-24-ge0c7 From 3667c96f84a39c8fbfaf4cef37f58a9db06bb4b3 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:49 +0000 Subject: ds: flush_write runs ->write callbacks even if closed We may need to rely on cleanup code running in enqueued callbacks, so ensure we call it when flush_write happens. --- lib/PublicInbox/DS.pm | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 8f77ce24..d38e2d20 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -442,13 +442,13 @@ sub psendfile ($$$) { sub flush_write ($) { my ($self) = @_; my $wbuf = $self->{wbuf} or return 1; - my $sock = $self->{sock} or return 1; + my $sock = $self->{sock}; next_buf: while (my $bref = $wbuf->[0]) { if (ref($bref) ne 'CODE') { my $off = delete($self->{wbuf_off}) // 0; - while (1) { + while ($sock) { my $w = psendfile($sock, $bref, \$off); if (defined $w) { if ($w == 0) { -- cgit v1.2.3-24-ge0c7 From 6f173864f5acac89769a67739b8c377510711d49 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:50 +0000 Subject: nntp: lazily allocate and stash rbuf Allocating a per-client buffer up front is unnecessary and wastes a hash slot. For the majority of (non-malicious) clients, we won't need to store rbuf in a long-lived object associated with a client socket at all. This saves around 10M on 64-bit with 20K connected-but-idle clients. --- lib/PublicInbox/NNTP.pm | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 6acfcc1b..10a2e158 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -107,7 +107,6 @@ sub new ($$$) { $self->{nntpd} = $nntpd; push @$wbuf, \&greet; $self->{wbuf} = $wbuf; - $self->{rbuf} = ''; update_idle_time($self); $expt ||= PublicInbox::EvCleanup::later(*expire_old); $self; @@ -964,7 +963,7 @@ sub event_step { # otherwise we can be buffering infinitely w/o backpressure use constant LINE_MAX => 512; # RFC 977 section 2.3 - my $rbuf = \($self->{rbuf}); + my $rbuf = $self->{rbuf} // (\(my $x = '')); my $r = 1; if (index($$rbuf, "\n") < 0) { @@ -984,6 +983,11 @@ sub event_step { return $self->close if $r < 0; my $len = bytes::length($$rbuf); return $self->close if ($len >= LINE_MAX); + if ($len) { + $self->{rbuf} = $rbuf; + } else { + delete $self->{rbuf}; + } update_idle_time($self); # maybe there's more pipelined data, or we'll have @@ -1002,7 +1006,7 @@ sub not_idle_long ($$) { # for graceful shutdown in PublicInbox::Daemon: sub busy { my ($self, $now) = @_; - ($self->{rbuf} ne '' || $self->{wbuf} || not_idle_long($self, $now)); + ($self->{rbuf} || $self->{wbuf} || not_idle_long($self, $now)); } 1; -- cgit v1.2.3-24-ge0c7 From 2b18e497086cb92d86d41e842c75feb1a28e9053 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:51 +0000 Subject: ci: require IO::KQueue on FreeBSD, for now We'll likely replace IO::KQueue (at least on FreeBSD) using a pure-Perl syscall()-based version since syscall numbers are consistent across architectures on FreeBSD and easy to maintain. IO::KQueue->EV_SET is also shockingly inefficient in that it calls kqueue() as much as epoll_ctl. --- ci/deps.perl | 5 +---- ci/profiles.sh | 3 +-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/ci/deps.perl b/ci/deps.perl index 62870c1f..ad2c11d2 100755 --- a/ci/deps.perl +++ b/ci/deps.perl @@ -60,7 +60,7 @@ my $profiles = { # account for granularity differences between package systems and OSes my @precious; if ($^O eq 'freebsd') { - @precious = qw(perl curl Socket6 IO::Compress::Gzip); + @precious = qw(perl curl Socket6 IO::Compress::Gzip IO::KQueue); } elsif ($pkg_fmt eq 'rpm') { @precious = qw(perl curl); } @@ -149,9 +149,6 @@ my (@pkg_install, @pkg_remove, %all); for my $ary (values %$profiles) { $all{$_} = \@pkg_remove for @$ary; } -if ($^O eq 'freebsd') { - $all{'IO::KQueue'} = \@pkg_remove; -} $profiles->{all} = [ keys %all ]; # pseudo-profile for all packages # parse the profile list from the command-line diff --git a/ci/profiles.sh b/ci/profiles.sh index d559ec5f..1ddf7891 100755 --- a/ci/profiles.sh +++ b/ci/profiles.sh @@ -54,8 +54,7 @@ esac case $ID-$VERSION_ID in freebsd-11|freebsd-12) sed "s/^/$PKG_FMT /" < Date: Mon, 24 Jun 2019 02:52:52 +0000 Subject: nntp: send greeting immediately for plain sockets A tiny write() for the greeting on a just accept()-ed TCP socket won't fail with EAGAIN, so we can avoid the extra epoll syscall traffic with plain sockets. --- lib/PublicInbox/NNTP.pm | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 10a2e158..53e18281 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -24,7 +24,7 @@ use constant { r225 => '225 Headers follow (multi-line)', r430 => '430 No article with that message-id', }; -use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT); +use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT); use Errno qw(EAGAIN); my @OVERVIEW = qw(Subject From Date Message-ID References Xref); @@ -96,17 +96,19 @@ sub greet ($) { $_[0]->write($_[0]->{nntpd}->{greet}) }; sub new ($$$) { my ($class, $sock, $nntpd) = @_; my $self = fields::new($class); - my $ev = EPOLLOUT | EPOLLONESHOT; - my $wbuf = []; + my $ev = EPOLLIN; + my $wbuf; if (ref($sock) eq 'IO::Socket::SSL' && !$sock->accept_SSL) { $ev = PublicInbox::TLS::epollbit() or return CORE::close($sock); - $ev |= EPOLLONESHOT; - $wbuf->[0] = \&PublicInbox::DS::accept_tls_step; + $wbuf = [ \&PublicInbox::DS::accept_tls_step, \&greet ]; } - $self->SUPER::new($sock, $ev); + $self->SUPER::new($sock, $ev | EPOLLONESHOT); $self->{nntpd} = $nntpd; - push @$wbuf, \&greet; - $self->{wbuf} = $wbuf; + if ($wbuf) { + $self->{wbuf} = $wbuf; + } else { + greet($self); + } update_idle_time($self); $expt ||= PublicInbox::EvCleanup::later(*expire_old); $self; -- cgit v1.2.3-24-ge0c7 From 4e1a84c2a97c319862c960a34e3a7a8bf31d5274 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:53 +0000 Subject: daemon: set TCP_DEFER_ACCEPT on everything but NNTP This Linux-specific option can save us some wakeups during the TLS negotiation phase, and it can help with ordinary HTTP, too. Plain NNTP (and in the future, POP3) are the only things which require the server send messages, first. --- lib/PublicInbox/Daemon.pm | 26 ++++++++++++++++++++++---- t/httpd-corner.t | 19 +++++++++++++++++++ t/httpd.t | 8 ++++++++ t/nntpd-tls.t | 11 ++++++++++- 4 files changed, 59 insertions(+), 5 deletions(-) diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm index c4481555..8b59b65f 100644 --- a/lib/PublicInbox/Daemon.pm +++ b/lib/PublicInbox/Daemon.pm @@ -8,6 +8,7 @@ use warnings; use Getopt::Long qw/:config gnu_getopt no_ignore_case auto_abbrev/; use IO::Handle; use IO::Socket; +use Socket qw(IPPROTO_TCP); use Cwd qw/abs_path/; STDOUT->autoflush(1); STDERR->autoflush(1); @@ -552,6 +553,18 @@ sub tls_start_cb ($$) { } } +sub defer_accept ($) { + if ($^O eq 'linux') { + my ($s) = @_; + my $x = getsockopt($s, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT()); + return unless defined $x; # may be Unix socket + my $sec = unpack('i', $x); + return if $sec > 0; # systemd users may set a higher value + setsockopt($s, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT(), 1); + } + # TODO FreeBSD accf_http / accf_data +} + sub daemon_loop ($$$) { my ($refresh, $post_accept, $nntpd) = @_; PublicInbox::EvCleanup::enable(); # early for $refresh @@ -581,10 +594,15 @@ sub daemon_loop ($$$) { $SIG{HUP} = $refresh; $SIG{CHLD} = 'DEFAULT'; $SIG{$_} = 'IGNORE' for qw(USR2 TTIN TTOU WINCH); - # this calls epoll_create: - @listeners = map { - PublicInbox::Listener->new($_, - $post_accept{sockname($_)} || $post_accept) + @listeners = map {; + my $tls_cb = $post_accept{sockname($_)}; + + # NNTPS, HTTPS, HTTP, and POP3S are client-first traffic + # NNTP and POP3 are server-first + defer_accept($_) if $tls_cb || !$nntpd; + + # this calls epoll_create: + PublicInbox::Listener->new($_, $tls_cb || $post_accept) } @listeners; PublicInbox::DS->EventLoop; $parent_pipe = undef; diff --git a/t/httpd-corner.t b/t/httpd-corner.t index c1dc77db..13befcf1 100644 --- a/t/httpd-corner.t +++ b/t/httpd-corner.t @@ -36,6 +36,17 @@ my %opts = ( Listen => 1024, ); my $sock = IO::Socket::INET->new(%opts); +my $defer_accept_val; +if ($^O eq 'linux') { + setsockopt($sock, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT(), 5) or die; + my $x = getsockopt($sock, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT()); + defined $x or die "getsockopt: $!"; + $defer_accept_val = unpack('i', $x); + if ($defer_accept_val <= 0) { + die "unexpected TCP_DEFER_ACCEPT value: $defer_accept_val"; + } +} + my $upath = "$tmpdir/s"; my $unix = IO::Socket::UNIX->new( Listen => 1024, @@ -497,6 +508,14 @@ SKIP: { is($body, sha1_hex(''), 'read expected body #2'); } +SKIP: { + skip 'TCP_DEFER_ACCEPT is Linux-only', 1 if $^O ne 'linux'; + my $var = Socket::TCP_DEFER_ACCEPT(); + defined(my $x = getsockopt($sock, IPPROTO_TCP, $var)) or die; + is(unpack('i', $x), $defer_accept_val, + 'TCP_DEFER_ACCEPT unchanged if previously set'); +}; + done_testing(); sub capture { diff --git a/t/httpd.t b/t/httpd.t index c061031c..8c2a3173 100644 --- a/t/httpd.t +++ b/t/httpd.t @@ -10,6 +10,7 @@ foreach my $mod (qw(Plack::Util Plack::Builder HTTP::Date HTTP::Status)) { } use File::Temp qw/tempdir/; use IO::Socket::INET; +use Socket qw(IPPROTO_TCP); require './t/common.perl'; # FIXME: too much setup @@ -99,6 +100,13 @@ EOF 'fsck on cloned directory successful'); } +SKIP: { + skip 'TCP_DEFER_ACCEPT is Linux-only', 1 if $^O ne 'linux'; + my $var = Socket::TCP_DEFER_ACCEPT(); + defined(my $x = getsockopt($sock, IPPROTO_TCP, $var)) or die; + ok(unpack('i', $x) > 0, 'TCP_DEFER_ACCEPT set'); +}; + done_testing(); 1; diff --git a/t/nntpd-tls.t b/t/nntpd-tls.t index e8fb63b4..ef683cab 100644 --- a/t/nntpd-tls.t +++ b/t/nntpd-tls.t @@ -4,7 +4,7 @@ use strict; use warnings; use Test::More; use File::Temp qw(tempdir); -use Socket qw(SOCK_STREAM); +use Socket qw(SOCK_STREAM IPPROTO_TCP); # IO::Poll and Net::NNTP are part of the standard library, but # distros may split them off... foreach my $mod (qw(DBD::SQLite IO::Socket::SSL Net::NNTP IO::Poll)) { @@ -182,6 +182,15 @@ for my $args ( is(sysread($slow, my $eof, 4096), 0, 'got EOF'); $slow = undef; + SKIP: { + skip 'TCP_DEFER_ACCEPT is Linux-only', 2 if $^O ne 'linux'; + my $var = Socket::TCP_DEFER_ACCEPT(); + defined(my $x = getsockopt($nntps, IPPROTO_TCP, $var)) or die; + ok(unpack('i', $x) > 0, 'TCP_DEFER_ACCEPT set on NNTPS'); + defined($x = getsockopt($starttls, IPPROTO_TCP, $var)) or die; + is(unpack('i', $x), 0, 'TCP_DEFER_ACCEPT is 0 on plain NNTP'); + }; + $c = undef; kill('TERM', $pid); is($pid, waitpid($pid, 0), 'nntpd exited successfully'); -- cgit v1.2.3-24-ge0c7 From fbcd2b5eb401a8e1811d803cef9b1c156acb50f6 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:54 +0000 Subject: daemon: use FreeBSD accept filters on non-NNTP Similar to TCP_DEFER_ACCEPT on Linux, FreeBSD has a 'dataready' accept filter which we can use to reduce wakeups when doing TLS negotiation or plain HTTP. There's also a 'httpready' which we can use for plain HTTP connections. --- lib/PublicInbox/Daemon.pm | 23 +++++++++++++++-------- t/httpd-corner.t | 21 ++++++++++++++++++--- t/httpd.t | 10 ++++++++++ t/nntpd-tls.t | 14 +++++++++++++- 4 files changed, 56 insertions(+), 12 deletions(-) diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm index 8b59b65f..cf011a20 100644 --- a/lib/PublicInbox/Daemon.pm +++ b/lib/PublicInbox/Daemon.pm @@ -8,7 +8,8 @@ use warnings; use Getopt::Long qw/:config gnu_getopt no_ignore_case auto_abbrev/; use IO::Handle; use IO::Socket; -use Socket qw(IPPROTO_TCP); +use Socket qw(IPPROTO_TCP SOL_SOCKET); +sub SO_ACCEPTFILTER () { 0x1000 } use Cwd qw/abs_path/; STDOUT->autoflush(1); STDERR->autoflush(1); @@ -553,20 +554,25 @@ sub tls_start_cb ($$) { } } -sub defer_accept ($) { +sub defer_accept ($$) { + my ($s, $af_name) = @_; + return unless defined $af_name; if ($^O eq 'linux') { - my ($s) = @_; my $x = getsockopt($s, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT()); return unless defined $x; # may be Unix socket my $sec = unpack('i', $x); return if $sec > 0; # systemd users may set a higher value setsockopt($s, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT(), 1); + } elsif ($^O eq 'freebsd') { + my $x = getsockopt($s, SOL_SOCKET, SO_ACCEPTFILTER); + return if defined $x; # don't change if set + my $accf_arg = pack('a16a240', $af_name, ''); + setsockopt($s, SOL_SOCKET, SO_ACCEPTFILTER, $accf_arg); } - # TODO FreeBSD accf_http / accf_data } -sub daemon_loop ($$$) { - my ($refresh, $post_accept, $nntpd) = @_; +sub daemon_loop ($$$$) { + my ($refresh, $post_accept, $nntpd, $af_default) = @_; PublicInbox::EvCleanup::enable(); # early for $refresh my %post_accept; while (my ($k, $v) = each %tls_opt) { @@ -599,7 +605,7 @@ sub daemon_loop ($$$) { # NNTPS, HTTPS, HTTP, and POP3S are client-first traffic # NNTP and POP3 are server-first - defer_accept($_) if $tls_cb || !$nntpd; + defer_accept($_, $tls_cb ? 'dataready' : $af_default); # this calls epoll_create: PublicInbox::Listener->new($_, $tls_cb || $post_accept) @@ -612,8 +618,9 @@ sub daemon_loop ($$$) { sub run ($$$;$) { my ($default, $refresh, $post_accept, $nntpd) = @_; daemon_prepare($default); + my $af_default = $default =~ /:8080\z/ ? 'httpready' : undef; daemonize(); - daemon_loop($refresh, $post_accept, $nntpd); + daemon_loop($refresh, $post_accept, $nntpd, $af_default); } sub do_chown ($) { diff --git a/t/httpd-corner.t b/t/httpd-corner.t index 13befcf1..1cfc2565 100644 --- a/t/httpd-corner.t +++ b/t/httpd-corner.t @@ -18,7 +18,7 @@ use File::Temp qw/tempdir/; use IO::Socket; use IO::Socket::UNIX; use Fcntl qw(:seek); -use Socket qw(IPPROTO_TCP TCP_NODELAY); +use Socket qw(IPPROTO_TCP TCP_NODELAY SOL_SOCKET); use POSIX qw(mkfifo); require './t/common.perl'; my $tmpdir = tempdir('httpd-corner-XXXXXX', TMPDIR => 1, CLEANUP => 1); @@ -36,7 +36,10 @@ my %opts = ( Listen => 1024, ); my $sock = IO::Socket::INET->new(%opts); -my $defer_accept_val; + +# Make sure we don't clobber socket options set by systemd or similar +# using socket activation: +my ($defer_accept_val, $accf_arg); if ($^O eq 'linux') { setsockopt($sock, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT(), 5) or die; my $x = getsockopt($sock, IPPROTO_TCP, Socket::TCP_DEFER_ACCEPT()); @@ -45,6 +48,11 @@ if ($^O eq 'linux') { if ($defer_accept_val <= 0) { die "unexpected TCP_DEFER_ACCEPT value: $defer_accept_val"; } +} elsif ($^O eq 'freebsd' && system('kldstat -m accf_data >/dev/null') == 0) { + require PublicInbox::Daemon; + my $var = PublicInbox::Daemon::SO_ACCEPTFILTER(); + $accf_arg = pack('a16a240', 'dataready', ''); + setsockopt($sock, SOL_SOCKET, $var, $accf_arg) or die "setsockopt: $!"; } my $upath = "$tmpdir/s"; @@ -100,7 +108,7 @@ my $spawn_httpd = sub { is(scalar(grep(/CLOSE FAIL/, @$after)), 1, 'body->close not called'); } -{ +SKIP: { my $conn = conn_for($sock, 'excessive header'); $SIG{PIPE} = 'IGNORE'; $conn->write("GET /callback HTTP/1.0\r\n"); @@ -515,6 +523,13 @@ SKIP: { is(unpack('i', $x), $defer_accept_val, 'TCP_DEFER_ACCEPT unchanged if previously set'); }; +SKIP: { + skip 'SO_ACCEPTFILTER is FreeBSD-only', 1 if $^O ne 'freebsd'; + skip 'accf_data not loaded: kldload accf_data' if !defined $accf_arg; + my $var = PublicInbox::Daemon::SO_ACCEPTFILTER(); + defined(my $x = getsockopt($sock, SOL_SOCKET, $var)) or die; + is($x, $accf_arg, 'SO_ACCEPTFILTER unchanged if previously set'); +}; done_testing(); diff --git a/t/httpd.t b/t/httpd.t index 8c2a3173..e085c4b9 100644 --- a/t/httpd.t +++ b/t/httpd.t @@ -106,6 +106,16 @@ SKIP: { defined(my $x = getsockopt($sock, IPPROTO_TCP, $var)) or die; ok(unpack('i', $x) > 0, 'TCP_DEFER_ACCEPT set'); }; +SKIP: { + skip 'SO_ACCEPTFILTER is FreeBSD-only', 1 if $^O ne 'freebsd'; + if (system('kldstat -m accf_http >/dev/null') != 0) { + skip 'accf_http not loaded: kldload accf_http', 1; + } + require PublicInbox::Daemon; + my $var = PublicInbox::Daemon::SO_ACCEPTFILTER(); + my $x = getsockopt($sock, SOL_SOCKET, $var); + like($x, qr/\Ahttpready\0+\z/, 'got httpready accf for HTTP'); +}; done_testing(); diff --git a/t/nntpd-tls.t b/t/nntpd-tls.t index ef683cab..427d370f 100644 --- a/t/nntpd-tls.t +++ b/t/nntpd-tls.t @@ -4,7 +4,7 @@ use strict; use warnings; use Test::More; use File::Temp qw(tempdir); -use Socket qw(SOCK_STREAM IPPROTO_TCP); +use Socket qw(SOCK_STREAM IPPROTO_TCP SOL_SOCKET); # IO::Poll and Net::NNTP are part of the standard library, but # distros may split them off... foreach my $mod (qw(DBD::SQLite IO::Socket::SSL Net::NNTP IO::Poll)) { @@ -190,6 +190,18 @@ for my $args ( defined($x = getsockopt($starttls, IPPROTO_TCP, $var)) or die; is(unpack('i', $x), 0, 'TCP_DEFER_ACCEPT is 0 on plain NNTP'); }; + SKIP: { + skip 'SO_ACCEPTFILTER is FreeBSD-only', 2 if $^O ne 'freebsd'; + if (system('kldstat -m accf_data >/dev/null')) { + skip 'accf_data not loaded? kldload accf_data', 2; + } + require PublicInbox::Daemon; + my $var = PublicInbox::Daemon::SO_ACCEPTFILTER(); + my $x = getsockopt($nntps, SOL_SOCKET, $var); + like($x, qr/\Adataready\0+\z/, 'got dataready accf for NNTPS'); + $x = getsockopt($starttls, IPPROTO_TCP, $var); + is($x, undef, 'no BSD accept filter for plain NNTP'); + }; $c = undef; kill('TERM', $pid); -- cgit v1.2.3-24-ge0c7 From df5755b40b4ba1d6048042e18d8ea501755b9a02 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:55 +0000 Subject: ds: split out IO::KQueue-specific code We don't need to code multiple event loops or have branches in watch() if we can easily make the IO::KQueue-based interface look like our lower-level epoll_* API. --- MANIFEST | 1 + lib/PublicInbox/DS.pm | 121 +++++++++------------------------------------ lib/PublicInbox/DSKQXS.pm | 73 +++++++++++++++++++++++++++ lib/PublicInbox/Syscall.pm | 7 +-- 4 files changed, 99 insertions(+), 103 deletions(-) create mode 100644 lib/PublicInbox/DSKQXS.pm diff --git a/MANIFEST b/MANIFEST index 26ff0d0d..52c4790e 100644 --- a/MANIFEST +++ b/MANIFEST @@ -77,6 +77,7 @@ lib/PublicInbox/Cgit.pm lib/PublicInbox/Config.pm lib/PublicInbox/ContentId.pm lib/PublicInbox/DS.pm +lib/PublicInbox/DSKQXS.pm lib/PublicInbox/Daemon.pm lib/PublicInbox/Emergency.pm lib/PublicInbox/EvCleanup.pm diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index d38e2d20..d6ef0b8d 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -36,14 +36,9 @@ use Errno qw(EAGAIN EINVAL); use Carp qw(croak confess carp); use File::Temp qw(tempfile); -our $HAVE_KQUEUE = eval { require IO::KQueue; IO::KQueue->import; 1 }; - our ( - $HaveEpoll, # Flag -- is epoll available? initially undefined. - $HaveKQueue, %DescriptorMap, # fd (num) -> PublicInbox::DS object - $Epoll, # Global epoll fd (for epoll mode only) - $KQueue, # Global kqueue fd ref (for kqueue mode only) + $Epoll, # Global epoll fd (or DSKQXS ref) $_io, # IO::Handle for Epoll @ToClose, # sockets to close when event loop is done @@ -74,13 +69,8 @@ sub Reset { $PostLoopCallback = undef; $DoneInit = 0; - # NOTE kqueue is close-on-fork, and we don't account for it, yet - # OTOH, we (public-inbox) don't need this sub outside of tests... - POSIX::close($$KQueue) if !$_io && $KQueue && $$KQueue >= 0; - $KQueue = undef; - - $_io = undef; # close $Epoll - $Epoll = undef; + $_io = undef; # closes real $Epoll FD + $Epoll = undef; # may call DSKQXS::DESTROY *EventLoop = *FirstTimeEventLoop; } @@ -152,21 +142,17 @@ sub _InitPoller return if $DoneInit; $DoneInit = 1; - if ($HAVE_KQUEUE) { - $KQueue = IO::KQueue->new(); - $HaveKQueue = defined $KQueue; - if ($HaveKQueue) { - *EventLoop = *KQueueEventLoop; - } - } - elsif (PublicInbox::Syscall::epoll_defined()) { - $Epoll = eval { epoll_create(1024); }; - $HaveEpoll = defined $Epoll && $Epoll >= 0; - if ($HaveEpoll) { - set_cloexec($Epoll); - *EventLoop = *EpollEventLoop; - } + if (!PublicInbox::Syscall::epoll_defined()) { + $Epoll = eval { + require PublicInbox::DSKQXS; + PublicInbox::DSKQXS->import; + PublicInbox::DSKQXS->new; + }; + } else { + $Epoll = epoll_create(); + set_cloexec($Epoll) if (defined($Epoll) && $Epoll >= 0); } + *EventLoop = *EpollEventLoop; } =head2 C<< CLASS->EventLoop() >> @@ -180,11 +166,7 @@ sub FirstTimeEventLoop { _InitPoller(); - if ($HaveEpoll) { - EpollEventLoop($class); - } elsif ($HaveKQueue) { - KQueueEventLoop($class); - } + EventLoop($class); } sub now () { clock_gettime(CLOCK_MONOTONIC) } @@ -218,11 +200,7 @@ sub RunTimers { return $timeout; } -### The epoll-based event loop. Gets installed as EventLoop if IO::Epoll loads -### okay. sub EpollEventLoop { - my $class = shift; - while (1) { my @events; my $i; @@ -241,30 +219,6 @@ sub EpollEventLoop { } } -### The kqueue-based event loop. Gets installed as EventLoop if IO::KQueue works -### okay. -sub KQueueEventLoop { - my $class = shift; - - while (1) { - my $timeout = RunTimers(); - my @ret = eval { $KQueue->kevent($timeout) }; - if (my $err = $@) { - # workaround https://rt.cpan.org/Ticket/Display.html?id=116615 - if ($err =~ /Interrupted system call/) { - @ret = (); - } else { - die $err; - } - } - - foreach my $kev (@ret) { - $DescriptorMap{$kev->[0]}->event_step; - } - return unless PostEventLoop(); - } -} - =head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >> Sets post loop callback function. Pass a subref and it will be @@ -314,17 +268,6 @@ sub PostEventLoop { return $keep_running; } -# map EPOLL* bits to kqueue EV_* flags for EV_SET -sub kq_flag ($$) { - my ($bit, $ev) = @_; - if ($ev & $bit) { - my $fl = EV_ADD() | EV_ENABLE(); - ($ev & EPOLLONESHOT) ? ($fl|EV_ONESHOT()) : $fl; - } else { - EV_ADD() | EV_DISABLE(); - } -} - ##################################################################### ### PublicInbox::DS-the-object code ##################################################################### @@ -353,21 +296,13 @@ sub new { _InitPoller(); - if ($HaveEpoll) { -retry: - if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) { - if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) { - $ev &= ~EPOLLEXCLUSIVE; - goto retry; - } - die "couldn't add epoll watch for $fd: $!\n"; + if (epoll_ctl($Epoll, EPOLL_CTL_ADD, $fd, $ev)) { + if ($! == EINVAL && ($ev & EPOLLEXCLUSIVE)) { + $ev &= ~EPOLLEXCLUSIVE; + goto retry; } + die "couldn't add epoll watch for $fd: $!\n"; } - elsif ($HaveKQueue) { - $KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev)); - $KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev)); - } - Carp::cluck("PublicInbox::DS::new blowing away existing descriptor map for fd=$fd ($DescriptorMap{$fd})") if $DescriptorMap{$fd}; @@ -396,11 +331,9 @@ sub close { # if we're using epoll, we have to remove this from our epoll fd so we stop getting # notifications about it - if ($HaveEpoll) { - my $fd = fileno($sock); - epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0) and - confess("EPOLL_CTL_DEL: $!"); - } + my $fd = fileno($sock); + epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0) and + confess("EPOLL_CTL_DEL: $!"); # we explicitly don't delete from DescriptorMap here until we # actually close the socket, as we might be in the middle of @@ -596,14 +529,8 @@ sub msg_more ($$) { sub watch ($$) { my ($self, $ev) = @_; my $sock = $self->{sock} or return; - my $fd = fileno($sock); - if ($HaveEpoll) { - epoll_ctl($Epoll, EPOLL_CTL_MOD, $fd, $ev) and - confess("EPOLL_CTL_MOD $!"); - } elsif ($HaveKQueue) { - $KQueue->EV_SET($fd, EVFILT_READ(), kq_flag(EPOLLIN, $ev)); - $KQueue->EV_SET($fd, EVFILT_WRITE(), kq_flag(EPOLLOUT, $ev)); - } + epoll_ctl($Epoll, EPOLL_CTL_MOD, fileno($sock), $ev) and + confess("EPOLL_CTL_MOD $!"); 0; } diff --git a/lib/PublicInbox/DSKQXS.pm b/lib/PublicInbox/DSKQXS.pm new file mode 100644 index 00000000..38e13446 --- /dev/null +++ b/lib/PublicInbox/DSKQXS.pm @@ -0,0 +1,73 @@ +# Copyright (C) 2019 all contributors +# Licensed the same as Danga::Socket (and Perl5) +# License: GPL-1.0+ or Artistic-1.0-Perl +# +# +# +# kqueue support via IO::KQueue XS module. This makes kqueue look +# like epoll to simplify the code in DS.pm. This is NOT meant to be +# an all encompassing emulation of epoll via IO::KQueue, but just to +# support cases public-inbox-nntpd/httpd care about. +# A pure-Perl version using syscall() is planned, and it should be +# faster due to the lack of syscall overhead. +package PublicInbox::DSKQXS; +use strict; +use warnings; +use parent qw(IO::KQueue); +use parent qw(Exporter); +use IO::KQueue; +use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLL_CTL_DEL); +our @EXPORT = qw(epoll_ctl epoll_wait); +my $owner_pid = -1; # kqueue is close-on-fork (yes, fork, not exec) + +# map EPOLL* bits to kqueue EV_* flags for EV_SET +sub kq_flag ($$) { + my ($bit, $ev) = @_; + if ($ev & $bit) { + my $fl = EV_ADD | EV_ENABLE; + ($ev & EPOLLONESHOT) ? ($fl | EV_ONESHOT) : $fl; + } else { + EV_ADD | EV_DISABLE; + } +} + +sub new { + my ($class) = @_; + die 'non-singleton use not supported' if $owner_pid == $$; + $owner_pid = $$; + $class->SUPER::new; +} + +sub epoll_ctl { + my ($self, $op, $fd, $ev) = @_; + if ($op != EPOLL_CTL_DEL) { + $self->EV_SET($fd, EVFILT_READ, kq_flag(EPOLLIN, $ev)); + $self->EV_SET($fd, EVFILT_WRITE, kq_flag(EPOLLOUT, $ev)); + } + 0; +} + +sub epoll_wait { + my ($self, $maxevents, $timeout_msec, $events) = @_; + @$events = eval { $self->kevent($timeout_msec) }; + if (my $err = $@) { + # workaround https://rt.cpan.org/Ticket/Display.html?id=116615 + if ($err =~ /Interrupted system call/) { + @$events = (); + } else { + die $err; + } + } + # caller only cares for $events[$i]->[0] + scalar(@$events); +} + +sub DESTROY { + my ($self) = @_; + if ($owner_pid == $$) { + POSIX::close($$self); + $owner_pid = -1; + } +} + +1; diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm index f1988e61..500efa67 100644 --- a/lib/PublicInbox/Syscall.pm +++ b/lib/PublicInbox/Syscall.pm @@ -248,13 +248,8 @@ sub sendfile_freebsd { sub epoll_defined { return $SYS_epoll_create ? 1 : 0; } -# ARGS: (size) -- but in modern Linux 2.6, the -# size doesn't even matter (radix tree now, not hash) sub epoll_create { - return -1 unless defined $SYS_epoll_create; - my $epfd = eval { syscall($SYS_epoll_create, $no_deprecated ? 0 : ($_[0]||100)+0) }; - return -1 if $@; - return $epfd; + syscall($SYS_epoll_create, $no_deprecated ? 0 : ($_[0]||100)+0); } # epoll_ctl wrapper -- cgit v1.2.3-24-ge0c7 From 4f868db3675eeee5994edc4fe79a9a2583623747 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:56 +0000 Subject: ds: reimplement IO::Poll support to look like epoll At least the subset of epoll we use. EPOLLET might be difficult to emulate if we end up using it. --- MANIFEST | 2 ++ lib/PublicInbox/DS.pm | 16 +++++++------ lib/PublicInbox/DSPoll.pm | 58 +++++++++++++++++++++++++++++++++++++++++++++++ t/ds-poll.t | 58 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 127 insertions(+), 7 deletions(-) create mode 100644 lib/PublicInbox/DSPoll.pm create mode 100644 t/ds-poll.t diff --git a/MANIFEST b/MANIFEST index 52c4790e..29920953 100644 --- a/MANIFEST +++ b/MANIFEST @@ -78,6 +78,7 @@ lib/PublicInbox/Config.pm lib/PublicInbox/ContentId.pm lib/PublicInbox/DS.pm lib/PublicInbox/DSKQXS.pm +lib/PublicInbox/DSPoll.pm lib/PublicInbox/Daemon.pm lib/PublicInbox/Emergency.pm lib/PublicInbox/EvCleanup.pm @@ -191,6 +192,7 @@ t/content_id.t t/convert-compact.t t/data/0001.patch t/ds-leak.t +t/ds-poll.t t/edit.t t/emergency.t t/fail-bin/spamc diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index d6ef0b8d..e3479e66 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -142,15 +142,17 @@ sub _InitPoller return if $DoneInit; $DoneInit = 1; - if (!PublicInbox::Syscall::epoll_defined()) { - $Epoll = eval { - require PublicInbox::DSKQXS; - PublicInbox::DSKQXS->import; - PublicInbox::DSKQXS->new; - }; - } else { + if (PublicInbox::Syscall::epoll_defined()) { $Epoll = epoll_create(); set_cloexec($Epoll) if (defined($Epoll) && $Epoll >= 0); + } else { + my $cls; + for (qw(DSKQXS DSPoll)) { + $cls = "PublicInbox::$_"; + last if eval "require $cls"; + } + $cls->import; + $Epoll = $cls->new; } *EventLoop = *EpollEventLoop; } diff --git a/lib/PublicInbox/DSPoll.pm b/lib/PublicInbox/DSPoll.pm new file mode 100644 index 00000000..e65640a8 --- /dev/null +++ b/lib/PublicInbox/DSPoll.pm @@ -0,0 +1,58 @@ +# Copyright (C) 2019 all contributors +# Licensed the same as Danga::Socket (and Perl5) +# License: GPL-1.0+ or Artistic-1.0-Perl +# +# +# +# poll(2) via IO::Poll core module. This makes poll look +# like epoll to simplify the code in DS.pm. This is NOT meant to be +# an all encompassing emulation of epoll via IO::Poll, but just to +# support cases public-inbox-nntpd/httpd care about. +package PublicInbox::DSPoll; +use strict; +use warnings; +use parent qw(Exporter); +use IO::Poll; +use PublicInbox::Syscall qw(EPOLLONESHOT EPOLLIN EPOLLOUT EPOLL_CTL_DEL); +our @EXPORT = qw(epoll_ctl epoll_wait); + +sub new { bless {}, $_[0] } # fd => events + +sub epoll_ctl { + my ($self, $op, $fd, $ev) = @_; + + # not wasting time on error checking + if ($op != EPOLL_CTL_DEL) { + $self->{$fd} = $ev; + } else { + delete $self->{$fd}; + } + 0; +} + +sub epoll_wait { + my ($self, $maxevents, $timeout_msec, $events) = @_; + my @pset; + while (my ($fd, $events) = each %$self) { + my $pevents = $events & EPOLLIN ? POLLIN : 0; + $pevents |= $events & EPOLLOUT ? POLLOUT : 0; + push(@pset, $fd, $pevents); + } + @$events = (); + my $n = IO::Poll::_poll($timeout_msec, @pset); + if ($n >= 0) { + for (my $i = 0; $i < @pset; ) { + my $fd = $pset[$i++]; + my $revents = $pset[$i++] or next; + delete($self->{$fd}) if $self->{$fd} & EPOLLONESHOT; + push @$events, [ $fd ]; + } + my $nevents = scalar @$events; + if ($n != $nevents) { + warn "BUG? poll() returned $n, but got $nevents"; + } + } + $n; +} + +1; diff --git a/t/ds-poll.t b/t/ds-poll.t new file mode 100644 index 00000000..a397ee06 --- /dev/null +++ b/t/ds-poll.t @@ -0,0 +1,58 @@ +# Copyright (C) 2019 all contributors +# Licensed the same as Danga::Socket (and Perl5) +# License: GPL-1.0+ or Artistic-1.0-Perl +# +# +use strict; +use warnings; +use Test::More; +use PublicInbox::Syscall qw(:epoll); +my $cls = 'PublicInbox::DSPoll'; +use_ok $cls; +my $p = $cls->new; + +my ($r, $w, $x, $y); +pipe($r, $w) or die; +pipe($x, $y) or die; +is(epoll_ctl($p, EPOLL_CTL_ADD, fileno($r), EPOLLIN), 0, 'add EPOLLIN'); +my $events = []; +my $n = epoll_wait($p, 9, 0, $events); +is_deeply($events, [], 'no events set'); +is($n, 0, 'nothing ready, yet'); +is(epoll_ctl($p, EPOLL_CTL_ADD, fileno($w), EPOLLOUT|EPOLLONESHOT), 0, + 'add EPOLLOUT|EPOLLONESHOT'); +$n = epoll_wait($p, 9, -1, $events); +is($n, 1, 'got POLLOUT event'); +is($events->[0]->[0], fileno($w), '$w ready'); + +$n = epoll_wait($p, 9, 0, $events); +is($n, 0, 'nothing ready after oneshot'); +is_deeply($events, [], 'no events set after oneshot'); + +syswrite($w, '1') == 1 or die; +for my $t (0..1) { + $n = epoll_wait($p, 9, $t, $events); + is($events->[0]->[0], fileno($r), "level-trigger POLLIN ready #$t"); + is($n, 1, "only event ready #$t"); +} +syswrite($y, '1') == 1 or die; +is(epoll_ctl($p, EPOLL_CTL_ADD, fileno($x), EPOLLIN|EPOLLONESHOT), 0, + 'EPOLLIN|EPOLLONESHOT add'); +is(epoll_wait($p, 9, -1, $events), 2, 'epoll_wait has 2 ready'); +my @fds = sort(map { $_->[0] } @$events); +my @exp = sort((fileno($r), fileno($x))); +is_deeply(\@fds, \@exp, 'got both ready FDs'); + +# EPOLL_CTL_DEL doesn't matter for kqueue, we do it in native epoll +# to avoid a kernel-wide lock; but its not needed for native kqueue +# paths so DSKQXS makes it a noop (as did Danga::Socket::close). +SKIP: { + if ($cls ne 'PublicInbox::DSPoll') { + skip "$cls doesn't handle EPOLL_CTL_DEL", 2; + } + is(epoll_ctl($p, EPOLL_CTL_DEL, fileno($r), 0), 0, 'EPOLL_CTL_DEL OK'); + $n = epoll_wait($p, 9, 0, $events); + is($n, 0, 'nothing ready after EPOLL_CTL_DEL'); +}; + +done_testing; -- cgit v1.2.3-24-ge0c7 From 19382e401c5fe158200ba43c204875e368158374 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 02:52:57 +0000 Subject: Revert "ci: require IO::KQueue on FreeBSD, for now" Now that we support IO::Poll once again, we can remove the IO::KQueue requirement. --- ci/deps.perl | 5 ++++- ci/profiles.sh | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/ci/deps.perl b/ci/deps.perl index ad2c11d2..62870c1f 100755 --- a/ci/deps.perl +++ b/ci/deps.perl @@ -60,7 +60,7 @@ my $profiles = { # account for granularity differences between package systems and OSes my @precious; if ($^O eq 'freebsd') { - @precious = qw(perl curl Socket6 IO::Compress::Gzip IO::KQueue); + @precious = qw(perl curl Socket6 IO::Compress::Gzip); } elsif ($pkg_fmt eq 'rpm') { @precious = qw(perl curl); } @@ -149,6 +149,9 @@ my (@pkg_install, @pkg_remove, %all); for my $ary (values %$profiles) { $all{$_} = \@pkg_remove for @$ary; } +if ($^O eq 'freebsd') { + $all{'IO::KQueue'} = \@pkg_remove; +} $profiles->{all} = [ keys %all ]; # pseudo-profile for all packages # parse the profile list from the command-line diff --git a/ci/profiles.sh b/ci/profiles.sh index 1ddf7891..d559ec5f 100755 --- a/ci/profiles.sh +++ b/ci/profiles.sh @@ -54,7 +54,8 @@ esac case $ID-$VERSION_ID in freebsd-11|freebsd-12) sed "s/^/$PKG_FMT /" < Date: Mon, 24 Jun 2019 02:52:58 +0000 Subject: ds: reduce overhead of tempfile creation We end up buffering giant things to the FS sometimes, and open() is not a cheap syscall; so being forced to do it twice to get a file description with O_APPEND is gross when we can just use O_EXCL ourselves and loop on EEXIST. --- lib/PublicInbox/DS.pm | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index e3479e66..4947192f 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -18,7 +18,7 @@ use strict; use bytes; use POSIX (); use IO::Handle qw(); -use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD SEEK_SET); +use Fcntl qw(SEEK_SET :DEFAULT); use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC); use parent qw(Exporter); our @EXPORT_OK = qw(now msg_more); @@ -32,9 +32,9 @@ use fields ('sock', # underlying socket 'wbuf_off', # offset into first element of wbuf to start writing at ); -use Errno qw(EAGAIN EINVAL); +use Errno qw(EAGAIN EINVAL EEXIST); use Carp qw(croak confess carp); -use File::Temp qw(tempfile); +require File::Spec; our ( %DescriptorMap, # fd (num) -> PublicInbox::DS object @@ -440,12 +440,16 @@ sub drop { # PerlIO::mmap or PerlIO::scalar if needed sub tmpio ($$$) { my ($self, $bref, $off) = @_; - # open(my $fh, '+>>', undef) doesn't set O_APPEND - my ($fh, $path) = eval { tempfile('wbuf-XXXXXXX', TMPDIR => 1) }; - $fh or return drop($self, "tempfile: $@"); - open($fh, '+>>', $path) or return drop($self, "open: $!"); + my $fh; # open(my $fh, '+>>', undef) doesn't set O_APPEND + do { + my $fn = File::Spec->tmpdir . '/wbuf-' . rand; + if (sysopen($fh, $fn, O_RDWR|O_CREAT|O_EXCL|O_APPEND, 0600)) { # likely + unlink($fn) or return drop($self, "unlink($fn) $!"); + } elsif ($! != EEXIST) { # EMFILE/ENFILE/ENOSPC/ENOMEM + return drop($self, "open: $!"); + } + } until (defined $fh); $fh->autoflush(1); - unlink($path) or return drop($self, "unlink: $!"); my $len = bytes::length($$bref) - $off; $fh->write($$bref, $len, $off) or return drop($self, "write ($len): $!"); $fh -- cgit v1.2.3-24-ge0c7 From ac2173434be2be4baba51db6dac7351833cdcdeb Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 04:06:42 +0000 Subject: Makefile: skip DSKQXS in global syntax check IO::KQueue isn't easily installable on Linux systems. --- Makefile.PL | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile.PL b/Makefile.PL index 23822072..adcf91e5 100644 --- a/Makefile.PL +++ b/Makefile.PL @@ -77,7 +77,7 @@ changed = \$(shell git ls-files -m) %.syntax :: @\$(PERL) -w -I lib -c \$(subst .syntax,,\$@) -syntax:: \$(my_syntax) +syntax:: \$(filter-out lib/PublicInbox/DSKQXS.pm.syntax,\$(my_syntax)) dsyn :: \$(addsuffix .syntax, \$(filter \$(changed), \$(syn_files))) -- cgit v1.2.3-24-ge0c7 From c30b4427b340aeb242273a7b890fbd7e50132f51 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 24 Jun 2019 18:18:18 +0000 Subject: ds: ->write must not clobber empty wbuf array We need to account for ->write(CODE) calls doing ->write(SCALARREF), otherwise flush_write may see the wrong ->{wbuf} field. --- lib/PublicInbox/DS.pm | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 4947192f..08f4e9e8 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -505,7 +505,10 @@ sub write { return $self->close; } my $tmpio = tmpio($self, $bref, $written) or return 0; - $self->{wbuf} = [ $tmpio ]; + + # wbuf may be an empty array if we're being called inside + # ->flush_write via CODE bref: + push @{$self->{wbuf} ||= []}, $tmpio; watch($self, EPOLLOUT|EPOLLONESHOT); return 0; } -- cgit v1.2.3-24-ge0c7