about summary refs log tree commit homepage
path: root/lib/PublicInbox/IO.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/PublicInbox/IO.pm')
-rw-r--r--lib/PublicInbox/IO.pm152
1 files changed, 152 insertions, 0 deletions
diff --git a/lib/PublicInbox/IO.pm b/lib/PublicInbox/IO.pm
new file mode 100644
index 00000000..8640f112
--- /dev/null
+++ b/lib/PublicInbox/IO.pm
@@ -0,0 +1,152 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# supports reaping of children tied to a pipe or socket
+package PublicInbox::IO;
+use v5.12;
+use parent qw(IO::Handle Exporter);
+use PublicInbox::DS qw(awaitpid);
+our @EXPORT_OK = qw(poll_in read_all try_cat write_file);
+use Carp qw(croak);
+use IO::Poll qw(POLLIN);
+use Errno qw(EINTR EAGAIN);
+use PublicInbox::OnDestroy;
+# don't autodie in top-level for Perl 5.16.3 (and maybe newer versions)
+# we have our own ->close, so we scope autodie into each sub
+
+sub waitcb { # awaitpid callback
+        my ($pid, $errref, $cb, @args) = @_;
+        $$errref = $?; # sets .cerr for _close
+        $cb->($pid, @args) if $cb; # may clobber $?
+}
+
+sub attach_pid {
+        my ($io, $pid, @cb_arg) = @_;
+        bless $io, __PACKAGE__;
+        # we share $err (and not $self) with awaitpid to avoid a ref cycle
+        my $e = \(my $err);
+        ${*$io}{pi_io_reap} = [ $PublicInbox::OnDestroy::fork_gen, $pid, $e ];
+        awaitpid($pid, \&waitcb, $e, @cb_arg);
+        $io;
+}
+
+sub attached_pid {
+        my ($io) = @_;
+        ${${*$io}{pi_io_reap} // []}[1];
+}
+
+sub can_reap {
+        my ($io) = @_;
+        ${${*$io}{pi_io_reap} // [-1]}[0] == $PublicInbox::OnDestroy::fork_gen;
+}
+
+# caller cares about error result if they call close explicitly
+# reap->[2] may be set before this is called via waitcb
+sub close {
+        my ($io) = @_;
+        my $ret = $io->SUPER::close;
+        my $reap = delete ${*$io}{pi_io_reap};
+        return $ret if ($reap->[0] // -1) != $PublicInbox::OnDestroy::fork_gen;
+        if (defined ${$reap->[2]}) { # reap_pids already reaped asynchronously
+                $? = ${$reap->[2]};
+        } else { # wait synchronously
+                my $w = awaitpid($reap->[1]);
+        }
+        $? ? '' : $ret;
+}
+
+sub DESTROY {
+        my ($io) = @_;
+        my $reap = delete ${*$io}{pi_io_reap};
+        if (($reap->[0] // -1) == $PublicInbox::OnDestroy::fork_gen) {
+                $io->SUPER::close;
+                ${$reap->[2]} // awaitpid($reap->[1]);
+        }
+        $io->SUPER::DESTROY;
+}
+
+sub write_file ($$@) { # mode, filename, LIST (for print)
+        use autodie qw(open close);
+        open(my $fh, shift, shift);
+        print $fh @_;
+        defined(wantarray) && !wantarray ? $fh : close $fh;
+}
+
+sub poll_in ($;$) {
+        IO::Poll::_poll($_[1] // -1, fileno($_[0]), my $ev = POLLIN);
+}
+
+sub read_all ($;$$$) { # pass $len=0 to read until EOF for :utf8 handles
+        use autodie qw(read);
+        my ($io, $len, $bref, $off) = @_;
+        $bref //= \(my $buf);
+        $off //= 0;
+        my $r = 0;
+        if (my $left = $len //= -s $io) { # known size (binmode :raw/:unix)
+                do { # retry for binmode :unix
+                        $r = read($io, $$bref, $left, $off += $r) or croak(
+                                "read($io) premature EOF ($left/$len remain)");
+                } while ($left -= $r);
+        } else { # read until EOF
+                while (($r = read($io, $$bref, 65536, $off += $r))) {}
+        }
+        wantarray ? split(/^/sm, $$bref) : $$bref
+}
+
+sub try_cat ($) {
+        my ($path) = @_;
+        open(my $fh, '<', $path) or return '';
+        read_all $fh;
+}
+
+# TODO: move existing HTTP/IMAP/NNTP/POP3 uses of rbuf here
+sub my_bufread {
+        my ($io, $len) = @_;
+        my $rbuf = ${*$io}{pi_io_rbuf} //= \(my $new = '');
+        my $left = $len - length($$rbuf);
+        my $r;
+        while ($left > 0) {
+                $r = sysread($io, $$rbuf, $left, length($$rbuf));
+                if ($r) {
+                        $left -= $r;
+                } elsif (defined($r)) { # EOF
+                        return 0;
+                } else {
+                        next if ($! == EAGAIN and poll_in($io));
+                        next if $! == EINTR; # may be set by sysread or poll_in
+                        return; # unrecoverable error
+                }
+        }
+        my $no_pad = substr($$rbuf, 0, $len, '');
+        delete(${*$io}{pi_io_rbuf}) if $$rbuf eq '';
+        \$no_pad;
+}
+
+# always uses "\n"
+sub my_readline {
+        my ($io) = @_;
+        my $rbuf = ${*$io}{pi_io_rbuf} //= \(my $new = '');
+        while (1) {
+                if ((my $n = index($$rbuf, "\n")) >= 0) {
+                        my $ret = substr($$rbuf, 0, $n + 1, '');
+                        delete(${*$io}{pi_io_rbuf}) if $$rbuf eq '';
+                        return $ret;
+                }
+                my $r = sysread($io, $$rbuf, 65536, length($$rbuf));
+                if (!defined($r)) {
+                        next if ($! == EAGAIN and poll_in($io));
+                        next if $! == EINTR; # may be set by sysread or poll_in
+                        return; # unrecoverable error
+                } elsif ($r == 0) { # return whatever's left on EOF
+                        delete(${*$io}{pi_io_rbuf});
+                        return $$rbuf;
+                } # else { continue
+        }
+}
+
+sub has_rbuf {
+        my ($io) = @_;
+        defined(${*$io}{pi_io_rbuf});
+}
+
+1;