From 26fdf73fb6b25583d8b4bc6a75d79a5ef7d88318 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 13 Jan 2017 22:28:36 +0000 Subject: rename "GitAsyncRd" to "GitAsync" This wrapper class actually does both reading and writing, and a shorter name is nicer. --- MANIFEST | 2 +- lib/PublicInbox/Git.pm | 6 +- lib/PublicInbox/GitAsync.pm | 133 ++++++++++++++++++++++++++++++++++++++++++ lib/PublicInbox/GitAsyncRd.pm | 133 ------------------------------------------ 4 files changed, 137 insertions(+), 137 deletions(-) create mode 100644 lib/PublicInbox/GitAsync.pm delete mode 100644 lib/PublicInbox/GitAsyncRd.pm diff --git a/MANIFEST b/MANIFEST index cc4882a1..789ed68c 100644 --- a/MANIFEST +++ b/MANIFEST @@ -55,7 +55,7 @@ lib/PublicInbox/Filter/Mirror.pm lib/PublicInbox/Filter/Vger.pm lib/PublicInbox/GetlineBody.pm lib/PublicInbox/Git.pm -lib/PublicInbox/GitAsyncRd.pm +lib/PublicInbox/GitAsync.pm lib/PublicInbox/GitAsyncWr.pm lib/PublicInbox/GitHTTPBackend.pm lib/PublicInbox/HTTP.pm diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index 0b482c7e..648aaaf0 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -15,7 +15,7 @@ use PublicInbox::Spawn qw(spawn popen_rd); use Fcntl qw(:seek); my $have_async = eval { require PublicInbox::EvCleanup; - require PublicInbox::GitAsyncRd; + require PublicInbox::GitAsync; }; # Documentation/SubmittingPatches recommends 12 (Linux v4.4) @@ -222,7 +222,7 @@ sub check_async_ds ($$$) { my ($self, $obj, $cb) = @_; ($self->{async_c} ||= do { _bidi_pipe($self, qw(--batch-check in_ac out_ac pid_ac)); - PublicInbox::GitAsyncRd->new($self->{in_ac}, $self->{out_ac}, 1) + PublicInbox::GitAsync->new($self->{in_ac}, $self->{out_ac}, 1); })->cat_file_async($obj, $cb); } @@ -230,7 +230,7 @@ sub cat_async_ds ($$$) { my ($self, $obj, $cb) = @_; ($self->{async} ||= do { _bidi_pipe($self, qw(--batch in_a out_a pid_a)); - PublicInbox::GitAsyncRd->new($self->{in_a}, $self->{out_a}); + PublicInbox::GitAsync->new($self->{in_a}, $self->{out_a}); })->cat_file_async($obj, $cb); } diff --git a/lib/PublicInbox/GitAsync.pm b/lib/PublicInbox/GitAsync.pm new file mode 100644 index 00000000..8369978c --- /dev/null +++ b/lib/PublicInbox/GitAsync.pm @@ -0,0 +1,133 @@ +# Copyright (C) 2016 all contributors +# License: AGPL-3.0+ +# +# internal class used by PublicInbox::Git + Danga::Socket +# This parses the output pipe of "git cat-file --batch/--batch-check" +package PublicInbox::GitAsync; +use strict; +use warnings; +use base qw(Danga::Socket); +use fields qw(jobq rbuf wr check); +use PublicInbox::GitAsyncWr; +our $MAX = 65536; # Import may bump this in the future + +sub new { + my ($class, $rd, $wr, $check) = @_; + my $self = fields::new($class); + IO::Handle::blocking($rd, 0); + $self->SUPER::new($rd); + $self->{jobq} = []; # [ [ $obj, $cb, $state ], ... ] + my $buf = ''; + $self->{rbuf} = \$buf; + $self->{wr} = PublicInbox::GitAsyncWr->new($wr); + $self->{check} = $check; + $self->watch_read(1); + $self; +} + +sub cat_file_async { + my ($self, $obj, $cb) = @_; + # order matters + push @{$self->{jobq}}, [ $obj, $cb ]; + $self->{wr}->write(\"$obj\n"); +} + +# Returns: an array ref of the info line for --batch-check and --batch, +# which may be: [ $obj, 'missing'] +# Returns undef on error +sub read_info ($) { + my ($self) = @_; + my $rbuf = $self->{rbuf}; + my $rd = $self->{sock}; + + while (1) { + $$rbuf =~ s/\A([^\n]+)\n//s and return [ split(/ /, $1) ]; + + my $r = sysread($rd, $$rbuf, 110, length($$rbuf)); + next if $r; + return $r; + } +} + +sub event_read { + my ($self) = @_; + my $jobq = $self->{jobq}; + my ($cur, $obj, $cb, $info, $left); + my $check = $self->{check}; + my ($rbuf, $rlen, $need, $buf); +take_job: + $cur = shift @$jobq or die 'BUG: empty job queue in '.__PACKAGE__; + ($obj, $cb, $info, $left) = @$cur; + if (!$info) { + $info = read_info($self); + if (!defined $info && ($!{EAGAIN} || $!{EINTR})) { + return unshift(@$jobq, $cur) + } + $cb->($info); # $info may 0 (EOF, or undef, $cb will see $!) + return $self->close unless $info; + if ($check || (scalar(@$info) != 3)) { + # do not monopolize the event loop if we're drained: + return if ${$self->{rbuf}} eq ''; + goto take_job; + } + $cur->[2] = $info; + my $len = $info->[2]; + $left = \$len; + $cur->[3] = $left; # onto reading body... + } + ref($left) or die 'BUG: $left not ref in '.__PACKAGE__; + + $rbuf = $self->{rbuf}; + $rlen = length($$rbuf); + $need = $$left + 1; # +1 for trailing LF + $buf = ''; + + if ($rlen == $need) { +final_hunk: + $self->{rbuf} = \$buf; + $$left = undef; + my $lf = chop $$rbuf; + $lf eq "\n" or die "BUG: missing LF (got $lf)"; + $cb->($rbuf); + + return if $buf eq ''; + goto take_job; + } elsif ($rlen < $need) { + my $all = $need - $rlen; + my $n = $all > $MAX ? $MAX : $all; + my $r = sysread($self->{sock}, $$rbuf, $n, $rlen); + if ($r) { + goto final_hunk if $r == $all; + + # more to read later... + $$left -= $r; + $self->{rbuf} = \$buf; + $cb->($rbuf); + + # don't monopolize the event loop + return unshift(@$jobq, $cur); + } elsif (!defined $r) { + return unshift(@$jobq, $cur) if $!{EAGAIN} || $!{EINTR}; + } + $cb->($r); # $cb should handle 0 and undef (and see $!) + $self->close; # FAIL... + } else { # too much data in rbuf + $buf = substr($$rbuf, $need, $rlen - $need); + $$rbuf = substr($$rbuf, 0, $need); + goto final_hunk; + } +} + +sub close { + my $self = shift; + my $jobq = $self->{jobq}; + $self->{jobq} = []; + $_->[1]->(0) for @$jobq; + $self->{wr}->close; + $self->SUPER::close(@_); +} + +sub event_hup { $_[0]->close } +sub event_err { $_[0]->close } + +1; diff --git a/lib/PublicInbox/GitAsyncRd.pm b/lib/PublicInbox/GitAsyncRd.pm deleted file mode 100644 index 465db2c7..00000000 --- a/lib/PublicInbox/GitAsyncRd.pm +++ /dev/null @@ -1,133 +0,0 @@ -# Copyright (C) 2016 all contributors -# License: AGPL-3.0+ -# -# internal class used by PublicInbox::Git + Danga::Socket -# This parses the output pipe of "git cat-file --batch/--batch-check" -package PublicInbox::GitAsyncRd; -use strict; -use warnings; -use base qw(Danga::Socket); -use fields qw(jobq rbuf wr check); -use PublicInbox::GitAsyncWr; -our $MAX = 65536; # Import may bump this in the future - -sub new { - my ($class, $rd, $wr, $check) = @_; - my $self = fields::new($class); - IO::Handle::blocking($rd, 0); - $self->SUPER::new($rd); - $self->{jobq} = []; # [ [ $obj, $cb, $state ], ... ] - my $buf = ''; - $self->{rbuf} = \$buf; - $self->{wr} = PublicInbox::GitAsyncWr->new($wr); - $self->{check} = $check; - $self->watch_read(1); - $self; -} - -sub cat_file_async { - my ($self, $obj, $cb) = @_; - # order matters - push @{$self->{jobq}}, [ $obj, $cb ]; - $self->{wr}->write(\"$obj\n"); -} - -# Returns: an array ref of the info line for --batch-check and --batch, -# which may be: [ $obj, 'missing'] -# Returns undef on error -sub read_info ($) { - my ($self) = @_; - my $rbuf = $self->{rbuf}; - my $rd = $self->{sock}; - - while (1) { - $$rbuf =~ s/\A([^\n]+)\n//s and return [ split(/ /, $1) ]; - - my $r = sysread($rd, $$rbuf, 110, length($$rbuf)); - next if $r; - return $r; - } -} - -sub event_read { - my ($self) = @_; - my $jobq = $self->{jobq}; - my ($cur, $obj, $cb, $info, $left); - my $check = $self->{check}; - my ($rbuf, $rlen, $need, $buf); -take_job: - $cur = shift @$jobq or die 'BUG: empty job queue in '.__PACKAGE__; - ($obj, $cb, $info, $left) = @$cur; - if (!$info) { - $info = read_info($self); - if (!defined $info && ($!{EAGAIN} || $!{EINTR})) { - return unshift(@$jobq, $cur) - } - $cb->($info); # $info may 0 (EOF, or undef, $cb will see $!) - return $self->close unless $info; - if ($check || (scalar(@$info) != 3)) { - # do not monopolize the event loop if we're drained: - return if ${$self->{rbuf}} eq ''; - goto take_job; - } - $cur->[2] = $info; - my $len = $info->[2]; - $left = \$len; - $cur->[3] = $left; # onto reading body... - } - ref($left) or die 'BUG: $left not ref in '.__PACKAGE__; - - $rbuf = $self->{rbuf}; - $rlen = length($$rbuf); - $need = $$left + 1; # +1 for trailing LF - $buf = ''; - - if ($rlen == $need) { -final_hunk: - $self->{rbuf} = \$buf; - $$left = undef; - my $lf = chop $$rbuf; - $lf eq "\n" or die "BUG: missing LF (got $lf)"; - $cb->($rbuf); - - return if $buf eq ''; - goto take_job; - } elsif ($rlen < $need) { - my $all = $need - $rlen; - my $n = $all > $MAX ? $MAX : $all; - my $r = sysread($self->{sock}, $$rbuf, $n, $rlen); - if ($r) { - goto final_hunk if $r == $all; - - # more to read later... - $$left -= $r; - $self->{rbuf} = \$buf; - $cb->($rbuf); - - # don't monopolize the event loop - return unshift(@$jobq, $cur); - } elsif (!defined $r) { - return unshift(@$jobq, $cur) if $!{EAGAIN} || $!{EINTR}; - } - $cb->($r); # $cb should handle 0 and undef (and see $!) - $self->close; # FAIL... - } else { # too much data in rbuf - $buf = substr($$rbuf, $need, $rlen - $need); - $$rbuf = substr($$rbuf, 0, $need); - goto final_hunk; - } -} - -sub close { - my $self = shift; - my $jobq = $self->{jobq}; - $self->{jobq} = []; - $_->[1]->(0) for @$jobq; - $self->{wr}->close; - $self->SUPER::close(@_); -} - -sub event_hup { $_[0]->close } -sub event_err { $_[0]->close } - -1; -- cgit v1.2.3-24-ge0c7