diff options
Diffstat (limited to 'lib/PublicInbox/WQBlocked.pm')
-rw-r--r-- | lib/PublicInbox/WQBlocked.pm | 48 |
1 files changed, 48 insertions, 0 deletions
diff --git a/lib/PublicInbox/WQBlocked.pm b/lib/PublicInbox/WQBlocked.pm new file mode 100644 index 00000000..8d931fa9 --- /dev/null +++ b/lib/PublicInbox/WQBlocked.pm @@ -0,0 +1,48 @@ +# Copyright (C) all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# non-blocking workqueues, currently used by LeiNoteEvent to track renames +package PublicInbox::WQBlocked; +use v5.12; +use parent qw(PublicInbox::DS); +use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT); +use PublicInbox::IPC; +use Carp (); + +sub new { + my ($cls, $wq, $buf) = @_; + my $self = bless { msgq => [$buf], }, $cls; + $wq->{wqb} = $self->SUPER::new($wq->{-wq_s1}, EPOLLOUT|EPOLLONESHOT); +} + +sub flush_send { + my ($self) = @_; + push(@{$self->{msgq}}, $_[1]) if defined($_[1]); + while (defined(my $buf = shift @{$self->{msgq}})) { + if (ref($buf) eq 'CODE') { + $buf->($self); # could be \&PublicInbox::DS::close + } else { + my $wq_s1 = $self->{sock}; + my $n = $PublicInbox::IPC::send_cmd->($wq_s1, [], $buf, + 0); + next if defined($n); + Carp::croak("sendmsg: $!") unless $!{EAGAIN}; + PublicInbox::DS::epwait($wq_s1, EPOLLOUT|EPOLLONESHOT); + unshift @{$self->{msgq}}, $buf; + last; # wait for ->event_step + } + } +} + +sub enq_close { flush_send($_[0], $_[0]->can('close')) } + +sub event_step { # called on EPOLLOUT wakeup + my ($self) = @_; + eval { flush_send($self) } if $self->{sock}; + if ($@) { + warn $@; + $self->close; + } +} + +1; |