diff options
-rw-r--r-- | lib/PublicInbox/DS.pm | 4 | ||||
-rw-r--r-- | lib/PublicInbox/NNTP.pm | 7 | ||||
-rw-r--r-- | lib/PublicInbox/NNTPdeflate.pm | 59 | ||||
-rw-r--r-- | t/nntpd-validate.t | 8 |
4 files changed, 54 insertions, 24 deletions
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 586c47cd..b16b1896 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -553,7 +553,9 @@ sub msg_more ($$) { return 0; } } - $self->write(\($_[1])); + + # don't redispatch into NNTPdeflate::write + PublicInbox::DS::write($self, \($_[1])); } sub epwait ($$) { diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index d6f315ba..895858b7 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -642,6 +642,11 @@ sub long_response ($$) { } elsif ($more) { # $self->{wbuf}: update_idle_time($self); + # COMPRESS users all share the same DEFLATE context. + # Flush it here to ensure clients don't see + # each other's data + $self->zflush; + # no recursion, schedule another call ASAP # but only after all pending writes are done my $wbuf = $self->{wbuf} ||= []; @@ -925,6 +930,8 @@ sub cmd_compress ($$) { undef } +sub zflush {} # overridden by NNTPdeflate + sub cmd_xpath ($$) { my ($self, $mid) = @_; return r501 unless $mid =~ /\A<(.+)>\z/; diff --git a/lib/PublicInbox/NNTPdeflate.pm b/lib/PublicInbox/NNTPdeflate.pm index 66210bfa..78da2a58 100644 --- a/lib/PublicInbox/NNTPdeflate.pm +++ b/lib/PublicInbox/NNTPdeflate.pm @@ -2,13 +2,18 @@ # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> # RFC 8054 NNTP COMPRESS DEFLATE implementation -# Warning, enabling compression for C10K NNTP clients is rather -# expensive in terms of memory use. # # RSS usage for 10K idle-but-did-something NNTP clients on 64-bit: -# TLS + DEFLATE : 1.8 GB (MemLevel=9, 1.2 GB with MemLevel=8) -# TLS only : <200MB -# plain : <50MB +# TLS + DEFLATE[a] : 1.8 GB (MemLevel=9, 1.2 GB with MemLevel=8) +# TLS + DEFLATE[b] : ~300MB +# TLS only : <200MB +# plain : <50MB +# +# [a] - initial implementation using per-client Deflate contexts and buffer +# +# [b] - memory-optimized implementation using a global deflate context. +# It's less efficient in terms of compression, but way more +# efficient in terms of server memory usage. package PublicInbox::NNTPdeflate; use strict; use warnings; @@ -23,11 +28,11 @@ my %IN_OPT = ( -AppendOutput => 1, ); -my %OUT_OPT = ( +# global deflate context and buffer +my $zbuf = \(my $buf = ''); +my $zout = Compress::Raw::Zlib::Deflate->new( # nnrpd (INN) and Compress::Raw::Zlib favor MemLevel=9, - # but the zlib C library and git use MemLevel=8 - # as the default. Using 8 drops our memory use with 10K - # TLS clients from 1.8 GB to 1.2 GB, but... + # but the zlib C library and git use MemLevel=8 as the default. # FIXME: sometimes clients fail with 8, so we use 9 # -MemLevel => 9, @@ -43,7 +48,6 @@ sub enable { unlock_hash(%$self); bless $self, $class; $self->{zin} = [ Compress::Raw::Zlib::Inflate->new(%IN_OPT), '' ]; - $self->{zout} = [ Compress::Raw::Zlib::Deflate->new(%OUT_OPT), '' ]; } # overrides PublicInbox::NNTP::compressed @@ -74,31 +78,42 @@ sub do_read ($$$$) { # override PublicInbox::DS::msg_more sub msg_more ($$) { my $self = $_[0]; - my $zout = $self->{zout}; # $_[1] may be a reference or not for ->deflate - my $err = $zout->[0]->deflate($_[1], $zout->[1]); + my $err = $zout->deflate($_[1], $zbuf); $err == Z_OK or die "->deflate failed $err"; 1; } -# SUPER is PublicInbox::DS::write, so $_[1] may be a reference or not +sub zflush ($) { + my ($self) = @_; + + my $deflated = $zbuf; + $zbuf = \(my $next = ''); + + my $err = $zout->flush($deflated, Z_FULL_FLUSH); + $err == Z_OK or die "->flush failed $err"; + + # We can still let the lower socket layer do buffering: + PublicInbox::DS::msg_more($self, $$deflated); +} + +# compatible with PublicInbox::DS::write, so $_[1] may be a reference or not sub write ($$) { my $self = $_[0]; - return $self->SUPER::write($_[1]) if ref($_[1]) eq 'CODE'; - my $zout = $self->{zout}; - my $deflated = pop @$zout; + return PublicInbox::DS::write($self, $_[1]) if ref($_[1]) eq 'CODE'; + + my $deflated = $zbuf; + $zbuf = \(my $next = ''); # $_[1] may be a reference or not for ->deflate - my $err = $zout->[0]->deflate($_[1], $deflated); + my $err = $zout->deflate($_[1], $deflated); $err == Z_OK or die "->deflate failed $err"; - $err = $zout->[0]->flush($deflated, Z_PARTIAL_FLUSH); + $err = $zout->flush($deflated, Z_FULL_FLUSH); $err == Z_OK or die "->flush failed $err"; - # PublicInbox::DS::write puts partial writes into another buffer, - # so we can prepare the next deflate buffer: - $zout->[1] = ''; - $self->SUPER::write(\$deflated); + # We can still let the socket layer do buffering: + PublicInbox::DS::write($self, $deflated); } 1; diff --git a/t/nntpd-validate.t b/t/nntpd-validate.t index 1a325105..532ef729 100644 --- a/t/nntpd-validate.t +++ b/t/nntpd-validate.t @@ -112,11 +112,17 @@ sub do_get_all { } } } + + # hacky bytes_read thing added to Net::NNTP for testing: + my $bytes_read = ''; + if ($nntp->can('bytes_read')) { + $bytes_read .= ' '.$nntp->bytes_read.'b'; + } my $q = $nntp->quit; print STDERR "# quit failed: ".$nntp->code."\n" if !$q; my $elapsed = sprintf('%0.3f', clock_gettime(CLOCK_MONOTONIC) - $t0); my $res = $dig->hexdigest; - print STDERR "# $desc - $res (${elapsed}s)\n"; + print STDERR "# $desc - $res (${elapsed}s)$bytes_read\n"; $res; } my @tests = ([]); |