# Copyright (C) all contributors
# License: AGPL-3.0+
# supports reaping of children tied to a pipe or socket
package PublicInbox::IO;
use v5.12;
use parent qw(IO::Handle Exporter);
use PublicInbox::DS qw(awaitpid);
our @EXPORT_OK = qw(poll_in read_all try_cat write_file);
use Carp qw(croak);
use IO::Poll qw(POLLIN);
use Errno qw(EINTR EAGAIN);
use PublicInbox::OnDestroy;
# don't autodie in top-level for Perl 5.16.3 (and maybe newer versions)
# we have our own ->close, so we scope autodie into each sub
sub waitcb { # awaitpid callback
my ($pid, $errref, $cb, @args) = @_;
$$errref = $?; # sets .cerr for _close
$cb->($pid, @args) if $cb; # may clobber $?
}
sub attach_pid {
my ($io, $pid, @cb_arg) = @_;
bless $io, __PACKAGE__;
# we share $err (and not $self) with awaitpid to avoid a ref cycle
my $e = \(my $err);
${*$io}{pi_io_reap} = [ $PublicInbox::OnDestroy::fork_gen, $pid, $e ];
awaitpid($pid, \&waitcb, $e, @cb_arg);
$io;
}
sub attached_pid {
my ($io) = @_;
${${*$io}{pi_io_reap} // []}[1];
}
sub can_reap {
my ($io) = @_;
${${*$io}{pi_io_reap} // [-1]}[0] == $PublicInbox::OnDestroy::fork_gen;
}
# caller cares about error result if they call close explicitly
# reap->[2] may be set before this is called via waitcb
sub close {
my ($io) = @_;
my $ret = $io->SUPER::close;
my $reap = delete ${*$io}{pi_io_reap};
return $ret if ($reap->[0] // -1) != $PublicInbox::OnDestroy::fork_gen;
if (defined ${$reap->[2]}) { # reap_pids already reaped asynchronously
$? = ${$reap->[2]};
} else { # wait synchronously
my $w = awaitpid($reap->[1]);
}
$? ? '' : $ret;
}
sub DESTROY {
my ($io) = @_;
my $reap = delete ${*$io}{pi_io_reap};
if (($reap->[0] // -1) == $PublicInbox::OnDestroy::fork_gen) {
$io->SUPER::close;
${$reap->[2]} // awaitpid($reap->[1]);
}
$io->SUPER::DESTROY;
}
sub write_file ($$@) { # mode, filename, LIST (for print)
use autodie qw(open close);
open(my $fh, shift, shift);
print $fh @_;
defined(wantarray) && !wantarray ? $fh : close $fh;
}
sub poll_in ($;$) {
IO::Poll::_poll($_[1] // -1, fileno($_[0]), my $ev = POLLIN);
}
sub read_all ($;$$$) { # pass $len=0 to read until EOF for :utf8 handles
use autodie qw(read);
my ($io, $len, $bref, $off) = @_;
$bref //= \(my $buf);
$off //= 0;
my $r = 0;
if (my $left = $len //= -s $io) { # known size (binmode :raw/:unix)
do { # retry for binmode :unix
$r = read($io, $$bref, $left, $off += $r) or croak(
"read($io) premature EOF ($left/$len remain)");
} while ($left -= $r);
} else { # read until EOF
while (($r = read($io, $$bref, 65536, $off += $r))) {}
}
wantarray ? split(/^/sm, $$bref) : $$bref
}
sub try_cat ($) {
my ($path) = @_;
open(my $fh, '<', $path) or return '';
read_all $fh;
}
# TODO: move existing HTTP/IMAP/NNTP/POP3 uses of rbuf here
sub my_bufread {
my ($io, $len) = @_;
my $rbuf = ${*$io}{pi_io_rbuf} //= \(my $new = '');
my $left = $len - length($$rbuf);
my $r;
while ($left > 0) {
$r = sysread($io, $$rbuf, $left, length($$rbuf));
if ($r) {
$left -= $r;
} elsif (defined($r)) { # EOF
return 0;
} else {
next if ($! == EAGAIN and poll_in($io));
next if $! == EINTR; # may be set by sysread or poll_in
return; # unrecoverable error
}
}
my $no_pad = substr($$rbuf, 0, $len, '');
delete(${*$io}{pi_io_rbuf}) if $$rbuf eq '';
\$no_pad;
}
# always uses "\n"
sub my_readline {
my ($io) = @_;
my $rbuf = ${*$io}{pi_io_rbuf} //= \(my $new = '');
while (1) {
if ((my $n = index($$rbuf, "\n")) >= 0) {
my $ret = substr($$rbuf, 0, $n + 1, '');
delete(${*$io}{pi_io_rbuf}) if $$rbuf eq '';
return $ret;
}
my $r = sysread($io, $$rbuf, 65536, length($$rbuf));
if (!defined($r)) {
next if ($! == EAGAIN and poll_in($io));
next if $! == EINTR; # may be set by sysread or poll_in
return; # unrecoverable error
} elsif ($r == 0) { # return whatever's left on EOF
delete(${*$io}{pi_io_rbuf});
return $$rbuf;
} # else { continue
}
}
sub has_rbuf {
my ($io) = @_;
defined(${*$io}{pi_io_rbuf});
}
1;