about summary refs log tree commit homepage
path: root/lib/PublicInbox/InputPipe.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/InputPipe.pm')
-rw-r--r--lib/PublicInbox/InputPipe.pm54
1 files changed, 35 insertions, 19 deletions
diff --git a/lib/PublicInbox/InputPipe.pm b/lib/PublicInbox/InputPipe.pm
index e1e26e20..ee5bda59 100644
--- a/lib/PublicInbox/InputPipe.pm
+++ b/lib/PublicInbox/InputPipe.pm
@@ -1,36 +1,52 @@
-# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
-# for reading pipes and sockets off the DS event loop
+# for reading pipes, sockets, and TTYs off the DS event loop
 package PublicInbox::InputPipe;
-use strict;
-use v5.10.1;
+use v5.12;
 use parent qw(PublicInbox::DS);
-use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
+use PublicInbox::Syscall qw(EPOLLIN);
 
 sub consume {
         my ($in, $cb, @args) = @_;
         my $self = bless { cb => $cb, args => \@args }, __PACKAGE__;
-        eval { $self->SUPER::new($in, EPOLLIN|EPOLLET) };
-        return $self->requeue if $@; # regular file
-        $in->blocking(0); # pipe or socket
+        eval { $self->SUPER::new($in, EPOLLIN) };
+        if ($@) { # regular file (but not w/ select|IO::Poll backends)
+                $self->{-need_rq} = 1;
+                $self->requeue;
+        } elsif (-p _ || -S _) { # O_NONBLOCK for sockets and pipes
+                $in->blocking(0);
+        }
+        $self;
+}
+
+sub close { # idempotent
+        my ($self) = @_;
+        $self->{-need_rq} ? delete($self->{sock}) : $self->SUPER::close
 }
 
 sub event_step {
         my ($self) = @_;
         my $r = sysread($self->{sock} // return, my $rbuf, 65536);
-        if ($r) {
-                $self->{cb}->(@{$self->{args} // []}, $rbuf);
-                return $self->requeue; # may be regular file or pipe
-        }
-        if (defined($r)) { # EOF
-                $self->{cb}->(@{$self->{args} // []}, '');
-        } elsif ($!{EAGAIN}) {
-                return;
-        } else { # another error
-                $self->{cb}->(@{$self->{args} // []}, undef)
+        eval {
+                if ($r) {
+                        $self->{cb}->($self, @{$self->{args}}, $rbuf);
+                        $self->requeue if $self->{-need_rq};
+                } elsif (defined($r)) { # EOF
+                        $self->{cb}->($self, @{$self->{args}}, '');
+                        $self->close
+                } elsif ($!{EAGAIN}) { # rely on EPOLLIN
+                } elsif ($!{EINTR}) { # rely on EPOLLIN for sockets/pipes
+                        $self->requeue if $self->{-need_rq};
+                } else { # another error
+                        $self->{cb}->($self, @{$self->{args}}, undef);
+                        $self->close;
+                }
+        };
+        if ($@) {
+                warn "E: $@";
+                $self->close;
         }
-        $self->{sock}->blocking ? delete($self->{sock}) : $self->close
 }
 
 1;