# Copyright (C) all contributors # License: AGPL-3.0+ # supports reaping of children tied to a pipe or socket package PublicInbox::IO; use v5.12; use parent qw(IO::Handle Exporter); use PublicInbox::DS qw(awaitpid); our @EXPORT_OK = qw(poll_in read_all try_cat write_file); use Carp qw(croak); use IO::Poll qw(POLLIN); use Errno qw(EINTR EAGAIN); use PublicInbox::OnDestroy; # don't autodie in top-level for Perl 5.16.3 (and maybe newer versions) # we have our own ->close, so we scope autodie into each sub sub waitcb { # awaitpid callback my ($pid, $errref, $cb, @args) = @_; $$errref = $?; # sets .cerr for _close $cb->($pid, @args) if $cb; # may clobber $? } sub attach_pid { my ($io, $pid, @cb_arg) = @_; bless $io, __PACKAGE__; # we share $err (and not $self) with awaitpid to avoid a ref cycle my $e = \(my $err); ${*$io}{pi_io_reap} = [ $PublicInbox::OnDestroy::fork_gen, $pid, $e ]; awaitpid($pid, \&waitcb, $e, @cb_arg); $io; } sub attached_pid { my ($io) = @_; ${${*$io}{pi_io_reap} // []}[1]; } sub can_reap { my ($io) = @_; ${${*$io}{pi_io_reap} // [-1]}[0] == $PublicInbox::OnDestroy::fork_gen; } # caller cares about error result if they call close explicitly # reap->[2] may be set before this is called via waitcb sub close { my ($io) = @_; my $ret = $io->SUPER::close; my $reap = delete ${*$io}{pi_io_reap}; return $ret if ($reap->[0] // -1) != $PublicInbox::OnDestroy::fork_gen; if (defined ${$reap->[2]}) { # reap_pids already reaped asynchronously $? = ${$reap->[2]}; } else { # wait synchronously my $w = awaitpid($reap->[1]); } $? ? '' : $ret; } sub DESTROY { my ($io) = @_; my $reap = delete ${*$io}{pi_io_reap}; if (($reap->[0] // -1) == $PublicInbox::OnDestroy::fork_gen) { $io->SUPER::close; ${$reap->[2]} // awaitpid($reap->[1]); } $io->SUPER::DESTROY; } sub write_file ($$@) { # mode, filename, LIST (for print) use autodie qw(open close); open(my $fh, shift, shift); print $fh @_; defined(wantarray) && !wantarray ? $fh : close $fh; } sub poll_in ($;$) { IO::Poll::_poll($_[1] // -1, fileno($_[0]), my $ev = POLLIN); } sub read_all ($;$$$) { # pass $len=0 to read until EOF for :utf8 handles use autodie qw(read); my ($io, $len, $bref, $off) = @_; $bref //= \(my $buf); $off //= 0; my $r = 0; if (my $left = $len //= -s $io) { # known size (binmode :raw/:unix) do { # retry for binmode :unix $r = read($io, $$bref, $left, $off += $r) or croak( "read($io) premature EOF ($left/$len remain)"); } while ($left -= $r); } else { # read until EOF while (($r = read($io, $$bref, 65536, $off += $r))) {} } wantarray ? split(/^/sm, $$bref) : $$bref } sub try_cat ($) { my ($path) = @_; open(my $fh, '<', $path) or return ''; read_all $fh; } # TODO: move existing HTTP/IMAP/NNTP/POP3 uses of rbuf here sub my_bufread { my ($io, $len) = @_; my $rbuf = ${*$io}{pi_io_rbuf} //= \(my $new = ''); my $left = $len - length($$rbuf); my $r; while ($left > 0) { $r = sysread($io, $$rbuf, $left, length($$rbuf)); if ($r) { $left -= $r; } elsif (defined($r)) { # EOF return 0; } else { next if ($! == EAGAIN and poll_in($io)); next if $! == EINTR; # may be set by sysread or poll_in return; # unrecoverable error } } my $no_pad = substr($$rbuf, 0, $len, ''); delete(${*$io}{pi_io_rbuf}) if $$rbuf eq ''; \$no_pad; } # always uses "\n" sub my_readline { my ($io) = @_; my $rbuf = ${*$io}{pi_io_rbuf} //= \(my $new = ''); while (1) { if ((my $n = index($$rbuf, "\n")) >= 0) { my $ret = substr($$rbuf, 0, $n + 1, ''); delete(${*$io}{pi_io_rbuf}) if $$rbuf eq ''; return $ret; } my $r = sysread($io, $$rbuf, 65536, length($$rbuf)); if (!defined($r)) { next if ($! == EAGAIN and poll_in($io)); next if $! == EINTR; # may be set by sysread or poll_in return; # unrecoverable error } elsif ($r == 0) { # return whatever's left on EOF delete(${*$io}{pi_io_rbuf}); return $$rbuf; } # else { continue } } sub has_rbuf { my ($io) = @_; defined(${*$io}{pi_io_rbuf}); } 1;