diff options
Diffstat (limited to 'lib/PublicInbox/DS.pm')
-rw-r--r-- | lib/PublicInbox/DS.pm | 135 |
1 files changed, 36 insertions, 99 deletions
diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index 737f4c7a..03612ce8 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -5,13 +5,21 @@ # # This is a fork of the (for now) unmaintained Danga::Socket 1.61. # Unused features will be removed, and updates will be made to take -# advantage of newer kernels - +# advantage of newer kernels. +# +# API changes to diverge from Danga::Socket will happen to better +# accomodate new features and improve scalability. Do not expect +# this to be a stable API like Danga::Socket. +# Bugs encountered (and likely fixed) are reported to +# bug-Danga-Socket@rt.cpan.org and visible at: +# https://rt.cpan.org/Public/Dist/Display.html?Name=Danga-Socket package PublicInbox::DS; use strict; use bytes; use POSIX (); use Time::HiRes (); +use IO::Handle qw(); +use Fcntl qw(FD_CLOEXEC F_SETFD F_GETFD); use warnings; @@ -22,10 +30,8 @@ use fields ('sock', # underlying socket 'write_buf', # arrayref of scalars, scalarrefs, or coderefs to write 'write_buf_offset', # offset into first array of write_buf to start writing at 'write_buf_size', # total length of data in all write_buf items - 'write_set_watch', # bool: true if we internally set watch_write rather than by a subclass 'closed', # bool: socket is closed 'event_watch', # bitmask of events the client is interested in (POLLIN,OUT,etc.) - 'writer_func', # subref which does writing. must return bytes written (or undef) and set $! on errors ); use Errno qw(EINPROGRESS EWOULDBLOCK EISCONN ENOTSOCK @@ -47,11 +53,11 @@ our ( $HaveKQueue, %DescriptorMap, # fd (num) -> PublicInbox::DS object $Epoll, # Global epoll fd (for epoll mode only) - $KQueue, # Global kqueue fd (for kqueue mode only) + $KQueue, # Global kqueue fd ref (for kqueue mode only) + $_io, # IO::Handle for Epoll @ToClose, # sockets to close when event loop is done $PostLoopCallback, # subref to call at the end of each loop, if defined (global) - %PLCMap, # fd (num) -> PostLoopCallback (per-object) $LoopTimeout, # timeout of event loop in milliseconds $DoneInit, # if we've done the one-time module init yet @@ -78,33 +84,19 @@ sub Reset { @Timers = (); $PostLoopCallback = undef; - %PLCMap = (); $DoneInit = 0; - POSIX::close($Epoll) if defined $Epoll && $Epoll >= 0; - POSIX::close($KQueue) if defined $KQueue && $KQueue >= 0; - - *EventLoop = *FirstTimeEventLoop; -} - -=head2 C<< CLASS->HaveEpoll() >> + # NOTE kqueue is close-on-fork, and we don't account for it, yet + # OTOH, we (public-inbox) don't need this sub outside of tests... + POSIX::close($$KQueue) if !$_io && $KQueue && $$KQueue >= 0; + $KQueue = undef; -Returns a true value if this class will use IO::Epoll for async IO. + $_io = undef; # close $Epoll + $Epoll = undef; -=cut -sub HaveEpoll { - _InitPoller(); - return $HaveEpoll; + *EventLoop = *FirstTimeEventLoop; } -=head2 C<< CLASS->ToClose() >> - -Return the list of sockets that are awaiting close() at the end of the -current event loop. - -=cut -sub ToClose { return @ToClose; } - =head2 C<< CLASS->SetLoopTimeout( $timeout ) >> Set the loop timeout for the event loop to some value in milliseconds. @@ -164,6 +156,16 @@ sub AddTimer { die "Shouldn't get here."; } +# keeping this around in case we support other FD types for now, +# epoll_create1(EPOLL_CLOEXEC) requires Linux 2.6.27+... +sub set_cloexec ($) { + my ($fd) = @_; + + $_io = IO::Handle->new_from_fd($fd, 'r+') or return; + defined(my $fl = fcntl($_io, F_GETFD, 0)) or return; + fcntl($_io, F_SETFD, $fl | FD_CLOEXEC); +} + sub _InitPoller { return if $DoneInit; @@ -171,7 +173,7 @@ sub _InitPoller if ($HAVE_KQUEUE) { $KQueue = IO::KQueue->new(); - $HaveKQueue = $KQueue >= 0; + $HaveKQueue = defined $KQueue; if ($HaveKQueue) { *EventLoop = *KQueueEventLoop; } @@ -180,6 +182,7 @@ sub _InitPoller $Epoll = eval { epoll_create(1024); }; $HaveEpoll = defined $Epoll && $Epoll >= 0; if ($HaveEpoll) { + set_cloexec($Epoll); *EventLoop = *EpollEventLoop; } } @@ -251,7 +254,6 @@ sub EpollEventLoop { # get up to 1000 events my $evcount = epoll_wait($Epoll, 1000, $timeout, \@events); - EVENT: for ($i=0; $i<$evcount; $i++) { my $ev = $events[$i]; @@ -263,16 +265,6 @@ sub EpollEventLoop { my $code; my $state = $ev->[1]; - # if we didn't find a Perlbal::Socket subclass for that fd, try other - # pseudo-registered (above) fds. - if (! $pob) { - my $fd = $ev->[0]; - warn "epoll() returned fd $fd w/ state $state for which we have no mapping. removing.\n"; - epoll_ctl($Epoll, EPOLL_CTL_DEL, $fd, 0); - POSIX::close($fd); - next; - } - DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), state=%d \@ %s\n", $ev->[0], ref($pob), $ev->[1], time); @@ -329,10 +321,6 @@ sub PollEventLoop { $pob = $DescriptorMap{$fd}; - if (!$pob) { - next; - } - $pob->event_read if $state & POLLIN && ! $pob->{closed}; $pob->event_write if $state & POLLOUT && ! $pob->{closed}; $pob->event_err if $state & POLLERR && ! $pob->{closed}; @@ -365,11 +353,6 @@ sub KQueueEventLoop { foreach my $kev (@ret) { my ($fd, $filter, $flags, $fflags) = @$kev; my PublicInbox::DS $pob = $DescriptorMap{$fd}; - if (!$pob) { - warn "kevent() returned fd $fd for which we have no mapping. removing.\n"; - POSIX::close($fd); # close deletes the kevent entry - next; - } DebugLevel >= 1 && $class->DebugMsg("Event: fd=%d (%s), flags=%d \@ %s\n", $fd, ref($pob), $flags, time); @@ -404,18 +387,8 @@ The callback function will be passed two parameters: \%DescriptorMap sub SetPostLoopCallback { my ($class, $ref) = @_; - if (ref $class) { - # per-object callback - my PublicInbox::DS $self = $class; - if (defined $ref && ref $ref eq 'CODE') { - $PLCMap{$self->{fd}} = $ref; - } else { - delete $PLCMap{$self->{fd}}; - } - } else { - # global callback - $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef; - } + # global callback + $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef; } # Internal function: run the post-event callback, send read events @@ -441,11 +414,6 @@ sub PostEventLoop { # or global) cancels it my $keep_running = 1; - # per-object post-loop-callbacks - for my $plc (values %PLCMap) { - $keep_running &&= $plc->(\%DescriptorMap); - } - # now we're at the very end, call callback if defined if (defined $PostLoopCallback) { $keep_running &&= $PostLoopCallback->(\%DescriptorMap); @@ -595,10 +563,6 @@ sub _cleanup { } } - # now delete from mappings. this fd no longer belongs to us, so we don't want - # to get alerts for it if it becomes writable/readable/etc. - delete $PLCMap{$self->{fd}}; - # we explicitly don't delete from DescriptorMap here until we # actually close the socket, as we might be in the middle of # processing an epoll_wait/etc that returned hundreds of fds, one @@ -622,18 +586,6 @@ sub sock { return $self->{sock}; } -=head2 C<< $obj->set_writer_func( CODEREF ) >> - -Sets a function to use instead of C<syswrite()> when writing data to the socket. - -=cut -sub set_writer_func { - my PublicInbox::DS $self = shift; - my $wtr = shift; - Carp::croak("Not a subref") unless !defined $wtr || UNIVERSAL::isa($wtr, "CODE"); - $self->{writer_func} = $wtr; -} - =head2 C<< $obj->write( $data ) >> Write the specified data to the underlying handle. I<data> may be scalar, @@ -703,12 +655,8 @@ sub write { } my $to_write = $len - $self->{write_buf_offset}; - my $written; - if (my $wtr = $self->{writer_func}) { - $written = $wtr->($bref, $to_write, $self->{write_buf_offset}); - } else { - $written = syswrite($self->{sock}, $$bref, $to_write, $self->{write_buf_offset}); - } + my $written = syswrite($self->{sock}, $$bref, $to_write, + $self->{write_buf_offset}); if (! defined $written) { if ($! == EPIPE) { @@ -720,7 +668,6 @@ sub write { push @{$self->{write_buf}}, $bref; $self->{write_buf_size} += $len; } - $self->{write_set_watch} = 1 unless $self->{event_watch} & POLLOUT; $self->watch_write(1); return 0; } elsif ($! == ECONNRESET) { @@ -747,11 +694,7 @@ sub write { DebugLevel >= 2 && $self->debugmsg("Wrote ALL %d bytes to %d (nq=%d)", $written, $self->{fd}, $need_queue); $self->{write_buf_offset} = 0; - - if ($self->{write_set_watch}) { - $self->watch_write(0); - $self->{write_set_watch} = 0; - } + $self->watch_write(0); # this was our only write, so we can return immediately # since we avoided incrementing the buffer size or @@ -769,7 +712,6 @@ sub write { sub on_incomplete_write { my PublicInbox::DS $self = shift; - $self->{write_set_watch} = 1 unless $self->{event_watch} & POLLOUT; $self->watch_write(1); } @@ -888,11 +830,6 @@ sub watch_write { $event &= ~POLLOUT if ! $val; $event |= POLLOUT if $val; - if ($val && caller ne __PACKAGE__) { - # A subclass registered interest, it's now responsible for this. - $self->{write_set_watch} = 0; - } - # If it changed, set it if ($event != $self->{event_watch}) { if ($HaveKQueue) { |