diff options
Diffstat (limited to 'lib/PublicInbox/IO.pm')
-rw-r--r-- | lib/PublicInbox/IO.pm | 152 |
1 files changed, 152 insertions, 0 deletions
diff --git a/lib/PublicInbox/IO.pm b/lib/PublicInbox/IO.pm new file mode 100644 index 00000000..8640f112 --- /dev/null +++ b/lib/PublicInbox/IO.pm @@ -0,0 +1,152 @@ +# Copyright (C) all contributors <meta@public-inbox.org> +# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt> + +# supports reaping of children tied to a pipe or socket +package PublicInbox::IO; +use v5.12; +use parent qw(IO::Handle Exporter); +use PublicInbox::DS qw(awaitpid); +our @EXPORT_OK = qw(poll_in read_all try_cat write_file); +use Carp qw(croak); +use IO::Poll qw(POLLIN); +use Errno qw(EINTR EAGAIN); +use PublicInbox::OnDestroy; +# don't autodie in top-level for Perl 5.16.3 (and maybe newer versions) +# we have our own ->close, so we scope autodie into each sub + +sub waitcb { # awaitpid callback + my ($pid, $errref, $cb, @args) = @_; + $$errref = $?; # sets .cerr for _close + $cb->($pid, @args) if $cb; # may clobber $? +} + +sub attach_pid { + my ($io, $pid, @cb_arg) = @_; + bless $io, __PACKAGE__; + # we share $err (and not $self) with awaitpid to avoid a ref cycle + my $e = \(my $err); + ${*$io}{pi_io_reap} = [ $PublicInbox::OnDestroy::fork_gen, $pid, $e ]; + awaitpid($pid, \&waitcb, $e, @cb_arg); + $io; +} + +sub attached_pid { + my ($io) = @_; + ${${*$io}{pi_io_reap} // []}[1]; +} + +sub can_reap { + my ($io) = @_; + ${${*$io}{pi_io_reap} // [-1]}[0] == $PublicInbox::OnDestroy::fork_gen; +} + +# caller cares about error result if they call close explicitly +# reap->[2] may be set before this is called via waitcb +sub close { + my ($io) = @_; + my $ret = $io->SUPER::close; + my $reap = delete ${*$io}{pi_io_reap}; + return $ret if ($reap->[0] // -1) != $PublicInbox::OnDestroy::fork_gen; + if (defined ${$reap->[2]}) { # reap_pids already reaped asynchronously + $? = ${$reap->[2]}; + } else { # wait synchronously + my $w = awaitpid($reap->[1]); + } + $? ? '' : $ret; +} + +sub DESTROY { + my ($io) = @_; + my $reap = delete ${*$io}{pi_io_reap}; + if (($reap->[0] // -1) == $PublicInbox::OnDestroy::fork_gen) { + $io->SUPER::close; + ${$reap->[2]} // awaitpid($reap->[1]); + } + $io->SUPER::DESTROY; +} + +sub write_file ($$@) { # mode, filename, LIST (for print) + use autodie qw(open close); + open(my $fh, shift, shift); + print $fh @_; + defined(wantarray) && !wantarray ? $fh : close $fh; +} + +sub poll_in ($;$) { + IO::Poll::_poll($_[1] // -1, fileno($_[0]), my $ev = POLLIN); +} + +sub read_all ($;$$$) { # pass $len=0 to read until EOF for :utf8 handles + use autodie qw(read); + my ($io, $len, $bref, $off) = @_; + $bref //= \(my $buf); + $off //= 0; + my $r = 0; + if (my $left = $len //= -s $io) { # known size (binmode :raw/:unix) + do { # retry for binmode :unix + $r = read($io, $$bref, $left, $off += $r) or croak( + "read($io) premature EOF ($left/$len remain)"); + } while ($left -= $r); + } else { # read until EOF + while (($r = read($io, $$bref, 65536, $off += $r))) {} + } + wantarray ? split(/^/sm, $$bref) : $$bref +} + +sub try_cat ($) { + my ($path) = @_; + open(my $fh, '<', $path) or return ''; + read_all $fh; +} + +# TODO: move existing HTTP/IMAP/NNTP/POP3 uses of rbuf here +sub my_bufread { + my ($io, $len) = @_; + my $rbuf = ${*$io}{pi_io_rbuf} //= \(my $new = ''); + my $left = $len - length($$rbuf); + my $r; + while ($left > 0) { + $r = sysread($io, $$rbuf, $left, length($$rbuf)); + if ($r) { + $left -= $r; + } elsif (defined($r)) { # EOF + return 0; + } else { + next if ($! == EAGAIN and poll_in($io)); + next if $! == EINTR; # may be set by sysread or poll_in + return; # unrecoverable error + } + } + my $no_pad = substr($$rbuf, 0, $len, ''); + delete(${*$io}{pi_io_rbuf}) if $$rbuf eq ''; + \$no_pad; +} + +# always uses "\n" +sub my_readline { + my ($io) = @_; + my $rbuf = ${*$io}{pi_io_rbuf} //= \(my $new = ''); + while (1) { + if ((my $n = index($$rbuf, "\n")) >= 0) { + my $ret = substr($$rbuf, 0, $n + 1, ''); + delete(${*$io}{pi_io_rbuf}) if $$rbuf eq ''; + return $ret; + } + my $r = sysread($io, $$rbuf, 65536, length($$rbuf)); + if (!defined($r)) { + next if ($! == EAGAIN and poll_in($io)); + next if $! == EINTR; # may be set by sysread or poll_in + return; # unrecoverable error + } elsif ($r == 0) { # return whatever's left on EOF + delete(${*$io}{pi_io_rbuf}); + return $$rbuf; + } # else { continue + } +} + +sub has_rbuf { + my ($io) = @_; + defined(${*$io}{pi_io_rbuf}); +} + +1; |