about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2019-07-05 22:53:38 +0000
committerEric Wong <e@80x24.org>2019-07-06 04:33:37 +0000
commit77c66b4cdb1d52321ed3cb6352fe0b72312cbb71 (patch)
tree66e6b3a1a0a754abaa02589301b2b99a1af5ea41
parentf60b310cf3ebabbb7aae6a74fb91bf5946983503 (diff)
downloadpublic-inbox-77c66b4cdb1d52321ed3cb6352fe0b72312cbb71.tar.gz
This is only tested so far with my patches to Net::NNTP at:
https://rt.cpan.org/Ticket/Display.html?id=129967

Memory use in C10K situations is disappointing, but that's
the nature of compression.

gzip compression over HTTPS does have the advantage of not
keeping zlib streams open when clients are idle, at the
cost of worse compression.
-rw-r--r--MANIFEST2
-rw-r--r--TODO2
-rw-r--r--lib/PublicInbox/NNTP.pm23
-rw-r--r--lib/PublicInbox/NNTPdeflate.pm104
-rwxr-xr-xscript/public-inbox-nntpd2
-rw-r--r--t/nntpd-tls.t11
-rw-r--r--t/nntpd-validate.t210
-rw-r--r--t/nntpd.t7
8 files changed, 357 insertions, 4 deletions
diff --git a/MANIFEST b/MANIFEST
index 4cb5f38f..24280351 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -113,6 +113,7 @@ lib/PublicInbox/MsgTime.pm
 lib/PublicInbox/Msgmap.pm
 lib/PublicInbox/NNTP.pm
 lib/PublicInbox/NNTPD.pm
+lib/PublicInbox/NNTPdeflate.pm
 lib/PublicInbox/NewsWWW.pm
 lib/PublicInbox/Over.pm
 lib/PublicInbox/OverIdx.pm
@@ -230,6 +231,7 @@ t/msgmap.t
 t/msgtime.t
 t/nntp.t
 t/nntpd-tls.t
+t/nntpd-validate.t
 t/nntpd.t
 t/nulsubject.t
 t/over.t
diff --git a/TODO b/TODO
index 7bd68c7b..2d20bad4 100644
--- a/TODO
+++ b/TODO
@@ -33,8 +33,6 @@ all need to be considered for everything we introduce)
   ensure things continue working as they should (or more better)
   while retaining compatibility with old versions.
 
-* NNTP COMPRESS extension (see innd)
-
 * Support more of RFC 3977 (NNTP)
 
 * Combined "super server" for NNTP/HTTP/POP3 to reduce memory overhead
diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm
index 631fd3c7..d6f315ba 100644
--- a/lib/PublicInbox/NNTP.pm
+++ b/lib/PublicInbox/NNTP.pm
@@ -20,6 +20,7 @@ use Time::Local qw(timegm timelocal);
 use constant {
         LINE_MAX => 512, # RFC 977 section 2.3
         r501 => '501 command syntax error',
+        r502 => '502 Command unavailable',
         r221 => '221 Header follows',
         r224 => '224 Overview information follows (multi-line)',
         r225 =>        '225 Headers follow (multi-line)',
@@ -41,6 +42,7 @@ LIST ACTIVE ACTIVE.TIMES NEWSGROUPS OVERVIEW.FMT\r
 HDR\r
 OVER\r
 
+my $have_deflate;
 my $EXPMAP; # fd -> [ idle_time, $self ]
 my $expt;
 our $EXPTIME = 180; # 3 minutes
@@ -897,11 +899,13 @@ sub cmd_xover ($;$) {
         });
 }
 
+sub compressed { undef }
+
 sub cmd_starttls ($) {
         my ($self) = @_;
         my $sock = $self->{sock} or return;
         # RFC 4642 2.2.1
-        (ref($sock) eq 'IO::Socket::SSL') and return '502 Command unavailable';
+        return r502 if (ref($sock) eq 'IO::Socket::SSL' || $self->compressed);
         my $opt = $self->{nntpd}->{accept_tls} or
                 return '580 can not initiate TLS negotiation';
         res($self, '382 Continue with TLS negotiation');
@@ -910,6 +914,17 @@ sub cmd_starttls ($) {
         undef;
 }
 
+# RFC 8054
+sub cmd_compress ($$) {
+        my ($self, $alg) = @_;
+        return '503 Only the DEFLATE is supported' if uc($alg) ne 'DEFLATE';
+        return r502 if $self->compressed || !$have_deflate;
+        res($self, '206 Compression active');
+        PublicInbox::NNTPdeflate->enable($self);
+        $self->requeue;
+        undef
+}
+
 sub cmd_xpath ($$) {
         my ($self, $mid) = @_;
         return r501 unless $mid =~ /\A<(.+)>\z/;
@@ -997,4 +1012,10 @@ sub busy {
         ($self->{rbuf} || $self->{wbuf} || not_idle_long($self, $now));
 }
 
+# this is an import to prevent "perl -c" from complaining about fields
+sub import {
+        $have_deflate = eval { require PublicInbox::NNTPdeflate } and
+                $CAPABILITIES .= "COMPRESS DEFLATE\r\n";
+}
+
 1;
diff --git a/lib/PublicInbox/NNTPdeflate.pm b/lib/PublicInbox/NNTPdeflate.pm
new file mode 100644
index 00000000..66210bfa
--- /dev/null
+++ b/lib/PublicInbox/NNTPdeflate.pm
@@ -0,0 +1,104 @@
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# RFC 8054 NNTP COMPRESS DEFLATE implementation
+# Warning, enabling compression for C10K NNTP clients is rather
+# expensive in terms of memory use.
+#
+# RSS usage for 10K idle-but-did-something NNTP clients on 64-bit:
+#   TLS + DEFLATE :  1.8 GB  (MemLevel=9, 1.2 GB with MemLevel=8)
+#   TLS only      :  <200MB
+#   plain         :   <50MB
+package PublicInbox::NNTPdeflate;
+use strict;
+use warnings;
+use 5.010_001;
+use base qw(PublicInbox::NNTP);
+use Compress::Raw::Zlib;
+use Hash::Util qw(unlock_hash); # dependency of fields for perl 5.10+, anyways
+
+my %IN_OPT = (
+        -Bufsize => PublicInbox::NNTP::LINE_MAX,
+        -WindowBits => -15, # RFC 1951
+        -AppendOutput => 1,
+);
+
+my %OUT_OPT = (
+        # nnrpd (INN) and Compress::Raw::Zlib favor MemLevel=9,
+        # but the zlib C library and git use MemLevel=8
+        # as the default.  Using 8 drops our memory use with 10K
+        # TLS clients from 1.8 GB to 1.2 GB, but...
+        # FIXME: sometimes clients fail with 8, so we use 9
+        # -MemLevel => 9,
+
+        # needs more testing, nothing obviously different in terms of memory
+        -Bufsize => 65536,
+
+        -WindowBits => -15, # RFC 1951
+        -AppendOutput => 1,
+);
+
+sub enable {
+        my ($class, $self) = @_;
+        unlock_hash(%$self);
+        bless $self, $class;
+        $self->{zin} = [ Compress::Raw::Zlib::Inflate->new(%IN_OPT), '' ];
+        $self->{zout} = [ Compress::Raw::Zlib::Deflate->new(%OUT_OPT), '' ];
+}
+
+# overrides PublicInbox::NNTP::compressed
+sub compressed { 1 }
+
+# SUPER is PublicInbox::DS::do_read, so $_[1] may be a reference or not
+sub do_read ($$$$) {
+        my ($self, $rbuf, $len, $off) = @_;
+
+        my $zin = $self->{zin} or return; # closed
+        my $deflated = \($zin->[1]);
+        my $r = $self->SUPER::do_read($deflated, $len) or return;
+
+        # assert(length($$rbuf) == $off) as far as NNTP.pm is concerned
+        # -ConsumeInput is true, so $deflated is automatically emptied
+        my $err = $zin->[0]->inflate($deflated, $rbuf);
+        if ($err == Z_OK) {
+                $r = length($$rbuf) and return $r;
+                # nothing ready, yet, get more, later
+                $self->requeue;
+        } else {
+                delete $self->{zin};
+                $self->close;
+        }
+        0;
+}
+
+# override PublicInbox::DS::msg_more
+sub msg_more ($$) {
+        my $self = $_[0];
+        my $zout = $self->{zout};
+
+        # $_[1] may be a reference or not for ->deflate
+        my $err = $zout->[0]->deflate($_[1], $zout->[1]);
+        $err == Z_OK or die "->deflate failed $err";
+        1;
+}
+
+# SUPER is PublicInbox::DS::write, so $_[1] may be a reference or not
+sub write ($$) {
+        my $self = $_[0];
+        return $self->SUPER::write($_[1]) if ref($_[1]) eq 'CODE';
+        my $zout = $self->{zout};
+        my $deflated = pop @$zout;
+
+        # $_[1] may be a reference or not for ->deflate
+        my $err = $zout->[0]->deflate($_[1], $deflated);
+        $err == Z_OK or die "->deflate failed $err";
+        $err = $zout->[0]->flush($deflated, Z_PARTIAL_FLUSH);
+        $err == Z_OK or die "->flush failed $err";
+
+        # PublicInbox::DS::write puts partial writes into another buffer,
+        # so we can prepare the next deflate buffer:
+        $zout->[1] = '';
+        $self->SUPER::write(\$deflated);
+}
+
+1;
diff --git a/script/public-inbox-nntpd b/script/public-inbox-nntpd
index 55bf330e..cd273849 100755
--- a/script/public-inbox-nntpd
+++ b/script/public-inbox-nntpd
@@ -6,7 +6,7 @@
 use strict;
 use warnings;
 require PublicInbox::Daemon;
-require PublicInbox::NNTP;
+use PublicInbox::NNTP; # need to call import
 require PublicInbox::NNTPD;
 my $nntpd = PublicInbox::NNTPD->new;
 PublicInbox::Daemon::run('0.0.0.0:119',
diff --git a/t/nntpd-tls.t b/t/nntpd-tls.t
index 49b31221..84d6e3c0 100644
--- a/t/nntpd-tls.t
+++ b/t/nntpd-tls.t
@@ -29,6 +29,9 @@ require './t/common.perl';
 require PublicInbox::InboxWritable;
 require PublicInbox::MIME;
 require PublicInbox::SearchIdx;
+my $need_zlib;
+eval { require Compress::Raw::Zlib } or
+        $need_zlib = 'Compress::Raw::Zlib missing';
 my $version = 2; # v2 needs newer git
 require_git('2.6') if $version >= 2;
 my $tmpdir = tempdir('pi-nntpd-tls-XXXXXX', TMPDIR => 1, CLEANUP => 1);
@@ -234,6 +237,14 @@ sub get_capa {
                 die "unexpected: $!" unless defined($r);
                 die 'unexpected EOF' if $r == 0;
         } until $capa =~ /\.\r\n\z/;
+
+        my $deflate_capa = qr/\r\nCOMPRESS DEFLATE\r\n/;
+        if ($need_zlib) {
+                unlike($capa, $deflate_capa,
+                        'COMPRESS DEFLATE NOT advertised '.$need_zlib);
+        } else {
+                like($capa, $deflate_capa, 'COMPRESS DEFLATE advertised');
+        }
         $capa;
 }
 
diff --git a/t/nntpd-validate.t b/t/nntpd-validate.t
new file mode 100644
index 00000000..1a325105
--- /dev/null
+++ b/t/nntpd-validate.t
@@ -0,0 +1,210 @@
+# Copyright (C) 2019 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Integration test to validate compression.
+use strict;
+use warnings;
+use File::Temp qw(tempdir);
+use Test::More;
+use Symbol qw(gensym);
+use Time::HiRes qw(clock_gettime CLOCK_MONOTONIC);
+my $inbox_dir = $ENV{GIANT_INBOX_DIR};
+plan skip_all => "GIANT_INBOX_DIR not defined for $0" unless $inbox_dir;
+my $mid = $ENV{TEST_MID};
+
+# This test is also an excuse for me to experiment with Perl threads :P
+unless (eval 'use threads; 1') {
+        plan skip_all => "$0 requires a threaded perl" if $@;
+}
+
+# Net::NNTP is part of the standard library, but distros may split it off...
+foreach my $mod (qw(DBD::SQLite Net::NNTP Compress::Raw::Zlib)) {
+        eval "require $mod";
+        plan skip_all => "$mod missing for $0" if $@;
+}
+
+my $test_compress = Net::NNTP->can('compress');
+if (!$test_compress) {
+        diag 'Your Net::NNTP does not yet support compression';
+        diag 'See: https://rt.cpan.org/Ticket/Display.html?id=129967';
+}
+my $test_tls = $ENV{TEST_SKIP_TLS} ? 0 : eval { require IO::Socket::SSL };
+my $cert = 'certs/server-cert.pem';
+my $key = 'certs/server-key.pem';
+if ($test_tls && !-r $key || !-r $cert) {
+        plan skip_all => "certs/ missing for $0, run ./certs/create-certs.perl";
+}
+require './t/common.perl';
+my $keep_tmp = !!$ENV{TEST_KEEP_TMP};
+my $tmpdir = tempdir('nntpd-validate-XXXXXX',TMPDIR => 1,CLEANUP => $keep_tmp);
+my (%OPT, $pid, $tail_pid, $host_port, $group);
+my $batch = 1000;
+END {
+        foreach ($pid, $tail_pid) {
+                kill 'TERM', $_ if defined $_;
+        }
+};
+if (($ENV{NNTP_TEST_URL} // '') =~ m!\Anntp://([^/]+)/([^/]+)\z!) {
+        ($host_port, $group) = ($1, $2);
+        $host_port .= ":119" unless index($host_port, ':') > 0;
+} else {
+        make_local_server();
+}
+my $test_article = $ENV{TEST_ARTICLE} // 0;
+my $test_xover = $ENV{TEST_XOVER} // 1;
+
+if ($test_tls) {
+        my $nntp = Net::NNTP->new($host_port, %OPT);
+        ok($nntp->starttls, 'STARTTLS works');
+        ok($nntp->compress, 'COMPRESS works') if $test_compress;
+        ok($nntp->quit, 'QUIT after starttls OK');
+}
+if ($test_compress) {
+        my $nntp = Net::NNTP->new($host_port, %OPT);
+        ok($nntp->compress, 'COMPRESS works');
+        ok($nntp->quit, 'QUIT after compress OK');
+}
+
+sub do_get_all {
+        my ($methods) = @_;
+        my $desc = join(',', @$methods);
+        my $t0 = clock_gettime(CLOCK_MONOTONIC);
+        my $dig = Digest::SHA->new(1);
+        my $digfh = gensym;
+        my $tmpfh;
+        if ($keep_tmp) {
+                open $tmpfh, '>', "$tmpdir/$desc.raw" or die $!;
+        }
+        my $tmp = { dig => $dig, tmpfh => $tmpfh };
+        tie *$digfh, 'DigestPipe', $tmp;
+        my $nntp = Net::NNTP->new($host_port, %OPT);
+        $nntp->article("<$mid>", $digfh) if $mid;
+        foreach my $m (@$methods) {
+                my $res = $nntp->$m;
+                print STDERR "# $m got $res ($desc)\n" if !$res;
+        }
+        $nntp->article("<$mid>", $digfh) if $mid;
+        my ($num, $first, $last) = $nntp->group($group);
+        unless (defined $num && defined $first && defined $last) {
+                warn "Invalid group\n";
+                return undef;
+        }
+        my $i;
+        for ($i = $first; $i < $last; $i += $batch) {
+                my $j = $i + $batch - 1;
+                $j = $last if $j > $last;
+                if ($test_xover) {
+                        my $xover = $nntp->xover("$i-$j");
+                        for my $n (sort { $a <=> $b } keys %$xover) {
+                                my $line = join("\t", @{$xover->{$n}});
+                                $line =~ tr/\r//d;
+                                $dig->add("$n\t".$line);
+                        }
+                }
+                if ($test_article) {
+                        for my $n ($i..$j) {
+                                $nntp->article($n, $digfh) and next;
+                                next if $nntp->code == 423;
+                                my $res = $nntp->code.' '.  $nntp->message;
+
+                                $res =~ tr/\r\n//d;
+                                print STDERR "# Article $n ($desc): $res\n";
+                        }
+                }
+        }
+        my $q = $nntp->quit;
+        print STDERR "# quit failed: ".$nntp->code."\n" if !$q;
+        my $elapsed = sprintf('%0.3f', clock_gettime(CLOCK_MONOTONIC) - $t0);
+        my $res = $dig->hexdigest;
+        print STDERR "# $desc - $res (${elapsed}s)\n";
+        $res;
+}
+my @tests = ([]);
+push @tests, [ 'compress' ] if $test_compress;
+push @tests, [ 'starttls' ] if $test_tls;
+push @tests, [ 'starttls', 'compress' ] if $test_tls && $test_compress;
+my (@keys, %thr, %res);
+for my $m (@tests) {
+        my $key = join(',', @$m);
+        push @keys, $key;
+        diag "$key start";
+        $thr{$key} = threads->create(\&do_get_all, $m);
+}
+
+$res{$_} = $thr{$_}->join for @keys;
+my $plain = $res{''};
+ok($plain, "plain got $plain");
+is($res{$_}, $plain, "$_ matches '' result") for @keys;
+
+done_testing();
+
+sub make_local_server {
+        require PublicInbox::Inbox;
+        $group = 'inbox.test.perf.nntpd';
+        my $ibx = { mainrepo => $inbox_dir, newsgroup => $group };
+        $ibx = PublicInbox::Inbox->new($ibx);
+        my $nntpd = 'blib/script/public-inbox-nntpd';
+        my $pi_config = "$tmpdir/config";
+        {
+                open my $fh, '>', $pi_config or die "open($pi_config): $!";
+                print $fh <<"" or die "print $pi_config: $!";
+[publicinbox "test"]
+        newsgroup = $group
+        mainrepo = $inbox_dir
+        address = test\@example.com
+
+                close $fh or die "close($pi_config): $!";
+        }
+        my ($out, $err) = ("$tmpdir/out", "$tmpdir/err");
+        for ($out, $err) {
+                open my $fh, '>', $_ or die "truncate: $!";
+        }
+        if (my $tail_cmd = $ENV{TAIL}) { # don't assume GNU tail
+                $tail_pid = fork;
+                if (defined $tail_pid && $tail_pid == 0) {
+                        open STDOUT, '>&STDERR' or die ">&2 failed: $!";
+                        exec(split(' ', $tail_cmd), $out, $err);
+                }
+        }
+        my $sock = tcp_server();
+        ok($sock, 'sock created');
+        $host_port = $sock->sockhost . ':' . $sock->sockport;
+
+        # not using multiple workers, here, since we want to increase
+        # the chance of tripping concurrency bugs within PublicInbox/NNTP*.pm
+        my $cmd = [ $nntpd, "--stdout=$out", "--stderr=$err", '-W0' ];
+        push @$cmd, "-lnntp://$host_port";
+        if ($test_tls) {
+                push @$cmd, "--cert=$cert", "--key=$key";
+                %OPT = (
+                        SSL_hostname => 'server.local',
+                        SSL_verifycn_name => 'server.local',
+                        SSL_verify_mode => IO::Socket::SSL::SSL_VERIFY_PEER(),
+                        SSL_ca_file => 'certs/test-ca.pem',
+                );
+        }
+        print STDERR "# CMD ". join(' ', @$cmd). "\n";
+        $pid = spawn_listener({ PI_CONFIG => $pi_config }, $cmd, [$sock]);
+}
+
+package DigestPipe;
+use strict;
+use warnings;
+
+sub TIEHANDLE {
+        my ($class, $self) = @_;
+        bless $self, $class;
+}
+
+sub PRINT {
+        my $self = shift;
+        my $data = join('', @_);
+        # Net::NNTP emit different line-endings depending on TLS or not...:
+        $data =~ tr/\r//d;
+        $self->{dig}->add($data);
+        if (my $tmpfh = $self->{tmpfh}) {
+                print $tmpfh $data;
+        }
+        1;
+}
+1;
diff --git a/t/nntpd.t b/t/nntpd.t
index fdb4bee4..e264fa6b 100644
--- a/t/nntpd.t
+++ b/t/nntpd.t
@@ -151,6 +151,13 @@ EOF
         $buf = read_til_dot($s);
         like($buf, qr/\r\nVERSION 2\r\n/s, 'CAPABILITIES works');
         unlike($buf, qr/STARTTLS/s, 'STARTTLS not advertised');
+        my $deflate_capa = qr/\r\nCOMPRESS DEFLATE\r\n/;
+        if (eval { require Compress::Raw::Zlib }) {
+                like($buf, $deflate_capa, 'DEFLATE advertised');
+        } else {
+                unlike($buf, $deflate_capa,
+                        'DEFLATE not advertised (Compress::Raw::Zlib missing)');
+        }
 
         syswrite($s, "NEWGROUPS 19990424 000000 GMT\r\n");
         $buf = read_til_dot($s);