diff options
-rw-r--r-- | lib/PublicInbox/DS.pm | 41 | ||||
-rw-r--r-- | lib/PublicInbox/EvCleanup.pm | 80 | ||||
-rw-r--r-- | lib/PublicInbox/HTTP.pm | 6 | ||||
-rw-r--r-- | lib/PublicInbox/HTTPD/Async.pm | 6 | ||||
-rw-r--r-- | lib/PublicInbox/NNTP.pm | 14 |
5 files changed, 36 insertions, 111 deletions
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 8f1494f6..6cd527e2 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -37,7 +37,6 @@ use Errno qw(EAGAIN EINVAL EEXIST); use Carp qw(croak confess carp); require File::Spec; -my $nextt; # timer for next_tick my $nextq = []; # queue for next_tick our ( %DescriptorMap, # fd (num) -> PublicInbox::DS object @@ -101,12 +100,6 @@ Returns a timer object which you can call C<< $timer->cancel >> on if you need t sub AddTimer { my ($class, $secs, $coderef) = @_; - if (!$secs) { - my $timer = bless([0, $coderef], 'PublicInbox::DS::Timer'); - unshift(@Timers, $timer); - return $timer; - } - my $fire_time = now() + $secs; my $timer = bless [$fire_time, $coderef], "PublicInbox::DS::Timer"; @@ -176,9 +169,23 @@ sub FirstTimeEventLoop { sub now () { clock_gettime(CLOCK_MONOTONIC) } +sub next_tick () { + my $q = $nextq; + $nextq = []; + for (@$q) { + if (ref($_) eq 'CODE') { + $_->(); + } else { + $_->event_step; + } + } +} + # runs timers and returns milliseconds for next one, or next event loop sub RunTimers { - return $LoopTimeout unless @Timers; + next_tick(); + + return ((@$nextq || @ToClose) ? 0 : $LoopTimeout) unless @Timers; my $now = now(); @@ -188,6 +195,9 @@ sub RunTimers { $to_run->[1]->($now) if $to_run->[1]; } + # timers may enqueue into nextq: + return 0 if (@$nextq || @ToClose); + return $LoopTimeout unless @Timers; # convert time to an even number of milliseconds, adding 1 @@ -320,6 +330,8 @@ sub new { ### I N S T A N C E M E T H O D S ##################################################################### +sub requeue ($) { push @$nextq, $_[0] } + =head2 C<< $obj->close >> Close the socket. @@ -593,19 +605,6 @@ sub shutdn ($) { $self->close; } } - -sub next_tick () { - $nextt = undef; - my $q = $nextq; - $nextq = []; - $_->event_step for @$q; -} - -sub requeue ($) { - push @$nextq, $_[0]; - $nextt ||= PublicInbox::EvCleanup::asap(*next_tick); -} - package PublicInbox::DS::Timer; # [$abs_float_firetime, $coderef]; sub cancel { diff --git a/lib/PublicInbox/EvCleanup.pm b/lib/PublicInbox/EvCleanup.pm index 33b54ebc..be6672ed 100644 --- a/lib/PublicInbox/EvCleanup.pm +++ b/lib/PublicInbox/EvCleanup.pm @@ -1,80 +1,23 @@ -# Copyright (C) 2016-2018 all contributors <meta@public-inbox.org> +# Copyright (C) 2016-2019 all contributors <meta@public-inbox.org> # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> -# event cleanups (currently for PublicInbox::DS) +# event cleanups (for PublicInbox::DS) package PublicInbox::EvCleanup; use strict; use warnings; -use base qw(PublicInbox::DS); -use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT); +require PublicInbox::DS; +# this only runs under public-inbox-{httpd/nntpd}, not generic PSGI servers my $ENABLED; sub enabled { $ENABLED } sub enable { $ENABLED = 1 } -my $singleton; -my $asapq = [ [], undef ]; -my $nextq = [ [], undef ]; my $laterq = [ [], undef ]; -sub once_init () { - my $self = fields::new('PublicInbox::EvCleanup'); - my ($r, $w); - - # This is a dummy pipe which is always writable so it can always - # fires in the next event loop iteration. - pipe($r, $w) or die "pipe: $!"; - fcntl($w, 1031, 4096) if $^O eq 'linux'; # 1031: F_SETPIPE_SZ - $self->SUPER::new($w, 0); - - # always writable, since PublicInbox::EvCleanup::event_step - # never drains wbuf. We can avoid wasting a hash slot by - # stuffing the read-end of the pipe into the never-to-be-touched - # wbuf - $self->{wbuf} = $r; - $self; -} - -sub _run_all ($) { - my ($q) = @_; - - my $run = $q->[0]; - $q->[0] = []; - $q->[1] = undef; - $_->() foreach @$run; -} - -# ensure PublicInbox::DS::ToClose processing after timers fire -sub _asap_close () { $asapq->[1] ||= _asap_timer() } - -# Called by PublicInbox::DS -sub event_step { _run_all($asapq) } - -sub _run_next () { - _run_all($nextq); - _asap_close(); -} - sub _run_later () { - _run_all($laterq); - _asap_close(); -} - -sub _asap_timer () { - $singleton ||= once_init(); - $singleton->watch(EPOLLOUT|EPOLLONESHOT); - 1; -} - -sub asap ($) { - my ($cb) = @_; - push @{$asapq->[0]}, $cb; - $asapq->[1] ||= _asap_timer(); -} - -sub next_tick ($) { - my ($cb) = @_; - push @{$nextq->[0]}, $cb; - $nextq->[1] ||= PublicInbox::DS->AddTimer(0, *_run_next); + my $run = $laterq->[0]; + $laterq->[0] = []; + $laterq->[1] = undef; + $_->() foreach @$run; } sub later ($) { @@ -83,10 +26,5 @@ sub later ($) { $laterq->[1] ||= PublicInbox::DS->AddTimer(60, *_run_later); } -END { - event_step(); - _run_all($nextq); - _run_all($laterq); -} - +END { _run_later() } 1; diff --git a/lib/PublicInbox/HTTP.pm b/lib/PublicInbox/HTTP.pm index 856b8959..b8912950 100644 --- a/lib/PublicInbox/HTTP.pm +++ b/lib/PublicInbox/HTTP.pm @@ -30,10 +30,8 @@ use constant { use Errno qw(EAGAIN); my $pipelineq = []; -my $pipet; sub process_pipelineq () { my $q = $pipelineq; - $pipet = undef; $pipelineq = []; foreach (@$q) { next unless $_->{sock}; @@ -238,8 +236,8 @@ sub next_request ($) { my ($self) = @_; if ($self->{rbuf}) { # avoid recursion for pipelined requests + PublicInbox::DS::requeue(\&process_pipelineq) if !@$pipelineq; push @$pipelineq, $self; - $pipet ||= PublicInbox::EvCleanup::asap(*process_pipelineq); } else { # wait for next request $self->requeue; } @@ -269,7 +267,7 @@ sub getline_cb ($$$) { if ($self->{wbuf}) { $self->write($next); } else { - PublicInbox::EvCleanup::asap($next); + PublicInbox::DS::requeue($next); } return; } diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index b46baeb2..35d17150 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -19,8 +19,8 @@ sub new { # no $io? call $cb at the top of the next event loop to # avoid recursion: unless (defined($io)) { - PublicInbox::EvCleanup::asap($cb) if $cb; - PublicInbox::EvCleanup::next_tick($cleanup) if $cleanup; + PublicInbox::DS::requeue($cb); + die 'cleanup unsupported w/o $io' if $cleanup; return; } @@ -87,7 +87,7 @@ sub close { # we defer this to the next timer loop since close is deferred if (my $cleanup = delete $self->{cleanup}) { - PublicInbox::EvCleanup::next_tick($cleanup); + PublicInbox::DS::requeue($cleanup); } } diff --git a/lib/PublicInbox/NNTP.pm b/lib/PublicInbox/NNTP.pm index 83970309..9973fcaf 100644 --- a/lib/PublicInbox/NNTP.pm +++ b/lib/PublicInbox/NNTP.pm @@ -50,14 +50,11 @@ sub expire_old () { my $exp = $EXPTIME; my $old = $now - $exp; my $nr = 0; - my $closed = 0; my %new; while (my ($fd, $v) = each %$EXPMAP) { my ($idle_time, $nntp) = @$v; if ($idle_time < $old) { - if ($nntp->shutdn) { - $closed++; - } else { + if (!$nntp->shutdn) { ++$nr; $new{$fd} = $v; } @@ -67,14 +64,7 @@ sub expire_old () { } } $EXPMAP = \%new; - if ($nr) { - $expt = PublicInbox::EvCleanup::later(*expire_old); - } else { - $expt = undef; - # noop to kick outselves out of the loop ASAP so descriptors - # really get closed - PublicInbox::EvCleanup::asap(sub {}) if $closed; - } + $expt = PublicInbox::EvCleanup::later(*expire_old) if $nr; } sub greet ($) { $_[0]->write($_[0]->{nntpd}->{greet}) }; |