From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.6 (2021-04-09) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.2 required=3.0 tests=ALL_TRUSTED,BAYES_00, DKIM_SIGNED,DKIM_VALID,DKIM_VALID_AU,DKIM_VALID_EF shortcircuit=no autolearn=ham autolearn_force=no version=3.4.6 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 4E8651F47A for ; Sun, 8 Oct 2023 21:57:13 +0000 (UTC) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=80x24.org; s=selector1; t=1696802233; bh=EFVs9VDx1dT0Y2+wngPfod4Vp1+Hl/vorlW4+IVqqkU=; h=From:To:Subject:Date:From; b=g06n3hiYZyHL7HG4yZf0jSpE+HxCVCjN+K1/z+GGb+oRVXrrqGuPSye0eXDkL7WQh 1LWMQIgwtYGSmEmkl3y/JAkpud/aZiJ9bsrQYJlRxNBPVlTKhD6E2q0cBVREYRkmc0 UgEELUxiopNT1fP6GqwKDPMnFxjbwzPkFZ4Q6DSI= From: Eric Wong To: spew@80x24.org Subject: [PATCH] introduce ProcessIONBF for multiplexed non-blocking IO Date: Sun, 8 Oct 2023 21:57:13 +0000 Message-ID: <20231008215713.2057121-1-e@80x24.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit List-Id: This is required for reliable epoll/kevent/poll/select wakeup notifications, since we have no visibility into the buffer states used internally by Perl. --- MANIFEST | 1 + lib/PublicInbox/Git.pm | 9 ++++----- lib/PublicInbox/HTTPD/Async.pm | 4 ++-- lib/PublicInbox/ProcessIONBF.pm | 25 +++++++++++++++++++++++++ 4 files changed, 32 insertions(+), 7 deletions(-) create mode 100644 lib/PublicInbox/ProcessIONBF.pm diff --git a/MANIFEST b/MANIFEST index c972818f..791d91a7 100644 --- a/MANIFEST +++ b/MANIFEST @@ -319,6 +319,7 @@ lib/PublicInbox/POP3.pm lib/PublicInbox/POP3D.pm lib/PublicInbox/PktOp.pm lib/PublicInbox/ProcessIO.pm +lib/PublicInbox/ProcessIONBF.pm lib/PublicInbox/Qspawn.pm lib/PublicInbox/Reply.pm lib/PublicInbox/RepoAtom.pm diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index 94d5dcee..448cfaf7 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -12,7 +12,6 @@ use v5.10.1; use parent qw(Exporter PublicInbox::DS); use autodie qw(socketpair); use POSIX (); -use IO::Handle; # ->blocking use Socket qw(AF_UNIX SOCK_STREAM); use PublicInbox::Syscall qw(EPOLLIN EPOLLET); use Errno qw(EINTR EAGAIN); @@ -20,6 +19,7 @@ use File::Glob qw(bsd_glob GLOB_NOSORT); use File::Spec (); use Time::HiRes qw(stat); use PublicInbox::Spawn qw(spawn popen_rd which); +use PublicInbox::ProcessIONBF; use PublicInbox::Tmpfile; use IO::Poll qw(POLLIN); use Carp qw(croak carp); @@ -146,7 +146,6 @@ sub _sock_cmd { my ($self, $batch, $err_c) = @_; $self->{sock} and Carp::confess('BUG: {sock} exists'); socketpair(my $s1, my $s2, AF_UNIX, SOCK_STREAM, 0); - $s1->blocking(0); my $opt = { pgid => 0, 0 => $s2, 1 => $s2 }; my $gd = $self->{git_dir}; if ($gd =~ s!/([^/]+/[^/]+)\z!/!) { @@ -165,7 +164,7 @@ sub _sock_cmd { $self->fail("tmpfile($id): $!"); } my $pid = spawn(\@cmd, undef, $opt); - $self->{sock} = PublicInbox::ProcessIO->maybe_new($pid, $s1); + $self->{sock} = PublicInbox::ProcessIONBF->new($pid, $s1); } sub poll_in ($) { IO::Poll::_poll($RDTIMEO, fileno($_[0]), my $ev = POLLIN) } @@ -626,8 +625,8 @@ sub cleanup_if_unlinked { my $ret = 0; for my $obj ($self, ($self->{ck} // ())) { my $sock = $obj->{sock} // next; - my PublicInbox::ProcessIO $pp = tied *$sock; # ProcessIO - my $pid = $pp->{pid} // next; + my PublicInbox::ProcessIONBF $p = tied *$sock; # ProcessIONBF + my $pid = $p->{pid} // next; open my $fh, '<', "/proc/$pid/maps" or return cleanup($self, 1); while (<$fh>) { # n.b. we do not restart for unlinked multi-pack-index diff --git a/lib/PublicInbox/HTTPD/Async.pm b/lib/PublicInbox/HTTPD/Async.pm index b9d2159c..b73d0c4b 100644 --- a/lib/PublicInbox/HTTPD/Async.pm +++ b/lib/PublicInbox/HTTPD/Async.pm @@ -18,6 +18,7 @@ use v5.12; use parent qw(PublicInbox::DS); use Errno qw(EAGAIN); use PublicInbox::Syscall qw(EPOLLIN); +use PublicInbox::ProcessIONBF; # This is called via: $env->{'pi-httpd.async'}->() # $io is a read-only pipe ($rpipe) for now, but may be a @@ -37,8 +38,7 @@ sub new { arg => $arg, # arg for $cb end_obj => $end_obj, # like END{}, can ->event_step }, $class; - my $pp = tied *$io; # ProcessIO - $pp->{fh}->blocking(0) // die "$io->blocking(0): $!"; + PublicInbox::ProcessIONBF->replace($io); $self->SUPER::new($io, EPOLLIN); } diff --git a/lib/PublicInbox/ProcessIONBF.pm b/lib/PublicInbox/ProcessIONBF.pm new file mode 100644 index 00000000..490e200a --- /dev/null +++ b/lib/PublicInbox/ProcessIONBF.pm @@ -0,0 +1,25 @@ +# Copyright (C) all contributors +# License: AGPL-3.0+ + +# used to support unbuffered partial reads +package PublicInbox::ProcessIONBF; +use v5.12; +use parent qw(PublicInbox::ProcessIO); +use IO::Handle; # ->blocking + +sub new { + my ($cls, $pid, $fh, @cb_arg) = @_; + $fh->blocking(0) // die "$fh->blocking(0): $!"; + my $io = $cls->SUPER::maybe_new($pid, $fh, @cb_arg); +} + +sub replace { + my ($cls, $orig) = @_; + my $pio = tied *$orig; # ProcessIO + $pio->{fh}->blocking(0) // die "$pio->{fh}->blocking(0): $!"; + bless $pio, $cls; +} + +sub READ { sysread($_[0]->{fh}, $_[1], $_[2], $_[3] // 0) } + +1;